In [2]:
import pandas as pd
import yfinance as yf
import requests
import duckdb
import datetime
from prefect import flow, task

## Preliminaries
### Prefect Tasks

A task is a function that represents a discrete unit of work in a Prefect workflow. Tasks are not required — you may define Prefect workflows that consist only of flows, using regular Python statements and functions. Tasks enable you to encapsulate elements of your workflow logic in observable units that can be reused across flows and subflows.

https://docs.prefect.io/latest/concepts/tasks/

## 1. Data Extraction

We will be extracting a list of ticker data supported by `polygon API`, and using the list of tickers to extract historical stock data from `yfinance`.

`yfinance` is an open-source Python package that scrapes data from `Yahoo finance`. The data can then be accessed in a threaded and Pythonic way.

Useful links:
*   https://pypi.org/project/yfinance/
*   https://polygon.io/docs/stocks/get_v3_reference_tickers

In [8]:
@task(name="extract data", log_prints=True, retries=1)
def extract_all_tickers():
  BASE_URL = "https://api.polygon.io/v3/reference/tickers?"
  params = {"apiKey" : "cmDK3EffgqLXrbZ0ZivQ9I7ZAwjHImiX"}

  print(f"Beginning data extraction from {BASE_URL}")
  try:
    res = requests.get(BASE_URL, params=params)
    data = res.json()["results"]
    df = pd.DataFrame(data)
  except Exception as e:
    print(f"Error {e} while ingesting data from {BASE_URL}")
    df = pd.DataFrame()   # return empty dataframe if exception is raised

  return df

# test_data = extract_all_tickers()
# test_data.head(10)


 `@task(name='my_unique_name', ...)`


In [9]:
@task(name="transform data",log_prints=True, retries=1)
def transform_data(test_data_df : pd.DataFrame) -> pd.DataFrame:
  """
  Perform transformations and cleaning on a datframe

  Parameters:
    test_data_df : dataframe containing the raw_extracted data

  Returns:
    A transformed dataframe

  """
  print("Beginning transformations ..........")

  try:
    # rename some columns
    test_data_df.rename(
        columns={'name' : 'company_name', 'locale' : 'country'}, inplace=True
        )

    # convert the currency columns and country column to uppercase
    test_data_df["currency_name"] = test_data_df["currency_name"].str.upper()
    test_data_df["country"] = test_data_df["country"].str.upper()
    print("Currency and country successfully converted to lowercase...")

    # replace the nan values
    test_data_df["primary_exchange"] = test_data_df["primary_exchange"].replace("nan", "Not Listed")
    test_data_df["composite_figi"] = test_data_df["composite_figi"].fillna("Not Listed")
    test_data_df["share_class_figi"] = test_data_df["share_class_figi"].fillna("Not Listed")
    test_data_df["cik"] = test_data_df["cik"].replace("nan", "Not Listed")
    print("Null values successfully replaced...")

    # change the data types
    test_data_df["ticker"] = test_data_df["ticker"].astype(str)
    test_data_df["company_name"] = test_data_df["company_name"].astype(str)
    test_data_df["market"] = test_data_df["market"].astype(str)
    test_data_df["country"] = test_data_df["country"].astype(str)
    test_data_df["primary_exchange"] = test_data_df["primary_exchange"].astype(str)
    test_data_df["cik"] = test_data_df["cik"].astype(str)
    test_data_df["composite_figi"] = test_data_df["share_class_figi"].astype(str)
    test_data_df["last_updated_utc"] = pd.to_datetime(test_data_df["last_updated_utc"], format="%Y-%m-%d")

    # create a primary key column
    test_data_df["company_key"] = range(1, len(test_data_df) + 1)

    cols = ["company_key","ticker","company_name","market","country","primary_exchange",
            "cik","composite_figi","last_updated_utc"]

    # Mantain the relative order of the columns
    test_data_df = test_data_df[cols]

    print("Transformation complete...")

  except Exception as e:
    print(f"Exception {e} while transforming data")
    test_data_df = pd.DataFrame()

  return test_data_df


#transformed_df = transform_data(test_data)




 `@task(name='my_unique_name', ...)`


In [10]:
@task(name="load data", log_prints=True, retries=3)
def load_duckdb(dataframe, db_name, table_name):
  """Load the cleaned dataframe into a duckdb database

  Parameters:
    dataframe : input dataframe to be loaded to database
    db_name : database to load data in
    table_name : table in database where data is loaded
  """
  try:
    # connect to duckdb
    con = duckdb.connect(db_name)

    # write the dataframe to duckdb
    dataframe.to_sql(name=table_name, con=con, if_exists="replace", index="True")
    print(f"Data succesfully written to {db_name}")


    # read the data from sql
    loaded_df = pd.read_sql(f"DESCRIBE {table_name}", con=con)
    print(loaded_df)

  except Exception as e:
    print(f"Exception {e} while loading data into {db_name}")


 `@task(name='my_unique_name', ...)`


In [11]:
@flow(name="Orchestrate pipeline", retries=3)
def run_pipeline():
  raw_df = extract_all_tickers()
  transformed_df = transform_data(raw_df)
  load_duckdb(transformed_df, "finance-DWH.db", "dim_companies")

run_pipeline()


 `@flow(name='my_unique_name', ...)`


ImportError: DLL load failed while importing _rust: The specified procedure could not be found.

### Extraction yfinance
After successfully extracting data from the polygon finance API, we are interested in the column of tickers from the dataframe. This ticker list will then be used to extract historical data information for the maximum period of companies.

NOTE : Polygon finance limits the free tier to only 2 years of historical information.

In [4]:


@task(name="extract-historical-data-yfinance", log_prints=True, retries=3)
def extract_yfinance(polygon_df):

  # Extract the ticker column from your polygon finance data
  tickers_arr = polygon_df.ticker.to_list()

  # set ticker symbols for yahoo finance
  tickers = yf.Tickers(tickers_arr)

  # extract historical information of the tickers for the maximum period at intervals of 1d
  tickers_hist = tickers.history(period="max", interval="1d")

  # Load the data in a dataframe
  hist_df = pd.DataFrame(tickers_hist)

  return hist_df


## 2. Data Transformation

In [5]:
@task(name="transform-yfinance", log_prints=True, retries=3)
def transform(yfinance_df, polygon_df=None):

  # transpose the dataframe
  transformed_df = yfinance_df.stack(level=1).rename_axis(['Date', 'ticker']).reset_index(level=1)

  # drop the Adj Close and Capital Gains
  transformed_df =  transformed_df.loc[:, ['ticker', 'Open','Close','High','Low','Stock Splits','Volume']]

  # add a date column in transformed_df
  transformed_df["date"] = pd.to_datetime(transformed_df.index)

  # merge the polygon finance dataframe and the yfinance
  merged_df = pd.merge(left=transformed_df, right=polygon_df, on="ticker")

  # drop the unnecessary columns in merged dataframes
  #merged_clean_df = merged_df.drop(columns=['locale', 'primary_exchange', 'type', 'active',
       #'currency_name', 'cik', 'composite_figi', 'share_class_figi',
       #'last_updated_utc'], inplace=True)

  merged_clean_df =  merged_df.loc[:, ['date','ticker','name','market', 'Open','Close','High','Low','Volume','Stock Splits']]

  # set the date as the index
  merged_clean_df.set_index('date', inplace=True)

  return merged_clean_df



In [6]:
@task(name="load facts", log_prints=True, retries=3)
def load_duckdb(dataframe, db_name, table_name):
  """Load the cleaned dataframe into a duckdb database

  Parameters:
    dataframe : input dataframe to be loaded to database
    db_name : database to load data in
    table_name : table in database where data is loaded
  """
  # connect to duckdb
  con = duckdb.connect(db_name)

  # write the dataframe to duckdb
  dataframe.to_sql(name=table_name, con=con, if_exists="replace", index="True")
  print(f"Data succesfully written to {db_name}")


  # read the data from sql
  loaded_df = pd.read_sql(f"SELECT * FROM {table_name}", con=con)
  print(loaded_df.info())


## Running the data pipeline

In [7]:
@flow(name="run-facts-pipeline", log_prints=True, retries=3)
def run_pipeline():

  polygon_data = extract_all_tickers()
  yfinance_data = extract_yfinance(polygon_data)

  transformed_data = transform(yfinance_data, polygon_data)

  load_duckdb(transformed_data, "historical-DWH.db", "fact_table")



In [9]:
if __name__ == "__main__":
  run_pipeline()
  run_pipeline.serve(
      name = "historical-facts-pipeline",
      cron = "* * * * *",
  )

ImportError: DLL load failed while importing _rust: The specified procedure could not be found.