diff --git a/README.md b/README.md index 834d7fd..67edaec 100644 --- a/README.md +++ b/README.md @@ -14,6 +14,36 @@ not generally a good idea for performance. But this lets us a implement a simple shuffler using a Python defaultdict in just a few lines of code which is easy to understand. +Theory +====== + +MapReduce can be thought of on a high level as being a list +homomorphism that can be written as a composition of two functions ( +Reduce . Map ) . It is parallelizable because of the associativity of +the of map and reduce operations. + +```haskell +MapReduce :: [(k1, v1)] -> [(k3, v3)] +MapReduce = Reduce . Map + +MapReduce :: a -> [(k3, v3)] +MapReduce = reducefn . shuffle . mapfn . datafn +``` + +The implementation provides two functions +split ( datafn ) and shuffle. + +```haskell +shuffle :: [ (k2, v2) ] -> [(k2, [v2])] +``` + +The user provides map and reduce. + +```haskell +map :: (k1,v1) -> [ (k2, v2) ] +reduce :: (k2, [v2]) -> [ (k3, v3) ] +``` + Directions: =========== diff --git a/example.py b/example.py index ee0b5f6..9b94ee3 100644 --- a/example.py +++ b/example.py @@ -12,9 +12,6 @@ # This just enumerates all lines in the file, but is able to # get data from disk into ZeroMQ much faster than read/writes. -# Do an HDF5 data source example! - -# map :: a -> [ (k1, v1) ] def datafn(): i = count(0) total = mm.size() @@ -22,30 +19,11 @@ def datafn(): yield next(i), memoryview(mm.readline()) mm.close() -# MapReduce can be thought of on a high level as being a list -# homomorphism that can be written as a composition of two functions ( -# Reduce . Map ) . It is parallelizable because of the associativity of -# the of map and reduce operations. -# -# MapReduce :: [(k1, v1)] -> [(k3, v3)] -# MapReduce = Reduce . Map - -# MapReduce :: a -> [(k3, v3)] -# MapReduce = reducefn . shuffle . mapfn . datafn - -# The implementation provides two functions -# split ( datafn ) and shuffle. - # map :: (k1,v1) -> [ (k2, v2) ] def mapfn(k1, v): for w in v.bytes.split(): yield w, 1 -# shuffle :: [ (k2, v2) ] -> [(k2, [v2])] - -# In the Haskell notation -# pmap reducefn ( shuffle ( pmap mapfn ( split a ) ) ) - # reduce :: (k2, [v2]) -> [ (k3, v3) ] def reducefn(k2, v): return sum(v) @@ -54,9 +32,6 @@ def reducefn(k2, v): s = Server() s.connect() -# yaml config -# Datastore backend, Redis kaylee:// - s.mapfn = mapfn s.reducefn = reducefn s.datafn = datafn diff --git a/kaylee/server.py b/kaylee/server.py index 9f92eec..f2d3aae 100644 --- a/kaylee/server.py +++ b/kaylee/server.py @@ -94,7 +94,7 @@ def main_loop(self): break # TODO: Specify number of nodes - if len(self.workers) > 5: + if len(self.workers) > 0: if events.get(self.push_socket) == zmq.POLLOUT: self.start_new_task() if events.get(self.ctrl_socket) == zmq.POLLIN: @@ -291,12 +291,6 @@ def collect_task(self): else: raise RuntimeError("Unknown wire chatter") - def on_map_done(self, command, data): - self.map_done(data) - - def on_reduce_done(self, command, data): - self.reduce_done(data) - def gen_bytecode(self): self.bytecode = ( marshal.dumps(self.mapfn.func_code),