# Working with parquet files

## Objective

+ In this assignment, we will use the data downloaded with the module `data_manager` to create features.

(11 pts total)

## Prerequisites

+ This notebook assumes that price data is available to you in the environment variable `PRICE_DATA`. If you have not done so, then execute the notebook `01_materials/labs/2_data_engineering.ipynb` to create this data set.


+ Load the environment variables using dotenv. (1 pt)

In [1]:
#### >>> Helper Section <<< ####

# help(os)

In [2]:
## Write your code below.

# load dotenv library with magic command "%"
%load_ext dotenv
# load the environment variables
%dotenv 

In [3]:
import os, sys
sys.path.append(os.getenv('SRC_DIR'))


In [4]:
## start logger 
from utils.logger import get_logger
_logs = get_logger(__name__)

from datetime import datetime
now = datetime.now()
formatted_date_weekday = now.strftime("%A, %B %d, %Y")
_logs.info(f"Hello! Welcome to Python's World 🐍🐍~! Today is {datetime.now().strftime('%A, %B %d, %Y')}")

2025-09-30 11:36:37,021, 1150614155.py, 8, INFO, Hello! Welcome to Python's World 🐍🐍~! Today is Tuesday, September 30, 2025


+ Load the environment variable `PRICE_DATA`.
+ Use [glob](https://docs.python.org/3/library/glob.html) to find the path of all parquet files in the directory `PRICE_DATA`.

(1pt)

In [5]:
## Write your code below.

Path_Price_Data = os.getenv('PRICE_DATA')

from glob import glob

parquet_files = glob(os.path.join(Path_Price_Data, "**/*.parquet"), recursive = True)
parquet_files

['../../05_src/data/prices/PLG/PLG_2012/part.1.parquet',
 '../../05_src/data/prices/PLG/PLG_2012/part.0.parquet',
 '../../05_src/data/prices/PLG/PLG_2015/part.1.parquet',
 '../../05_src/data/prices/PLG/PLG_2015/part.0.parquet',
 '../../05_src/data/prices/PLG/PLG_2017/part.1.parquet',
 '../../05_src/data/prices/PLG/PLG_2017/part.0.parquet',
 '../../05_src/data/prices/PLG/PLG_2019/part.1.parquet',
 '../../05_src/data/prices/PLG/PLG_2019/part.0.parquet',
 '../../05_src/data/prices/PLG/PLG_2007/part.1.parquet',
 '../../05_src/data/prices/PLG/PLG_2007/part.0.parquet',
 '../../05_src/data/prices/PLG/PLG_2009/part.1.parquet',
 '../../05_src/data/prices/PLG/PLG_2009/part.0.parquet',
 '../../05_src/data/prices/PLG/PLG_2006/part.1.parquet',
 '../../05_src/data/prices/PLG/PLG_2006/part.0.parquet',
 '../../05_src/data/prices/PLG/PLG_2018/part.1.parquet',
 '../../05_src/data/prices/PLG/PLG_2018/part.0.parquet',
 '../../05_src/data/prices/PLG/PLG_2011/part.1.parquet',
 '../../05_src/data/prices/PLG/

For each ticker and using Dask, do the following:

+ Add lags for variables Close and Adj_Close.
+ Add returns based on Close:
    
    - `returns`: (Close / Close_lag_1) - 1

+ Add the following range: 

    - `hi_lo_range`: this is the day's High minus Low.

+ Assign the result to `dd_feat`.

(4 pt)

In [6]:
## Write your code below.
import dask.dataframe as dd
dd_px = dd.read_parquet(parquet_files).set_index("ticker")

In [7]:
dd_px


Unnamed: 0_level_0,Date,Open,High,Low,Close,Adj Close,Volume,source,Year
npartitions=60,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1,Unnamed: 7_level_1,Unnamed: 8_level_1,Unnamed: 9_level_1
ACBI,datetime64[ns],float64,float64,float64,float64,float64,float64,string,int32
ACM,...,...,...,...,...,...,...,...,...
...,...,...,...,...,...,...,...,...,...
ZN,...,...,...,...,...,...,...,...,...
ZN,...,...,...,...,...,...,...,...,...


In [8]:
# add new column Close_lag_1
dd_shift = dd_px.groupby('ticker',group_keys=False).apply(
    lambda x:x.assign(Close_lag_1 =x['Close'].shift(1))
)

  Before: .apply(func)
  After:  .apply(func, meta={'x': 'f8', 'y': 'f8'}) for dataframe result
  or:     .apply(func, meta=('x', 'f8'))            for series result
  dd_shift = dd_px.groupby('ticker',group_keys=False).apply(


In [9]:
# calculate Return as ROI_pct 
dd_roi = dd_shift.assign(
    ROI_pct = lambda x: (x['Close']/x['Close_lag_1'] - 1)*100
)

# calcualte hi_low-range
dd_roi_HL = dd_roi.assign(
    hi_lo_range = lambda x: x['High']-x['Low']
)

In [10]:
# Check the output by looking at the first 10 row from MET 
met = dd_roi_HL.loc['MET']
met.compute().head(10)

Unnamed: 0_level_0,Date,Open,High,Low,Close,Adj Close,Volume,source,Year,Close_lag_1,ROI_pct,hi_lo_range
ticker,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1,Unnamed: 7_level_1,Unnamed: 8_level_1,Unnamed: 9_level_1,Unnamed: 10_level_1,Unnamed: 11_level_1,Unnamed: 12_level_1
MET,2002-07-03,26.158646,26.158646,25.499109,25.757576,16.826885,3516600.0,MET.csv,2002,,,0.659536
MET,2002-07-05,26.069519,26.327986,25.935829,26.203209,17.11801,1525100.0,MET.csv,2002,25.757576,1.730104,0.392157
MET,2002-07-08,26.292336,26.488413,25.935829,26.363636,17.222818,1270300.0,MET.csv,2002,26.203209,0.612242,0.552584
MET,2002-07-09,26.559715,26.648842,25.623886,25.64171,16.751184,1526700.0,MET.csv,2002,26.363636,-2.738339,1.024956
MET,2002-07-10,25.891266,26.024956,24.937611,24.937611,16.291216,2284200.0,MET.csv,2002,25.64171,-2.745915,1.087345
MET,2002-07-11,24.955437,25.525846,24.59893,24.910873,16.273756,1920900.0,MET.csv,2002,24.937611,-0.107216,0.926916
MET,2002-07-12,25.222816,25.222816,24.242424,24.705883,16.139833,1266600.0,MET.csv,2002,24.910873,-0.822895,0.980392
MET,2002-07-15,24.643494,24.643494,23.55615,24.59893,16.069962,2030200.0,MET.csv,2002,24.705883,-0.432904,1.087343
MET,2002-07-16,24.197861,24.86631,23.110518,23.333334,15.243179,2811200.0,MET.csv,2002,24.59893,-5.144924,1.755793
MET,2002-07-17,23.796791,24.046347,22.103386,22.584671,14.754088,3880500.0,MET.csv,2002,23.333334,-3.208555,1.942961


+ Convert the Dask data frame to a pandas data frame. 
+ Add a new feature containing the moving average of `returns` using a window of 10 days. There are several ways to solve this task, a simple one uses `.rolling(10).mean()`.

(3 pt)

In [11]:
## Write your code below.
## Convert the Dask data frame to a pandas data frame. 

import time
start = time.time() # start time

import tracemalloc 
tracemalloc.start() # start runtime ram tracking

df = dd_roi_HL.compute() 
df.head(20)

end = time.time()
_logs.info(f'Performing operation and reading the first 20 rows of df takes {end - start} seconds.')

# object ram 
_logs.info(f"The object 'dd_ROI_HL' when in Dask data frame, occupies {sys.getsizeof(dd_roi_HL)} byte of memory")

# runtime memory checked
current, peak = tracemalloc.get_traced_memory()
_logs.info(f"Runtime Memory Profile - Current: {current / 1024**2:.2f} MB; Peak: {peak / 1024**2:.2f} MB")

tracemalloc.stop()


2025-09-30 11:37:26,164, 1550072122.py, 14, INFO, Performing operation and reading the first 20 rows of df takes 21.4671733379364 seconds.
2025-09-30 11:37:26,165, 1550072122.py, 17, INFO, The object 'dd_ROI_HL' when in Dask data frame, occupies 48 byte of memory
2025-09-30 11:37:26,166, 1550072122.py, 21, INFO, Runtime Memory Profile - Current: 21.04 MB; Peak: 92.66 MB


In [12]:
## add new feature column 10day_SMA for the Return

# start time and runtime ram tracking
start = time.time()
tracemalloc.start()  

dd_roi_HL_rol10 = dd_roi_HL.assign(
    ROI_pct_SMA_10 = lambda x: x['ROI_pct'].rolling(10).mean()
)
# time 
dd_roi_HL_rol10
end = time.time()
_logs.info(f'Performing SMA calculation takes {end - start} seconds.')

# object ram 
_logs.info(f"The object 'dd_roi_HL_rol10' when in Dask data frame, occupies {sys.getsizeof(dd_roi_HL_rol10)} byte of memory")

# runtime memory checked
current, peak = tracemalloc.get_traced_memory()
_logs.info(f"Runtime Memory Profile - Current: {current / 1024**2:.2f} MB; Peak: {peak / 1024**2:.2f} MB")

tracemalloc.stop()

2025-09-30 11:37:26,213, 427478100.py, 13, INFO, Performing SMA calculation takes 0.03384709358215332 seconds.
2025-09-30 11:37:26,214, 427478100.py, 16, INFO, The object 'dd_roi_HL_rol10' when in Dask data frame, occupies 48 byte of memory
2025-09-30 11:37:26,215, 427478100.py, 20, INFO, Runtime Memory Profile - Current: 0.10 MB; Peak: 0.11 MB


In [13]:
# Check the output by looking at the first 10 row from MET 
start = time.time()
met = dd_roi_HL_rol10.loc['MET']
met.compute().head(20)
end = time.time()
_logs.info(f'Performing operation and reading the first 20 rows for ticker "MET" takes {end - start} seconds.')

2025-09-30 11:37:38,462, 1928167749.py, 6, INFO, Performing operation and reading the first 20 rows for ticker "MET" takes 12.216049909591675 seconds.


### Please comment:

+ Was it necessary to convert to pandas to calculate the moving average return? 

    **Ans**: It depends on the ultimate goal, and the downstream application. If just considering the memory allocated to the object itself and the speed of doing calculation, doing in Dask only take less than 0.01 seconds and the object itself is only occupied 48 byte of memory (as it is just the container). In Pandas, just objects itself will take about 37 MB of memory and the calculation takes about 0.5 - 0.8 sec. If the memory size is a concern, run in the Dask. It is worth to note than it will take more than 11 seconds for Dask to the calculation one ticket (in our case above, 'MET') and convert it to pandas data frame. So it would be better to perform all the actual calculation in the end - the concept of **lazy execution**>   
    

+ Would it have been better to do it in Dask? Why? 

    **Ans**: As it mentioned previously, If the memory size is a concern, run in the Dask. The lazy execution defers the actual operation in the end. This is commonly used to defer heavy computation particularity in training larger models in ML. 

(1 pt)

In [None]:
##  BONUS: Time when calculate in pandas 
start = time.time()  
tracemalloc.start()

# calculatio in pandas
df_new = df.groupby('ticker', group_keys=False).apply( 
    lambda x: x.assign(ROI_pct_SMA10 = x['ROI_pct'].rolling(10).mean())
)
end = time.time() # end time
_logs.info(f'Performing SMA calculation takes {end - start} seconds.')

# object occupied ram 
_logs.info(f"The object 'df_new' converted to pandas data frame, occupies {sys.getsizeof(df_new)/1024**2:.2f} MB of memory")

# run time ram occupied
current, peak = tracemalloc.get_traced_memory()
_logs.info(f"Runtime Memory Profile - Current: {current / 1024**2:.2f} MB; Peak: {peak / 1024**2:.2f} MB")

display(df_new.loc['MET'].head(20))


2025-09-30 11:37:38,984, 3205398830.py, 10, INFO, [Performing SMA calculation takes 0.5118987560272217 seconds.
2025-09-30 11:37:39,147, 3205398830.py, 13, INFO, The object 'df_new' converted to pandas data frame, occupies 29.20 MB of memory
2025-09-30 11:37:39,148, 3205398830.py, 17, INFO, Runtime Memory Profile - Current: 37.11 MB; Peak: 74.30 MB


Unnamed: 0_level_0,Date,Open,High,Low,Close,Adj Close,Volume,source,Year,Close_lag_1,ROI_pct,hi_lo_range,ROI_pct_SMA10
ticker,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1,Unnamed: 7_level_1,Unnamed: 8_level_1,Unnamed: 9_level_1,Unnamed: 10_level_1,Unnamed: 11_level_1,Unnamed: 12_level_1,Unnamed: 13_level_1
MET,2002-07-03,26.158646,26.158646,25.499109,25.757576,16.826885,3516600.0,MET.csv,2002,,,0.659536,
MET,2002-07-05,26.069519,26.327986,25.935829,26.203209,17.11801,1525100.0,MET.csv,2002,25.757576,1.730104,0.392157,
MET,2002-07-08,26.292336,26.488413,25.935829,26.363636,17.222818,1270300.0,MET.csv,2002,26.203209,0.612242,0.552584,
MET,2002-07-09,26.559715,26.648842,25.623886,25.64171,16.751184,1526700.0,MET.csv,2002,26.363636,-2.738339,1.024956,
MET,2002-07-10,25.891266,26.024956,24.937611,24.937611,16.291216,2284200.0,MET.csv,2002,25.64171,-2.745915,1.087345,
MET,2002-07-11,24.955437,25.525846,24.59893,24.910873,16.273756,1920900.0,MET.csv,2002,24.937611,-0.107216,0.926916,
MET,2002-07-12,25.222816,25.222816,24.242424,24.705883,16.139833,1266600.0,MET.csv,2002,24.910873,-0.822895,0.980392,
MET,2002-07-15,24.643494,24.643494,23.55615,24.59893,16.069962,2030200.0,MET.csv,2002,24.705883,-0.432904,1.087343,
MET,2002-07-16,24.197861,24.86631,23.110518,23.333334,15.243179,2811200.0,MET.csv,2002,24.59893,-5.144924,1.755793,
MET,2002-07-17,23.796791,24.046347,22.103386,22.584671,14.754088,3880500.0,MET.csv,2002,23.333334,-3.208555,1.942961,


## Criteria

The [rubric](./assignment_1_rubric_clean.xlsx) contains the criteria for grading.

## Submission Information

🚨 **Please review our [Assignment Submission Guide](https://github.com/UofT-DSI/onboarding/blob/main/onboarding_documents/submissions.md)** 🚨 for detailed instructions on how to format, branch, and submit your work. Following these guidelines is crucial for your submissions to be evaluated correctly.

### Submission Parameters:
* Submission Due Date: `HH:MM AM/PM - DD/MM/YYYY`
* The branch name for your repo should be: `assignment-1`
* What to submit for this assignment:
    * This Jupyter Notebook (assignment_1.ipynb) should be populated and should be the only change in your pull request.
* What the pull request link should look like for this assignment: `https://github.com/<your_github_username>/production/pull/<pr_id>`
    * Open a private window in your browser. Copy and paste the link to your pull request into the address bar. Make sure you can see your pull request properly. This helps the technical facilitator and learning support staff review your submission easily.

Checklist:
- [ ] Created a branch with the correct naming convention.
- [ ] Ensured that the repository is public.
- [ ] Reviewed the PR description guidelines and adhered to them.
- [ ] Verify that the link is accessible in a private browser window.

If you encounter any difficulties or have questions, please don't hesitate to reach out to our team via our Slack at `#cohort-3-help`. Our Technical Facilitators and Learning Support staff are here to help you navigate any challenges.