In [9]:
from prefect import task, flows
import requests
import pandas as pd 


In [2]:
# help(prefect)

Help on package prefect:

NAME
    prefect - # isort: skip_file

PACKAGE CONTENTS
    _internal (package)
    _vendor (package)
    _version
    agent
    artifacts
    blocks (package)
    cli (package)
    client (package)
    concurrency (package)
    context
    deployments (package)
    deprecated (package)
    engine
    events (package)
    exceptions
    filesystems
    flows
    futures
    infrastructure (package)
    logging (package)
    manifests
    packaging (package)
    plugins
    results
    runner (package)
    runtime (package)
    serializers
    server (package)
    settings
    software (package)
    states
    task_runners
    tasks
    testing (package)
    utilities (package)
    variables
    workers (package)

CLASSES
    builtins.object
        prefect.runner.runner.Runner
    prefect._internal.schemas.bases.ObjectBaseModel(prefect._internal.schemas.bases.IDBaseModel)
        prefect.client.schemas.objects.State(prefect._internal.schemas.bases.ObjectBaseMo

### Prefect 

Prefect is an orchestration an observability platform that enables developers to build and scale resilient code quickly, turning Python scripts to resilient, reccurring workkflows.

What does that mean. Ideally we want our pipelines to be robust and gracefully recover from errors. For instance, while attempting to ingest data from a data source, the data source may not be available, in that case, we want to make our pipeline *resilient* by retrying the ingestion phase.  

We will use an example from our previous pipelines to demonstrate to use Prefect for workflow orchestration.


In [None]:

def extract_all_tickers():
  BASE_URL = "https://api.polygon.io/v3/reference/tickers?"
  params = {"apiKey" : "cmDK3EffgqLXrbZ0ZivQ9I7ZAwjHImiX"}

  print(f"Beginning data extraction from {BASE_URL}")
  try:
    res = requests.get(BASE_URL, params=params)
    data = res.json()["results"]
    df = pd.DataFrame(data)
  except Exception as e:
    print(f"Error {e} while ingesting data from {BASE_URL}")
    df = pd.DataFrame()   # return empty dataframe if exception is raised

  return df

We have our function to consume all the companies information supported by Polygon API. This is a good candidate for the dimension table. Before loading the data, we must transform the data, then use Prefect to orchestrate the entire process.

In [4]:

def extract_all_tickers():
  BASE_URL = "https://api.polygon.io/v3/reference/tickers?"
  params = {"apiKey" : "cmDK3EffgqLXrbZ0ZivQ9I7ZAwjHImiX"}

  print(f"Beginning data extraction from {BASE_URL}")
  try:
    res = requests.get(BASE_URL, params=params)
    data = res.json()["results"]
    df = pd.DataFrame(data)
  except Exception as e:
    print(f"Error {e} while ingesting data from {BASE_URL}")
    df = pd.DataFrame()   # return empty dataframe if exception is raised

  return df

dim_companies = extract_all_tickers()
dim_companies.sample(10)  

Beginning data extraction from https://api.polygon.io/v3/reference/tickers?


Unnamed: 0,ticker,name,market,locale,primary_exchange,type,active,currency_name,cik,composite_figi,share_class_figi,last_updated_utc
61,AAT,"AMERICAN ASSETS TRUST, INC.",stocks,us,XNYS,CS,True,usd,1500217.0,BBG00161BCR0,BBG001TCBJS5,2024-02-28T00:00:00Z
92,ABCFF,ABACUS MINING & EXPL ORD,otc,us,,OS,True,USD,,BBG000JR8L77,BBG001S5XSV3,2023-05-10T05:00:38.805Z
49,AAPI,APPLE ISPORTS GROUP INC,otc,us,,CS,True,USD,,BBG000CQN9X7,BBG001SGHPN2,2024-02-21T09:45:13.158Z
84,ABBNY,ABB LTD SPONS ADR,otc,us,,ADRC,True,USD,,BBG000DK5Q25,BBG001SDDMX9,2023-06-30T05:00:38.266Z
28,AAGRW,African Agriculture Holdings Inc. Warrant,stocks,us,XNAS,WARRANT,True,usd,1848898.0,BBG00ZKXGR82,,2024-02-28T00:00:00Z
23,AAGC,ALL AMERICAN GOLD CORP,otc,us,,CS,True,USD,,BBG000R4MM71,BBG001T948W9,2024-02-23T09:45:11.986Z
12,AACIU,Armada Acquisition Corp. I Unit,stocks,us,XNAS,UNIT,True,usd,1844817.0,BBG011PFP1D1,BBG011PFP285,2024-02-28T00:00:00Z
43,AANNF,AROUNDTOWN SA ORD,otc,us,,OS,True,USD,,,,2021-09-28T08:45:23.537Z
34,AAKAY,AAK AB UNSP/ADR,otc,us,,ADRC,True,USD,,BBG01L562BD2,BBG01L562C77,2024-01-31T18:59:08.681Z
2,AAA,AXS First Priority CLO Bond ETF,stocks,us,ARCX,ETF,True,usd,1776878.0,BBG01B0JRCS6,BBG01B0JRCT5,2024-02-28T00:00:00Z


### Prefect Concepts - Tasks and Flows

**Tasks** are functions, they can take inputs, perform work, and return an output. In context of a data pipeline, the *extract*, *transform* and *load* functions would be Prefect tasks in a Prefect workflow. 

Think of a task as enhanced function. In our vanilla functions, we had to manually implement logging. A Prefect task has automatic logging to capture details such as runtime, tags, and final state.

**Flows**


In [10]:
# Making our extract function into a task

@task(name="Polygon finance ingestion", log_prints=True, retries=3)
def extract_all_tickers():
  BASE_URL = "https://api.polygon.io/v3/reference/tickers?"
  params = {"apiKey" : "cmDK3EffgqLXrbZ0ZivQ9I7ZAwjHImiX"}

  print(f"Beginning data extraction from {BASE_URL}")
  try:
    res = requests.get(BASE_URL, params=params)
    data = res.json()["results"]
    df = pd.DataFrame(data)
  except Exception as e:
    print(f"Error {e} while ingesting data from {BASE_URL}")
    df = pd.DataFrame()   # return empty dataframe if exception is raised

  return df