# Prefect and Basic Orchestration Features

![img](https://miro.medium.com/max/1400/1*RlKOgfcbj3QXVBiGYOSZeg.gif)

[Prefect](https://www.prefect.io/) is an open source workflow orchestration tool designed to eliminate negative engineering. 
Prefect provides dataflow automation that allows engineers to add retries, parameterization of workflows, caching, and concurrent execution using the pydata stack to schedule and monitor data pipelines at scale.


## Basic Orchestration

The fundamental building blocks of Prefect are `flows` and `tasks`. 

A [flow](https://orion-docs.prefect.io/concepts/flows/) is a container for workflow logic and allows users to interact with and reason about the state of their workflow. 

Flows are denoted by using the `@flow` decorator.  

For example, let's create a flow and run it (causing an `FlowRun` which is an instance of the flow):

In [None]:
from prefect import flow

@flow(name="My Awesome Flow")
def my_flow():
    return

A [task](https://orion-docs.prefect.io/concepts/tasks/) is a function that represents a discrete unit of work in a Prefect workflow. Tasks are functions; they can take inputs, perform work, and return an output. 

Tasks take advance of automatic Prefect logging to capture details about task runs such as runtime, tags, and final states. 

Tasks are denoted by using the `@task` decorator.

For example, let's add a task and run the flow (causing a `FlowRun` and a subsequent `TaskRun` which are instances of the flow and task):

In [None]:
from prefect import flow, task

@task
def my_task():
    print("Hello, I'm a task")

@flow("My Awesome Flow")
def my_flow():
    my_task()

## Retries

![img](https://miro.medium.com/max/1200/1*p-97ezRLuof6kO6hMALaSw.jpeg)


Retries provide relability to your workflow.

Prefect tasks can automatically retry on failure when you enable parameters `retries` and `retry_delay_seconds` on a task.

For example, we might want to add retries to the `find_nike_price()` function because the URL response might be unrealiable.

In [None]:

@task(retries=3, retry_delay_seconds=10)
def find_nike_price(url):
    k = requests.get(url).text
    soup = BeautifulSoup(k,'html.parser')
    price_string = soup.find('div', {"class":"product-price"}).text
    price_string = price_string.replace(' ','')
    price = int(re.search('[0-9]+',price_string).group(0))
    return price


## Caching

Caching is the ability of a task run to reflect a finished state without actually running the code that defines the task. You can then efficently reuse results of tasks that might be expensive to run with every flow or reuse cached results if the inputs to a task have not changed. 

Cache keys determine whether a task run should retrieve a cached state or not. A cache key is a string value that indicates if a run should be identical to another. Cache keys are attached to the state of task runs and before a task run starts, we look for states with a matching cache key.

To enable caching, specify a `cache_key_fn` (a function that returns the cache key) on your task. You can optionally provide an `cache expiration` timedelta to the task.

Prefect's `task_input_hash` is a task cache key that hashes all inputs to the task using a JSON or cloudpickle serializer. 

Here's an example of caching:

In [None]:
from prefect.tasks import task_input_hash
from datetime import timedelta

@task(cache_key_fn=task_input_hash, cache_expiration=timedelta(days=1))
def compare_price(price, budget):
    if price <= budget:
        return "Buy the shoes! Good deal!"
    else:
        return "Don't buy the shoes. They're too expensive"

## Parameters - Typing Validation

Flows can take parameters.

Type hints can provide an easy way to enforce typing via [pydantic.](https://pydantic-docs.helpmanual.io/)

Parameters are validated before a flow is run. 

Let's add the url and budget as parameters to our nike_flow()

In [None]:
from prefect import flow

@flow(name="Shoe Price Notification")
def nike_flow(url: str, budget: int):
    price = find_nike_price(url)
    message = compare_price(price, budget)

Invalid parameters will cause the flow run to fail. Let's test it out.

In [None]:
nike_flow(50, 'Shoe')

## Concurrent Execution

Task runners are responsible for running Prefect tasks. Each flow has a task runner associated with it. By default, the task runner is the `ConcurrentTaskRunner`.

Let's change our nike_flow to take a list of URL's and map over them with a for loop to watch the tasks run concurrently.

In [8]:
from prefect import flow, task
from prefect.tasks import task_input_hash
from typing import List
from datetime import timedelta
import requests
import re
from bs4 import BeautifulSoup

@task(retries=3, retry_delay_seconds=10)
def find_nike_price(url):
    k = requests.get(url).text
    soup = BeautifulSoup(k,'html.parser')
    price_string = soup.find('div', {"class":"product-price"}).text
    price_string = price_string.replace(' ','')
    price = int(re.search('[0-9]+',price_string).group(0))
    return price

@task(cache_key_fn=task_input_hash, cache_expiration=timedelta(days=1))
def compare_price(price, budget):
    if price <= budget:
        return "Buy the shoes! Good deal!"
    else:
        return "Don't buy the shoes. They're too expensive"

@flow(name="Shoe Price Notification")
def nike_flow(url: List[str], budget: int):
    for url in urls:
        price = find_nike_price(url)
        message = compare_price(price, budget)


urls = [
    "https://www.nike.com/t/air-max-270-womens-shoes-Pgb94t/AH6789-601",
    "https://www.nike.com/t/air-max-terrascape-90-mens-shoes-R6r8hB/DH2973-100",
    "https://www.nike.com/t/pegasus-trail-3-gore-tex-mens-running-shoes-HG005k/DR0137-200"
]

budget = 120
slack_token = 'https://hooks.slack.com/services/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'
    
nike_flow(urls, budget)


00:01:17.313 | INFO    | prefect.engine - Created flow run 'peculiar-gorilla' for flow 'Shoe Price Notification'
00:01:17.315 | INFO    | Flow run 'peculiar-gorilla' - Using task runner 'ConcurrentTaskRunner'
00:01:17.716 | INFO    | Flow run 'peculiar-gorilla' - Created task run 'find_nike_price-3ad0ad07-0' for task 'find_nike_price'
00:01:17.988 | INFO    | Flow run 'peculiar-gorilla' - Created task run 'compare_price-69979684-0' for task 'compare_price'
00:01:18.258 | INFO    | Flow run 'peculiar-gorilla' - Created task run 'find_nike_price-3ad0ad07-1' for task 'find_nike_price'
00:01:18.482 | INFO    | Flow run 'peculiar-gorilla' - Created task run 'compare_price-69979684-1' for task 'compare_price'
00:01:18.641 | INFO    | Flow run 'peculiar-gorilla' - Created task run 'find_nike_price-3ad0ad07-2' for task 'find_nike_price'
00:01:18.876 | INFO    | Flow run 'peculiar-gorilla' - Created task run 'compare_price-69979684-2' for task 'compare_price'
00:01:19.129 | INFO    | Task run '

Completed(message='All states completed.', type=COMPLETED, result=[Completed(message=None, type=COMPLETED, result=160, task_run_id=17dc2f4b-b81a-4e60-8bda-992e712fcfa4), Completed(message=None, type=COMPLETED, result="Don't buy the shoes. They're too expensive", task_run_id=32518f9d-2c70-4a60-8239-5b702147372e), Completed(message=None, type=COMPLETED, result=140, task_run_id=828a2658-25df-47e3-9a30-c37ad8b352f7), Completed(message=None, type=COMPLETED, result="Don't buy the shoes. They're too expensive", task_run_id=9a600792-bba4-4589-93b2-e483f9138814), Completed(message=None, type=COMPLETED, result=160, task_run_id=f71a80d3-36fe-46e7-801b-0a9057e4e853), Completed(message=None, type=COMPLETED, result="Don't buy the shoes. They're too expensive", task_run_id=bed2a380-9ffc-4fa1-909c-eb1cabf59424)], flow_run_id=adede156-d547-4a7d-a85f-e97cbb5bed49)

## Collections 

Prefect has prebuilt tasks and flows that you can install and use call [collections](https://orion-docs.prefect.io/collections/overview/). 

Let's add a [slack notification](https://prefecthq.github.io/prefect-slack/) to our nike_flow instead of printing out each time to buy or not to buy the shoes.

In [9]:
!pip install prefect-slack

Collecting prefect-slack
  Using cached prefect_slack-0.1.0-py3-none-any.whl (9.6 kB)
Collecting slack-sdk>=3.15.1
  Using cached slack_sdk-3.15.2-py2.py3-none-any.whl (261 kB)




Installing collected packages: slack-sdk, prefect-slack
Successfully installed prefect-slack-0.1.0 slack-sdk-3.15.2


In [None]:
from prefect_slack import SlackWebhook
from prefect_slack.messages import send_chat_message

@flow(name="Shoe Price Notification")
def nike_flow(url: str, budget: int, slack_token: str):
    price = find_nike_price(url)
    message = compare_price(price, budget)
    send_chat_message(
        slack_credentials=SlackWebhook(slack_token),
        text=f"{message}"
   )

## Execise: Putting Together a Simple Data Pipeline 
(Full code example in `flow_code/nike_flow.py`)

## Deployments 
The mechanism to bring workflows to the execution evironment

So we have our python script into a Prefect flow now. Next, we need to add a schedule and have an execution environment that can run the flow. 

A [deployment](https://orion-docs.prefect.io/concepts/deployments/) is a concept that encapsulates a flow, allowing it to be scheduled and triggered via API. It stores metadata about where your flow's code is stored and how your flow should be run; the schedule it should run on, the parameters it will take.

Here is an example of a deployment: 

In [None]:
from prefect.deployments import DeploymentSpec

DeploymentSpec(
    flow=nike_flow,
    name="Nike Shoe Flow",
    tags=["demo"]
)

Next let's add a [schedule](https://orion-docs.prefect.io/concepts/schedules/) to our deployment. 

You can add an IntervalSchedule, RRuleSchedule, or a CronSchedule.

In [None]:
from prefect.deployments import DeploymentSpec
from prefect.orion.schemas.schedules import IntervalSchedule
from datetime import timedelta

DeploymentSpec(
    flow=nike_flow,
    name="Nike Shoe Flow",
    schedule=IntervalSchedule(interval=timedelta(days=1)),
    tags=["demo"]
)

We need to execute the deployment using Prefect CLI

In [None]:
!prefect deployment create ./flow_code/nike_flow.py

Now that the deployment is created, we can start our [Orion UI](https://127.0.0.1:4200) to see the deployment spec.

In [None]:
!prefect orion start

Starting...

 ___ ___ ___ ___ ___ ___ _____    ___  ___ ___ ___  _  _
| _ \ _ \ __| __| __/ __|_   _|  / _ \| _ \_ _/ _ \| \| |
|  _/   / _|| _|| _| (__  | |   | (_) |   /| | (_) | .` |
|_| |_|_\___|_| |___\___| |_|    \___/|_|_\___\___/|_|\_|

Configure Prefect to communicate with the server with:

    prefect config set PREFECT_API_URL=http://127.0.0.1:4200/api

Check out the dashboard at http://127.0.0.1:4200





Lastly, we need [a work queue and an agent](https://orion-docs.prefect.io/concepts/work-queues/) to pick up the deployment we have created to run our Flow.

Work queues and agents bridge the gap between the server's orchestration enviornment and the user's execution environment. A work queue defines the work to be done and an agent polls the specific work queue for scheduled work. 

In [None]:
!prefect work-queue create demo

Next, start an agent that corresponds to that specific work queue.

In [None]:
!prefect agent start WORK_QUEUE_ID

If we go into the Orion UI, we can navigate to our deployements tab and click quick run. This will actually trigger an instance of the nike_flow to run. Keeping the agent running also maintains the schedule we have set up for the flow.

## Q&A