In [1]:
%load_ext dotenv
%dotenv 

In [2]:
import os
os.getenv("SOMETHING_ELSE")

# What are we doing?

## Objectives 


* Build a data pipeline that downloads price data from the internet, stores it locally, transforms it into return data, and stores the feature set.
    - Getting the data.
    - Schemas and index in dask.

* Explore the parquet format.
    - Reading and writing parquet files.
    - Read datasets that are stored in distributed files.
    - Discuss dask vs pandas as a small example of big vs small data.
    
* Discuss the use of environment variables for settings.
* Discuss how to use Jupyter notebooks and source code concurrently. 
* Logging and using a standard logger.

## About the Data

+ We will download the prices for a list of stocks.
+ The source is Yahoo Finance and we will use the API provided by the library yfinance.


## Medallion Architecture

+ The architecture that we are thinking about is called Medallion by [DataBricks](https://www.databricks.com/glossary/medallion-architecture). It is an ELT type of thinking, although our data is well-structured.

![Medallion Architecture (DataBicks)](./images/02_medallion_architecture.png)

+ In our case, we would like to optimize the number of times that we download data from the internet. 
+ Ultimately, we will build a pipeline manager class that will help us control the process of obtaining and transforming our data.

![](./images/02_target_pipeline_manager.png)

# Download Data from Yahoo Finance

Yahoo Finance provides information about public stocks in different markets. The library yfinance gives us access to a fair bit of the data in Yahoo Finance. 

These steps are based on the instructions in:

+ [yfinance documentation](https://pypi.org/project/yfinance/)
+ [Tutorial in geeksforgeeks.org](https://www.geeksforgeeks.org/get-financial-data-from-yahoo-finance-with-python/)


+ If required, install: `python -m pip install yfinance`.
+ To download the price history of a stock, first use the following setup:


In [3]:
import pandas as pd
import yfinance as yf
import os
import sys

sys.path.append(os.getenv('SRC_DIR'))

A few things to notice in the code chunk above:

+ Libraries are ordered from high-level to low-level libraries from the package manager (pip in this case, but could be conda, poetry, etc.)
+ The command `sys.path.append("../05_src/)` will add the `../05_src/` directory to the path in the Notebook's kernel. This way, we can use our modules as part of the notebook.
+ Local modules are imported at the end. 
+ The function `get_logger()` is called with `__name__` as recommended by the documentation.

Now, to download the historical price data for a stock, we could use:

In [4]:
px = yf.download('AAPL', start = "2013-12-01", end = "2024-02-01")
px

[*********************100%***********************]  1 of 1 completed


Price,Adj Close,Close,High,Low,Open,Volume
Ticker,AAPL,AAPL,AAPL,AAPL,AAPL,AAPL
Date,Unnamed: 1_level_2,Unnamed: 2_level_2,Unnamed: 3_level_2,Unnamed: 4_level_2,Unnamed: 5_level_2,Unnamed: 6_level_2
2013-12-02,17.175100,19.686787,20.154642,19.672144,19.928572,472544800
2013-12-03,17.645267,20.225714,20.227858,19.917143,19.939285,450968000
2013-12-04,17.604137,20.178572,20.328215,20.029285,20.196428,377809600
2013-12-05,17.694490,20.282143,20.540714,20.228930,20.451786,447580000
2013-12-06,17.448975,20.000713,20.241072,19.984644,20.206785,344352400
...,...,...,...,...,...,...
2024-01-25,193.223389,194.169998,196.270004,193.110001,195.220001,54822100
2024-01-26,191.481918,192.419998,194.759995,191.940002,194.270004,44594000
2024-01-29,190.795288,191.729996,192.199997,189.580002,192.009995,47145600
2024-01-30,187.123260,188.039993,191.800003,187.470001,190.940002,55859400


In [5]:
yf.download('AAPL',start = '2025-01-01', end = '2025-01-15')

[*********************100%***********************]  1 of 1 completed


Price,Adj Close,Close,High,Low,Open,Volume
Ticker,AAPL,AAPL,AAPL,AAPL,AAPL,AAPL
Date,Unnamed: 1_level_2,Unnamed: 2_level_2,Unnamed: 3_level_2,Unnamed: 4_level_2,Unnamed: 5_level_2,Unnamed: 6_level_2
2025-01-02,243.850006,243.850006,249.100006,241.820007,248.929993,55740700
2025-01-03,243.360001,243.360001,244.179993,241.889999,243.360001,40244100
2025-01-06,245.0,245.0,247.330002,243.199997,244.309998,45045600
2025-01-07,242.210007,242.210007,245.550003,241.350006,242.979996,40856000
2025-01-08,242.699997,242.699997,243.710007,240.050003,241.919998,37628900
2025-01-10,236.850006,236.850006,240.160004,233.0,240.009995,61710900
2025-01-13,234.399994,234.399994,234.669998,229.720001,233.529999,49630700
2025-01-14,233.279999,233.279999,236.119995,232.470001,234.75,39435300


## Parametrize the download

+ Generally, we will look to separate every parameter and setting from functions.
+ If we had a few stocks, we could cycle through them. We need a place to store the list of tickers (a db or file, for example).
+ Store a csv file with a few stock tickers. The location of the file is a setting, the contents of this file are parameters.
+ Use **environment variables** to pass parameters.

Start by getting a sample of Information Technology stock tickers by applying subindexing and converting the "ticker" column from a pandas object to a list.

In [6]:
# Load all tickers
ticker_file = os.getenv("TICKERS")
tickers = pd.read_csv(ticker_file)


In [7]:
os.getenv('tickers')
pd.read_csv(ticker_file)

Unnamed: 0,ticker,Security,GICS Sector,GICS Sub-Industry,Headquarters Location,Date added,CIK,Founded
0,MMM,3M,Industrials,Industrial Conglomerates,"Saint Paul, Minnesota",20883,66740,1902
1,AOS,A. O. Smith,Industrials,Building Products,"Milwaukee, Wisconsin",42942,91142,1916
2,ABT,Abbott,Health Care,Health Care Equipment,"North Chicago, Illinois",20883,1800,1888
3,ABBV,AbbVie,Health Care,Biotechnology,"North Chicago, Illinois",41274,1551152,2013 (1888)
4,ACN,Accenture,Information Technology,IT Consulting & Other Services,"Dublin, Ireland",40730,1467373,1989
...,...,...,...,...,...,...,...,...
498,YUM,Yum! Brands,Consumer Discretionary,Restaurants,"Louisville, Kentucky",35709,1041061,1997
499,ZBRA,Zebra Technologies,Information Technology,Electronic Equipment & Instruments,"Lincolnshire, Illinois",43822,877212,1969
500,ZBH,Zimmer Biomet,Health Care,Health Care Equipment,"Warsaw, Indiana",37110,1136869,1927
501,ZION,Zions Bancorporation,Financials,Regional Banks,"Salt Lake City, Utah",37064,109380,1873


We can subset our ticker data set using standard indexing techniques. A good reference for this type of data manipulation is Panda's [Documentation](https://pandas.pydata.org/docs/user_guide/indexing.html#indexing-and-selecting-data) and [Cookbook](https://pandas.pydata.org/docs/user_guide/cookbook.html#cookbook-selection).

In [8]:
idx_tech = tickers['GICS Sector'] == 'Information Technology'
tech_sector = tickers[idx_tech]
tech_sector ['ticker'].to_list()

['ACN',
 'ADBE',
 'AMD',
 'AKAM',
 'APH',
 'ADI',
 'ANSS',
 'AAPL',
 'AMAT',
 'ANET',
 'ADSK',
 'AVGO',
 'CDNS',
 'CDW',
 'CSCO',
 'CTSH',
 'GLW',
 'ENPH',
 'EPAM',
 'FFIV',
 'FICO',
 'FSLR',
 'FTNT',
 'IT',
 'GEN',
 'HPE',
 'HPQ',
 'IBM',
 'INTC',
 'INTU',
 'JBL',
 'JNPR',
 'KEYS',
 'KLAC',
 'LRCX',
 'MCHP',
 'MU',
 'MSFT',
 'MPWR',
 'MSI',
 'NTAP',
 'NVDA',
 'NXPI',
 'ON',
 'ORCL',
 'PANW',
 'PTC',
 'QRVO',
 'QCOM',
 'ROP',
 'CRM',
 'STX',
 'NOW',
 'SWKS',
 'SNPS',
 'TEL',
 'TDY',
 'TER',
 'TXN',
 'TRMB',
 'TYL',
 'VRSN',
 'WDC',
 'ZBRA']

From the subset data frame, select one column and convert to list.

In [9]:
tech_tickers = tech_sector['ticker'].to_list()

In [10]:
tech_raw_dt = yf.download(tech_tickers, start = "2000-01-01", end = "2025-01-26")


[*********************100%***********************]  64 of 64 completed


The data that we downloaded combines several stocks and prices into a single row. We want to parse this arrangement into a dataframe that contains observations about a single stock on a given day per row. To do this, we can use the function [`stack()`](https://pandas.pydata.org/docs/reference/api/pandas.DataFrame.stack.html) and re-arrange the indices.

In [11]:
tech_raw_dt

Price,Adj Close,Adj Close,Adj Close,Adj Close,Adj Close,Adj Close,Adj Close,Adj Close,Adj Close,Adj Close,...,Volume,Volume,Volume,Volume,Volume,Volume,Volume,Volume,Volume,Volume
Ticker,AAPL,ACN,ADBE,ADI,ADSK,AKAM,AMAT,AMD,ANET,ANSS,...,SWKS,TDY,TEL,TER,TRMB,TXN,TYL,VRSN,WDC,ZBRA
Date,Unnamed: 1_level_2,Unnamed: 2_level_2,Unnamed: 3_level_2,Unnamed: 4_level_2,Unnamed: 5_level_2,Unnamed: 6_level_2,Unnamed: 7_level_2,Unnamed: 8_level_2,Unnamed: 9_level_2,Unnamed: 10_level_2,Unnamed: 11_level_2,Unnamed: 12_level_2,Unnamed: 13_level_2,Unnamed: 14_level_2,Unnamed: 15_level_2,Unnamed: 16_level_2,Unnamed: 17_level_2,Unnamed: 18_level_2,Unnamed: 19_level_2,Unnamed: 20_level_2,Unnamed: 21_level_2
2000-01-03,0.843077,,16.274673,28.095675,8.052906,321.250000,23.138018,15.500000,,2.765625,...,512000,315400,,1353500,8025000,10815600,138300,2270100,2461900,1055700
2000-01-04,0.771997,,14.909400,26.674360,7.660816,300.000000,21.994831,14.625000,,2.687500,...,292600,444300,,1611800,4963200,7952400,135100,3002200,7660300,522450
2000-01-05,0.783294,,15.204171,27.063759,7.178246,283.500000,21.171743,15.000000,,2.703125,...,411800,109100,,1855800,1930200,12142400,187200,6886600,3944600,612225
2000-01-06,0.715508,,15.328291,26.323891,6.740914,236.125000,21.206036,16.000000,,2.703125,...,385400,120700,,964700,2230800,11758400,107200,4003200,2468400,263925
2000-01-07,0.749401,,16.072985,27.063759,7.540174,248.375000,21.388952,16.250000,,2.703125,...,536000,222100,,809200,2166600,12938800,78900,3803200,9783000,333900
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
2025-01-17,229.979996,352.589996,429.989990,219.160004,291.450012,91.010002,192.050003,121.459999,119.949997,349.929993,...,2150600,264400,4102600.0,2253700,1010700,5407500,308000,503300,7160500,270600
2025-01-21,222.639999,356.420013,436.359985,221.449997,296.910004,94.709999,192.869995,122.279999,121.500000,352.070007,...,2389300,377400,2568300.0,4083000,1341800,9621700,235400,781800,7045600,446000
2025-01-22,223.830002,359.109985,437.320007,224.080002,301.450012,95.110001,195.509995,123.750000,129.820007,356.630005,...,2774100,668700,3162200.0,2262100,1098600,8222000,177300,674500,9340500,497500
2025-01-23,223.660004,363.260010,437.279999,228.350006,301.079987,95.889999,190.699997,123.040001,129.119995,360.359985,...,1777300,384700,1417800.0,2214300,1210400,12694700,214000,630100,7059500,377100


In [12]:
# First, check what tech_raw_dt.stack() looks like.
tech_raw_dt.stack(future_stack=True).reset_index()

Price,Date,Ticker,Adj Close,Close,High,Low,Open,Volume
0,2000-01-03,AAPL,0.843077,0.999442,1.004464,0.907924,0.936384,535796800.0
1,2000-01-03,ACN,,,,,,
2,2000-01-03,ADBE,16.274673,16.390625,16.875000,16.062500,16.812500,7384400.0
3,2000-01-03,ADI,28.095675,45.093750,46.937500,44.000000,46.750000,3655600.0
4,2000-01-03,ADSK,8.052906,8.343750,8.656250,8.031250,8.500000,2845600.0
...,...,...,...,...,...,...,...,...
403451,2025-01-24,TXN,184.158173,185.520004,191.500000,185.029999,190.000000,15856600.0
403452,2025-01-24,TYL,591.929993,591.929993,594.969971,590.210022,591.400024,155300.0
403453,2025-01-24,VRSN,210.729996,210.729996,210.880005,206.009995,206.020004,563300.0
403454,2025-01-24,WDC,67.410004,67.410004,69.440002,67.360001,68.910004,6046500.0


In [13]:
tech_dt = (tech_raw_dt
        .stack(future_stack=True)
        .reset_index()
        .sort_values(['Ticker','Date']))
tech_dt.columns

Index(['Date', 'Ticker', 'Adj Close', 'Close', 'High', 'Low', 'Open',
       'Volume'],
      dtype='object', name='Price')

In [14]:
tech_dt = (tech_raw_dt
           .stack(future_stack=True)
           .reset_index()
           .sort_values(['Ticker', 'Date']))
tech_dt.columns.name = None
tech_dt.columns


Index(['Date', 'Ticker', 'Adj Close', 'Close', 'High', 'Low', 'Open',
       'Volume'],
      dtype='object')

# Storing Data in CSV



+ We have some data. How do we store it?
+ We can compare two options, CSV and Parqruet, by measuring their performance:

    - Time to save.
    - Space required.

In [15]:
def get_dir_size(path='.'):
    '''Returns the total size of files contained in path.'''
    total = 0
    with os.scandir(path) as it:
        for entry in it:
            if entry.is_file():
                total += entry.stat().st_size
            elif entry.is_dir():
                total += get_dir_size(entry.path)
    return total

In [16]:
import time

In [17]:
temp = os.getenv("TEMP_DATA")
#05_src file create a temp file
os.makedirs(temp, exist_ok=True)
stock_path = os.path.join(temp, "stock_px.csv")

In [18]:
stock_path

'../../05_src/data/temp/stock_px.csv'

In [19]:
tech_dt.shape

(403456, 8)

In [20]:
start = time.time()
tech_dt.to_csv(stock_path, index = False)
end = time.time()

print(f'Writing to dt ({tech_dt.shape})csv took {end - start} seconds.')
print(f'Csv file size { os.path.getsize(stock_path)*1e-6 } MB')

Writing to dt ((403456, 8))csv took 5.075342416763306 seconds.
Csv file size 41.766062999999995 MB


## Save Data to Parquet

### Dask 

We can work with with large data sets and parquet files. In fact, recent versions of pandas support pyarrow data types and future versions will require a pyarrow backend. The pyarrow library is an interface between Python and the Appache Arrow project. The [parquet data format](https://parquet.apache.org/) and [Arrow](https://arrow.apache.org/docs/python/parquet.html) are projects of the Apache Foundation.

However, Dask is much more than an interface to Arrow: Dask provides parallel and distributed computing on pandas-like dataframes. It is also relatively easy to use, bridging a gap between pandas and Spark. 

In [21]:
import dask.dataframe as dd



In [22]:
px_dd = dd.from_pandas(tech_dt, npartitions = len(tech_tickers))
parquet_path = os.path.join(temp, "stock_px.parquet")

start = time.time()
px_dd.to_parquet(parquet_path, engine = "pyarrow")
end = time.time()

print(f'Writing dd ({tech_dt.shape}) to parquet took {end - start} seconds.')
print(f'Parquet file size { get_dir_size(parquet_path)*1e-6 } MB')

Writing dd ((403456, 8)) to parquet took 1.3962295055389404 seconds.
Parquet file size 16.590623 MB


### Parquet files and Dask Dataframes

+ Parquet files are immutable: once written, they cannot be modified.
+ Dask DataFrames are a useful implementation to manipulate data stored in parquets.
+ Parquet and Dask are not the same: parquet is a file format that can be accessed by many applications and programming languages (Python, R, PowerBI, etc.), while Dask is a package in Python to work with large datasets using distributed computation.
+ **Dask is not for everything** (see [Dask DataFrames Best Practices](https://docs.dask.org/en/stable/dataframe-best-practices.html)). 

    - Consider cases suchas small to large joins, where the small dataframe fits in memory, but the large one does not. 
    - If possible, use pandas: reduce, then use pandas.
    - Pandas performance tips apply to Dask.
    - Use the index: it is beneficial to have a well-defined index in Dask DataFrames, as it may speed up searching (filtering) the data. A one-dimensional index is allowed.
    - Avoid (or minimize) full-data shuffling: indexing is an expensive operations. 
    - Some joins are more expensive than others. 

        * Not expensive:

            - Join a Dask DataFrame with a pandas DataFrame.
            - Join a Dask DataFrame with another Dask DataFrame of a single partition.
            - Join Dask DataFrames along their indexes.

        * Expensive:

            - Join Dask DataFrames along columns that are not their index.


# How do we store prices?

+ We can store our data as a single blob. This can be difficult to maintain, especially because parquet files are immutable.
+ Strategy: organize data files by ticker and date. Update only latest month.



In [23]:
# CLean up before start
PRICE_DATA = os.getenv("PRICE_DATA")
import shutil
if os.path.exists(PRICE_DATA):
    shutil.rmtree(PRICE_DATA)

In [24]:
for ticker in tech_dt['Ticker'].unique():
    ticker_dt = tech_dt[tech_dt['Ticker'] == ticker]
    ticker_dt = ticker_dt.assign(Year = ticker_dt.Date.dt.year)
    for yr in ticker_dt['Year'].unique():
        yr_dd = dd.from_pandas(ticker_dt[ticker_dt['Year'] == yr],2)
        yr_path = os.path.join(PRICE_DATA, ticker, f"{ticker}_{yr}")
        os.makedirs(os.path.dirname(yr_path), exist_ok=True)
        yr_dd.to_parquet(yr_path, engine = "pyarrow")
    

Why would we want to store data this way?

+ Easier to maintain. We do not update old data, only recent data.
+ We can also access all files as follows.

# Load, Transform and Save 

## Load

+ Parquet files can be read individually or as a collection.
+ `dd.read_parquet()` can take a list (collection) of files as input.
+ Use `glob` to get the collection of files.

In [25]:
from glob import glob

parquet_files = glob(os.path.join(PRICE_DATA, "**/*.parquet"), recursive = True)
dd_px = dd.read_parquet(parquet_files).set_index("Ticker")

In [26]:
len(parquet_files)

3328

In [27]:
dd_px

Unnamed: 0_level_0,Date,Adj Close,Close,High,Low,Open,Volume,Year
npartitions=64,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1,Unnamed: 7_level_1,Unnamed: 8_level_1
AAPL,datetime64[ns],float64,float64,float64,float64,float64,float64,int32
ACN,...,...,...,...,...,...,...,...
...,...,...,...,...,...,...,...,...
ZBRA,...,...,...,...,...,...,...,...
ZBRA,...,...,...,...,...,...,...,...


## Transform

+ This transformation step will create a *Features* data set. In our case, features will be stock returns (we obtained prices).
+ Dask dataframes work like pandas dataframes: in particular, we can perform groupby and apply operations.
+ Notice the use of [an anonymous (lambda) function](https://realpython.com/python-lambda/) in the apply statement.

In [28]:
dd_shift = dd_px.groupby('Ticker', group_keys=False).apply(
    lambda x: x.assign(Close_lag_1 = x['Close'].shift(1))
)

  Before: .apply(func)
  After:  .apply(func, meta={'x': 'f8', 'y': 'f8'}) for dataframe result
  or:     .apply(func, meta=('x', 'f8'))            for series result
  dd_shift = dd_px.groupby('Ticker', group_keys=False).apply(


In [29]:
dd_rets = dd_shift.assign(
    Returns = lambda x: x['Close']/x['Close_lag_1'] - 1
)

## Lazy Exection

What does `dd_rets` contain?

In [30]:
dd_rets

Unnamed: 0_level_0,Date,Adj Close,Close,High,Low,Open,Volume,Year,Close_lag_1,Returns
npartitions=64,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1,Unnamed: 7_level_1,Unnamed: 8_level_1,Unnamed: 9_level_1,Unnamed: 10_level_1
AAPL,datetime64[ns],float64,float64,float64,float64,float64,float64,int32,float64,float64
ACN,...,...,...,...,...,...,...,...,...,...
...,...,...,...,...,...,...,...,...,...,...
ZBRA,...,...,...,...,...,...,...,...,...,...
ZBRA,...,...,...,...,...,...,...,...,...,...


+ Dask is a lazy execution framework: commands will not execute until they are required. 
+ To trigger an execution in dask use `.compute()`.

In [31]:
dd_rets.compute()

Unnamed: 0_level_0,Date,Adj Close,Close,High,Low,Open,Volume,Year,Close_lag_1,Returns
Ticker,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1,Unnamed: 7_level_1,Unnamed: 8_level_1,Unnamed: 9_level_1,Unnamed: 10_level_1
AAPL,2000-01-03,0.843077,0.999442,1.004464,0.907924,0.936384,535796800.0,2000,,
AAPL,2000-01-04,0.771997,0.915179,0.987723,0.903460,0.966518,512377600.0,2000,0.999442,-0.084310
AAPL,2000-01-05,0.783294,0.928571,0.987165,0.919643,0.926339,778321600.0,2000,0.915179,0.014633
AAPL,2000-01-06,0.715508,0.848214,0.955357,0.848214,0.947545,767972800.0,2000,0.928571,-0.086538
AAPL,2000-01-07,0.749401,0.888393,0.901786,0.852679,0.861607,460734400.0,2000,0.848214,0.047369
...,...,...,...,...,...,...,...,...,...,...
ZBRA,2025-01-17,405.709991,405.709991,407.290009,402.290009,406.040009,270600.0,2025,402.720001,0.007424
ZBRA,2025-01-21,418.070007,418.070007,419.850006,407.619995,407.619995,446000.0,2025,405.709991,0.030465
ZBRA,2025-01-22,420.570007,420.570007,427.760010,419.589996,425.239990,497500.0,2025,418.070007,0.005980
ZBRA,2025-01-23,421.109985,421.109985,422.290009,414.450012,417.619995,377100.0,2025,420.570007,0.001284


## Save

+ Apply transformations to calculate daily returns
+ Store the enriched data, the silver dataset, in a new directory.
+ Should we keep the same namespace? All columns?

In [32]:
# CLean up before save
FEATURES_DATA = os.getenv("FEATURES_DATA")
if os.path.exists(FEATURES_DATA):
    shutil.rmtree(FEATURES_DATA)
dd_rets.to_parquet(FEATURES_DATA, overwrite = True)

# Optional: from Jupyter to Command Line

+ We have drafted our code in a Jupyter Notebook. 
+ Finalized code should be written in Python modules.

## Object Oriented vs Functional Programming

+ We can use classes to keep parameters and functions together.
+ We *could* use Object Oriented Programming, but parallelization of data manipulation and modelling tasks benefit from *Functional Programming*.
+ An Idea: 

    - [Data Oriented Programming](https://blog.klipse.tech/dop/2022/06/22/principles-of-dop.html).
    - Use the class to bundle together parameters and functions.
    - Use stateless operations and treat all data objects as immutable (we do not modify them, we overwrite them).
    - Take advantage of [`@staticmethod`](https://realpython.com/instance-class-and-static-methods-demystified/).

The code is in `./05_src/`.

Our original design was:

![](./images/02_target_pipeline_manager.png)



The `DataManager` class in `./05_src/data_manager.py` is a simple implementation of the ideas and code discussed in this notebook. The lines below will download data for about 500 stocks from the S&P500. Using this data a few features will be created and stored in the features data set.

First, instantiate an object of class `DataManager`.

In [33]:
from data_manager import DataManager
dm = DataManager()

Download all prices.

In [34]:
dm.download_all()

2025-02-01 11:51:35,833, data_manager.py, 42, INFO, Getting price data for all tickers.
2025-02-01 11:51:35,835, data_manager.py, 51, INFO, Getting tickers from ../../05_src/data/tickers/sp500_wiki.csv
2025-02-01 11:51:35,852, data_manager.py, 57, INFO, Processing all tickers
2025-02-01 11:51:35,854, data_manager.py, 70, INFO, Processing ticker ['MMM', 'AOS', 'ABT', 'ABBV', 'ACN', 'ADBE', 'AMD', 'AES', 'AFL', 'A', 'APD', 'ABNB', 'AKAM', 'ALB', 'ARE', 'ALGN', 'ALLE', 'LNT', 'ALL', 'GOOGL', 'GOOG', 'MO', 'AMZN', 'AMCR', 'AEE', 'AAL', 'AEP', 'AXP', 'AIG', 'AMT', 'AWK', 'AMP', 'AME', 'AMGN', 'APH', 'ADI', 'ANSS', 'AON', 'APA', 'AAPL', 'AMAT', 'APTV', 'ACGL', 'ADM', 'ANET', 'AJG', 'AIZ', 'T', 'ATO', 'ADSK', 'ADP', 'AZO', 'AVB', 'AVY', 'AXON', 'BKR', 'BALL', 'BAC', 'BK', 'BBWI', 'BAX', 'BDX', 'BRK.B', 'BBY', 'BIO', 'TECH', 'BIIB', 'BLK', 'BX', 'BA', 'BKNG', 'BWA', 'BXP', 'BSX', 'BMY', 'AVGO', 'BR', 'BRO', 'BF.B', 'BLDR', 'BG', 'CDNS', 'CZR', 'CPT', 'CPB', 'COF', 'CAH', 'KMX', 'CCL', 'CARR', 

Finally, add features to the data set and save to a *feature store*.

In [35]:
dm.featurize()

2025-02-01 12:02:23,545, data_manager.py, 114, INFO, Creating features data.
2025-02-01 12:02:23,546, data_manager.py, 124, INFO, Loading price data from ../../05_src/data/prices/
2025-02-01 12:02:33,282, data_manager.py, 133, INFO, Creating features
  Before: .apply(func)
  After:  .apply(func, meta={'x': 'f8', 'y': 'f8'}) for dataframe result
  or:     .apply(func, meta=('x', 'f8'))            for series result
  features = (price_dd.groupby('Ticker', group_keys=False)
2025-02-01 12:02:33,311, data_manager.py, 158, INFO, Saving features to ../../05_src/data/features/stock_features.parquet


In [40]:
import pandas as pd


data = pd.DataFrame({
    "ticker": ["AAPL", "GOOG", "MSFT"],
    "price": [150, 2800, 300],
    "date": ["2025-01-01", "2025-01-02", "2025-01-03"]
})

print(data.head())


  ticker  price        date
0   AAPL    150  2025-01-01
1   GOOG   2800  2025-01-02
2   MSFT    300  2025-01-03
