# Setting up Prefect

Prefect is an orchestration tool, this is a flow of everything that runs in a database load... The pulling of data, validating, loading, transforming, etc, etc. See it as an altenative to SSIS & SQL Agent Jobs, that looks and works a lot nicer!

It also means that we don't have to be tied down to a single toolset to accoplish things, if something would be better done in python which then kicks off a bit of SQL, the orchestration sets that us for us.

Prefect is a python module that uses _decorators_ to orchestrate the data flow

from prefect  import flow

@flow
def my_first_flow():
    print("This function doesn't do too much")
    return 42

We set up a basic function that is decorated with `@flow` 

In [2]:
state = my_first_flow()

11:23:57.820 | INFO    | prefect.engine - Created flow run 'hypnotic-boa' for flow 'my-first-flow'
11:23:57.823 | INFO    | Flow run 'hypnotic-boa' - Using task runner 'ConcurrentTaskRunner'
11:23:57.928 | INFO    | Flow run 'hypnotic-boa' - Finished in state Completed()


This function doesn't do too much


In [3]:
print(state)
print(state.result())

Completed()
42


## So what happened here?

1. We created a basic function that prints something and returns something
2. Decorated it with `@flow`
3. Assigned the output of the function to variable `state` which we can then query. The output is a Prefect `State` object, to see what is returned we need to access it via `.result()`

In [6]:
print(type(state))

<class 'prefect.orion.schemas.states.State'>


## Flows and tasks

Flows and tasks are the basic blocks of Prefect, they are containers for the workflow logic. Flows can run other flows or tasks; tasks are optional, but provide extra encapsulation in observable units that can be reused across flows and subflows.

In [7]:
import os
import requests
from prefect import flow, task

@task
def call_api(url):
    response = requests.get(url)
    print(response.status_code)
    return response.json()

@task
def parse_fact(response):
    print(response["fact"])
    return

@flow(name="Example API call flow",
     description="An example flow for this tutorial",
     version=os.getenv("GIT_COMMIT_SHA"))
def api_flow(url):
    fact_json = call_api(url)
    parse_fact(fact_json)
    return

state=api_flow("https://catfact.ninja/fact")

12:10:48.352 | INFO    | prefect.engine - Created flow run 'adorable-millipede' for flow 'Example API call flow'
12:10:48.354 | INFO    | Flow run 'adorable-millipede' - Using task runner 'ConcurrentTaskRunner'
12:10:48.468 | INFO    | Flow run 'adorable-millipede' - Created task run 'call_api-190c7484-0' for task 'call_api'
12:10:48.529 | INFO    | Flow run 'adorable-millipede' - Created task run 'parse_fact-b0346046-0' for task 'parse_fact'
12:10:50.167 | INFO    | Task run 'call_api-190c7484-0' - Finished in state Completed()
12:10:50.268 | INFO    | Task run 'parse_fact-b0346046-0' - Finished in state Completed()


200
Cats' hearing is much more sensitive than humans and dogs.


12:10:50.325 | INFO    | Flow run 'adorable-millipede' - Finished in state Completed('All states completed.')


## Applying this to NHS Numbers

So now we have the basics, let's apply it to our OptOut csv.

### The way we'll do it

1. Simulate csv delivery - just copy a file from one area to the source folder
2. Validate data with great expectations
3. Run checksum against data
4. Load csv to dataframe, save as a timestamped csv + load into a database

In [None]:
import os
import IO
import shutil
import pandas as pd
from prefect import flow, task

@task(name='Copy file to dropzone', description='A simulated drop of the OptOuts csv to the dropzone')
def simulate_datadrop(sourcePath: str, destinationPath: str, file: str): -> None:
    sourceFile = sourcePath + '\\' + file
    destFile = destinationPath + '\\' + file
    shutil.copyfile(sourceFile, destFile)
    
    
    
    
@flow(name='OptOuts Prefect Flow')
def optout_flow():
    state = simulate_datadrop('D:\git\dePOC\data\raw', 'D:\git\dePOC\data\src', 'OptOuts.csv')