Skip to content
This repository has been archived by the owner on Oct 25, 2023. It is now read-only.

Commit

Permalink
Migrated docs.
Browse files Browse the repository at this point in the history
  • Loading branch information
sdiehl committed Sep 23, 2012
1 parent a64f1d3 commit cb72025
Show file tree
Hide file tree
Showing 3 changed files with 31 additions and 32 deletions.
30 changes: 30 additions & 0 deletions README.md
Expand Up @@ -14,6 +14,36 @@ not generally a good idea for performance. But this lets us a implement
a simple shuffler using a Python defaultdict in just a few lines of code
which is easy to understand.

Theory
======

MapReduce can be thought of on a high level as being a list
homomorphism that can be written as a composition of two functions (
Reduce . Map ) . It is parallelizable because of the associativity of
the of map and reduce operations.

```haskell
MapReduce :: [(k1, v1)] -> [(k3, v3)]
MapReduce = Reduce . Map

MapReduce :: a -> [(k3, v3)]
MapReduce = reducefn . shuffle . mapfn . datafn
```

The implementation provides two functions
split ( datafn ) and shuffle.

```haskell
shuffle :: [ (k2, v2) ] -> [(k2, [v2])]
```

The user provides map and reduce.

```haskell
map :: (k1,v1) -> [ (k2, v2) ]
reduce :: (k2, [v2]) -> [ (k3, v3) ]
```

Directions:
===========

Expand Down
25 changes: 0 additions & 25 deletions example.py
Expand Up @@ -12,40 +12,18 @@
# This just enumerates all lines in the file, but is able to
# get data from disk into ZeroMQ much faster than read/writes.

# Do an HDF5 data source example!

# map :: a -> [ (k1, v1) ]
def datafn():
i = count(0)
total = mm.size()
while mm.tell() < total:
yield next(i), memoryview(mm.readline())
mm.close()

# MapReduce can be thought of on a high level as being a list
# homomorphism that can be written as a composition of two functions (
# Reduce . Map ) . It is parallelizable because of the associativity of
# the of map and reduce operations.
#
# MapReduce :: [(k1, v1)] -> [(k3, v3)]
# MapReduce = Reduce . Map

# MapReduce :: a -> [(k3, v3)]
# MapReduce = reducefn . shuffle . mapfn . datafn

# The implementation provides two functions
# split ( datafn ) and shuffle.

# map :: (k1,v1) -> [ (k2, v2) ]
def mapfn(k1, v):
for w in v.bytes.split():
yield w, 1

# shuffle :: [ (k2, v2) ] -> [(k2, [v2])]

# In the Haskell notation
# pmap reducefn ( shuffle ( pmap mapfn ( split a ) ) )

# reduce :: (k2, [v2]) -> [ (k3, v3) ]
def reducefn(k2, v):
return sum(v)
Expand All @@ -54,9 +32,6 @@ def reducefn(k2, v):
s = Server()
s.connect()

# yaml config
# Datastore backend, Redis kaylee://

s.mapfn = mapfn
s.reducefn = reducefn
s.datafn = datafn
Expand Down
8 changes: 1 addition & 7 deletions kaylee/server.py
Expand Up @@ -94,7 +94,7 @@ def main_loop(self):
break

# TODO: Specify number of nodes
if len(self.workers) > 5:
if len(self.workers) > 0:
if events.get(self.push_socket) == zmq.POLLOUT:
self.start_new_task()
if events.get(self.ctrl_socket) == zmq.POLLIN:
Expand Down Expand Up @@ -291,12 +291,6 @@ def collect_task(self):
else:
raise RuntimeError("Unknown wire chatter")

def on_map_done(self, command, data):
self.map_done(data)

def on_reduce_done(self, command, data):
self.reduce_done(data)

def gen_bytecode(self):
self.bytecode = (
marshal.dumps(self.mapfn.func_code),
Expand Down

0 comments on commit cb72025

Please sign in to comment.