Permalink
Browse files

added a primitive celery driver, added parallelizer switcher to simpl…

…e example
  • Loading branch information...
1 parent bf7450f commit edceefdd8b68812aa80ebbd254a97dfd8eca0598 @kadirpekel kadirpekel committed Mar 11, 2013
Showing with 44 additions and 15 deletions.
  1. +8 −4 examples/simple/__main__.py
  2. +4 −0 examples/simple/celeryconf.py
  3. +0 −11 examples/simple/plugins/echo.py
  4. +32 −0 octavious/parallelizer/celery.py
@@ -1,20 +1,24 @@
+import sys
+
from octavious.utils import pipeline, processor, plugin, parallelizer
from octavious.processor import ManyToOneProcessor, PipelineProcessor
if __name__ == '__main__':
+ driver = len(sys.argv) > 1 and sys.argv[1] or 'multiprocessing'
+
cnj_processor = processor('examples.simple.processes.chucknjoke')
cnj_pipeline = pipeline(
- plugin('examples.simple.plugins.echo'),
plugin('examples.simple.plugins.dictdigger', 'value.joke'),
plugin('examples.simple.plugins.jsondeserializer'))
- cnj_parallelizer = parallelizer('octavious.parallelizer.multiprocessing')
+ cnj_parallelizer = parallelizer('octavious.parallelizer.%s' % driver)
- onetomany = ManyToOneProcessor(PipelineProcessor(cnj_processor,
+ manytoone = ManyToOneProcessor(PipelineProcessor(cnj_processor,
cnj_pipeline),
cnj_parallelizer)
- onetomany(range(1, 4))
+ for result in manytoone(range(1, 4)):
+ print '*', result
@@ -0,0 +1,4 @@
+BROKER_URL = 'redis://localhost:6379/0'
+CELERY_RESULT_BACKEND = 'redis://localhost:6379/0'
+CELERY_IMPORTS = ("octavious.parallelizer.celery", )
+CELERY_TASK_RESULT_EXPIRES = 300
@@ -1,11 +0,0 @@
-from octavious.pipeline import Plugin
-
-
-class EchoPlugin(Plugin):
-
- def post_process(self, input, output):
- print "* %s" % output
- print
- return output
-
-plugin = EchoPlugin
@@ -0,0 +1,32 @@
+from __future__ import absolute_import
+
+import celery
+
+from octavious.parallelizer import Parallelizer
+
+
+@celery.task('parallelizer_task')
+def parallelizer_task(processor, input, callback):
+ output = processor(input)
+ if callback:
+ callback(output)
+ return output
+
+
+class CeleryParallelizer(Parallelizer):
+ """This class is basic implementation for parallelizing processors using
+ a messaging queue server through celery libraries.
+
+ """
+ def parallelize(self, processors, input=None, callback=None):
+ """Convenient ``Parallelizer.parallelize`` implementation to establish
+ concurrency using celery tasks
+
+ """
+ subtasks = []
+ for processor in processors:
+ subtasks.append(parallelizer_task.s(processor, input, callback))
+ async_result = celery.group(subtasks).apply_async()
+ return async_result.get()
+
+parallelizer = CeleryParallelizer

0 comments on commit edceefd

Please sign in to comment.