# Openstack TaskFlow
---
_Rick van de Loo_<br>
https://github.com/vdloo

DevOps @ Byte

# Before we begin:
https://www.byte.nl/vacatures    
### If you like this stuff and are looking for a new challenge, come talk to us


What is TaskFlow?
=================

### Python library to make tasks:
- consistent
- scalable
- resilient

in short: helps for dealing with unreliable systems

# Characteristics

- declarative
- distributed
- event driven

# Basic functionality


- Retries
- Dependency driven ordering
- Reverts

## Example functions:

In [1]:
def create_volume(volume_name):
    return {'volume_name': volume_name}
    
def create_instance(instance_name):
    return {'instance_name': instance_name, 'attached_volume': None}
    
def attach_volume_to_instance(instance, volume):
    instance['attached_volume'] = volume
    return instance

# In a perfect world that would look like this:

In [2]:
volume = create_volume('vol1')
print volume

instance = create_instance('instance1')
print instance

instance = attach_volume_to_instance(instance, volume)
print instance

{'volume_name': 'vol1'}
{'instance_name': 'instance1', 'attached_volume': None}
{'instance_name': 'instance1', 'attached_volume': {'volume_name': 'vol1'}}


### But what if one of the creations fails?

## Simulating a flaky API

In [3]:
from mock import Mock

# create_instance will fail the first 2 times, 
# the third time it will work as expected.
create_instance = Mock(side_effect=[
        RuntimeError('The API is down'), 
        RuntimeError('Still down!'), 
        {'instance_name': 'mynode', 'attached_volume': None}])

## The first time the call will fail

In [4]:
create_instance('mynode')

RuntimeError: The API is down

## The second time the call will fail as well

In [5]:
create_instance('mynode')

RuntimeError: Still down!

## The third time the call will succeed

In [6]:
create_instance('mynode')

{'attached_volume': None, 'instance_name': 'mynode'}

Retries
=======
### We need to retry create_instance a couple of times before it works.

In [7]:
# resetting the fake flaky function 
create_instance = Mock(side_effect=[
        RuntimeError('The API is down'), 
        RuntimeError('Still down!'), 
        {'instance_name': 'mynode', 'attached_volume': None}])

## Importing some taskflow stuff

- _engine_ that runs the flow
- FunctorTask that takes a function and creates a Task
- Linear flow for sequential execution

In [8]:
from taskflow.engines import run
from taskflow.task import FunctorTask
from taskflow.patterns import linear_flow as lf

## Creating the flow

### A flow is a collection of Tasks

In [9]:
flow = lf.Flow('boot_instance_flow')  
flow.add(
    FunctorTask(execute=create_volume,
                inject={'volume_name': 'myvol'},
                provides='volume'),
    
    FunctorTask(execute=create_instance,
                inject={'instance_name': 'mynode'},
                provides='instance'),
    
    FunctorTask(execute=attach_volume_to_instance, 
                requires=['volume', 'instance']),
)


<taskflow.patterns.linear_flow.Flow at 0x7ffb7fe41550>

## Running the flow will result in a failure

RuntimeError: The API is down

In [10]:
run(flow)

RuntimeError: The API is down

In [None]:
# resetting the fake flaky function 
create_instance = Mock(side_effect=[
        RuntimeError('The API is down'), 
        RuntimeError('Still down!'), 
        {'instance_name': 'mynode', 'attached_volume': None}])

## By the way, instead of injecting we could also use a store

In [11]:
flow = lf.Flow('boot_instance_flow')  
flow.add(
    FunctorTask(execute=create_volume,
                provides='volume'),
    
    FunctorTask(execute=create_instance,
                provides='instance'),
    
    FunctorTask(execute=attach_volume_to_instance, 
                requires=['volume', 'instance']),
)
store = {'volume_name': 'myvol', 'instance_name': 'mynode'}
run(flow, store=store)


RuntimeError: Still down!

In [12]:
# resetting the fake flaky function 
create_instance = Mock(side_effect=[
        RuntimeError('The API is down'), 
        RuntimeError('Still down!'), 
        {'instance_name': 'mynode', 'attached_volume': None}])

## Adding a retry


In [13]:
from taskflow.retry import Times

# all tasks will be attempted up to 3 times
flow = lf.Flow('boot_instance_flow', retry=Times(3))  
flow.add(
    FunctorTask(
        execute=create_volume,
        inject={'volume_name': 'myvol'},
        provides='volume'
    ),
    FunctorTask(
        execute=create_instance,
        inject={'instance_name': 'mynode'},
        provides='instance'
    ),
    FunctorTask(
        execute=attach_volume_to_instance, 
        requires=['volume', 'instance']
    ),
)

<taskflow.patterns.linear_flow.Flow at 0x7ffb7f711e90>

# Running the flow again

In [14]:
result = run(flow)
result['instance']

{'attached_volume': {'volume_name': 'myvol'}, 'instance_name': 'mynode'}

### create_instance was attempted three times and the third time it succeeded

Dependency driven ordering
==========================

In [15]:
# let's reset create_instance first
# it is now a normal function again
def create_instance(instance_name):
    return {'instance_name': instance_name}

## The flow fails if the order of the tasks is changed

In [16]:
# changing the task order
flow = lf.Flow('boot_instance_flow', retry=Times(3))  
flow.add(
    FunctorTask(
        execute=attach_volume_to_instance,
        # fails because we don't have these yet
        requires=['volume', 'instance']
    ),
    FunctorTask(
        execute=create_volume,
        inject={'volume_name': 'myvol'},
        provides='volume'
    ),
    FunctorTask(
        execute=create_instance,
        inject={'instance_name': 'mynode'},
        provides='instance'
    ),
)

result = run(flow)

MissingDependencies: 'linear_flow.Flow: boot_instance_flow(len=3)' requires ['instance', 'volume'] but no other entity produces said requirements
  MissingDependencies: '__main__.attach_volume_to_instance==1.0' requires ['instance', 'volume'] but no other entity produces said requirements

## Automatic Task ordering with a graph flow

In [17]:
from taskflow.patterns import graph_flow as gf

# graph flow, orders tasks by dependencies    
flow = gf.Flow('boot_instance_flow')  
flow.add(
    FunctorTask(
        execute=attach_volume_to_instance, 
        requires=['volume', 'instance']
    ),
    FunctorTask(
        execute=create_volume,
        inject={'volume_name': 'myvol'},
        provides='volume'
    ),
    FunctorTask(
        execute=create_instance,
        inject={'instance_name': 'mynode'},
        provides='instance'
    ),
)

<taskflow.patterns.graph_flow.Flow at 0x7ffb7f7e7b50>

#### Taskflow will now resolve the dependencies and execute the tasks in the right order

In [18]:
result = run(flow)
result['instance']

{'attached_volume': {'volume_name': 'myvol'}, 'instance_name': 'mynode'}

## Graph flows can also run in parallel

### creating the volume and booting the instance can be done at the same time

In [19]:
# parallel engine for simultaneous execution
# the default engine is 'serial'
result = run(flow, engine='parallel')  
result['instance']


{'attached_volume': {'volume_name': 'myvol'}, 'instance_name': 'mynode'}

Reverting
=========

### If attaching the volume fails, we need to clean up the volume and instance

## Defining reverts

In [20]:
from taskflow.task import Task
    
class AttachVolumeToInstance(Task):
    def execute(self, instance, volume):
        print 'failing attaching the volume'
        raise RuntimeError("Something is wrong!")

class CreateInstance(Task):
    def execute(self):
        print 'creating instance'
        
    def revert(self, result, **kwargs):
        print 'cleaning up instance'
    
class CreateVolume(Task):
    def execute(self):
        print 'creating volume'
        
    def revert(self, result, **kwargs):
        print 'cleaning up volume'


## Cleaning up after the failed attempts

In [21]:
flow = gf.Flow('boot_instance_flow', Times(3))
flow.add(
    CreateInstance(provides='instance'),
    CreateVolume(provides='volume'),
    AttachVolumeToInstance(
        requires=['instance', 'volume']
    )
)
run(flow, engine='serial')

creating instance
creating volume
failing attaching the volume
cleaning up instance
cleaning up volume
creating instance
creating volume
failing attaching the volume
cleaning up instance
cleaning up volume
creating instance
creating volume
failing attaching the volume
cleaning up instance
cleaning up volume


RuntimeError: Something is wrong!

## Extended functionality
- Persistence backend
- Jobboard


## Persistence backend

<img src="media/persistence.png">

image source: https://wiki.openstack.org/wiki/TaskFlow#Design

## Jobboard

<img src="media/jobboard.png">

image source: https://wiki.openstack.org/wiki/TaskFlow#Design

## Other cool features:
- flow factories
- atom versioning
- manual linking
- possibly in the future: [checkpointing](https://wiki.openstack.org/wiki/TaskFlow/Checkpointing)

# That's it. Questions?
---
## You can find these slides and the Jupyter notebook here: https://github.com/vdloo/python-meetup-taskflow
