# Flow and task configuration

## Basic flow config
Basic flow configuration includes the ability to provide a name, description, and version and other options for the flow via flow arguments.

A flow description enables you to provide documentation right alongside your flow object.

A flow version enables you to associate a given run of your workflow with the version of code or configuration that was used.

In [1]:
from prefect import flow, task


In [None]:

@flow(name="My Example Flow", 
      description="An example flow for a tutorial.",
      version="tutorial_02")
def my_flow():
    # run tasks and subflows


## Basic task configuration

you can independently assign tasks their own name and description.

In [2]:
@task(name="My example task",
description= "An example task for a tutorial")
def print_hello():
    print("Hello Thanh Tu")

@flow(name="My Example Flow", 
      description="An example flow for a tutorial.",
      version="tutorial_02")
def make_flow():
    print_hello()

make_flow()

21:55:04.283 | INFO    | prefect.engine - Created flow run 'attractive-stoat' for flow 'My Example Flow'
21:55:04.399 | INFO    | Flow run 'attractive-stoat' - Created task run 'My example task-1fa5169b-0' for task 'My example task'
21:55:04.400 | INFO    | Flow run 'attractive-stoat' - Executing 'My example task-1fa5169b-0' immediately...
21:55:04.435 | INFO    | Task run 'My example task-1fa5169b-0' - Finished in state Completed()
21:55:04.449 | INFO    | Flow run 'attractive-stoat' - Finished in state Completed('All states completed.')


Hello Thanh Tu


[Completed(message=None, type=COMPLETED, result=LiteralResult(type='literal', value=None))]

## Flow and task retries
This enables flows and tasks to automatically retry on failure

In [None]:
@task(retries=2, retry_delay_seconds = 5)
def failure():
    print('running')
    raise ValueError("bad code")

@flow 
def test_retries():
    return failure()



In [None]:
state = test_retries()

## Task caching
Caching refers to the ability of a task run to reflect a finished state without actually running the code that defines the task.

This allows you to efficiently reuse results of tasks that may be particularly "expensive" to run with every flow run. 

Moreover, Prefect makes it easy to share these states across flows and flow runs using the concept of a "cache key function".

You can define a task that is cached based on its inputs by using the Prefect task_input_hash. This is a task cache key implementation that hashes all inputs to the task using a JSON or cloudpickle serializer. If the task inputs do not change, the cached results are used rather than running the task until the cache expires

until the cache_expiration time ends, as long as the input to hello_task() remains the same when it is called, the cached return value is returned. In this situation the task is not rerun. However, if the input argument value changes, hello_task() runs using the new input

One way to use cache_key_fn is to cache based on inputs by specifying task_input_hash. If the input parameters to the task are the same, Prefect returns the cached results rather than running the task again.

### Task input hash

In [None]:
from datetime import timedelta
from prefect import flow, task
from prefect.tasks import task_input_hash

@task(cache_key_fn=task_input_hash, cache_expiration=timedelta(days=1))
def hello_task(name_input):
    # Doing some work
    print("Saying hello")
    return "hello " + name_input

@flow
def hello_flow(name_input):
    hello_task(name_input)


### Cache key function

In [None]:
from prefect import flow, task
from datetime import timedelta
import time

def cache_key_from_sum(context, parameters):
    print(parameters)
    return sum(parameters["nums"])

@task(cache_key_fn=cache_key_from_sum, cache_expiration=timedelta(minutes=1))
def cached_task(nums):
    print('running an expensive operation')  
    time.sleep(3)
    return sum(nums)

@flow
def test_caching(nums):
    cached_task(nums)


## Task results

## Map