https://docs.prefect.io/guide/

In [13]:
%matplotlib inline

ModuleNotFoundError: No module named 'matplotlib'

In [1]:
from prefect import task, Flow

In [2]:
@task
def say_hello():
    print("Hello, world!")


with Flow("My First Flow") as flow:
    say_hello()

In [3]:
flow.run() # "Hello, world!"

[2019-03-25 11:37:32,722] INFO - prefect.FlowRunner | Beginning Flow run for 'My First Flow'
[2019-03-25 11:37:32,730] INFO - prefect.FlowRunner | Starting flow run.
[2019-03-25 11:37:32,732] DEBUG - prefect.FlowRunner | Flow 'My First Flow': Handling state change from Scheduled to Running
[2019-03-25 11:37:32,736] INFO - prefect.TaskRunner | Task 'say_hello': Starting task run...
[2019-03-25 11:37:32,736] DEBUG - prefect.TaskRunner | Task 'say_hello': Handling state change from Pending to Running
[2019-03-25 11:37:32,738] DEBUG - prefect.TaskRunner | Task 'say_hello': Calling task.run() method...
[2019-03-25 11:37:32,740] DEBUG - prefect.TaskRunner | Task 'say_hello': Handling state change from Running to Success
[2019-03-25 11:37:32,740] INFO - prefect.TaskRunner | Task 'say_hello': finished task run for task with final state: 'Success'
[2019-03-25 11:37:32,742] INFO - prefect.FlowRunner | Flow run SUCCESS: all reference tasks succeeded
[2019-03-25 11:37:32,743] DEBUG - prefect.FlowR

Hello, world!


Success("All reference tasks succeeded.")

## Prefect as calculator
https://docs.prefect.io/guide/tutorials/calculator.html

In [4]:
from prefect import task, Flow, Parameter

def run(flow, **parameters):
    state = flow.run(**parameters)
    terminal_task = list(flow.terminal_tasks())[0]
    return state.result[terminal_task].result

In [5]:
with Flow('Add one') as flow:
    result = Parameter('x') + 1

In [6]:
assert run(flow, x=1) == 2
assert run(flow, x=2) == 3
assert run(flow, x=-100) == -99

[2019-03-25 11:41:47,411] INFO - prefect.FlowRunner | Beginning Flow run for 'Add one'
[2019-03-25 11:41:47,413] INFO - prefect.FlowRunner | Starting flow run.
[2019-03-25 11:41:47,414] DEBUG - prefect.FlowRunner | Flow 'Add one': Handling state change from Scheduled to Running
[2019-03-25 11:41:47,420] INFO - prefect.TaskRunner | Task 'x': Starting task run...
[2019-03-25 11:41:47,422] DEBUG - prefect.TaskRunner | Task 'x': Handling state change from Pending to Running
[2019-03-25 11:41:47,423] DEBUG - prefect.TaskRunner | Task 'x': Calling task.run() method...
[2019-03-25 11:41:47,424] DEBUG - prefect.TaskRunner | Task 'x': Handling state change from Running to Success
[2019-03-25 11:41:47,425] INFO - prefect.TaskRunner | Task 'x': finished task run for task with final state: 'Success'
[2019-03-25 11:41:47,427] INFO - prefect.TaskRunner | Task '1': Starting task run...
[2019-03-25 11:41:47,428] DEBUG - prefect.TaskRunner | Task '1': Handling state change from Pending to Running
[2019

In [7]:
with Flow('Add x and y') as flow:
    result = Parameter('x') + Parameter('y')

In [8]:
assert run(flow, x=1, y=1) == 2
assert run(flow, x=40, y=2) == 42

[2019-03-25 11:42:09,455] INFO - prefect.FlowRunner | Beginning Flow run for 'Add x and y'
[2019-03-25 11:42:09,457] INFO - prefect.FlowRunner | Starting flow run.
[2019-03-25 11:42:09,458] DEBUG - prefect.FlowRunner | Flow 'Add x and y': Handling state change from Scheduled to Running
[2019-03-25 11:42:09,463] INFO - prefect.TaskRunner | Task 'y': Starting task run...
[2019-03-25 11:42:09,465] DEBUG - prefect.TaskRunner | Task 'y': Handling state change from Pending to Running
[2019-03-25 11:42:09,467] DEBUG - prefect.TaskRunner | Task 'y': Calling task.run() method...
[2019-03-25 11:42:09,468] DEBUG - prefect.TaskRunner | Task 'y': Handling state change from Running to Success
[2019-03-25 11:42:09,469] INFO - prefect.TaskRunner | Task 'y': finished task run for task with final state: 'Success'
[2019-03-25 11:42:09,470] INFO - prefect.TaskRunner | Task 'x': Starting task run...
[2019-03-25 11:42:09,472] DEBUG - prefect.TaskRunner | Task 'x': Handling state change from Pending to Runni

In [9]:
from prefect.tasks.control_flow import switch, merge

# note: this will raise some warnings, but they're ok for this use case!
with Flow('Arithmetic') as flow:
    x, y = Parameter('x'), Parameter('y')
    operations = {
        '+': x + y,
        '-': x - y,
        '*': x * y,
        '/': x / y
    }
    switch(condition=Parameter('op'), cases=operations)
    result = merge(*operations.values())



In [10]:
assert run(flow, x=1, op='+', y=2) == 3
assert run(flow, x=1, op='-', y=2) == -1
assert run(flow, x=1, op='*', y=2) == 2
assert run(flow, x=1, op='/', y=2) == 0.5

[2019-03-25 11:42:43,517] INFO - prefect.FlowRunner | Beginning Flow run for 'Arithmetic'
[2019-03-25 11:42:43,519] INFO - prefect.FlowRunner | Starting flow run.
[2019-03-25 11:42:43,520] DEBUG - prefect.FlowRunner | Flow 'Arithmetic': Handling state change from Scheduled to Running
[2019-03-25 11:42:43,533] INFO - prefect.TaskRunner | Task 'op': Starting task run...
[2019-03-25 11:42:43,534] DEBUG - prefect.TaskRunner | Task 'op': Handling state change from Pending to Running
[2019-03-25 11:42:43,535] DEBUG - prefect.TaskRunner | Task 'op': Calling task.run() method...
[2019-03-25 11:42:43,536] DEBUG - prefect.TaskRunner | Task 'op': Handling state change from Running to Success
[2019-03-25 11:42:43,537] INFO - prefect.TaskRunner | Task 'op': finished task run for task with final state: 'Success'
[2019-03-25 11:42:43,539] INFO - prefect.TaskRunner | Task 'CompareValue: "/"': Starting task run...
[2019-03-25 11:42:43,540] DEBUG - prefect.TaskRunner | Task 'CompareValue: "/"': Handling

In [11]:
@task
def parse_input(expression):
    x, op, y = expression.split(' ')
    return dict(x=float(x), op=op, y=float(y))

with Flow('Arithmetic') as flow:
    inputs = parse_input(Parameter('expression'))

    # once we have our inputs, everything else is the same:
    x, y = inputs['x'], inputs['y']
    operations = {
        '+': x + y,
        '-': x - y,
        '*': x * y,
        '/': x / y
    }
    switch(condition=inputs['op'], cases=operations)
    result = merge(*operations.values())

In [12]:
assert run(flow, expression='1 + 2') == 3
assert run(flow, expression='1 - 2') == -1
assert run(flow, expression='1 * 2') == 2
assert run(flow, expression='1 / 2') == 0.5

[2019-03-25 11:43:08,710] INFO - prefect.FlowRunner | Beginning Flow run for 'Arithmetic'
[2019-03-25 11:43:08,711] INFO - prefect.FlowRunner | Starting flow run.
[2019-03-25 11:43:08,712] DEBUG - prefect.FlowRunner | Flow 'Arithmetic': Handling state change from Scheduled to Running
[2019-03-25 11:43:08,727] INFO - prefect.TaskRunner | Task 'expression': Starting task run...
[2019-03-25 11:43:08,728] DEBUG - prefect.TaskRunner | Task 'expression': Handling state change from Pending to Running
[2019-03-25 11:43:08,729] DEBUG - prefect.TaskRunner | Task 'expression': Calling task.run() method...
[2019-03-25 11:43:08,730] DEBUG - prefect.TaskRunner | Task 'expression': Handling state change from Running to Success
[2019-03-25 11:43:08,731] INFO - prefect.TaskRunner | Task 'expression': finished task run for task with final state: 'Success'
[2019-03-25 11:43:08,732] INFO - prefect.TaskRunner | Task 'parse_input': Starting task run...
[2019-03-25 11:43:08,733] DEBUG - prefect.TaskRunner | 

In [14]:
flow.visualize()

ImportError: This feature requires graphviz.
Try re-installing prefect with `pip install prefect[viz]`