# Flow and task configuration

[Reference](https://orion-docs.prefect.io/tutorials/flow-task-config/)

## Basic flow configuration

In [1]:
from prefect import flow

In [2]:
@flow(name="My example flow")
def my_flow():
    print("nothing much")
    return 42

In [3]:
state = my_flow()

16:39:51.154 | INFO    | prefect.engine - Created flow run 'aquatic-magpie' for flow 'My example flow'
16:39:51.154 | INFO    | Flow run 'aquatic-magpie' - Using task runner 'ConcurrentTaskRunner'
16:39:51.221 | INFO    | Flow run 'aquatic-magpie' - Finished in state Completed()


nothing much


In [6]:
@flow(name="My example flow",
      description="An example flow for a tutorial.")
def my_flow():
    print("Flow with description")
    return 42

In [7]:
state = my_flow()

08:14:55.468 | INFO    | prefect.engine - Created flow run 'sweet-duck' for flow 'My example flow'
08:14:55.469 | INFO    | Flow run 'sweet-duck' - Using task runner 'ConcurrentTaskRunner'
08:14:55.538 | INFO    | Flow run 'sweet-duck' - Finished in state Completed()


Flow with description


In [9]:
@flow(name="My example flow")
def my_flow():
    """An example flow for a tutorial."""
    print("Flow with description in the docstring")
    return 42

In [10]:
state = my_flow()

08:16:45.760 | INFO    | prefect.engine - Created flow run 'demonic-chicken' for flow 'My example flow'
08:16:45.761 | INFO    | Flow run 'demonic-chicken' - Using task runner 'ConcurrentTaskRunner'
08:16:45.865 | INFO    | Flow run 'demonic-chicken' - Finished in state Completed()


Flow with description in the docstring


In [11]:
@flow(name="My example flow",
      description="An example flow for a tutorial.",
      version="tutorial_02")
def my_flow():
    print("Flow with description and version")
    return 42

In [12]:
state = my_flow()

08:17:52.348 | INFO    | prefect.engine - Created flow run 'sloppy-millipede' for flow 'My example flow'
08:17:52.357 | INFO    | Flow run 'sloppy-millipede' - Using task runner 'ConcurrentTaskRunner'
08:17:52.433 | INFO    | Flow run 'sloppy-millipede' - Finished in state Completed()


Flow with description and version


In [13]:
import os

@flow(name="My example flow",
      description="An example flow for a tutorial.",
      version=os.getenv("GIT_COMMIT_SHA"))
def my_flow():
    print("Flow with description and Git commit hash ID as version")
    return 42

In [14]:
state = my_flow()

08:19:27.502 | INFO    | prefect.engine - Created flow run 'amaranth-chinchilla' for flow 'My example flow'
08:19:27.503 | INFO    | Flow run 'amaranth-chinchilla' - Using task runner 'ConcurrentTaskRunner'
08:19:27.581 | INFO    | Flow run 'amaranth-chinchilla' - Finished in state Completed()


Flow with description and Git commit hash ID as version


### Parameter type conversion

In [15]:
from prefect import task, flow

@task
def printer(obj):
    print(f"Received a {type(obj)} with value {obj}")

# note that we define the flow with type hints
@flow
def validation_flow(x: int, y: str):
    printer(x)
    printer(y)


In [16]:
validation_flow(x="42", y=100)

08:21:17.083 | INFO    | prefect.engine - Created flow run 'outgoing-kittiwake' for flow 'validation-flow'
08:21:17.084 | INFO    | Flow run 'outgoing-kittiwake' - Using task runner 'ConcurrentTaskRunner'
08:21:17.155 | INFO    | Flow run 'outgoing-kittiwake' - Created task run 'printer-da44fb11-0' for task 'printer'
08:21:17.197 | INFO    | Flow run 'outgoing-kittiwake' - Created task run 'printer-da44fb11-1' for task 'printer'


Received a <class 'int'> with value 42
Received a <class 'str'> with value 100


08:21:18.061 | INFO    | Task run 'printer-da44fb11-0' - Finished in state Completed()
08:21:18.305 | INFO    | Task run 'printer-da44fb11-1' - Finished in state Completed()
08:21:18.336 | INFO    | Flow run 'outgoing-kittiwake' - Finished in state Completed('All states completed.')


Completed(message='All states completed.', type=COMPLETED, result=[Completed(message=None, type=COMPLETED, result=None, task_run_id=08df5462-2b74-4845-804d-448c82ec261f), Completed(message=None, type=COMPLETED, result=None, task_run_id=a82385a5-6ec1-4fa7-b892-dde93e252023)], flow_run_id=32ffe024-1696-44b9-86f5-f33209470cf6)

In [17]:
from prefect import flow
from pydantic import BaseModel

class Model(BaseModel):
    a: int
    b: float
    c: str

@flow
def model_validator(model: Model):
    printer(model)


In [18]:
model_validator({"a":42, "b":0, "c": 55})

08:22:26.699 | INFO    | prefect.engine - Created flow run 'violet-goose' for flow 'model-validator'
08:22:26.699 | INFO    | Flow run 'violet-goose' - Using task runner 'ConcurrentTaskRunner'
08:22:26.788 | INFO    | Flow run 'violet-goose' - Created task run 'printer-da44fb11-2' for task 'printer'


Received a <class '__main__.Model'> with value a=42 b=0.0 c='55'


08:22:27.509 | INFO    | Task run 'printer-da44fb11-2' - Finished in state Completed()
08:22:27.533 | INFO    | Flow run 'violet-goose' - Finished in state Completed('All states completed.')


Completed(message='All states completed.', type=COMPLETED, result=[Completed(message=None, type=COMPLETED, result=None, task_run_id=9da027af-2615-4625-a207-7b093f14ffc8)], flow_run_id=89db6443-2a73-4dda-a11b-e333dfcc9a06)

### Configuring task runners

In [19]:
from prefect.task_runners import DaskTaskRunner

@flow(name="My Example Flow", 
      task_runner=DaskTaskRunner())
def my_flow(*args, **kwargs):
    # run parallel tasks and subflows with Dask
    pass

## Basic task configuration

In [20]:
import time

@task(name="My Example Task", 
      description="An example task for a tutorial.")
def my_task():
    # do some work
    for i in range(5):
        time.sleep(5)


In [21]:
@task(name="My Example Task", 
      description="An example task for a tutorial.",
      tags=["tutorial", "tag-test"])
def my_task():
    # do some work
    for i in range(5):
        time.sleep(5)

## Task retries

In [22]:
from prefect import task, flow

@task(retries=2, retry_delay_seconds=0)
def failure():
    print('running')
    raise ValueError("bad code")

@flow
def test_retries():
    return failure()


In [23]:
test_retries()

08:27:49.118 | INFO    | prefect.engine - Created flow run 'hallowed-flounder' for flow 'test-retries'
08:27:49.118 | INFO    | Flow run 'hallowed-flounder' - Using task runner 'ConcurrentTaskRunner'
08:27:49.185 | INFO    | Flow run 'hallowed-flounder' - Created task run 'failure-acc38180-0' for task 'failure'
08:27:49.662 | ERROR   | Task run 'failure-acc38180-0' - Encountered exception during execution:
Traceback (most recent call last):
  File "/home/lenovo/miniconda3/envs/prefect-env/lib/python3.9/site-packages/prefect/engine.py", line 798, in orchestrate_task_run
    result = await run_sync_in_worker_thread(task.fn, *args, **kwargs)
  File "/home/lenovo/miniconda3/envs/prefect-env/lib/python3.9/site-packages/prefect/utilities/asyncio.py", line 54, in run_sync_in_worker_thread
    return await anyio.to_thread.run_sync(call, cancellable=True)
  File "/home/lenovo/miniconda3/envs/prefect-env/lib/python3.9/site-packages/anyio/to_thread.py", line 31, in run_sync
    return await get_a

running


08:27:49.966 | INFO    | Task run 'failure-acc38180-0' - Received non-final state 'AwaitingRetry' when proposing final state 'Failed' and will attempt to run again...
08:27:50.176 | ERROR   | Task run 'failure-acc38180-0' - Encountered exception during execution:
Traceback (most recent call last):
  File "/home/lenovo/miniconda3/envs/prefect-env/lib/python3.9/site-packages/prefect/engine.py", line 798, in orchestrate_task_run
    result = await run_sync_in_worker_thread(task.fn, *args, **kwargs)
  File "/home/lenovo/miniconda3/envs/prefect-env/lib/python3.9/site-packages/prefect/utilities/asyncio.py", line 54, in run_sync_in_worker_thread
    return await anyio.to_thread.run_sync(call, cancellable=True)
  File "/home/lenovo/miniconda3/envs/prefect-env/lib/python3.9/site-packages/anyio/to_thread.py", line 31, in run_sync
    return await get_asynclib().run_sync_in_worker_thread(
  File "/home/lenovo/miniconda3/envs/prefect-env/lib/python3.9/site-packages/anyio/_backends/_asyncio.py", li

running


08:27:50.614 | INFO    | Task run 'failure-acc38180-0' - Received non-final state 'AwaitingRetry' when proposing final state 'Failed' and will attempt to run again...
08:27:51.232 | ERROR   | Task run 'failure-acc38180-0' - Encountered exception during execution:
Traceback (most recent call last):
  File "/home/lenovo/miniconda3/envs/prefect-env/lib/python3.9/site-packages/prefect/engine.py", line 798, in orchestrate_task_run
    result = await run_sync_in_worker_thread(task.fn, *args, **kwargs)
  File "/home/lenovo/miniconda3/envs/prefect-env/lib/python3.9/site-packages/prefect/utilities/asyncio.py", line 54, in run_sync_in_worker_thread
    return await anyio.to_thread.run_sync(call, cancellable=True)
  File "/home/lenovo/miniconda3/envs/prefect-env/lib/python3.9/site-packages/anyio/to_thread.py", line 31, in run_sync
    return await get_asynclib().run_sync_in_worker_thread(
  File "/home/lenovo/miniconda3/envs/prefect-env/lib/python3.9/site-packages/anyio/_backends/_asyncio.py", li

running


08:27:51.506 | ERROR   | Task run 'failure-acc38180-0' - Finished in state Failed('Task run encountered an exception.')
08:27:51.547 | ERROR   | Flow run 'hallowed-flounder' - Finished in state Failed('1/1 states failed.')


Failed(message='1/1 states failed.', type=FAILED, result=Failed(message='Task run encountered an exception.', type=FAILED, result=ValueError('bad code'), task_run_id=23086027-61f7-4e8f-aa15-856daefe37a1), flow_run_id=aad45de9-de40-4cf5-bbc9-678be9f5879b)

## Task caching

### Task input hash

In [24]:
from prefect import flow, task
from prefect.tasks import task_input_hash
from datetime import timedelta

@task(cache_key_fn=task_input_hash, cache_expiration=timedelta(minutes=1))
def hello_task(name_input):
    print (f"Saying hello {name_input}")
    return "hello " + name_input

@flow
def hello_flow(name_input):
    hello_task(name_input)

In [25]:
for name in ["ABC", "ABC", "ABC", "DEF", "DEF", "GHI"]:
    print(hello_flow(name))

08:34:22.211 | INFO    | prefect.engine - Created flow run 'aquatic-bear' for flow 'hello-flow'
08:34:22.212 | INFO    | Flow run 'aquatic-bear' - Using task runner 'ConcurrentTaskRunner'
08:34:22.292 | INFO    | Flow run 'aquatic-bear' - Created task run 'hello_task-e97fb216-0' for task 'hello_task'


Saying hello ABC


08:34:23.126 | INFO    | Task run 'hello_task-e97fb216-0' - Finished in state Completed()
08:34:23.203 | INFO    | Flow run 'aquatic-bear' - Finished in state Completed('All states completed.')
08:34:23.334 | INFO    | prefect.engine - Created flow run 'archetypal-mackerel' for flow 'hello-flow'
08:34:23.335 | INFO    | Flow run 'archetypal-mackerel' - Using task runner 'ConcurrentTaskRunner'
08:34:23.420 | INFO    | Flow run 'archetypal-mackerel' - Created task run 'hello_task-e97fb216-1' for task 'hello_task'


Completed('All states completed.')


08:34:23.521 | INFO    | Task run 'hello_task-e97fb216-1' - Finished in state Cached(, type=COMPLETED)
08:34:23.579 | INFO    | Flow run 'archetypal-mackerel' - Finished in state Completed('All states completed.')
08:34:23.726 | INFO    | prefect.engine - Created flow run 'pygmy-gecko' for flow 'hello-flow'
08:34:23.727 | INFO    | Flow run 'pygmy-gecko' - Using task runner 'ConcurrentTaskRunner'
08:34:23.800 | INFO    | Flow run 'pygmy-gecko' - Created task run 'hello_task-e97fb216-2' for task 'hello_task'


Completed('All states completed.')


08:34:24.094 | INFO    | Task run 'hello_task-e97fb216-2' - Finished in state Cached(, type=COMPLETED)
08:34:24.131 | INFO    | Flow run 'pygmy-gecko' - Finished in state Completed('All states completed.')
08:34:24.245 | INFO    | prefect.engine - Created flow run 'dazzling-scorpion' for flow 'hello-flow'
08:34:24.246 | INFO    | Flow run 'dazzling-scorpion' - Using task runner 'ConcurrentTaskRunner'
08:34:24.332 | INFO    | Flow run 'dazzling-scorpion' - Created task run 'hello_task-e97fb216-3' for task 'hello_task'


Completed('All states completed.')
Saying hello DEF


08:34:25.067 | INFO    | Task run 'hello_task-e97fb216-3' - Finished in state Completed()
08:34:25.095 | INFO    | Flow run 'dazzling-scorpion' - Finished in state Completed('All states completed.')
08:34:25.225 | INFO    | prefect.engine - Created flow run 'lumpy-scallop' for flow 'hello-flow'
08:34:25.226 | INFO    | Flow run 'lumpy-scallop' - Using task runner 'ConcurrentTaskRunner'
08:34:25.314 | INFO    | Flow run 'lumpy-scallop' - Created task run 'hello_task-e97fb216-4' for task 'hello_task'


Completed('All states completed.')


08:34:25.858 | INFO    | Task run 'hello_task-e97fb216-4' - Finished in state Cached(, type=COMPLETED)
08:34:25.881 | INFO    | Flow run 'lumpy-scallop' - Finished in state Completed('All states completed.')
08:34:25.984 | INFO    | prefect.engine - Created flow run 'ivory-mosquito' for flow 'hello-flow'
08:34:25.985 | INFO    | Flow run 'ivory-mosquito' - Using task runner 'ConcurrentTaskRunner'
08:34:26.049 | INFO    | Flow run 'ivory-mosquito' - Created task run 'hello_task-e97fb216-5' for task 'hello_task'


Completed('All states completed.')
Saying hello GHI


08:34:26.743 | INFO    | Task run 'hello_task-e97fb216-5' - Finished in state Completed()
08:34:26.778 | INFO    | Flow run 'ivory-mosquito' - Finished in state Completed('All states completed.')


Completed('All states completed.')


### Cache key function

In [26]:
from prefect import flow, task
from datetime import timedelta
import time

def cache_key_from_sum(context, parameters):
    print(parameters)
    return sum(parameters["nums"])

@task(cache_key_fn=cache_key_from_sum, cache_expiration=timedelta(minutes=1))
def cached_task(nums):
    print('running an expensive operation')
    time.sleep(3)
    return sum(nums)

@flow
def test_caching(nums):
    cached_task(nums)

In [27]:
test_caching([2, 2])

08:41:31.995 | INFO    | prefect.engine - Created flow run 'flat-roadrunner' for flow 'test-caching'
08:41:31.996 | INFO    | Flow run 'flat-roadrunner' - Using task runner 'ConcurrentTaskRunner'
08:41:32.200 | INFO    | Flow run 'flat-roadrunner' - Created task run 'cached_task-64beb460-0' for task 'cached_task'


{'nums': [2, 2]}
running an expensive operation


08:41:36.043 | INFO    | Task run 'cached_task-64beb460-0' - Finished in state Completed()
08:41:36.065 | INFO    | Flow run 'flat-roadrunner' - Finished in state Completed('All states completed.')


Completed(message='All states completed.', type=COMPLETED, result=[Completed(message=None, type=COMPLETED, result=4, task_run_id=7e85e214-c744-451b-8cba-ea673f09cae0)], flow_run_id=1baf7839-ddb0-4d22-8205-f4c9e3e748ca)

In [28]:
test_caching([2, 2])

08:41:56.812 | INFO    | prefect.engine - Created flow run 'vengeful-python' for flow 'test-caching'
08:41:56.813 | INFO    | Flow run 'vengeful-python' - Using task runner 'ConcurrentTaskRunner'
08:41:56.890 | INFO    | Flow run 'vengeful-python' - Created task run 'cached_task-64beb460-1' for task 'cached_task'


{'nums': [2, 2]}


08:41:57.662 | INFO    | Task run 'cached_task-64beb460-1' - Finished in state Cached(, type=COMPLETED)
08:41:57.689 | INFO    | Flow run 'vengeful-python' - Finished in state Completed('All states completed.')


Completed(message='All states completed.', type=COMPLETED, result=[Cached(message=None, type=COMPLETED, result=4, task_run_id=21599438-4c58-4be4-834b-4a8b1e0515bb)], flow_run_id=c404fe73-204d-4fe6-9858-2941e1450ac5)

In [29]:
test_caching([1, 3])

08:42:07.626 | INFO    | prefect.engine - Created flow run 'attractive-oarfish' for flow 'test-caching'
08:42:07.626 | INFO    | Flow run 'attractive-oarfish' - Using task runner 'ConcurrentTaskRunner'
08:42:07.707 | INFO    | Flow run 'attractive-oarfish' - Created task run 'cached_task-64beb460-2' for task 'cached_task'


{'nums': [1, 3]}


08:42:08.040 | INFO    | Task run 'cached_task-64beb460-2' - Finished in state Cached(, type=COMPLETED)
08:42:08.055 | INFO    | Flow run 'attractive-oarfish' - Finished in state Completed('All states completed.')


Completed(message='All states completed.', type=COMPLETED, result=[Cached(message=None, type=COMPLETED, result=4, task_run_id=767075db-e66c-493e-ad18-dfe7851ba861)], flow_run_id=0728a8d5-60ac-4e60-bc28-d774646ad9b5)

In [30]:
test_caching([3, 4])

08:42:18.880 | INFO    | prefect.engine - Created flow run 'daffodil-goshawk' for flow 'test-caching'
08:42:18.880 | INFO    | Flow run 'daffodil-goshawk' - Using task runner 'ConcurrentTaskRunner'
08:42:18.948 | INFO    | Flow run 'daffodil-goshawk' - Created task run 'cached_task-64beb460-3' for task 'cached_task'


{'nums': [3, 4]}
running an expensive operation


08:42:22.720 | INFO    | Task run 'cached_task-64beb460-3' - Finished in state Completed()
08:42:22.748 | INFO    | Flow run 'daffodil-goshawk' - Finished in state Completed('All states completed.')


Completed(message='All states completed.', type=COMPLETED, result=[Completed(message=None, type=COMPLETED, result=7, task_run_id=f7561409-3d6c-4fff-a50f-92004de33ce4)], flow_run_id=36109ce3-9eb0-46d2-a4d6-bda467672bfc)