Skip to content
This repository

HTTPS clone URL

Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP

Threading parallelizer and README.rst Updated #9

Closed
wants to merge 17 commits into from

2 participants

Tuna VARGI kadirpekel
This page is out of date. Refresh to see the latest.
1  .gitignore
@@ -9,4 +9,5 @@ docs/api
9 9
 coverage
10 10
 .DS_Store
11 11
 .vagrant
  12
+/.idea
12 13
 manifests/build
187  README.rst
Source Rendered
@@ -3,13 +3,163 @@ Octavious
3 3
 =========
4 4
 .. image:: https://github.com/metglobal/octavious/raw/master/dococtopus.gif
5 5
 
6  
-Octavious is a very lightweight mutli processing framework helps you parallelize
7  
-your tasks within a plugin pipeline system.
  6
+Octavious is a very lightweight concurrency framework helps you parallelize
  7
+your tasks through a plugin pipeline system.
8 8
 
9  
-Run the example app:
10  
---------------------
  9
+Currently it supports concurrency technologies such as:
11 10
 
12  
-::
  11
+* Multiprocessing (python builtin)
  12
+* Threading (python builtin)
  13
+* Celery (message Queues)
  14
+* Gevent (green threads)
  15
+
  16
+Quick Overview
  17
+--------------
  18
+
  19
+Processor
  20
+~~~~~~~~~
  21
+
  22
+First, You have to define your processor that's going to be working as
  23
+parallelized. Let's have look at the example below
  24
+
  25
+.. code:: python
  26
+
  27
+    # processes/chucknjoke.py
  28
+    import urllib
  29
+    from octavious.processor import Processor
  30
+
  31
+    class ChuckNJokeProcessor(Processor):
  32
+        def process(self, input):
  33
+            url = 'http://api.icndb.com/jokes/%s' % input
  34
+            return urllib.urlopen(url % input).read()
  35
+
  36
+    processor = ChuckNJokeProcessor
  37
+
  38
+In this example defined, `ChuckNJokeProcessor` joins to given url
  39
+and brings the Chuck Norris Jokes as parallelized that are going to be
  40
+processed via the plugins.
  41
+
  42
+Plugin
  43
+~~~~~~
  44
+
  45
+Plugins already have methods such as `pre-process` and `post-process`. If you
  46
+want to make changes on your input data coming from your processor you have to implement the defined pre-process, and if you want to make changes on output data
  47
+you have to implement the defined post process.
  48
+
  49
+As you may see in the example below, this is our first plugin. We are going to
  50
+take the RAW JSON data and decode
  51
+
  52
+.. code:: python
  53
+
  54
+    # plugins/jsondeserializer.py
  55
+    import json
  56
+    from octavious.pipeline import Plugin
  57
+
  58
+    class JsonDeserializerPlugin(Plugin):
  59
+        def post_process(self, input, output):
  60
+            return json.loads(output)
  61
+
  62
+    plugin = JsonDeserializerPlugin
  63
+
  64
+And our second plugin
  65
+
  66
+.. code:: python
  67
+
  68
+    # plugins/dictdigger.py
  69
+    from octavious.pipeline import Plugin
  70
+
  71
+    class DictDiggerPlugin(Plugin):
  72
+        def __init__(self, path):
  73
+            self.path = path
  74
+
  75
+        def post_process(self, input, output):
  76
+            entry = output
  77
+            for component in self.path.split('.'):
  78
+                entry = entry.get(component)
  79
+            return entry
  80
+
  81
+    plugin = DictDiggerPlugin
  82
+
  83
+This is for getting what data we want from tree structured dictionary.
  84
+
  85
+Pipeline
  86
+~~~~~~~~
  87
+
  88
+Pipelines are for running the plugins in the sequence that we desire.
  89
+You can create a pipeline with a buch of plugins just like below
  90
+
  91
+.. code:: python
  92
+
  93
+    from octavious.utils import pipeline, plugin
  94
+
  95
+    chuck_norris_pipeline = pipeline(
  96
+        plugin('chuck_norris.plugins.dictdigger', 'value.joke'),
  97
+        plugin('chuck_norris.plugins.jsondeserializer'),
  98
+    )
  99
+
  100
+Parallelizer
  101
+~~~~~~~~~~~~
  102
+
  103
+As above we have 4 parallelizer options implemented for Octavious. You can set
  104
+the parallelizer settings like below
  105
+
  106
+.. code:: python
  107
+
  108
+    chuck_norris_parallellizer = parallelizer(
  109
+        'octavious.parallelizer.multiprocessing')
  110
+
  111
+You can also use desired parallelizer backend with choice of
  112
+
  113
+* `octavious.parallelizer.threading`
  114
+* `octavious.parallelizer.gevent`
  115
+* `octavious.parallelizer.celery`
  116
+
  117
+Convenient Processors
  118
+~~~~~~~~~~~~~~~~~~~~~
  119
+
  120
+There are some auxialary `Processor` implementations help you define
  121
+parallelizing workflows.
  122
+
  123
+* `OneToManyProcessor` wraps your processors to work with just one input.
  124
+* `ManyToOneProcessor` wraps your processor to works with multiple inputs.
  125
+
  126
+We are going to work with `ManyToOneProcessor` in ChuckNorris example.
  127
+This is our last setting below as a summary, that we will make our code work.
  128
+
  129
+.. code:: python
  130
+
  131
+    from octavious.utils import pipeline, processor, plugin, parallelizer
  132
+    from octavious.processor import ManyToOneProcessor, PipelineProcessor
  133
+
  134
+    chn_processor = processor('chuck_norris.processes.chucknjoke')
  135
+    chn_parallellizer = parallelizer('octavious.parallelizer.gevent')
  136
+    chn_pipeline = pipeline(
  137
+        plugin('chuck_norris.plugins.dictdigger', 'value.joke'),
  138
+        plugin('chuck_norris.plugins.jsondeserializer'),
  139
+    )
  140
+
  141
+    manytoone = ManyToOneProcessor(
  142
+                  PipelineProcessor(cnj_processor, chn_pipeline),
  143
+                  chn_parallelizer)
  144
+
  145
+    for result in manytoone(range(1, 4)):
  146
+        print '*', result
  147
+
  148
+Run the example app
  149
+
  150
+.. code:: console
  151
+
  152
+    $ python -m examples.simple
  153
+    * Chuck Norris uses ribbed condoms inside out, so he gets the pleasure.
  154
+    * Chuck Norris doesn't read books. He stares them down until he gets the information he wants.
  155
+    * MacGyver can build an airplane out of gum and paper clips. Chuck Norris can kill him and take it.
  156
+
  157
+Tests
  158
+-----
  159
+
  160
+Octavious has a bunch of unit tests. To run them, simply type
  161
+
  162
+.. code:: console
13 163
 
14 164
     $ python -m unittest -v octavious.tests
15 165
     test_pipeline (octavious.tests.TestPipeline) ... ok
@@ -20,22 +170,31 @@ Run the example app:
20 170
     Ran 3 tests in 0.001s
21 171
 
22 172
     OK
23  
-    $ python -m examples.simple
24  
-    * Chuck Norris uses ribbed condoms inside out, so he gets the pleasure.
25  
-
26  
-    * Chuck Norris doesn't read books. He stares them down until he gets the information he wants.
27  
-
28  
-    * MacGyver can build an airplane out of gum and paper clips. Chuck Norris can kill him and take it.
29 173
 
  174
+Status
  175
+------
30 176
 
31 177
 Currently in very early stages, please stay tuned...
32 178
 
33 179
 License
34 180
 -------
  181
+
35 182
 Copyright (c) 2013 Metglobal LLC.
36 183
 
37  
-Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the 'Software'), to deal in the Software without restriction, including without limitation the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and to permit persons to whom the Software is furnished to do so, subject to the following conditions:
  184
+Permission is hereby granted, free of charge, to any person obtaining a copy of
  185
+this software and associated documentation files (the 'Software'), to deal in
  186
+the Software without restriction, including without limitation the rights to
  187
+use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies
  188
+of the Software, and to permit persons to whom the Software is furnished to do
  189
+so, subject to the following conditions:
38 190
 
39  
-The above copyright notice and this permission notice shall be included in all copies or substantial portions of the Software.
  191
+The above copyright notice and this permission notice shall be included in all
  192
+copies or substantial portions of the Software.
40 193
 
41  
-THE SOFTWARE IS PROVIDED 'AS IS', WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
  194
+THE SOFTWARE IS PROVIDED 'AS IS', WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
  195
+IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
  196
+FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
  197
+AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
  198
+LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
  199
+OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
  200
+SOFTWARE.
30  octavious/parallelizer/threading.py
... ...
@@ -0,0 +1,30 @@
  1
+from __future__ import absolute_import
  2
+import threading
  3
+from octavious.parallelizer import Parallelizer
  4
+from Queue import Queue
  5
+
  6
+
  7
+def parallelizer_task(q, processor, input, callback):
  8
+    output = q.put(processor(input))
  9
+    if callback:
  10
+        callback(output)
  11
+
  12
+
  13
+class ThreadsParallelizer(Parallelizer):
  14
+    """
  15
+    This is implementation for python parallelizing processors using
  16
+    python's threading higher level threading interface
  17
+
  18
+    """
  19
+    def parallelize(self, processors, input=None, callback=None):
  20
+        """Convenient ``Parallelizer.parallelize`` implementation
  21
+        using `threading` library.
  22
+        """
  23
+        q = Queue()
  24
+        for processor in processors:
  25
+            t = threading.Thread(target=parallelizer_task,
  26
+                                 args=(q, processor, input, callback))
  27
+            t.daemon = True
  28
+            t.start()
  29
+        return [q.get() for i in range(len(processors))]
  30
+parallelizer = ThreadsParallelizer
Commit_comment_tip

Tip: You can add notes to lines in a file. Hover to the left of a line to make a note

Something went wrong with that request. Please try again.