# Workflow + Redis + Celery 

* http://workflow.readthedocs.io/en/latest
* https://redis.io
* http://www.celeryproject.org

In [1]:
from celery import Celery

# linux:~> celery -A workflow_sample worker --loglevel=info
app = Celery('workflow_sample', broker='redis://localhost:6379/0')


# Define a couple of basic tasks.

# "workflow" Task 1

def add(obj, eng):
    obj["value"] += 2

# "workflow" Task 2

def print_res(obj, eng):
    print(obj.get("value"))

# Create a workflow out of them.

workflow_tasks = [add, print_res]


# "celery" Task

# Mark our execution process as a celery task with this decorator.
@app.task
def run_workflow(data):

    # Note that the imports that this function requires must be done inside
    # it since our code will not be running in the global context.

    from workflow.engine import GenericWorkflowEngine

    wfe = GenericWorkflowEngine()
    
    wfe.setWorkflow(workflow_tasks)
    
    wfe.process(data)


# linux:~> python workflow_sample.py

# Code that runs when we call this script directly. This way we can start
# as many workflows as we wish and let celery handle how they are
# distributed and when they run.

if __name__ == "__main__":
    run_workflow.delay([
                        {"value": 10}, 
                        {"value": 20}, 
                        {"value": 30}
                       ]
                      )

In [2]:
!pwd && ls

/var/tmp/python3/notebooks/pylibs/workflow/examples/celery_workflow
__pycache__		     workflow_sample.py
workflow_and_celery_1.ipynb  workflow_sample_worker.txt


# Worker

    celery -A workflow_sample worker --loglevel=info &

# Run the Task

    python workflow_sample.py 
    python workflow_sample.py 
    python workflow_sample.py 
    python workflow_sample.py 
    python workflow_sample.py 
    python workflow_sample.py 
    python workflow_sample.py 
    python workflow_sample.py 
    python workflow_sample.py 
    python workflow_sample.py 
    python workflow_sample.py 

# Analyzing the log files from worker

In [3]:
!gvim workflow_sample_worker.txt
# :v/\v(TaskPool)/d

In [4]:
!grep "TaskPool" workflow_sample_worker.txt

[2017-02-08 15:10:42,471: DEBUG/MainProcess] TaskPool: Apply <function _fast_trace_task at 0x7f34d426b6a8> (args:('workflow_sample.run_workflow', '263146a6-38d4-46e5-975f-51e2e66c1321', {'id': '263146a6-38d4-46e5-975f-51e2e66c1321', 'eta': None, 'kwargsrepr': '{}', 'argsrepr': "([{'value': 10}, {'value': 20}, {'value': 30}],)", 'timelimit': [None, None], 'reply_to': '274ff7bf-f248-3fe4-953b-1b32173c0055', 'root_id': '263146a6-38d4-46e5-975f-51e2e66c1321', 'retries': 0, 'expires': None, 'lang': 'py', 'parent_id': None, 'delivery_info': {'priority': 0, 'routing_key': 'celery', 'redelivered': None, 'exchange': ''}, 'correlation_id': '263146a6-38d4-46e5-975f-51e2e66c1321', 'origin': 'gen20077@goldbeef.anim.odw.com.cn', 'group': None, 'task': 'workflow_sample.run_workflow'}, b'[[[{"value": 10}, {"value": 20}, {"value": 30}]], {}, {"callbacks": null, "chain": null, "chord": null, "errbacks": null}]', 'application/json', 'utf-8') kwargs:{})
[2017-02-08 15:11:01,597: DEBUG/MainProcess] TaskPo

In [5]:
!grep "TaskPool" workflow_sample_worker.txt | wc -l

11
