In [1]:
%load_ext dotenv
%dotenv 

# What are we doing?

## Objectives 


* Build a data pipeline that downloads price data from the internet, stores it locally, transforms it into return data, and stores the feature set.
    - Getting the data.
    - Schemas and index in dask.

* Explore the parquet format.
    - Reading and writing parquet files.
    - Read datasets that are stored in distributed files.
    - Discuss dask vs pandas as a small example of big vs small data.
    
* Discuss the use of environment variables for settings.
* Discuss how to use Jupyter notebooks and source code concurrently. 
* Logging and using a standard logger.

## About the Data

+ We will download the prices for a list of stocks.
+ The source is Yahoo Finance and we will use the API provided by the library yfinance.


## Medallion Architecture

+ The architecture that we are thinking about is called Medallion by [DataBricks](https://www.databricks.com/glossary/medallion-architecture). It is an ELT type of thinking, although our data is well-structured.

![Medallion Architecture (DataBicks)](./images/02_medallion_architecture.png)

+ In our case, we would like to optimize the number of times that we download data from the internet. 
+ Ultimately, we will build a pipeline manager class that will help us control the process of obtaining and transforming our data.

![](./images/02_target_pipeline_manager.png)

# Download Data

Download the [Stock Market Dataset from Kaggle](https://www.kaggle.com/datasets/jacksoncrow/stock-market-dataset). Note that you may be required to register for a free account.

Extract all files into the directory: `./05_src/data/prices_csv/`

Your folder structure should include the following paths:

+ `05_src/data/prices_csv/etfs`
+ `05_src/data/prices_csv/stocks`


In [2]:
import pandas as pd
import os
import sys
from glob import glob

sys.path.append(os.getenv('SRC_DIR'))

from utils.logger import get_logger
_logs = get_logger(__name__)

A few things to notice in the code chunk above:

+ Libraries are ordered from high-level to low-level libraries from the package manager (pip in this case, but could be conda, poetry, etc.)
+ The command `sys.path.append("../05_src/)` will add the `../05_src/` directory to the path in the Notebook's kernel. This way, we can use our modules as part of the notebook.
+ Local modules are imported at the end. 
+ The function `get_logger()` is called with `__name__` as recommended by the documentation.

Now, to load the historical price data for stocks and ETFs, we could use:

In [3]:
import random

stock_files = glob(os.path.join(os.getenv('SRC_DIR'), "stock_prices/stocks/*.csv"))

random.seed(42)
stock_files = random.sample(stock_files, 60)

dt_list = []
for s_file in stock_files:
    _logs.info(f"Reading file: {s_file}")
    dt = pd.read_csv(s_file).assign(
        source = os.path.basename(s_file),
        ticker = os.path.basename(s_file).replace('.csv', ''),
        Date = lambda x: pd.to_datetime(x['Date'])
    )
    dt_list.append(dt)
stock_prices = pd.concat(dt_list, axis = 0, ignore_index = True)



2025-06-06 17:15:26,068, 4046701248.py, 10, INFO, Reading file: ../../05_src/stock_prices/stocks\TNC.csv
2025-06-06 17:15:26,115, 4046701248.py, 10, INFO, Reading file: ../../05_src/stock_prices/stocks\CBB.csv
2025-06-06 17:15:26,134, 4046701248.py, 10, INFO, Reading file: ../../05_src/stock_prices/stocks\ALDX.csv
2025-06-06 17:15:26,151, 4046701248.py, 10, INFO, Reading file: ../../05_src/stock_prices/stocks\GLADD.csv
2025-06-06 17:15:26,161, 4046701248.py, 10, INFO, Reading file: ../../05_src/stock_prices/stocks\FIXX.csv
2025-06-06 17:15:26,170, 4046701248.py, 10, INFO, Reading file: ../../05_src/stock_prices/stocks\ETJ.csv
2025-06-06 17:15:26,185, 4046701248.py, 10, INFO, Reading file: ../../05_src/stock_prices/stocks\CMCTP.csv
2025-06-06 17:15:26,192, 4046701248.py, 10, INFO, Reading file: ../../05_src/stock_prices/stocks\BWG.csv
2025-06-06 17:15:26,218, 4046701248.py, 10, INFO, Reading file: ../../05_src/stock_prices/stocks\VIAC.csv
2025-06-06 17:15:26,250, 4046701248.py, 10, INFO

Verify the structure of the `stock_prices` data:

In [4]:
stock_prices.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 239659 entries, 0 to 239658
Data columns (total 9 columns):
 #   Column     Non-Null Count   Dtype         
---  ------     --------------   -----         
 0   Date       239659 non-null  datetime64[ns]
 1   Open       239656 non-null  float64       
 2   High       239656 non-null  float64       
 3   Low        239656 non-null  float64       
 4   Close      239656 non-null  float64       
 5   Adj Close  239656 non-null  float64       
 6   Volume     239656 non-null  float64       
 7   source     239659 non-null  object        
 8   ticker     239659 non-null  object        
dtypes: datetime64[ns](1), float64(6), object(2)
memory usage: 16.5+ MB


We can subset our ticker data set using standard indexing techniques. A good reference for this type of data manipulation is Panda's [Documentation](https://pandas.pydata.org/docs/user_guide/indexing.html#indexing-and-selecting-data) and [Cookbook](https://pandas.pydata.org/docs/user_guide/cookbook.html#cookbook-selection).

From the subset data frame, select one column and convert to list.

In [5]:
select_tickers = stock_prices['ticker'].unique().tolist()
select_tickers

['TNC',
 'CBB',
 'ALDX',
 'GLADD',
 'FIXX',
 'ETJ',
 'CMCTP',
 'BWG',
 'VIAC',
 'REI',
 'BLPH',
 'SMG',
 'MOH',
 'AMH',
 'AMAL',
 'BPYPN',
 'ERH',
 'FAMI',
 'PFG',
 'SPXC',
 'ALL',
 'RTTR',
 'EARN',
 'ZIXI',
 'TSN',
 'WST',
 'REG',
 'MNK',
 'ESGR',
 'NGD',
 'SLRX',
 'GLW',
 'ACN',
 'CSSE',
 'WORK',
 'MOS',
 'IPWR',
 'GLUU',
 'CRMT',
 'EOLS',
 'INSU',
 'BWEN',
 'BPMX',
 'LH',
 'BRQS',
 'KALU',
 'ITCB',
 'SRE',
 'GAZ',
 'AQMS',
 'NPK',
 'QRHC',
 'CGEN',
 'LEVL',
 'BGS',
 'RIV',
 'GURE',
 'TEF',
 'SYNH',
 'KEY']

# Storing Data in CSV



+ We have some data. How do we store it?
+ We can compare two options, CSV and Parqruet, by measuring their performance:

    - Time to save.
    - Space required.

In [6]:
def get_dir_size(path='.'):
    '''Returns the total size of files contained in path.'''
    total = 0
    with os.scandir(path) as it:
        for entry in it:
            if entry.is_file():
                total += entry.stat().st_size
            elif entry.is_dir():
                total += get_dir_size(entry.path)
    return total

In [7]:
import time
import shutil

In [8]:
temp = os.getenv("TEMP_DATA")
csv_dir = os.path.join(temp, "csv")
shutil.rmtree(csv_dir, ignore_errors=True)
stock_csv = os.path.join(csv_dir, "stock_px.csv")
os.makedirs(csv_dir, exist_ok=True)

In [9]:

start = time.time()
stock_prices.to_csv(stock_csv, index = False)
end = time.time()

_logs.info(f'Writing data ({stock_prices.shape}) to csv took {end - start} seconds.')
_logs.info(f'CSV file size { os.path.getsize(stock_csv)*1e-6 } MB')

2025-06-06 17:15:55,074, 473192941.py, 5, INFO, Writing data ((239659, 9)) to csv took 1.431408166885376 seconds.
2025-06-06 17:15:55,075, 473192941.py, 6, INFO, CSV file size 26.618403999999998 MB


## Save Data to Parquet

### Dask 

We can work with with large data sets and parquet files. In fact, recent versions of pandas support pyarrow data types and future versions will require a pyarrow backend. The pyarrow library is an interface between Python and the Appache Arrow project. The [parquet data format](https://parquet.apache.org/) and [Arrow](https://arrow.apache.org/docs/python/parquet.html) are projects of the Apache Foundation.

However, Dask is much more than an interface to Arrow: Dask provides parallel and distributed computing on pandas-like dataframes. It is also relatively easy to use, bridging a gap between pandas and Spark. 

In [10]:
import dask.dataframe as dd

parquet_dir = os.path.join(temp, "parquet")
shutil.rmtree(parquet_dir, ignore_errors=True)
os.makedirs(parquet_dir, exist_ok=True)



In [11]:
px_dd = dd.from_pandas(stock_prices, npartitions = len(select_tickers))

start = time.time()
px_dd.to_parquet(parquet_dir, engine = "pyarrow")
end = time.time()

_logs.info(f'Writing dd ({stock_prices.shape}) to parquet took {end - start} seconds.')
_logs.info(f'Parquet file size { get_dir_size(parquet_dir)*1e-6 } MB')

2025-06-06 17:16:12,688, 817812245.py, 7, INFO, Writing dd ((239659, 9)) to parquet took 0.6877317428588867 seconds.
2025-06-06 17:16:12,690, 817812245.py, 8, INFO, Parquet file size 9.695952 MB


### Parquet files and Dask Dataframes

+ Parquet files are immutable: once written, they cannot be modified.
+ Dask DataFrames are a useful implementation to manipulate data stored in parquets.
+ Parquet and Dask are not the same: parquet is a file format that can be accessed by many applications and programming languages (Python, R, PowerBI, etc.), while Dask is a package in Python to work with large datasets using distributed computation.
+ **Dask is not for everything** (see [Dask DataFrames Best Practices](https://docs.dask.org/en/stable/dataframe-best-practices.html)). 

    - Consider cases suchas small to large joins, where the small dataframe fits in memory, but the large one does not. 
    - If possible, use pandas: reduce, then use pandas.
    - Pandas performance tips apply to Dask.
    - Use the index: it is beneficial to have a well-defined index in Dask DataFrames, as it may speed up searching (filtering) the data. A one-dimensional index is allowed.
    - Avoid (or minimize) full-data shuffling: indexing is an expensive operations. 
    - Some joins are more expensive than others. 

        * Not expensive:

            - Join a Dask DataFrame with a pandas DataFrame.
            - Join a Dask DataFrame with another Dask DataFrame of a single partition.
            - Join Dask DataFrames along their indexes.

        * Expensive:

            - Join Dask DataFrames along columns that are not their index.


# How do we store prices?

+ We can store our data as a single blob. This can be difficult to maintain, especially because parquet files are immutable.
+ Strategy: organize data files by ticker and date. Update only latest month.



In [12]:
# CLean up before start
PRICE_DATA = os.getenv("PRICE_DATA")
import shutil
if os.path.exists(PRICE_DATA):
    shutil.rmtree(PRICE_DATA)

In [13]:
stock_prices.columns

Index(['Date', 'Open', 'High', 'Low', 'Close', 'Adj Close', 'Volume', 'source',
       'ticker'],
      dtype='object')

In [15]:
stock_prices = stock_prices.rename(columns={'Adj Close': 'Adj_Close'})
stock_prices.columns

Index(['Date', 'Open', 'High', 'Low', 'Close', 'Adj_Close', 'Volume', 'source',
       'ticker'],
      dtype='object')

In [16]:
for ticker in stock_prices['ticker'].unique():
    ticker_dt = stock_prices[stock_prices['ticker'] == ticker]
    ticker_dt = ticker_dt.assign(Year = ticker_dt.Date.dt.year)
    for yr in ticker_dt['Year'].unique():
        yr_dd = dd.from_pandas(ticker_dt[ticker_dt['Year'] == yr],2)
        yr_path = os.path.join(PRICE_DATA, ticker, f"{ticker}_{yr}")
        os.makedirs(os.path.dirname(yr_path), exist_ok=True)
        yr_dd.to_parquet(yr_path, engine = "pyarrow")
    

Why would we want to store data this way?

+ Easier to maintain. We do not update old data, only recent data.
+ We can also access all files as follows.

# Load, Transform and Save 

## Load

+ Parquet files can be read individually or as a collection.
+ `dd.read_parquet()` can take a list (collection) of files as input.
+ Use `glob` to get the collection of files.

In [17]:
from glob import glob

parquet_files = glob(os.path.join(PRICE_DATA, "**/*.parquet"), recursive = True)
dd_px = dd.read_parquet(parquet_files).set_index("ticker")

## Transform

+ This transformation step will create a *Features* data set. In our case, features will be stock returns (we obtained prices).
+ Dask dataframes work like pandas dataframes: in particular, we can perform groupby and apply operations.
+ Notice the use of [an anonymous (lambda) function](https://realpython.com/python-lambda/) in the apply statement.

In [19]:
dd_shift = dd_px.groupby('ticker', group_keys=False).apply(
    lambda x: x.assign(Close_lag_1 = x['Close'].shift(1))
)

  Before: .apply(func)
  After:  .apply(func, meta={'x': 'f8', 'y': 'f8'}) for dataframe result
  or:     .apply(func, meta=('x', 'f8'))            for series result
  dd_shift = dd_px.groupby('ticker', group_keys=False).apply(


In [20]:
dd_rets = dd_shift.assign(
    Returns = lambda x: x['Close']/x['Close_lag_1'] - 1
)

## Lazy Exection

What does `dd_rets` contain?

In [21]:
dd_rets

Unnamed: 0_level_0,Date,Open,High,Low,Close,Adj_Close,Volume,source,Year,Close_lag_1,Returns
npartitions=60,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1,Unnamed: 7_level_1,Unnamed: 8_level_1,Unnamed: 9_level_1,Unnamed: 10_level_1,Unnamed: 11_level_1
ACN,datetime64[ns],float64,float64,float64,float64,float64,float64,object,int32,float64,float64
ALDX,...,...,...,...,...,...,...,...,...,...,...
...,...,...,...,...,...,...,...,...,...,...,...
ZIXI,...,...,...,...,...,...,...,...,...,...,...
ZIXI,...,...,...,...,...,...,...,...,...,...,...


+ Dask is a lazy execution framework: commands will not execute until they are required. 
+ To trigger an execution in dask use `.compute()`.

In [22]:
dd_rets.compute()

Unnamed: 0_level_0,Date,Open,High,Low,Close,Adj_Close,Volume,source,Year,Close_lag_1,Returns
ticker,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1,Unnamed: 7_level_1,Unnamed: 8_level_1,Unnamed: 9_level_1,Unnamed: 10_level_1,Unnamed: 11_level_1
ACN,2001-07-19,15.10,15.29,15.00,15.17,11.404394,34994300.0,ACN.csv,2001,,
ACN,2001-07-20,15.05,15.05,14.80,15.01,11.284108,9238500.0,ACN.csv,2001,15.17,-0.010547
ACN,2001-07-23,15.00,15.01,14.55,15.00,11.276587,7501000.0,ACN.csv,2001,15.01,-0.000666
ACN,2001-07-24,14.95,14.97,14.70,14.86,11.171341,3537300.0,ACN.csv,2001,15.00,-0.009333
ACN,2001-07-25,14.70,14.95,14.65,14.95,11.238999,4208100.0,ACN.csv,2001,14.86,0.006057
...,...,...,...,...,...,...,...,...,...,...,...
ZIXI,2003-06-26,4.04,4.19,3.86,4.00,4.000000,515300.0,ZIXI.csv,2003,4.04,-0.009901
ZIXI,2003-06-27,4.00,4.05,3.79,3.85,3.850000,162400.0,ZIXI.csv,2003,4.00,-0.037500
ZIXI,2003-06-30,3.84,4.00,3.72,3.77,3.770000,119900.0,ZIXI.csv,2003,3.85,-0.020779
ZIXI,2003-07-01,3.72,3.85,3.65,3.78,3.780000,202100.0,ZIXI.csv,2003,3.77,0.002653


## Save

+ Apply transformations to calculate daily returns
+ Store the enriched data, the silver dataset, in a new directory.
+ Should we keep the same namespace? All columns?

In [23]:
# CLean up before save
FEATURES_DATA = os.getenv("FEATURES_DATA")
if os.path.exists(FEATURES_DATA):
    shutil.rmtree(FEATURES_DATA)
dd_rets.to_parquet(FEATURES_DATA, overwrite = True)

# Optional: from Jupyter to Command Line

+ We have drafted our code in a Jupyter Notebook. 
+ Finalized code should be written in Python modules.

## Object Oriented vs Functional Programming

+ We can use classes to keep parameters and functions together.
+ We *could* use Object Oriented Programming, but parallelization of data manipulation and modelling tasks benefit from *Functional Programming*.
+ An Idea: 

    - [Data Oriented Programming](https://blog.klipse.tech/dop/2022/06/22/principles-of-dop.html).
    - Use the class to bundle together parameters and functions.
    - Use stateless operations and treat all data objects as immutable (we do not modify them, we overwrite them).
    - Take advantage of [`@staticmethod`](https://realpython.com/instance-class-and-static-methods-demystified/).

The code is in `./05_src/stock_prices/data_manager.py`.

Our original design was:

![](./images/02_target_pipeline_manager.png)



In [24]:
from stock_prices.data_manager import DataManager
dm = DataManager()

Download all prices.

In [25]:
dm.process_sample_files()

2025-06-06 17:19:58,556, data_manager.py, 53, INFO, Processing sample of tickers
2025-06-06 17:19:58,558, data_manager.py, 64, INFO, Getting file list from ../../05_src/data/prices_csv/
2025-06-06 17:19:58,559, data_manager.py, 66, INFO, Found 0 files in ../../05_src/data/prices/
2025-06-06 17:19:58,560, data_manager.py, 74, INFO, Selecting sample of files
2025-06-06 17:19:58,560, data_manager.py, 80, INFO, Selected 0 files


Finally, add features to the data set and save to a *feature store*.

In [26]:
dm.featurize()

2025-06-06 17:20:02,892, data_manager.py, 131, INFO, Creating features data.
2025-06-06 17:20:02,893, data_manager.py, 141, INFO, Loading price data from ../../05_src/data/prices/
2025-06-06 17:20:03,509, data_manager.py, 150, INFO, Creating features
  Before: .apply(func)
  After:  .apply(func, meta={'x': 'f8', 'y': 'f8'}) for dataframe result
  or:     .apply(func, meta=('x', 'f8'))            for series result
  features = (price_dd.groupby('ticker', group_keys=False)
2025-06-06 17:20:03,516, data_manager.py, 175, INFO, Saving features to ../../05_src/data/features/stock_features
