# Working with parquet files

## Objective

+ In this assignment, we will use the data downloaded with the module `data_manager` to create features.

(11 pts total)

## Prerequisites

+ This notebook assumes that price data is available to you in the environment variable `PRICE_DATA`. If you have not done so, then execute the notebook `01_materials/labs/2_data_engineering.ipynb` to create this data set.


+ Load the environment variables using dotenv. (1 pt)

In [1]:
%load_ext dotenv
%dotenv 

+ Load the environment variable `PRICE_DATA`.
+ Use [glob](https://docs.python.org/3/library/glob.html) to find the path of all parquet files in the directory `PRICE_DATA`.

(1pt)

In [2]:
import os
from glob import glob

In [None]:

Price_data_file = os.getenv("PRICE_DATA")
parquet_files = glob(os.path.join(Price_data_file, "*/*/*.parquet"))
parquet_files


['../../05_src/data/prices\\A\\A_2000.parquet\\part.0.parquet',
 '../../05_src/data/prices\\A\\A_2001.parquet\\part.0.parquet',
 '../../05_src/data/prices\\A\\A_2002.parquet\\part.0.parquet',
 '../../05_src/data/prices\\A\\A_2003.parquet\\part.0.parquet',
 '../../05_src/data/prices\\A\\A_2004.parquet\\part.0.parquet',
 '../../05_src/data/prices\\A\\A_2005.parquet\\part.0.parquet',
 '../../05_src/data/prices\\A\\A_2006.parquet\\part.0.parquet',
 '../../05_src/data/prices\\A\\A_2007.parquet\\part.0.parquet',
 '../../05_src/data/prices\\A\\A_2008.parquet\\part.0.parquet',
 '../../05_src/data/prices\\A\\A_2009.parquet\\part.0.parquet',
 '../../05_src/data/prices\\A\\A_2010.parquet\\part.0.parquet',
 '../../05_src/data/prices\\A\\A_2011.parquet\\part.0.parquet',
 '../../05_src/data/prices\\A\\A_2012.parquet\\part.0.parquet',
 '../../05_src/data/prices\\A\\A_2013.parquet\\part.0.parquet',
 '../../05_src/data/prices\\A\\A_2014.parquet\\part.0.parquet',
 '../../05_src/data/prices\\A\\A_2015.pa

In [90]:
Price_data_file

'../05_src/data/prices/'

In [8]:
import dask.dataframe as dd
import numpy as np

For each ticker and using Dask, do the following:

+ Add lags for variables Close and Adj_Close.
+ Add returns based on Adjusted Close:
    
    - `returns`: (Adj Close / Adj Close_lag) - 1

+ Add the following range: 

    - `hi_lo_range`: this is the day's High minus Low.

+ Assign the result to `dd_feat`.

(4 pt)

In [12]:
dd_px = dd.read_parquet(parquet_files).set_index("ticker")
dd_px.head()


Price,Date,Adj Close,Close,High,Low,Open,Volume,sector,subsector,year
ticker,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1,Unnamed: 7_level_1,Unnamed: 8_level_1,Unnamed: 9_level_1,Unnamed: 10_level_1
A,2000-01-03 00:00:00+00:00,43.532211,51.502148,56.464592,48.193848,56.330471,4674353,Health Care,Life Sciences Tools & Services,2000
A,2000-01-04 00:00:00+00:00,40.206848,47.567955,49.266811,46.316166,48.730328,4765083,Health Care,Life Sciences Tools & Services,2000
A,2000-01-05 00:00:00+00:00,37.712811,44.61731,47.567955,43.141991,47.389126,5758642,Health Care,Life Sciences Tools & Services,2000
A,2000-01-06 00:00:00+00:00,36.276848,42.918453,44.349072,41.577251,44.08083,2534434,Health Care,Life Sciences Tools & Services,2000
A,2000-01-07 00:00:00+00:00,39.299919,46.494991,47.165592,42.203148,42.247852,2819626,Health Care,Life Sciences Tools & Services,2000


In [34]:
#Fill missing columns with NaN to avoid KeyErrors during groupby operations
if 'Adj_Close' not in dd_px.columns:
    dd_px['Adj_Close'] = float('nan')
if 'High' not in dd_px.columns:
    dd_px['High'] = float('nan')
if 'Low' not in dd_px.columns:
    dd_px['Low'] = float('nan')

# Create the feature DataFrame with lags, returns, positive_return, and hi_lo_range
dd_feat = (dd_px.groupby('ticker')
           .apply(lambda x: x.assign(
               Close_lag_1=x['Close'].shift(1),
               Adj_Close_lag_1=x['Adj_Close'].shift(1),
               hi_lo_range=x['High'] - x['Low']
           ))
           .assign(
               returns=lambda x: (x['Adj_Close'] / x['Adj_Close_lag_1']) - 1,
               positive_return=lambda x: (x['returns'] > 0).astype(int)
           )
)

dd_feat



  Before: .apply(func)
  After:  .apply(func, meta={'x': 'f8', 'y': 'f8'}) for dataframe result
  or:     .apply(func, meta=('x', 'f8'))            for series result
  .apply(lambda x: x.assign(


Unnamed: 0_level_0,Date,Adj Close,Close,High,Low,Open,Volume,sector,subsector,year,Adj_Close,Close_lag_1,Adj_Close_lag_1,hi_lo_range,returns,positive_return
npartitions=11207,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1,Unnamed: 7_level_1,Unnamed: 8_level_1,Unnamed: 9_level_1,Unnamed: 10_level_1,Unnamed: 11_level_1,Unnamed: 12_level_1,Unnamed: 13_level_1,Unnamed: 14_level_1,Unnamed: 15_level_1,Unnamed: 16_level_1
,"datetime64[ns, UTC]",float64,float64,float64,float64,float64,int64,object,object,int32,float64,float64,float64,float64,float64,int32
,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...


In [35]:
dd_feat = dd_feat.compute()

+ Convert the Dask data frame to a pandas data frame. 
+ Add a rolling average return calculation with a window of 10 days.
+ *Tip*: Consider using `.rolling(10).mean()`.

(3 pt)

In [33]:
dd_feat['rolling_avg_return_10'] = dd_feat.groupby('ticker')['returns'].transform(lambda x: x.rolling(10).mean())

Please comment:

+ Was it necessary to convert to pandas to calculate the moving average return?
+ Would it have been better to do it in Dask? Why?

(1 pt)

1. Was it necessary to convert to pandas to calculate the moving average return?

No, converting to pandas was not strictly necessary to calculate a moving average return. Dask supports rolling window calculations, so it could handle a 10-day rolling average calculation within each ticker group directly in Dask.

2. Would it have been better to do it in Dask? Why?

Yes, it would generally be better to perform this rolling calculation in Dask if the dataset is large. Dask is designed to handle large datasets that don't fit into memory by performing computations in chunks and managing parallel processing. Keeping the data in Dask allows us to avoid loading the entire dataset into memory at once, which is ideal when working with large data. Additionally, Dask would only compute the final results when explicitly instructed (e.g., by calling .compute()), making it more memory-efficient and scalable for big data workflows.

Here’s how you could calculate the 10-day rolling average return in Dask directly without converting to pandas:

python
Copy code:
dd_feat['rolling_avg_return_10'] = dd_feat.groupby('ticker')['returns'].apply(lambda x: x.rolling(10).mean(), meta=('returns', 'float64'))

This way, the computation remains lazy, and Dask handles the memory management until the final .compute() call.

## Criteria

The [rubric](./assignment_1_rubric_clean.xlsx) contains the criteria for grading.

## Submission Information

🚨 **Please review our [Assignment Submission Guide](https://github.com/UofT-DSI/onboarding/blob/main/onboarding_documents/submissions.md)** 🚨 for detailed instructions on how to format, branch, and submit your work. Following these guidelines is crucial for your submissions to be evaluated correctly.

### Submission Parameters:
* Submission Due Date: `HH:MM AM/PM - DD/MM/YYYY`
* The branch name for your repo should be: `assignment-1`
* What to submit for this assignment:
    * This Jupyter Notebook (assignment_1.ipynb) should be populated and should be the only change in your pull request.
* What the pull request link should look like for this assignment: `https://github.com/<your_github_username>/production/pull/<pr_id>`
    * Open a private window in your browser. Copy and paste the link to your pull request into the address bar. Make sure you can see your pull request properly. This helps the technical facilitator and learning support staff review your submission easily.

Checklist:
- [ ] Created a branch with the correct naming convention.
- [ ] Ensured that the repository is public.
- [ ] Reviewed the PR description guidelines and adhered to them.
- [ ] Verify that the link is accessible in a private browser window.

If you encounter any difficulties or have questions, please don't hesitate to reach out to our team via our Slack at `#cohort-3-help`. Our Technical Facilitators and Learning Support staff are here to help you navigate any challenges.