# Prefect - workflow management system
### common uses
- define set of tasks
- execute tasks (on schedule)
- handle retries, logging

In [8]:
# Imports and setup

from prefect import task, Flow, Parameter
from prefect.schedules import IntervalSchedule
import datetime
import methods as m

sourcef, targetf = "values.csv", "newvalues.csv"

# define ETL tasks

## Extract 
@task
def extract(path):
    return m.extract(path)

## Transform 
@task
def transform(data):
    return m.transform(data)

## Load
@task
def load(data, path):
    return m.load(data, path)

In [None]:
def build_flow(schedule=None):
    with Flow("my_etl", schedule=schedule) as flow:
        path = Parameter(name="path", required=True)
        data = extract(path)
        newdata = transform(data)
        result = load(newdata, path)
    return flow

schedule = IntervalSchedule(
    start_date=datetime.datetime.now() + datetime.timedelta(seconds=1), 
    interval=datetime.timedelta(seconds=5)
)

flow = build_flow(schedule)

flow.run(parameters={
    "path":"values.csv"
})
flow.visualize()

[2021-01-25 12:16:29-0700] INFO - prefect.my_etl | Waiting for next scheduled run at 2021-01-25T19:16:30.555591+00:00
[2021-01-25 12:16:30-0700] INFO - prefect.FlowRunner | Beginning Flow run for 'my_etl'
[2021-01-25 12:16:30-0700] INFO - prefect.TaskRunner | Task 'path': Starting task run...
[2021-01-25 12:16:30-0700] INFO - prefect.TaskRunner | Task 'path': Finished task run for task with final state: 'Success'
[2021-01-25 12:16:30-0700] INFO - prefect.TaskRunner | Task 'extract': Starting task run...
[2021-01-25 12:16:30-0700] INFO - prefect.TaskRunner | Task 'extract': Finished task run for task with final state: 'Success'
[2021-01-25 12:16:30-0700] INFO - prefect.TaskRunner | Task 'transform': Starting task run...
[2021-01-25 12:16:30-0700] INFO - prefect.TaskRunner | Task 'transform': Finished task run for task with final state: 'Success'
[2021-01-25 12:16:30-0700] INFO - prefect.TaskRunner | Task 'load': Starting task run...
[2021-01-25 12:16:30-0700] INFO - prefect.TaskRunner |

[2021-01-25 12:17:00-0700] INFO - prefect.my_etl | Waiting for next scheduled run at 2021-01-25T19:17:05.555591+00:00
[2021-01-25 12:17:05-0700] INFO - prefect.FlowRunner | Beginning Flow run for 'my_etl'
[2021-01-25 12:17:05-0700] INFO - prefect.TaskRunner | Task 'path': Starting task run...
[2021-01-25 12:17:05-0700] INFO - prefect.TaskRunner | Task 'path': Finished task run for task with final state: 'Success'
[2021-01-25 12:17:05-0700] INFO - prefect.TaskRunner | Task 'extract': Starting task run...
[2021-01-25 12:17:05-0700] INFO - prefect.TaskRunner | Task 'extract': Finished task run for task with final state: 'Success'
[2021-01-25 12:17:05-0700] INFO - prefect.TaskRunner | Task 'transform': Starting task run...
[2021-01-25 12:17:05-0700] INFO - prefect.TaskRunner | Task 'transform': Finished task run for task with final state: 'Success'
[2021-01-25 12:17:05-0700] INFO - prefect.TaskRunner | Task 'load': Starting task run...
[2021-01-25 12:17:05-0700] INFO - prefect.TaskRunner |

[2021-01-25 12:17:35-0700] INFO - prefect.my_etl | Waiting for next scheduled run at 2021-01-25T19:17:40.555591+00:00
[2021-01-25 12:17:40-0700] INFO - prefect.FlowRunner | Beginning Flow run for 'my_etl'
[2021-01-25 12:17:40-0700] INFO - prefect.TaskRunner | Task 'path': Starting task run...
[2021-01-25 12:17:40-0700] INFO - prefect.TaskRunner | Task 'path': Finished task run for task with final state: 'Success'
[2021-01-25 12:17:40-0700] INFO - prefect.TaskRunner | Task 'extract': Starting task run...
[2021-01-25 12:17:40-0700] INFO - prefect.TaskRunner | Task 'extract': Finished task run for task with final state: 'Success'
[2021-01-25 12:17:40-0700] INFO - prefect.TaskRunner | Task 'transform': Starting task run...
[2021-01-25 12:17:40-0700] INFO - prefect.TaskRunner | Task 'transform': Finished task run for task with final state: 'Success'
[2021-01-25 12:17:40-0700] INFO - prefect.TaskRunner | Task 'load': Starting task run...
[2021-01-25 12:17:40-0700] INFO - prefect.TaskRunner |