Skip to content

Commit

Permalink
Merge branch 'release/0.1.0'
Browse files Browse the repository at this point in the history
  • Loading branch information
Álvaro Justen aka Turicas committed Oct 15, 2012
2 parents 2b8c1a5 + d4988e6 commit bcd9634
Show file tree
Hide file tree
Showing 33 changed files with 3,445 additions and 0 deletions.
14 changes: 14 additions & 0 deletions .gitignore
@@ -0,0 +1,14 @@
.*.sw?
*.pyc
*~
.idea/*
dist/*
build/*
pypln.egg-info/*
.idea/*
.coverage
reg_settings.py
MANIFEST
.directory
*.db
.env
10 changes: 10 additions & 0 deletions CHANGELOG.markdown
@@ -0,0 +1,10 @@
pypelinin's ChangeLog
=====================


Version 0.1.0
-------------

- First version released!
- Have Router, Broker and Pipeliner on "server-side"
- Have Job, Pipeline and PipelineManager on "client-side"
2 changes: 2 additions & 0 deletions MANIFEST.in
@@ -0,0 +1,2 @@
include README.markdown
include CHANGELOG.markdown
33 changes: 33 additions & 0 deletions Makefile
@@ -0,0 +1,33 @@
TEST_RUNNER=nosetests -dsv --with-yanc

clean:
find -regex '.*\.pyc' -exec rm {} \;
find -regex '.*~' -exec rm {} \;
rm -rf build/ reg_settings.py*

bootstrap-environment:
pip install -r requirements/development.txt

bootstrap-tests: clean
clear
python setup.py install

test: bootstrap-tests
${TEST_RUNNER} tests/

test-router: bootstrap-tests
${TEST_RUNNER} -x tests/test_router.py

test-client: bootstrap-tests
${TEST_RUNNER} --with-coverage --cover-package=pypelinin.client tests/test_client.py

test-broker: bootstrap-tests
${TEST_RUNNER} -x tests/test_broker.py

test-pipeline: bootstrap-tests
${TEST_RUNNER} --with-coverage --cover-package=pypelinin.pipeline tests/test_pipeline.py

test-pipeliner: bootstrap-tests
${TEST_RUNNER} -x tests/test_pipeliner.py

.PHONY: clean bootstrap-environment bootstrap-tests test test-router test-client test-broker test-pipeline test-pipeliner
172 changes: 172 additions & 0 deletions README.markdown
@@ -0,0 +1,172 @@
pypelinin'
==========

`pypelinin` is a python library to distribute jobs and pipelines among a
cluster. It uses ZeroMQ as its foundation framework for communication between
the daemons.


Architecture
------------

We have 3 daemons you need to run:

- **Router**: it's the central point of communication of the network. Every
pipeline you need to execute should be asked to Router to add it and every
other daemon will communicate with Router to get a pipeline to execute and
other things. You can have only one Router running.
- **Broker**: it run worker processes and execute jobs. It does not know about
an entire pipeline, it just receives a job to be executed, retrieve
needed information for that job, run the worker and then save information
returned by the worker. It uses a class defined by you (`StoreClass`) to
retrieve/save information. You should run as many Brokers as possible in your
cluster, to increase throughtput of job/pipeline execution.
- **Pipeliner**: It take cares of pipelines. This daemon do not know how to
save/retrieve or even execute jobs, but it knows which job should be executed
after another one in a pipeline. Router will give Pipeliner a pipeline and it
will ask for job execution (to Router, that will be sent to Broker). You can
run as many Pipeliners you want (but just one can handle lots of pipelines
simultaneously).


Installation
------------

First you need to install `libzmq`, its headers and compilers needed to compile
it. On a Debian/Ubuntu machine, run:

sudo aptitude install libzmq libzmq-dev build-essential

Then, install the Python package:

pip install pypelinin


Usage
-----

### Daemons

For each daemon, you need to create a script that instantiates the daemon class
and start it. Please check our
[example](https://github.com/turicas/pypelinin/tree/develop/example)
(files `example/my_router.py`, `example/my_broker.py` and
`example/my_pipeliner.py`).


### Client

You need to specify what jobs are in a pipeline and then send it to Router.
A pipeline is a
[directed acyclic graph](https://en.wikipedia.org/wiki/Directed_acyclic_graph)
(aka DAG) and is represented as a `dict`, where key/value pairs represent edges
(keys are "from" and values are "to" edges --
[see notes about this representation](http://www.python.org/doc/essays/graphs/)).


#### Example Creating a pipeline and submitting it to execution

```python
from pypelinin import Pipeline, Job, PipelineManager

pipeline = Pipeline({Job('WorkerName1'): Job('WorkerName2'),
Job('WorkerName2'): Job('WorkerName3')},
data={'foo': 'bar'})
```

In this pipeline, `Job('WorkernName2')` will be executed after
`Job('WorkerName1')` and `Job('WorkerName3')` after `Job('WorkerName2')` --
when you send it to `Pipeliner` (via `Router`), it'll take care of executing
the jobs in this order. `data` is what will be passed to `StoreClass` (that is
loaded on each `Broker`) when `Broker` needs to retrieve information from a
data store to pass it to a worker execute or to save information returned by
the worker.


```python

manager = PipelineManager(api='tcp://localhost:5555',
broadcast='tcp://localhost:5556')
manager.start(pipeline) # send it to the cluster to execute
while not manager.finished(pipeline): # wait for pipeline to finish
pass
print 'done'
```

Note that you need to create a `StoreClass` and the workers (each one is a
another class). These classes should be passed to a `Broker` when instantiated.


Tutorial
--------

Let's learn doing! Create a virtualenv, install pypelinin and then download our
`example` folder to see it working.

mkvirtualenv test-pypelinin
pip install pypelinin
wget https://github.com/turicas/pypelinin/tarball/develop -O pypelinin.tar.gz
tar xfz pypelinin.tar.gz && rm pypelinin.tar.gz
cd turicas-pypelinin-*/example/

Now your environment is created and you need to run the daemons, each one in a
separated terminal:

Router:

$ python my_router.py
2012-10-15 14:12:59,112 - My Router - INFO - Entering main loop

Broker:

$ python my_broker.py
2012-10-15 14:13:17,956 - Broker - INFO - Starting worker processes
2012-10-15 14:13:18,055 - Broker - INFO - Broker started
2012-10-15 14:13:18,056 - Broker - INFO - Trying to connect to router...
2012-10-15 14:13:18,057 - Broker - INFO - [API] Request to router: {'command': 'get configuration'}
2012-10-15 14:13:18,058 - Broker - INFO - [API] Reply from router: {u'monitoring interval': 60, u'store': {u'monitoring filename': u'/tmp/monitoring.log'}}

And Pipeliner:

$ python my_pipeliner.py
2012-10-15 14:13:56,476 - Pipeliner - INFO - Pipeliner started
2012-10-15 14:13:56,477 - Pipeliner - INFO - Entering main loop
2012-10-15 14:13:56,477 - Pipeliner - INFO - Bad bad router, no pipeline for me.

Please read the files:
- `file\_store.py` - we have a simple StoreClass which saves and retrieves
information from files. You can modify it easily to use a database.
- `workers.py` (and `test\_workers.py`) - we have created 3 workers:
`Downloader`, `GetTextAndWords` and `GetLinks`. The first one is required to
execute the last two. Each worker is basically a class that inherites from
`pypelinin.Worker`, have an attribute `requires` and a method `process`.
- `send\_pipelines.py` - this script basically creates some `Pipeline`s and
send it to execution using a `PipelineManager` (as the example above). You
need to run it to get the jobs executed.

After executing `send\_pipelines.py` you can check files
`/tmp/{0,1,2,3,4}.data` to see the results -- these files are python
dictionaries encoded as JSON (this was done by `file\_store.SimpleFileStore`).
To read one of these files, just call this function:

```python
import json

def read_result_file(filename):
with open(filename, 'r') as fp:
data = fp.read()
return json.loads(data)
```

### Installing on other cluster nodes

If you want to process more jobs/pipelines per second, you need to run more
Brokers on another machines. To do it, you need to:

- Be sure `Router` is binding to an interface that is reachable to all machines
that will run `Broker` and `Pipeline` (change `my\_router.py`);
- Change `my\_broker.py` with new `Router` ip address/ports;
- Install `pypelinin` in all cluster machines;
- Copy `my\_broker.py`, `file\_store.py` and `workers.py` to all
"Broker machines";
- Run everything!
52 changes: 52 additions & 0 deletions example/file_store.py
@@ -0,0 +1,52 @@
# coding: utf-8

import json


class SimpleFileStore(object):
def __init__(self, **configuration):
self.monitoring_fp = open(configuration['monitoring filename'], 'w')

def retrieve(self, info):
'''Retrieve data to pass to `WorkerClass.process`
`info` has keys 'worker', 'worker_requires' and 'data':
- 'data' comes from pipeline data
- 'worker' is the worker name
- 'worker_requires' is 'requires' attribute of WorkerClass
'''
filename = info['data']['filename'] # get filename with data
worker_requires = info['worker_requires']
with open(filename, 'r') as fp:
file_data = json.loads(fp.read().strip()) # read filename
# get only information this worker needs
worker_input = {key: file_data[key] for key in worker_requires}
return worker_input

def save(self, info):
'''Save information returned by `WorkerClass.process`
`info` has keys 'worker', 'worker_requires', 'worker_result' and 'data':
- 'data' comes from pipeline data
- 'worker' is the worker name
- 'worker_requires' is 'requires' attribute of WorkerClass
- 'worker_result' is what WorkerClass.process returned
'''
# read information from file
filename = info['data']['filename']
with open(filename, 'r') as fp:
file_data = json.loads(fp.read().strip())

# update file with information returned by worker
worker_result = info['worker_result']
file_data['_result-from-{}'.format(info['worker'])] = worker_result
for key, value in worker_result.items():
file_data[key] = value
with open(filename, 'w') as fp:
fp.write(json.dumps(file_data))

def save_monitoring(self, data):
# serialize monitoring information to JSON and save in a file
data_as_json_string = json.dumps(data)
self.monitoring_fp.write(data_as_json_string + "\n")
self.monitoring_fp.flush()
33 changes: 33 additions & 0 deletions example/my_broker.py
@@ -0,0 +1,33 @@
#!/usr/bin/env python
# coding: utf-8

from logging import Logger, StreamHandler, Formatter, NullHandler
from multiprocessing import cpu_count
from sys import stdout

from pypelinin import Broker

from file_store import SimpleFileStore


def main():
logger = Logger('Broker')
handler = StreamHandler(stdout)
formatter = Formatter('%(asctime)s - %(name)s - %(levelname)s - '
'%(message)s')
handler.setFormatter(formatter)
logger.addHandler(handler)
broker = Broker(api='tcp://localhost:5555', # router API
broadcast='tcp://localhost:5556', # router Broadcast
# class that will be called to retrieve/store information
# to pass to/to save from worker
store_class=SimpleFileStore,
logger=logger,
# name of the module that contain workers
workers='workers',
# each core will run 4 workers
number_of_workers=cpu_count() * 4)
broker.start()

if __name__ == '__main__':
main()
22 changes: 22 additions & 0 deletions example/my_pipeliner.py
@@ -0,0 +1,22 @@
#!/usr/bin/env python2
# coding: utf-8

from sys import stdout
from logging import Logger, StreamHandler, Formatter
from pypelinin import Pipeliner


def main():
logger = Logger('Pipeliner')
handler = StreamHandler(stdout)
formatter = Formatter('%(asctime)s - %(name)s - %(levelname)s - '
'%(message)s')
handler.setFormatter(formatter)
logger.addHandler(handler)
pipeliner = Pipeliner(api='tcp://localhost:5555',
broadcast='tcp://localhost:5556', logger=logger)
pipeliner.start()

if __name__ == '__main__':
main()

24 changes: 24 additions & 0 deletions example/my_router.py
@@ -0,0 +1,24 @@
#!/usr/bin/env python2
# coding: utf-8

from sys import stdout
from logging import Logger, StreamHandler, Formatter
from pypelinin import Router


def main():
logger = Logger('My Router')
handler = StreamHandler(stdout)
formatter = Formatter('%(asctime)s - %(name)s - %(levelname)s - '
'%(message)s')
handler.setFormatter(formatter)
logger.addHandler(handler)
api_host_port = ('*', 5555)
broadcast_host_port = ('*', 5556)
default_config = {'store': {'monitoring filename': '/tmp/monitoring.log'},
'monitoring interval': 60, }
router = Router(api_host_port, broadcast_host_port, default_config, logger)
router.start()

if __name__ == '__main__':
main()
1 change: 1 addition & 0 deletions example/requirements.txt
@@ -0,0 +1 @@
pypelinin

0 comments on commit bcd9634

Please sign in to comment.