Skip to content

Commit

Permalink
Add multiple workers in separate processes
Browse files Browse the repository at this point in the history
  • Loading branch information
Jeff Knupp committed Feb 11, 2014
1 parent ad942a7 commit e4604a1
Show file tree
Hide file tree
Showing 6 changed files with 52 additions and 18 deletions.
4 changes: 2 additions & 2 deletions app.py
Expand Up @@ -5,5 +5,5 @@ def count_words_in_page(url):
resp = requests.get(url)
return len(resp.text.split())

result = queue(count_words_in_page, 'http://www.jeffknupp.com')
print result
print queue(count_words_in_page, 'http://www.jeffknupp.com')
print queue(count_words_in_page, 'http://www.yahoo.com')
59 changes: 43 additions & 16 deletions brokest.py
@@ -1,26 +1,33 @@
"""Broker-less distributed task queue."""
from __future__ import absolute_import
import pickle
import logging
import multiprocessing

import zmq
import cloud
from config.settings import CONFIG

HOST = '127.0.0.1'
PORT = 9090
TASK_SOCKET = zmq.Context().socket(zmq.REQ)
TASK_SOCKET.connect('tcp://{}:{}'.format(HOST, PORT))

LOGGER = logging.getLogger(__name__)

class Worker(object):
"""A remote task executor."""

def __init__(self, host=HOST, port=PORT):
def __init__(self, host, port, worker_id=0):
"""Initialize worker."""
print 'starting worker [{}]'.format(worker_id)
LOGGER.info('Starting worker [{}]'.format(worker_id))
self.host = host
self.port = port
self._context = zmq.Context()
self._socket = self._context.socket(zmq.REP)
self._id = worker_id
self._context = None
self._socket = None

def start(self):
"""Start listening for tasks."""
self._context = zmq.Context()
self._socket = self._context.socket(zmq.REP)
self._socket.bind('tcp://{}:{}'.format(self.host, self.port))
while True:
runnable_string = self._socket.recv_pyobj()
Expand All @@ -34,22 +41,42 @@ def start(self):

def _do_work(self, task, args, kwargs):
"""Return the result of executing the given task."""
print('Running [{}] with args [{}] and kwargs [{}]'.format(
print('[{}] running [{}] with args [{}] and kwargs [{}]'.format(self._id,
task, args, kwargs))
return task(*args, **kwargs)

def next_worker():
workers = []
for worker in CONFIG['WORKERS']:
workers.append(worker)
while True:
for worker in workers:
yield worker

NEXT_WORKER = next_worker()

def queue(runnable, *args, **kwargs):
"""Return the result of running the task *runnable* with the given
arguments."""
worker = next(NEXT_WORKER)
host = worker[0]
port = worker[1]
socket = zmq.Context().socket(zmq.REQ)
print host, port
socket.connect('tcp://{}:{}'.format(host, port))
runnable_string = cloud.serialization.cloudpickle.dumps(runnable)
TASK_SOCKET.send_pyobj(runnable_string)
TASK_SOCKET.recv()
TASK_SOCKET.send_pyobj(args)
TASK_SOCKET.recv()
TASK_SOCKET.send_pyobj(kwargs)
results = TASK_SOCKET.recv_pyobj()
socket.send_pyobj(runnable_string)
socket.recv()
socket.send_pyobj(args)
socket.recv()
socket.send_pyobj(kwargs)
results = socket.recv_pyobj()
return results

if __name__ == '__main__':
w = Worker()
w.start()
workers = []
for worker in CONFIG['WORKERS']:
w = Worker(worker[0], worker[1], worker[2])
process = multiprocessing.Process(target=w.start)
process.start()

Empty file added config/__init__.py
Empty file.
Binary file added config/__init__.pyc
Binary file not shown.
7 changes: 7 additions & 0 deletions config/settings.py
@@ -0,0 +1,7 @@
CONFIG = {
'WORKERS': [
('127.0.0.1', 9091, 1),
('127.0.0.1', 9092, 2),
('127.0.0.1', 9093, 3),
],
}
Binary file added config/settings.pyc
Binary file not shown.

0 comments on commit e4604a1

Please sign in to comment.