In [1]:
%load_ext dotenv
%dotenv 

# What are we doing?

## Objectives 


* Build a data pipeline that downloads price data from the internet, stores it locally, transforms it into return data, and stores the feature set.
    - Getting the data.
    - Schemas and index in Dask.

* Explore the parquet format.
    - Reading and writing parquet files.
    - Read datasets that are stored in distributed files.
    - Discuss Dask vs. Pandas as a small example of big vs small data.
    
* Discuss the use of environment variables for settings.
* Discuss how to use Jupyter notebooks and source code concurrently. 
* Logging and using a standard logger.

## About the Data

+ We will download the prices for a list of stocks.
+ The source is Yahoo Finance, and the data, along with its description, is available via [Kaggle](https://www.kaggle.com/datasets/jacksoncrow/stock-market-dataset).


## Medallion Architecture

+ The architecture that we are thinking about is called Medallion by [DataBricks](https://www.databricks.com/glossary/medallion-architecture). It is an ELT type of thinking, although our data is well-structured.

<div>
<image src="./images/02_medallion_architecture.png" height=300>
</div>

+ In our case, we would like to optimize the number of times that we download data from the internet. 
+ Ultimately, we will build a pipeline manager class to control the process of obtaining and transforming our data.

<div>
<image src="./images/02_target_pipeline_manager.png" height=250>
</div>


# Download Data

Download the [Stock Market Dataset from Kaggle](https://www.kaggle.com/datasets/jacksoncrow/stock-market-dataset). Note that you may be required to register for a free account. Alternatively, download the file from [this location](https://drive.google.com/drive/folders/1AA4gapDLpI194TGce1bY25sd91Km-tU3?usp=drive_link).

+ Extract stock prices (not ETFs) into the directory: `./05_src/data/prices_csv/`. 
+ To be clear, your folder structure should include the path `05_src/data/prices_csv/stocks`.

The command `%run update_path.py` runs a local script that adds the repository's `./05_src/` directory to the Notebook's kernel path. This way, we can use our modules within the notebook.

In [2]:
%run update_path.py

To load the historical price data for stocks and ETFs, use the code below. Notice the following:

+ Libraries are ordered from high-level to low-level libraries from the package manager. Local modules are imported at the end. 
+ The function `get_logger()` is called with `__name__` as recommended by [Python's documentation](https://docs.python.org/2/howto/logging.html#logging-advanced-tutorial).



In [3]:
from glob import glob
import os
import pandas as pd
import random

from utils.logger import get_logger
_logs = get_logger(__name__)


+ The [`glob` module](https://docs.python.org/3/library/glob.html) is used for finding path names that match specified patterns using Unix shell-style rules.

+ Notice that the module `glob` contains a function called `glob`; therefore, we used `from glob import glob` above.

+ The path in which we are searching for our csv files is produced by joining two strings:



    - The value of the environment variable 'SRC_DIR' that we obtain with `os.getenv('SRC_DIR')` (this variable points to ./05_src).

    - Another string given by "data/prices_csv/stocks/*.csv".

    - Both strings are combined into an OS-consistent path using `os.path.join(...)`.

+ After we know the location of all our files, we sample a subset of them.

In [4]:
stock_files = glob(os.path.join(os.getenv('SRC_DIR'), "data/prices_csv/stocks/*.csv"))
_logs.info(f'Found {len(stock_files)} stock price files.')

random.seed(42)
n_sample = 60
stock_files = random.sample(stock_files, n_sample)
_logs.info(f'Sampled {n_sample} stock price files for processing. The files are: {stock_files   }')

2026-01-19 12:57:16,860, 3619788285.py, 2, INFO, Found 5884 stock price files.
2026-01-19 12:57:16,861, 3619788285.py, 7, INFO, Sampled 60 stock price files for processing. The files are: ['../../05_src/data/prices_csv/stocks/HCM.csv', '../../05_src/data/prices_csv/stocks/ATHM.csv', '../../05_src/data/prices_csv/stocks/RILYP.csv', '../../05_src/data/prices_csv/stocks/ALDX.csv', '../../05_src/data/prices_csv/stocks/JHG.csv', '../../05_src/data/prices_csv/stocks/CHRS.csv', '../../05_src/data/prices_csv/stocks/BKTI.csv', '../../05_src/data/prices_csv/stocks/REFR.csv', '../../05_src/data/prices_csv/stocks/KEN.csv', '../../05_src/data/prices_csv/stocks/MDRRP.csv', '../../05_src/data/prices_csv/stocks/GRF.csv', '../../05_src/data/prices_csv/stocks/KRC.csv', '../../05_src/data/prices_csv/stocks/MICT.csv', '../../05_src/data/prices_csv/stocks/AMPY.csv', '../../05_src/data/prices_csv/stocks/A.csv', '../../05_src/data/prices_csv/stocks/WRLSU.csv', '../../05_src/data/prices_csv/stocks/PNR.csv', '

We load the sampled files into dataframes and concatenate them:

+ Start with an empty list.
+ Read each file into a dataframe and [`append()` it to the list](https://docs.python.org/3/tutorial/datastructures.html#more-on-lists). Notice that `append()` is an in-place operation (it does not return a list, it modifies the list in place).
+ Finally, we concatenate all dataframes along the vertical axis (`axis=0`) using [`pd.concat()`](https://pandas.pydata.org/docs/user_guide/merging.html#concat). 
+ Notice that we do not concatenate each time that we load a dataframe. According to [Panda's documentation](https://pandas.pydata.org/docs/user_guide/merging.html#concat): 

> "`concat()` makes a full copy of the data, and iteratively reusing `concat()` can create unnecessary copies. Collect all DataFrame or Series objects in a list before using `concat()`."

In [5]:
dt_list = []
for s_file in stock_files:
    _logs.info(f"Reading file: {s_file}")
    dt = pd.read_csv(s_file).assign(
        source = os.path.basename(s_file),
        ticker = os.path.basename(s_file).replace('.csv', ''),
        Date = lambda x: pd.to_datetime(x['Date'])
    )
    dt_list.append(dt)
stock_prices = pd.concat(dt_list, axis = 0, ignore_index = True)

2026-01-19 12:57:21,668, 2577063914.py, 3, INFO, Reading file: ../../05_src/data/prices_csv/stocks/HCM.csv
2026-01-19 12:57:21,695, 2577063914.py, 3, INFO, Reading file: ../../05_src/data/prices_csv/stocks/ATHM.csv
2026-01-19 12:57:21,700, 2577063914.py, 3, INFO, Reading file: ../../05_src/data/prices_csv/stocks/RILYP.csv
2026-01-19 12:57:21,703, 2577063914.py, 3, INFO, Reading file: ../../05_src/data/prices_csv/stocks/ALDX.csv
2026-01-19 12:57:21,706, 2577063914.py, 3, INFO, Reading file: ../../05_src/data/prices_csv/stocks/JHG.csv
2026-01-19 12:57:21,709, 2577063914.py, 3, INFO, Reading file: ../../05_src/data/prices_csv/stocks/CHRS.csv
2026-01-19 12:57:21,714, 2577063914.py, 3, INFO, Reading file: ../../05_src/data/prices_csv/stocks/BKTI.csv
2026-01-19 12:57:21,726, 2577063914.py, 3, INFO, Reading file: ../../05_src/data/prices_csv/stocks/REFR.csv
2026-01-19 12:57:21,736, 2577063914.py, 3, INFO, Reading file: ../../05_src/data/prices_csv/stocks/KEN.csv
2026-01-19 12:57:21,739, 25770

Verify the structure of the `stock_prices` data using the [`info()` dataframe method](https://pandas.pydata.org/docs/reference/api/pandas.DataFrame.info.html):

In [6]:
stock_prices.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 230273 entries, 0 to 230272
Data columns (total 9 columns):
 #   Column     Non-Null Count   Dtype         
---  ------     --------------   -----         
 0   Date       230273 non-null  datetime64[ns]
 1   Open       230260 non-null  float64       
 2   High       230260 non-null  float64       
 3   Low        230260 non-null  float64       
 4   Close      230260 non-null  float64       
 5   Adj Close  230260 non-null  float64       
 6   Volume     230260 non-null  float64       
 7   source     230273 non-null  object        
 8   ticker     230273 non-null  object        
dtypes: datetime64[ns](1), float64(6), object(2)
memory usage: 15.8+ MB


We can subset our ticker data set using standard indexing techniques. Good references for this type of data manipulation are:

+ [Panda's Documentation](https://pandas.pydata.org/docs/user_guide/indexing.html#indexing-and-selecting-data). 
+ [Panda's Cookbook](https://pandas.pydata.org/docs/user_guide/cookbook.html#cookbook-selection).

From the subset data frame, select one column and convert to list.

In [7]:
select_tickers = stock_prices['ticker'].unique().tolist()
select_tickers

['HCM',
 'ATHM',
 'RILYP',
 'ALDX',
 'JHG',
 'CHRS',
 'BKTI',
 'REFR',
 'KEN',
 'MDRRP',
 'GRF',
 'KRC',
 'MICT',
 'AMPY',
 'A',
 'WRLSU',
 'PNR',
 'TERP',
 'TMSR',
 'TDJ',
 'CIK',
 'SMBC',
 'CNSP',
 'BLK',
 'CLNC',
 'AOSL',
 'LYV',
 'IX',
 'AWRE',
 'KNX',
 'TRXC',
 'CVCY',
 'UVE',
 'HCKT',
 'TFSL',
 'ACB',
 'AXP',
 'GLG',
 'MFO',
 'PCB',
 'IPDN',
 'DTW',
 'IQV',
 'EVRI',
 'LIVE',
 'KZR',
 'SO',
 'INFY',
 'BSE',
 'PSN',
 'TNAV',
 'APTX',
 'LAZY',
 'DK',
 'ZEUS',
 'IRBT',
 'AMAT',
 'PSB',
 'CTSO',
 'UEC']

# Storing Data in CSV

+ We have some data. How do we store it?
+ We can compare two options, CSV and Parquet, by measuring their performance:

    - Time to save: We will measure time by using the `time` library.
    - Space required on drive: We will use the custom function below.

In [8]:
def get_dir_size(path='.'):
    '''Returns the total size of files contained in path.'''
    total = 0
    with os.scandir(path) as it:
        for entry in it:
            if entry.is_file():
                total += entry.stat().st_size
            elif entry.is_dir():
                total += get_dir_size(entry.path)
    return total

Load the specification of a temporary directory from the environment  and create a subdirectory in it called "csv":

+ Use `os.getenv("TEMP_DATA")` to obtain the desired location of the temporary folder from an environment variable.
+ If the subdirectory exists, delete it using `shutil.rmtree()`; the flag `ignore_errors=True` helps us in case the subdirectory does not exist (for instance, in the first run).
+ Create a directory with path given by `csv_dir` using `os.makedirs()`; the flag `exist_ok=True` indicates that if the directory already exists, then the function will do nothing.
+ Finally, create the stock price file location, `stock_csv`, which will be used to create the csv file.

In [9]:
import shutil

temp = os.getenv("TEMP_DATA")
csv_dir = os.path.join(temp, "csv")

shutil.rmtree(csv_dir, ignore_errors=True)
os.makedirs(csv_dir, exist_ok=True)
stock_csv = os.path.join(csv_dir, "stock_px.csv")


Save the concatenated dataframe to a CSV file. We measure the time elapsed by storing the start and end times, then we calculate their difference in seconds.

In [10]:
import time

start = time.time()
stock_prices.to_csv(stock_csv, index = False)
end = time.time()

_logs.info(f'Writing data ({stock_prices.shape}) to csv took {end - start} seconds.')
_logs.info(f'CSV file size { os.path.getsize(stock_csv)*1e-6 } MB')

2026-01-19 12:57:40,277, 807204384.py, 7, INFO, Writing data ((230273, 9)) to csv took 0.9443402290344238 seconds.
2026-01-19 12:57:40,278, 807204384.py, 8, INFO, CSV file size 25.988193 MB


## Save Data to Parquet

### Notes on Dask 

We could use Pandas to save the data directly into Parquet files. However, we will use a different approach by applying the [Dask framework](https://www.dask.org/). Dask provides functionality for working with datasets that do not fit in memory and parallelization to speed up computation. A few notes on Dask and Pandas:

- Pandas, Parquet, and Arrow:

    + We can work with large datasets and Parquet files in Pandas, but we will generally be limited by the amount of data that can fit in our computer's memory.
    + Pandas can write Parquet files using a PyArrow backend. In fact, recent versions of Pandas support PyArrow data types, and future versions will require a PyArrow backend. 
    + The PyArrow library is an interface between Python and the Apache Arrow project. In particular, the [Parquet data format](https://parquet.apache.org/) and [Arrow](https://arrow.apache.org/docs/python/parquet.html) are Apache projects.

- Dask 

    + Dask is much more than an interface to Arrow: Dask provides parallel and distributed computing on Pandas-like dataframes. 
    + Dask is also relatively easy to use as it mimics Pandas' API.
    + Dask allows us to work with larger datasets than Pandas. In a sense, it is an intermediate step between Pandas and big-data frameworks like Spark (or Databricks).
    + If you are familiar with Pandas, a good introduction is [10 Minutes to Dask](https://docs.dask.org/en/stable/10-minutes-to-dask.html).

In [11]:
import dask.dataframe as dd

parquet_dir = os.path.join(temp, "parquet")
shutil.rmtree(parquet_dir, ignore_errors=True)
os.makedirs(parquet_dir, exist_ok=True)

In [12]:
px_dd = dd.from_pandas(stock_prices, npartitions = len(select_tickers))

start = time.time()
px_dd.to_parquet(parquet_dir, engine = "pyarrow")
end = time.time()

_logs.info(f'Writing dd ({stock_prices.shape}) to parquet took {end - start} seconds.')
_logs.info(f'Parquet file size { get_dir_size(parquet_dir)*1e-6 } MB')

2026-01-19 13:02:20,824, 817812245.py, 7, INFO, Writing dd ((230273, 9)) to parquet took 0.22370004653930664 seconds.
2026-01-19 13:02:20,825, 817812245.py, 8, INFO, Parquet file size 8.889431 MB


### Pandas, Dask, and Parquet

The distinction of Pandas Dataframes, Dask Dataframes, and Parquet files is important:

+ Pandas dataframes combine the functionality of [NumPy](https://numpy.org/) (efficient vector operations, especially vector algebra) with a concise data manipulation framework that allows us to create columns of different data types (NumPy only allows single-type matrices), database-like operations (such as filtering rows, subsetting columns, and joining different dataframes).
+ Dask dataframes extend the functionality of Pandas dataframes beyond the confines of available memory and implement parallelized operations, among other benefits.
+ Parquet files are a file format. Parquet files can be created by Pandas and Dask, but Dask offers a superior interface. Parquet files are immutable: once written, they cannot be modified.
+ Parquet and Dask are not the same: Parquet is a file format that can be accessed by many applications and programming languages (Python, R, Power BI, etc.), while Dask is a Python package for working with large datasets using distributed computation.

### Dask is Powerful, but not Infallible

It is tempting to think of Dask as a super-Pandas, but each package has its advantages and disadvantages. 

+ Dask is not good at everything (see [Dask DataFrames Best Practices](https://docs.dask.org/en/stable/dataframe-best-practices.html)). 
+ A useful and somewhat idiosyncratic comparison of various data manipulation frameworks is shown below (from [DataFrames at Scale Comparison: TPC-H](https://docs.coiled.io/blog/tpch.html#when-to-use-duckdb-vs-polars-vs-dask-vs-spark)) 

Concept              | Spark | Dask | DuckDB | Polars
---------------------|-------|------|--------|--------
Fast Locally         | ‚ùå    | ü§î  | ‚úÖ     | ‚úÖ
Fast on Cloud (1 TB) | ‚úÖ    | ‚úÖ  | ‚úÖ     | ‚ùå
Fast on Cloud (10 TB)| ‚ùå    | ‚úÖ  | ‚úÖ     | ‚ùå
Scales Out           | ‚úÖ    | ‚úÖ  | ‚ùå     | ‚ùå
SQL                  | ‚úÖ    | ü§î  | ‚úÖ     | ü§î
More than SQL        | ‚úÖ    | ‚úÖ  | ‚ùå     | ‚úÖ
Sensible Defaults    | ‚ùå    | ‚úÖ  | ‚úÖ     | ‚úÖ


### Dask Best Practices

Parallelism brings extra complexity and overhead. Here are a few ideas to help you decide when to use Dask (from [Dask's Best Practices](https://docs.dask.org/en/stable/best-practices.html)):

#### Small is Better

+ Start small: if possible, use Pandas. Also, try to reduce your data using aggregation, then use Pandas.
+ More generally, NumPy, Pandas, and Scikit-Learn may have faster functions for what you need. Consult the relevant documentation, experiment, and/or consult with a colleague or expert.

#### Index with Care

+ Use the index: it is beneficial to have a well-defined index in Dask DataFrames, as it may speed up searching (filtering) the data. A one-dimensional index is allowed.
+ Minimize full-data shuffling as much as possible: indexing is an expensive operation. 

### Consider the Cost of Joins

+ Consider cases such as small-to-large joins, where the small dataframe fits in memory, but the large one does not. The small dataframe can be Pandas, while the larger one is a Dask dataframe.
+ Some joins are more expensive than others. 

    * Not expensive:

        - Join a Dask DataFrame with a Pandas DataFrame.
        - Join a Dask DataFrame with another Dask DataFrame of a single partition.
        - Join Dask DataFrames along their indexes.

    * Expensive:

        - Join Dask DataFrames along columns that are not their index.

# How Do We Store Prices?

+ We can store our data as a single blob. 

  - This can be difficult to maintain, especially because parquet files are immutable.
  - Using a single file, we would need to recreate the complete file any time that we update it.

+ An alternative strategy is to organize data files by ticker and date: 

  - We can create one file per ticker and month (or any other suitable frequency). 
  - Under this approach, we would only need to recreate the latest month's file at any update. 

In [13]:
# CLean up before start
PRICE_DATA = os.getenv("PRICE_DATA")
import shutil
if os.path.exists(PRICE_DATA):
    shutil.rmtree(PRICE_DATA)

In [14]:
stock_prices.columns

Index(['Date', 'Open', 'High', 'Low', 'Close', 'Adj Close', 'Volume', 'source',
       'ticker'],
      dtype='object')

In [15]:
for ticker in stock_prices['ticker'].unique():
    # Filter data for ticker
    # Notice that these are Pandas dataframes
    _logs.info(f'Processing ticker: {ticker}')
    ticker_dt = stock_prices[stock_prices['ticker'] == ticker]
    ticker_dt = ticker_dt.assign(Year = ticker_dt.Date.dt.year)
    for yr in ticker_dt['Year'].unique():
        _logs.info(f'Processing year {yr} for ticker {ticker}.')
        # Filter data for year and convert to Dask dataframe
        yr_dd = dd.from_pandas(ticker_dt[ticker_dt['Year'] == yr],2)
        
        # Define path and create directories if not exist
        yr_path = os.path.join(PRICE_DATA, ticker, f"{ticker}_{yr}")
        os.makedirs(os.path.dirname(yr_path), exist_ok=True)
        _logs.info(f'Writing data to path: {yr_path}')

        # Write to Parquet
        yr_dd.to_parquet(yr_path, engine = "pyarrow")
    

2026-01-19 13:02:44,343, 2113979629.py, 4, INFO, Processing ticker: HCM
2026-01-19 13:02:44,358, 2113979629.py, 8, INFO, Processing year 2016 for ticker HCM.
2026-01-19 13:02:44,363, 2113979629.py, 15, INFO, Writing data to path: ../../05_src/data/prices/HCM/HCM_2016
2026-01-19 13:02:44,372, 2113979629.py, 8, INFO, Processing year 2017 for ticker HCM.
2026-01-19 13:02:44,376, 2113979629.py, 15, INFO, Writing data to path: ../../05_src/data/prices/HCM/HCM_2017
2026-01-19 13:02:44,385, 2113979629.py, 8, INFO, Processing year 2018 for ticker HCM.
2026-01-19 13:02:44,387, 2113979629.py, 15, INFO, Writing data to path: ../../05_src/data/prices/HCM/HCM_2018
2026-01-19 13:02:44,395, 2113979629.py, 8, INFO, Processing year 2019 for ticker HCM.
2026-01-19 13:02:44,397, 2113979629.py, 15, INFO, Writing data to path: ../../05_src/data/prices/HCM/HCM_2019
2026-01-19 13:02:44,405, 2113979629.py, 8, INFO, Processing year 2020 for ticker HCM.
2026-01-19 13:02:44,408, 2113979629.py, 15, INFO, Writing 

Why would we want to store data this way?

+ Data files are easier to maintain. We do not update old data, only recent data or the most recent "delta".
+ Parquet files, as long as they maintain a consistent schema, can all be read jointly. 

# Load, Transform, and Save

In this section, we will load the Parquet files we generated, transform the data, and save the resulting dataset.

## Load

+ Parquet files can be read individually or as a collection.
+ `dd.read_parquet()` can take a list (collection) of files as input.
+ Use `glob` to obtain the collection of files.

In [16]:
from glob import glob

parquet_files = glob(os.path.join(PRICE_DATA, "**/*.parquet"), recursive = True)
_logs.info(f'Found {len(parquet_files)} parquet files for reading back into Dask.')

dd_px = dd.read_parquet(parquet_files).set_index("ticker")

2026-01-19 13:03:05,298, 174194153.py, 4, INFO, Found 2010 parquet files for reading back into Dask.


## Transform

+ This transformation step will create a *Features* data set. In our case, features will be stock returns (we obtained prices).
+ Dask dataframes work similarly to Pandas dataframes: in particular, we can perform groupby and apply operations.
+ Notice the use of [an anonymous (lambda) function](https://realpython.com/python-lambda/) in the apply statement.

In the code below, the following operation occurs:

+ Start with a Dask dataframe, `dd_px`.
+ Group the rows of this dataframe by the variable `ticker`, i.e., each group will contain the observations that pertain only to one ticker at a time. The `group_key` parameter controls whether an index entry is added with the value of the grouping variable (`ticker` in this case); if we made `group_keys=True`, we would have a duplicate `ticker` column.
+ For each group defined by a `ticker`, `apply()` the following calculation:

    - Sort the values by `Date` in ascending order.
    - Assign a new variable called `Close_lag_1` by shifting the position of the closing price (`Close`) by one position. 
    - Define the schema of the resulting dataframe. If we omit this specification, we would get a warning; however, the simplicity of the calculation ensures that Dask can determine the resulting schema.

In [17]:
dd_px

Unnamed: 0_level_0,Date,Open,High,Low,Close,Adj Close,Volume,source,Year
npartitions=60,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1,Unnamed: 7_level_1,Unnamed: 8_level_1,Unnamed: 9_level_1
A,datetime64[ns],float64,float64,float64,float64,float64,float64,string,int32
ACB,...,...,...,...,...,...,...,...,...
...,...,...,...,...,...,...,...,...,...
ZEUS,...,...,...,...,...,...,...,...,...
ZEUS,...,...,...,...,...,...,...,...,...


In [18]:
dd_shift = (
    dd_px
        .groupby('ticker', group_keys=False)
        .apply(
            lambda x: x.sort_values('Date', ascending = True)
                       .assign(Close_lag_1 = x['Close'].shift(1)), 
            meta = pd.DataFrame(data ={'Date': 'datetime64[ns]',
                    'Open': 'f8',
                    'High': 'f8',
                    'Low': 'f8',
                    'Close': 'f8',
                    'Adj Close': 'f8',
                    'Volume': 'i8',
                    'source': 'object',
                    'Year': 'int32',
                    'Close_lag_1': 'f8'},
                    index = pd.Index([], dtype=pd.StringDtype(), name='ticker'))
    ))

Finally, using the dataframe that we created above, we can now `assign` the `Returns` variable to the entire dataset.

In [19]:
dd_rets = dd_shift.assign(
    Returns = lambda x: x['Close']/x['Close_lag_1'] - 1
)

Question: How do we know that we are not (erroneously) combining the last price of a ticker with the first price of the next ticker?

## Lazy Exection

What does `dd_rets` contain?

In [20]:
dd_rets

Unnamed: 0_level_0,Date,Open,High,Low,Close,Adj Close,Volume,source,Year,Close_lag_1,Returns
npartitions=60,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1,Unnamed: 7_level_1,Unnamed: 8_level_1,Unnamed: 9_level_1,Unnamed: 10_level_1,Unnamed: 11_level_1
A,object,object,object,object,object,object,object,object,object,object,object
ACB,...,...,...,...,...,...,...,...,...,...,...
...,...,...,...,...,...,...,...,...,...,...,...
ZEUS,...,...,...,...,...,...,...,...,...,...,...
ZEUS,...,...,...,...,...,...,...,...,...,...,...


+ Dask is a lazy execution framework: commands will not execute until the computation is required. 
+ To trigger an execution in dask use `.compute()` or execute a command that requires the actual values (for example, write to Parquet or SQL).

In [21]:
dd_rets.compute()

Unnamed: 0_level_0,Date,Open,High,Low,Close,Adj Close,Volume,source,Year,Close_lag_1,Returns
ticker,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1,Unnamed: 7_level_1,Unnamed: 8_level_1,Unnamed: 9_level_1,Unnamed: 10_level_1,Unnamed: 11_level_1
A,1999-11-18,32.546494,35.765381,28.612303,31.473534,27.068665,62546300.0,A.csv,1999,,
A,1999-11-19,30.713520,30.758226,28.478184,28.880543,24.838577,15234100.0,A.csv,1999,39.360001,-0.266246
A,1999-11-22,29.551144,31.473534,28.657009,31.473534,27.068665,6577800.0,A.csv,1999,39.790001,-0.209009
A,1999-11-23,30.400572,31.205294,28.612303,28.612303,24.607880,5975600.0,A.csv,1999,38.750000,-0.261618
A,1999-11-24,28.701717,29.998211,28.612303,29.372318,25.261524,4843200.0,A.csv,1999,38.919998,-0.245316
...,...,...,...,...,...,...,...,...,...,...,...
ZEUS,2020-03-26,9.610000,9.940000,9.260000,9.590000,9.590000,60500.0,ZEUS.csv,2020,6.870000,0.395924
ZEUS,2020-03-27,9.330000,9.330000,8.700000,8.700000,8.700000,52900.0,ZEUS.csv,2020,6.990000,0.244635
ZEUS,2020-03-30,8.810000,9.760000,8.700000,9.680000,9.680000,73700.0,ZEUS.csv,2020,7.630000,0.268676
ZEUS,2020-03-31,9.640000,10.470000,9.590000,10.350000,10.350000,68900.0,ZEUS.csv,2020,8.200000,0.262195


## Save

With our transformed data, we can now save the new feature to a Parquet file. We will need to answer the following questions depending on our context, setup, and available resources:

+ Should we keep the same namespace? 
+ Should we save all columns?

In [22]:
dd_rets

Unnamed: 0_level_0,Date,Open,High,Low,Close,Adj Close,Volume,source,Year,Close_lag_1,Returns
npartitions=60,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1,Unnamed: 7_level_1,Unnamed: 8_level_1,Unnamed: 9_level_1,Unnamed: 10_level_1,Unnamed: 11_level_1
A,object,object,object,object,object,object,object,object,object,object,object
ACB,...,...,...,...,...,...,...,...,...,...,...
...,...,...,...,...,...,...,...,...,...,...,...
ZEUS,...,...,...,...,...,...,...,...,...,...,...
ZEUS,...,...,...,...,...,...,...,...,...,...,...


In [23]:
# CLean up before save
FEATURES_DATA = os.getenv("FEATURES_DATA")
if os.path.exists(FEATURES_DATA):
    shutil.rmtree(FEATURES_DATA)
dd_rets.to_parquet(FEATURES_DATA, 
                   overwrite = True, 
                   schema={
                       'Date': 'timestamp[ns]',
                       'Open': 'float64',
                       'High': 'float64',
                       'Low': 'float64',
                       'Close': 'float64',
                       'Adj Close': 'float64',
                       'Volume': 'int64',
                       'source': 'string',
                       'Year': 'int32',
                       'Close_lag_1': 'float64',
                       'Returns': 'float64',
                       'ticker': 'large_string'
                   })

# Optional: from Jupyter to Command Line

+ We have drafted our code in a Jupyter Notebook. 
+ Finalized code should be written in Python modules.

## Object Oriented vs Functional Programming

+ We can use classes to keep parameters and functions together.
+ We *could* use Object Oriented Programming, but parallelization of data manipulation and modelling tasks benefit from *Functional Programming*.
+ An Idea: 

    - [Data Oriented Programming](https://blog.klipse.tech/dop/2022/06/22/principles-of-dop.html).
    - Use the class to bundle together parameters and functions.
    - Use stateless operations and treat all data objects as immutable (we do not modify them, we overwrite them).
    - Take advantage of [`@staticmethod`](https://realpython.com/instance-class-and-static-methods-demystified/).

The code is in `./05_src/stock_prices/data_manager.py`.

Our original design was:

![](./images/02_target_pipeline_manager.png)



In [None]:
from stock_prices.data_manager import DataManager
dm = DataManager()

Download all prices.

In [None]:
dm.process_sample_files()

Finally, add features to the data set and save to a *feature store*.

In [None]:
dm.featurize()