Skip to content
This repository

HTTPS clone URL

Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP
Browse code

progress

  • Loading branch information...
commit fc8cb264ad27b899f99364ac60db9712f1da6f14 1 parent 2d8ddc7
Stephen Diehl authored
18 README.md
Source Rendered
... ... @@ -1,3 +1,19 @@
  1 +Motivation
  2 +==========
  3 +
  4 +Kaylee is a small MapReduce implementation mostly meant as a
  5 +proof of concept to illustrate the power of ZeroMQ and for
  6 +education purpose
  7 +
  8 +My goal was not to write a Hadoop clone but to build a starting point
  9 +that one could use to learn about MapReduce.
  10 +
  11 +The main bottleneck in this implementation is that the Shuffle
  12 +phase requires all data to be moved to the ``server`` instance
  13 +which is not generally a good idea for performance. But this lets
  14 +us a implement a simple shuffler using a Python defaultdict in
  15 +just a few lines of code which is easy to understand.
  16 +
1 17 Directions:
2 18 ===========
3 19
@@ -9,6 +25,8 @@ For Arch Linux
9 25
10 26 For Ubuntu Linux
11 27
  28 + $ add-apt-repository ppa:chris-lea/zeromq
  29 + $ apt-get update
12 30 $ apt-get install zeromq-bin libzmq-dev libzmq0
13 31
14 32 For Macintosh:
38 example.py
... ... @@ -1,15 +1,25 @@
1 1 import time
2   -from kaylee import Server
  2 +import numpy
  3 +import mmap
  4 +from itertools import count
3 5
4   -# Example
5   -# -----------------------------------------------
  6 +from kaylee import Server
6 7
  8 +# Note, we never load the whole file into memory.
7 9 f = open('mobydick.txt')
  10 +mm = mmap.mmap(f.fileno(), 0, access=mmap.ACCESS_READ)
8 11
9   -data = dict(enumerate(f.readlines()))
  12 +# This just enumerates all lines in the file, but is able to
  13 +# get data from disk into ZeroMQ much faster than read/writes.
  14 +def datafn():
  15 + i = count(0)
  16 + total = mm.size()
  17 + while mm.tell() < total:
  18 + yield next(i), memoryview(mm.readline())
  19 + mm.close()
10 20
11 21 def mapfn(k, v):
12   - for w in v.split():
  22 + for w in v.bytes.split():
13 23 yield w, 1
14 24
15 25 def reducefn(k, v):
@@ -19,9 +29,21 @@ def reducefn(k, v):
19 29 s = Server()
20 30 s.connect()
21 31
  32 +# yaml config
  33 +# Datastore backend, Redis Moose kaylee://
  34 +
  35 +# kaylee
  36 +# /key1
  37 +# blob
  38 +# blob
  39 +# /key2
  40 +# blob
  41 +# blob
  42 +# /key3
  43 +
22 44 s.mapfn = mapfn
23 45 s.reducefn = reducefn
24   -s.data = data
  46 +s.datafn = datafn
25 47
26 48 start = time.time()
27 49 s.start()
@@ -29,3 +51,7 @@ def reducefn(k, v):
29 51
30 52 print stop-start
31 53 #print s.results()
  54 +print sorted(s.results().iteritems(), key=lambda x: x[1], reverse=True)[1:25]
  55 +
  56 +# Use a multiprocessing Pool example! Not the general use case
  57 +# though!
175 kaylee/client.py
... ... @@ -1,25 +1,29 @@
1 1 import sys
2 2 import uuid
3   -import cPickle as pickle
  3 +import numpy
4 4 import marshal
5 5 import types
6 6 import logging
  7 +
7 8 import gevent
8   -from gevent_zeromq import zmq
9   -from utils import cat
  9 +import zmq.green as zmq
  10 +
  11 +try:
  12 + import msgpack as srl
  13 +except ImportError:
  14 + import cPickle as srl
  15 +
10 16 from collections import defaultdict
11 17
12   -class Client:
  18 +class Client(object):
13 19
14 20 def __init__(self):
15 21
16 22 self.worker_id = str(uuid.uuid4())
17 23 self.push_socket = None
18 24 self.pull_socket = None
19   - self.control_socket = None
20   - self.delim = '::'
  25 + self.ctrl_socket = None
21 26
22   - # only listen for instructions for this specific worker
23 27 self.threaded = False
24 28 self.have_bytecode = False
25 29
@@ -86,12 +90,9 @@ def connect(self, push_addr = None,
86 90
87 91 print addr
88 92
89   - self.control_socket = c.socket(zmq.SUB)
90   - self.control_socket.connect(addr)
91   - self.control_socket.setsockopt(zmq.SUBSCRIBE, self.worker_id)
92   -
93   - def spawn(self):
94   - self.threaded = True
  93 + self.ctrl_socket = c.socket(zmq.ROUTER)
  94 + self.ctrl_socket.setsockopt(zmq.IDENTITY, self.worker_id)
  95 + self.ctrl_socket.connect(addr)
95 96
96 97 def start(self):
97 98 ''' Start processing work '''
@@ -99,98 +100,87 @@ def start(self):
99 100 self.collect()
100 101
101 102 def _kill(self):
102   - # Garbage collect the sockets to avoid weirdness
103   - self.control_socket.close()
  103 + self.ctrl_socket.close()
104 104 self.pull_socket.close()
105 105 self.push_socket.close()
106 106
107   - self.control_socket = None
108   - self.pull_socket = None
109   - self.push_socket = None
  107 + self.ctrl_socket = None
  108 + self.pull_socket = None
  109 + self.push_socket = None
110 110 self.logging.info('Stopped Worker')
111 111
112   - if self.threaded:
113   - gevent.getcurrent().kill()
114   - else:
115   - sys.exit(1)
  112 + sys.exit(0)
116 113
117 114 def collect(self):
118   - self.register()
119 115 poller = zmq.Poller()
120 116 poller.register(self.pull_socket, zmq.POLLIN)
121   - poller.register(self.control_socket, zmq.POLLIN)
  117 + poller.register(self.ctrl_socket, zmq.POLLIN)
122 118
123 119 # multiplex the pull and control ports
124 120 pull_socket = self.pull_socket
125   - control_socket = self.control_socket
  121 + ctrl_socket = self.ctrl_socket
126 122
127 123 while True:
128   - # Wait until the server pushes bytecode to use to
129   - # listening for data, ( this is a race condition
130   - # otherwise )
  124 +
131 125 if self.have_bytecode:
  126 +
132 127 try:
133   - socks = dict(poller.poll())
  128 + events = dict(poller.poll())
134 129 except zmq.ZMQError:
135 130 # Die gracefully if the user sends a SIGQUIT
136 131 self._kill()
137 132 break
138 133
139   - if pull_socket in socks and socks[pull_socket] == zmq.POLLIN:
140   - msg = self.pull_socket.recv()
  134 + if events.get(pull_socket) == zmq.POLLIN:
  135 +
  136 + command = self.pull_socket.recv(flags=zmq.SNDMORE)
  137 + key = self.pull_socket.recv(flags=zmq.SNDMORE)
  138 + data = self.pull_socket.recv(copy=False)
  139 + payload = (key, data)
141 140
142   - if msg:
143   - command, data = msg.split(self.delim)
144   - self.process_command(command, data)
  141 + self.process_command(command, payload)
145 142
146   - if control_socket in socks and socks[control_socket] == zmq.POLLIN:
147   - msg = self.control_socket.recv()
  143 + if events.get(ctrl_socket) == zmq.POLLIN:
  144 + worker_id, command = self.ctrl_socket.recv_multipart()
  145 + self.process_command(command, data)
148 146
149   - if msg:
150   - worker, command, data = msg.split(self.delim)
151   - self.process_command(command, data)
152 147 else:
153   - msg = self.control_socket.recv()
  148 + self.logging.info('Waiting for server')
154 149
155   - if msg:
156   - worker, command, data = msg.split(self.delim)
157   - self.process_command(command, data)
  150 + msg = srl.dumps(('connect', self.worker_id))
  151 + self.push_socket.send_multipart(['connect', self.worker_id])
158 152
159   - def register(self):
160   - '''
161   - Register the node with the server.
162   - '''
163   - self.send_command('connect', self.worker_id)
  153 + worker_id, payload = self.ctrl_socket.recv_multipart()
  154 + command, (mapbc, reducebc) = srl.loads(payload)
  155 +
  156 + assert command == 'bytecode'
  157 + self.set_bytecode(mapbc, reducebc)
  158 + self.logging.info('Received Bytecode')
164 159
165 160 def send_command(self, command, data=None):
166 161 '''
167   - Push a command to the sever.
  162 + Push a command to the server.
168 163 '''
169   - _d = self.delim
170   -
171 164 if data:
172   - pdata = pickle.dumps(data)
173   - self.push_socket.send(cat(command,_d,pdata))
174   - #logging.debug(command)
  165 + msg = srl.dumps((command, data))
  166 + self.push_socket.send(msg)
175 167 else:
176   - self.push_socket.send(cat(command,_d))
177   - #logging.debug(command)
  168 + msg = command
  169 + self.push_socket.send(msg)
178 170
179   - def set_bytecode(self, command, data):
  171 + def set_bytecode(self, mapbc, reducebc):
180 172 '''
181 173 Load the bytecode sent by the server and flag that we are
182 174 ready for work.
183 175 '''
184   - #self.logging.info('Received Bytecode')
185   - mapfn_bc, reducefn_bc = data
186 176
187 177 self.mapfn = types.FunctionType(
188   - marshal.loads(mapfn_bc),
  178 + marshal.loads(mapbc),
189 179 globals(),
190 180 'mapfn'
191 181 )
192 182 self.reducefn = types.FunctionType(
193   - marshal.loads(reducefn_bc),
  183 + marshal.loads(reducebc),
194 184 globals(),
195 185 'reducefn'
196 186 )
@@ -198,35 +188,60 @@ def set_bytecode(self, command, data):
198 188 self.have_bytecode = True
199 189
200 190 def on_done(self, command=None, data=None):
201   - #self.logging.info('Done')
202 191 self._kill()
203 192
204 193 def call_mapfn(self, command, data):
205   - results = defaultdict(list)
  194 + #results = defaultdict(list)
206 195 key, value = data
207 196
208 197 for k, v in self.mapfn(key, value):
209   - results[k].append(v)
  198 + print 'mapping', k, v
  199 + # Probably don't actually want to do this, but
  200 + # instead collect up a temporray batch and then do a
  201 + # tight loop where we send everything.
  202 +
  203 + self.push_socket.send('mapdone', flags=zmq.SNDMORE)
  204 + self.push_socket.send(key, flags=zmq.SNDMORE)
  205 + self.push_socket.send(k, flags=zmq.SNDMORE)
  206 + self.push_socket.send(srl.dumps(v))
  207 + #results[k].append(v)
210 208
211   - self.send_command('mapdone', (key, results))
  209 + self.push_socket.send('keydone', flags=zmq.SNDMORE)
  210 + self.push_socket.send(key)
  211 +
  212 + #print 'mapping', key
  213 + #import pdb; pdb.set_trace()
  214 +
  215 + #if isinstance(results, numpy.ndarray):
  216 + #self.push_socket.send(results, copy=False)
  217 + #else:
  218 + #self.push_socket.send(srl.dumps(results))
212 219
213 220 def call_reducefn(self, command, data):
214 221 key, value = data
215   - results = self.reducefn(key, value)
216   - self.send_command('reducedone', (key, results))
217   -
218   - def process_command(self, command, data=None):
219   - commands = {
220   - 'bytecode': self.set_bytecode,
221   - 'done': self.on_done,
222   - 'map': self.call_mapfn,
223   - 'reduce': self.call_reducefn,
224   - }
225   -
226   - if command in commands:
227   - if data:
228   - data = pickle.loads(data)
229   - commands[command](command, data)
  222 +
  223 + from itertools import imap
  224 + it = imap(srl.loads, srl.loads(value))
  225 +
  226 + results = self.reducefn(key, it)
  227 +
  228 + print 'reducing', key
  229 + self.push_socket.send('reducedone', flags=zmq.SNDMORE)
  230 + self.push_socket.send(key, flags=zmq.SNDMORE)
  231 +
  232 + if isinstance(results, numpy.ndarray):
  233 + self.push_socket.send(results, copy=False)
  234 + else:
  235 + self.push_socket.send(srl.dumps(results))
  236 +
  237 + def process_command(self, command, payload=None):
  238 + self.commands[command](self, command, payload)
  239 +
  240 + commands = {
  241 + 'done' : on_done,
  242 + 'map' : call_mapfn,
  243 + 'reduce' : call_reducefn,
  244 + }
230 245
231 246 if __name__ == "__main__":
232 247 c = Client()
385 kaylee/server.py
... ... @@ -1,18 +1,24 @@
1 1 import random
2 2 import marshal
3   -import cPickle as pickle
4 3 import logging
5 4 import gevent
6   -from gevent_zeromq import zmq
7   -from utils import cat
  5 +
  6 +import zmq.green as zmq
8 7 from collections import defaultdict
9 8
10   -START = 0
11   -MAP = 1
12   -REDUCE = 2
13   -FINISHED = 3
  9 +START = 0
  10 +MAP = 1
  11 +SHUFFLE = 2
  12 +PARTITION = 3
  13 +REDUCE = 4
  14 +COLLECT = 5
  15 +
  16 +try:
  17 + import msgpack as srl
  18 +except ImportError:
  19 + import cPickle as srl
14 20
15   -class Server:
  21 +class Server(object):
16 22
17 23 def __init__(self):
18 24
@@ -21,224 +27,242 @@ def __init__(self):
21 27
22 28 self.mapfn = None
23 29 self.reducefn = None
24   - self.data = None
  30 + self.datafn = None
  31 +
25 32 self.bytecode = None
26 33
27 34 self.started = False
28 35 self.completed = False
29   - self.delim = '::'
  36 +
  37 + self.working_maps = {}
30 38
31 39 logging.basicConfig(logging=logging.DEBUG,
32   - format="%(asctime)s [%(levelname)s] %(message)s")
  40 + format="%(asctime)s [%(levelname)s] %(message)s")
33 41 logging.getLogger("").setLevel(logging.INFO)
34 42 self.logging = logging
35 43
36   - def connect(self, push_addr = None,
37   - pull_addr = None,
38   - control_addr = None):
39   -
40   - c = zmq.Context()
41   -
42   - # Pull tasks across manager
43   - if not pull_addr:
44   - prot = 'tcp://'
45   - ip = '127.0.0.1'
46   - port = '6666'
47   - addr = ''.join([prot,ip,':',port])
48   - elif len(pull_addr) > 1:
49   - prot, ip, port = pull_addr
50   - addr = ''.join([prot,ip,':',port])
51   - else:
52   - addr = pull_addr
53   -
54   - print addr
55   -
56   - self.pull_socket = c.socket(zmq.PULL)
57   - self.pull_socket.bind(addr)
58   -
59   - # Pull tasks across manager
60   - if not push_addr:
61   - prot = 'tcp://'
62   - ip = '127.0.0.1'
63   - port = '5555'
64   - addr = ''.join([prot,ip,':',port])
65   - elif len(push_addr) > 1:
66   - prot, ip, port = push_addr
67   - addr = ''.join([prot,ip,':',port])
68   - else:
69   - addr = push_addr
70   -
71   - print addr
72   -
73   - self.push_socket = c.socket(zmq.PUSH)
74   - self.push_socket.bind(addr)
75   -
76   - # Pull tasks across manager
77   - if not control_addr:
78   - prot = 'tcp://'
79   - ip = '127.0.0.1'
80   - port = '7777'
81   - addr = ''.join([prot,ip,':',port])
82   - elif len(control_addr) > 1:
83   - prot, ip, port = control_addr
84   - addr = ''.join([prot,ip,':',port])
85   - else:
86   - addr = control_addr
  44 + def main_loop(self):
  45 + self.started = True
  46 +
  47 + poller = zmq.Poller()
87 48
88   - print addr
  49 + poller.register(self.pull_socket, zmq.POLLIN)
  50 + poller.register(self.push_socket, zmq.POLLOUT)
  51 + poller.register(self.ctrl_socket, zmq.POLLOUT)
89 52
90   - self.control_socket = c.socket(zmq.PUB)
91   - self.control_socket.bind(addr)
  53 + while self.started and not self.completed:
  54 + try:
  55 + events = dict(poller.poll())
  56 + except zmq.ZMQError:
  57 + self._kill()
  58 + break
  59 +
  60 + # Specify number of nodes to requeset
  61 + if len(self.workers) > 0:
  62 + if events.get(self.push_socket) == zmq.POLLOUT:
  63 + self.start_new_task()
  64 + if events.get(self.ctrl_socket) == zmq.POLLIN:
  65 + self.manage()
  66 + if events.get(self.pull_socket) == zmq.POLLIN:
  67 + self.collect_task()
  68 + else:
  69 + if events.get(self.pull_socket) == zmq.POLLIN:
  70 + self.collect_task()
  71 + if events.get(self.ctrl_socket) == zmq.POLLIN:
  72 + self.manage()
  73 +
  74 + def connect(self, push_addr = None, pull_addr = None, control_addr = None):
  75 + c = zmq.Context()
  76 +
  77 + # Pull tasks across manager
  78 + if not pull_addr:
  79 + prot = 'tcp://'
  80 + ip = '127.0.0.1'
  81 + port = '6666'
  82 + addr = ''.join([prot,ip,':',port])
  83 + elif len(pull_addr) > 1:
  84 + prot, ip, port = pull_addr
  85 + addr = ''.join([prot,ip,':',port])
  86 + else:
  87 + addr = pull_addr
92 88
  89 + print addr
93 90
94   - def start(self, timeout=None):
  91 + self.pull_socket = c.socket(zmq.PULL)
  92 + self.pull_socket.bind(addr)
95 93
96   - self.started = True
97   - self.logging.info('Started Server')
  94 + if not push_addr:
  95 + prot = 'tcp://'
  96 + ip = '127.0.0.1'
  97 + port = '5555'
  98 + addr = ''.join([prot,ip,':',port])
  99 + elif len(push_addr) > 1:
  100 + prot, ip, port = push_addr
  101 + addr = ''.join([prot,ip,':',port])
  102 + else:
  103 + addr = push_addr
  104 +
  105 + print addr
  106 +
  107 + self.push_socket = c.socket(zmq.PUSH)
  108 + self.push_socket.bind(addr)
  109 +
  110 + # Pull tasks across manager
  111 + if not control_addr:
  112 + prot = 'tcp://'
  113 + ip = '127.0.0.1'
  114 + port = '7777'
  115 + addr = ''.join([prot,ip,':',port])
  116 + elif len(control_addr) > 1:
  117 + prot, ip, port = control_addr
  118 + addr = ''.join([prot,ip,':',port])
  119 + else:
  120 + addr = control_addr
98 121
99   - try:
100   - if timeout:
101   - timeout = gevent.Timeout(timeout, gevent.Timeout)
  122 + print 'Control Socket', addr
102 123
103   - self.start_new_task()
104   - # Block until we collect all data
105   - gevent.spawn(self.collect).join(timeout=timeout)
  124 + self.ctrl_socket = c.socket(zmq.ROUTER)
  125 + self.ctrl_socket.bind(addr)
106 126
107   - except KeyboardInterrupt:
108   - self.started = False
109   - self.logging.info('Stopped Server')
  127 + def start(self, timeout=None):
  128 + self.gen_bytecode()
  129 + self.logging.info('Started Server')
110 130
111   - except gevent.Timeout:
112   - self.started = False
113   - self.logging.info('Timed out')
  131 + main = gevent.spawn(self.main_loop)
  132 + main.join()
114 133
115 134 self.done()
116 135
117 136 def done(self):
118 137 for worker in self.workers:
119   - self.send_control('done',None,worker)
  138 + self.ctrl_socket.send_multipart([worker, 'done'])
120 139
121 140 def _kill(self):
122 141 gevent.getcurrent().kill()
123 142
124   - def collect(self):
125   - while True:
126   - msg = self.pull_socket.recv()
127   -
128   - if msg:
129   - command, data = msg.split(self.delim)
130   - self.process_command(command, data)
131   -
132 143 def results(self):
133 144 if self.completed:
134 145 return self.reduce_results
135 146 else:
136 147 return None
137 148
138   - #@print_timing
139   - def send_control(self, command, data, worker):
140   - _d = self.delim
141   - self.logging.debug('Sending to: %s' % worker)
142   - if data:
143   - pdata = pickle.dumps(data)
144   - #logging.debug( "<- %s" % command)
145   - self.control_socket.send(cat(worker,_d,command,_d,pdata))
  149 + def send_datum(self, command, key, data):
  150 + self.push_socket.send(command, flags=zmq.SNDMORE)
  151 + self.push_socket.send(str(key), flags=zmq.SNDMORE)
  152 + # Do a multipart message since we want to do
  153 + # zero-copy of data.
  154 +
  155 + if self.state == MAP:
  156 + self.push_socket.send(data, copy=False)
146 157 else:
147   - #logging.debug( "<- %s" % command)
148   - self.control_socket.send(cat(worker,_d,command ,_d))
149   -
150   - #@print_timing
151   - def send_command(self, command, data=None):
152   - _d = self.delim
153   - if data:
154   - pdata = pickle.dumps(data)
155   - #logging.debug( "<- %s" % command)
156   - self.push_socket.send(cat(command,_d, pdata))
  158 + self.push_socket.send(srl.dumps(data))
  159 +
  160 + def send_command(self, command, payload=None):
  161 + if payload:
  162 + self.send_datum(command, *payload)
157 163 else:
158   - #logging.debug( "<- %s" % command)
159   - self.push_socket.send(cat(command ,_d))
  164 + self.push_socket.send(command)
160 165
161 166 def start_new_task(self):
162   - command, data = self.next_task()
163   - if command:
  167 + action = self.next_task()
  168 + if action:
  169 + command, data = action
164 170 self.send_command(command, data)
165   - #gevent.spawn(self.send_command, command, data)
166 171
167 172 def next_task(self):
168 173
169 174 if self.state == START:
170 175
171   - self.map_iter = iter(self.data)
172   - self.working_maps = {}
  176 + #self.job_id = 'foo'
  177 + self.map_iter = self.datafn()
173 178 self.map_results = defaultdict(list)
174 179 self.state = MAP
175 180 self.logging.info('Mapping')
176 181
177 182 if self.state == MAP:
  183 +
178 184 try:
  185 + map_key, map_item = self.map_iter.next()
  186 + self.working_maps[str(map_key)] = map_item
  187 + #print 'sending', map_key
  188 + return 'map', (map_key, map_item)
  189 + except StopIteration:
  190 + self.logging.info('Shuffling')
  191 + self.state = SHUFFLE
179 192
180   - map_key = self.map_iter.next()
181   - map_item = map_key, self.data[map_key]
182   - self.working_maps[map_item[0]] = map_item[1]
183   - return 'map', map_item
  193 + if self.state == SHUFFLE:
  194 + self.reduce_iter = self.map_results.iteritems()
  195 + self.working_reduces = set()
  196 + self.reduce_results = {}
184 197
185   - except StopIteration:
186   - if len(self.working_maps) > 0:
187   - key = random.choice(self.working_maps.keys())
188   - return 'map', (key, self.working_maps[key])
189   - self.state = REDUCE
190   - self.reduce_iter = self.map_results.iteritems()
191   - self.working_reduces = {}
192   - self.reduce_results = {}
  198 + if len(self.working_maps) == 0:
193 199 self.logging.info('Reducing')
  200 + self.state = PARTITION
  201 + #else:
  202 + #self.logging.info('Still shuffling %s ' % len(self.working_maps))
  203 +
  204 + if self.state == PARTITION:
  205 + self.state = REDUCE
194 206
195 207 if self.state == REDUCE:
  208 +
196 209 try:
  210 + reduce_key, reduce_value = self.reduce_iter.next()
  211 + self.working_reduces.add(reduce_key)
  212 + return 'reduce', (reduce_key, reduce_value)
  213 + except StopIteration:
  214 + self.logging.info('Collecting')
  215 + self.state = COLLECT
197 216
198   - reduce_item = self.reduce_iter.next()
199   - self.working_reduces[reduce_item[0]] = reduce_item[1]
200   - return 'reduce', reduce_item
  217 + if self.state == COLLECT:
201 218
202   - except StopIteration:
  219 + if len(self.working_reduces) == 0:
  220 + self.completed = True
  221 + self.logging.info('Finished')
  222 + #else:
  223 + #self.logging.info('Still collecting %s' % len(self.working_reduces))
203 224
204   - if len(self.working_reduces) > 0:
205   - key = random.choice(self.working_reduces.keys())
206   - return 'reduce', (key, self.working_reduces[key])
  225 + def collect_task(self):
  226 + # Don't use the results if they've already been counted
  227 + command = self.pull_socket.recv(flags=zmq.SNDMORE)
207 228
208   - self.state = FINISHED
  229 + if command == 'connect':
  230 + payload = self.pull_socket.recv()
  231 + self.on_connect(payload)
209 232
210   - if self.state == FINISHED:
211   - self.completed = True
212   - # Destroy the collector thread
213   - self._kill()
  233 + elif command == 'keydone':
  234 + key = self.pull_socket.recv()
  235 + del self.working_maps[key]
214 236
215   - def map_done(self, data):
216   - # Don't use the results if they've already been counted
217   - key, value = data
218   - if key not in self.working_maps:
219   - return
  237 + elif command == 'mapdone':
  238 + key = self.pull_socket.recv(flags=zmq.SNDMORE)
  239 + tkey = self.pull_socket.recv(flags=zmq.SNDMORE)
  240 + value = self.pull_socket.recv()
220 241
221   - for k, v in value.iteritems():
222   - self.map_results[k].extend(v)
  242 + #print tkey, key, value
  243 + self.map_results[tkey].extend(value)
223 244
224   - del self.working_maps[key]
  245 + #del self.working_maps[key]
225 246
226   - def reduce_done(self, data):
227   - # Don't use the results if they've already been counted
228   - key, value = data
229   - if key not in self.working_reduces:
230   - return
  247 + elif command == 'reducedone':
  248 + key = self.pull_socket.recv(flags=zmq.SNDMORE)
  249 + value = srl.loads(self.pull_socket.recv())
  250 +
  251 + # Don't use the results if they've already been counted
  252 + if key not in self.working_reduces:
  253 + return
  254 +
  255 + self.reduce_results[key] = value
  256 + self.working_reduces.remove(key)
231 257
232   - self.reduce_results[key] = value
233   - del self.working_reduces[key]
  258 + else:
  259 + raise RuntimeError()
234 260
235 261 def on_map_done(self, command, data):
236 262 self.map_done(data)
237   - self.start_new_task()
238 263
239 264 def on_reduce_done(self, command, data):
240 265 self.reduce_done(data)
241   - self.start_new_task()
242 266
243 267 def gen_bytecode(self):
244 268 self.bytecode = (
@@ -246,34 +270,31 @@ def gen_bytecode(self):
246 270 marshal.dumps(self.reducefn.func_code),
247 271 )
248 272
249   - def on_connect(self, command, data):
250   - self.logging.info('Worker Registered: %s' % data)
251   - self.workers.add(data)
252   - worker_id = data
  273 + def on_connect(self, worker_id):
  274 + if worker_id not in self.workers:
  275 + self.logging.info('Worker Registered: %s' % worker_id)
  276 + self.workers.add(worker_id)
253 277
254   - # Store this so we don't call it for every worker
255   - if not self.bytecode:
256   - self.gen_bytecode()
  278 + payload = ('bytecode', self.bytecode)
  279 + self.ctrl_socket.send_multipart([worker_id, srl.dumps(payload)])
  280 + self.logging.info('Sending Bytecode')
  281 + else:
  282 + print worker_id
257 283
258   - self.send_control(
259   - 'bytecode',
260   - self.bytecode,
261   - worker_id
262   - )
  284 + def process_command(self, command, data=None):
  285 + self.commands[command](self, command, data)
263 286
264   - self.logging.info('Sending Bytecode')
265   - self.start_new_task()
  287 + commands = {
  288 + 'mapdone' : on_map_done,
  289 + 'reducedone' : on_reduce_done,
  290 + 'connect' : on_connect
  291 + }
266 292
267   - def process_command(self, command, data=None):
268   - commands = {
269   - 'mapdone': self.on_map_done,
270   - 'reducedone': self.on_reduce_done,
271   - 'connect': self.on_connect
272   - }
273   -
274   - if command in commands:
275   - if data:
276   - data = pickle.loads(data)
277   - commands[command](command, data)
278   - else:
279   - self.process_command(self, command, data)
  293 +if __name__ == '__main__':
  294 +
  295 + # Support Cython!
  296 + import sys
  297 + import imp
  298 +
  299 + path = sys.argv[1]
  300 + imp.load_module(path)
8 kaylee/utils.py
... ... @@ -1,4 +1,5 @@
1 1 import time
  2 +import msgpack
2 3
3 4 def cat(*xs):
4 5 return "".join(xs)
@@ -11,3 +12,10 @@ def wrapper(*arg):
11 12 print '%s took %0.3f ms' % (func.func_name, (t2-t1)*1000.0)
12 13 return res
13 14 return wrapper
  15 +
  16 +def sub_subscription_prefix(worker_id, n=3):
  17 + """
  18 + Listen for n-tuples with the worker id prefix without
  19 + deserialization. Very fast.
  20 + """
  21 + return msgpack.dumps(tuple([worker_id] + [None]*(n-1)))[0:2]
22,108 mobydick.txt
22,108 additions, 0 deletions not shown
6 requirements.txt
... ... @@ -1,3 +1,3 @@
1   -gevent
2   -pyzmq
3   -gevent-zeromq
  1 +gevent==0.13.8
  2 +pyzmq>=2.2
  3 +msgpack-python==0.2.1

0 comments on commit fc8cb26

Please sign in to comment.
Something went wrong with that request. Please try again.