In [None]:
%load_ext dotenv
%dotenv 

# What are we doing?

## Objectives 


* Build a data pipeline that downloads price data from the internet, stores it locally, transforms it into return data, and stores the feature set.
    - Getting the data.
    - Schemas and index in dask.

* Explore the parquet format.
    - Reading and writing parquet files.
    - Read datasets that are stored in distributed files.
    - Discuss dask vs pandas as a small example of big vs small data.
    
* Discuss the use of environment variables for settings.
* Discuss how to use Jupyter notebooks and source code concurrently. 
* Logging and using a standard logger.

## About the Data

+ We will download the prices for a list of stocks.
+ The source is Yahoo Finance and we will use the API provided by the library yfinance.


## Medallion Architecture

+ The architecture that we are thinking about is called Medallion by [DataBricks](https://www.databricks.com/glossary/medallion-architecture). It is an ELT type of thinking, although our data is well-structured.

![Medallion Architecture (DataBicks)](./images/02_medallion_architecture.png)

+ In our case, we would like to optimize the number of times that we download data from the internet. 
+ Ultimately, we will build a pipeline manager class that will help us control the process of obtaining and transforming our data.

![](./images/02_target_pipeline_manager.png)

# Download Data from Yahoo Finance

Yahoo Finance provides information about public stocks in different markets. The library yfinance gives us access to a fair bit of the data in Yahoo Finance. 

These steps are based on the instructions in:

+ [yfinance documentation](https://pypi.org/project/yfinance/)
+ [Tutorial in geeksforgeeks.org](https://www.geeksforgeeks.org/get-financial-data-from-yahoo-finance-with-python/)


+ If required, install: `python -m pip install yfinance`.
+ To download the price history of a stock, first use the following setup:


In [None]:
import pandas as pd
import yfinance as yf
import os
import sys

sys.path.append(os.getenv('SRC_DIR'))

A few things to notice in the code chunk above:

+ Libraries are ordered from high-level to low-level libraries from the package manager (pip in this case, but could be conda, poetry, etc.)
+ The command `sys.path.append("../05_src/)` will add the `../05_src/` directory to the path in the Notebook's kernel. This way, we can use our modules as part of the notebook.
+ Local modules are imported at the end. 
+ The function `get_logger()` is called with `__name__` as recommended by the documentation.

Now, to download the historical price data for a stock, we could use:

In [None]:
px = yf.download('AAPL', start = "2013-12-01", end = "2024-02-01")
px

## Parametrize the download

+ Generally, we will look to separate every parameter and setting from functions.
+ If we had a few stocks, we could cycle through them. We need a place to store the list of tickers (a db or file, for example).
+ Store a csv file with a few stock tickers. The location of the file is a setting, the contents of this file are parameters.
+ Use **environment variables** to pass parameters.

Start by getting a sample of Information Technology stock tickers by applying subindexing and converting the "ticker" column from a pandas object to a list.

In [None]:
# Load all tickers
ticker_file = os.getenv("TICKERS")
tickers = pd.read_csv(ticker_file)


We can subset our ticker data set using standard indexing techniques. A good reference for this type of data manipulation is Panda's [Documentation](https://pandas.pydata.org/docs/user_guide/indexing.html#indexing-and-selecting-data) and [Cookbook](https://pandas.pydata.org/docs/user_guide/cookbook.html#cookbook-selection).

In [None]:
idx_tech = tickers['GICS Sector'] == 'Information Technology'
tech_sector = tickers[idx_tech]

From the subset data frame, select one column and convert to list.

In [None]:
tech_tickers = tech_sector['ticker'].to_list()

In [None]:
tech_raw_dt = yf.download(tech_tickers, start = "2000-01-01", end = "2025-01-26")


The data that we downloaded combines several stocks and prices into a single row. We want to parse this arrangement into a dataframe that contains observations about a single stock on a given day per row. To do this, we can use the function [`stack()`](https://pandas.pydata.org/docs/reference/api/pandas.DataFrame.stack.html) and re-arrange the indices.

In [None]:
# First, check what tech_raw_dt.stack() looks like.
tech_raw_dt.stack(future_stack=True).reset_index()

In [None]:
tech_dt = (tech_raw_dt
           .stack(future_stack=True)
           .reset_index()
           .sort_values(['Ticker', 'Date']))
tech_dt.columns.name = None


# Storing Data in CSV



+ We have some data. How do we store it?
+ We can compare two options, CSV and Parqruet, by measuring their performance:

    - Time to save.
    - Space required.

In [None]:
def get_dir_size(path='.'):
    '''Returns the total size of files contained in path.'''
    total = 0
    with os.scandir(path) as it:
        for entry in it:
            if entry.is_file():
                total += entry.stat().st_size
            elif entry.is_dir():
                total += get_dir_size(entry.path)
    return total

In [None]:
import time

In [None]:
temp = os.getenv("TEMP_DATA")
os.makedirs(temp, exist_ok=True)
stock_path = os.path.join(temp, "stock_px.csv")

In [None]:
start = time.time()
tech_dt.to_csv(stock_path, index = False)
end = time.time()

print(f'Writing to dt ({tech_dt.shape})csv took {end - start} seconds.')
print(f'Csv file size { os.path.getsize(stock_path)*1e-6 } MB')

## Save Data to Parquet

### Dask 

We can work with with large data sets and parquet files. In fact, recent versions of pandas support pyarrow data types and future versions will require a pyarrow backend. The pyarrow library is an interface between Python and the Appache Arrow project. The [parquet data format](https://parquet.apache.org/) and [Arrow](https://arrow.apache.org/docs/python/parquet.html) are projects of the Apache Foundation.

However, Dask is much more than an interface to Arrow: Dask provides parallel and distributed computing on pandas-like dataframes. It is also relatively easy to use, bridging a gap between pandas and Spark. 

In [None]:
import dask.dataframe as dd

In [None]:
px_dd = dd.from_pandas(tech_dt, npartitions = len(tech_tickers))
parquet_path = os.path.join(temp, "stock_px.parquet")

start = time.time()
px_dd.to_parquet(parquet_path, engine = "pyarrow")
end = time.time()

print(f'Writing dd ({tech_dt.shape}) to parquet took {end - start} seconds.')
print(f'Parquet file size { get_dir_size(parquet_path)*1e-6 } MB')

### Parquet files and Dask Dataframes

+ Parquet files are immutable: once written, they cannot be modified.
+ Dask DataFrames are a useful implementation to manipulate data stored in parquets.
+ Parquet and Dask are not the same: parquet is a file format that can be accessed by many applications and programming languages (Python, R, PowerBI, etc.), while Dask is a package in Python to work with large datasets using distributed computation.
+ **Dask is not for everything** (see [Dask DataFrames Best Practices](https://docs.dask.org/en/stable/dataframe-best-practices.html)). 

    - Consider cases suchas small to large joins, where the small dataframe fits in memory, but the large one does not. 
    - If possible, use pandas: reduce, then use pandas.
    - Pandas performance tips apply to Dask.
    - Use the index: it is beneficial to have a well-defined index in Dask DataFrames, as it may speed up searching (filtering) the data. A one-dimensional index is allowed.
    - Avoid (or minimize) full-data shuffling: indexing is an expensive operations. 
    - Some joins are more expensive than others. 

        * Not expensive:

            - Join a Dask DataFrame with a pandas DataFrame.
            - Join a Dask DataFrame with another Dask DataFrame of a single partition.
            - Join Dask DataFrames along their indexes.

        * Expensive:

            - Join Dask DataFrames along columns that are not their index.


# How do we store prices?

+ We can store our data as a single blob. This can be difficult to maintain, especially because parquet files are immutable.
+ Strategy: organize data files by ticker and date. Update only latest month.



In [None]:
# CLean up before start
PRICE_DATA = os.getenv("PRICE_DATA")
import shutil
if os.path.exists(PRICE_DATA):
    shutil.rmtree(PRICE_DATA)

In [None]:
for ticker in tech_dt['Ticker'].unique():
    ticker_dt = tech_dt[tech_dt['Ticker'] == ticker]
    ticker_dt = ticker_dt.assign(Year = ticker_dt.Date.dt.year)
    for yr in ticker_dt['Year'].unique():
        yr_dd = dd.from_pandas(ticker_dt[ticker_dt['Year'] == yr],2)
        yr_path = os.path.join(PRICE_DATA, ticker, f"{ticker}_{yr}")
        os.makedirs(os.path.dirname(yr_path), exist_ok=True)
        yr_dd.to_parquet(yr_path, engine = "pyarrow")
    

Why would we want to store data this way?

+ Easier to maintain. We do not update old data, only recent data.
+ We can also access all files as follows.

# Load, Transform and Save 

## Load

+ Parquet files can be read individually or as a collection.
+ `dd.read_parquet()` can take a list (collection) of files as input.
+ Use `glob` to get the collection of files.

In [None]:
from glob import glob

parquet_files = glob(os.path.join(PRICE_DATA, "**/*.parquet"), recursive = True)
dd_px = dd.read_parquet(parquet_files).set_index("Ticker")

## Transform

+ This transformation step will create a *Features* data set. In our case, features will be stock returns (we obtained prices).
+ Dask dataframes work like pandas dataframes: in particular, we can perform groupby and apply operations.
+ Notice the use of [an anonymous (lambda) function](https://realpython.com/python-lambda/) in the apply statement.

In [None]:
dd_shift = dd_px.groupby('Ticker', group_keys=False).apply(
    lambda x: x.assign(Close_lag_1 = x['Close'].shift(1))
)

In [None]:
dd_rets = dd_shift.assign(
    Returns = lambda x: x['Close']/x['Close_lag_1'] - 1
)

## Lazy Exection

What does `dd_rets` contain?

In [None]:
dd_rets

+ Dask is a lazy execution framework: commands will not execute until they are required. 
+ To trigger an execution in dask use `.compute()`.

In [None]:
dd_rets.compute()

## Save

+ Apply transformations to calculate daily returns
+ Store the enriched data, the silver dataset, in a new directory.
+ Should we keep the same namespace? All columns?

In [None]:
# CLean up before save
FEATURES_DATA = os.getenv("FEATURES_DATA")
if os.path.exists(FEATURES_DATA):
    shutil.rmtree(FEATURES_DATA)
dd_rets.to_parquet(FEATURES_DATA, overwrite = True)

# Optional: from Jupyter to Command Line

+ We have drafted our code in a Jupyter Notebook. 
+ Finalized code should be written in Python modules.

## Object Oriented vs Functional Programming

+ We can use classes to keep parameters and functions together.
+ We *could* use Object Oriented Programming, but parallelization of data manipulation and modelling tasks benefit from *Functional Programming*.
+ An Idea: 

    - [Data Oriented Programming](https://blog.klipse.tech/dop/2022/06/22/principles-of-dop.html).
    - Use the class to bundle together parameters and functions.
    - Use stateless operations and treat all data objects as immutable (we do not modify them, we overwrite them).
    - Take advantage of [`@staticmethod`](https://realpython.com/instance-class-and-static-methods-demystified/).

The code is in `./05_src/`.

Our original design was:

![](./images/02_target_pipeline_manager.png)



The `DataManager` class in `./05_src/data_manager.py` is a simple implementation of the ideas and code discussed in this notebook. The lines below will download data for about 500 stocks from the S&P500. Using this data a few features will be created and stored in the features data set.

First, instantiate an object of class `DataManager`.

In [None]:
from data_manager import DataManager
dm = DataManager()

Download all prices.

In [None]:
dm.download_all()

Finally, add features to the data set and save to a *feature store*.

In [None]:
dm.featurize()