# Working with parquet files

## Objective

+ In this assignment, we will use the data downloaded with the module `data_manager` to create features.

(11 pts total)

## Prerequisites

+ This notebook assumes that price data is available to you in the environment variable `PRICE_DATA`. If you have not done so, then execute the notebook `01_materials/labs/2_data_engineering.ipynb` to create this data set.


+ Load the environment variables using dotenv. (1 pt)

In [9]:
# Load variables using dotenv
from dotenv import load_dotenv
import os

# Load the .env file
load_dotenv() 

# Verify that PRICE_DATA is updated
price_data_path = os.getenv("PRICE_DATA")
print("Updated PRICE_DATA path:", price_data_path)
print("Directory exists:", os.path.isdir(price_data_path))


Updated PRICE_DATA path: C:/Users/Test/production/05_src/data/price_data/price_data
Directory exists: True


+ Load the environment variable `PRICE_DATA`.
+ Use [glob](https://docs.python.org/3/library/glob.html) to find the path of all parquet files in the directory `PRICE_DATA`.

(1pt)

In [10]:
import os
from glob import glob


# Updated glob to search subdirectories
price_data_path = os.getenv("PRICE_DATA")
parquet_files = glob(os.path.join(price_data_path, "*", "*.parquet"), recursive=True)

print("Parquet files found:", parquet_files)



Parquet files found: ['C:/Users/Test/production/05_src/data/price_data/price_data\\A\\A_2000.parquet', 'C:/Users/Test/production/05_src/data/price_data/price_data\\A\\A_2001.parquet', 'C:/Users/Test/production/05_src/data/price_data/price_data\\A\\A_2002.parquet', 'C:/Users/Test/production/05_src/data/price_data/price_data\\A\\A_2003.parquet', 'C:/Users/Test/production/05_src/data/price_data/price_data\\A\\A_2004.parquet', 'C:/Users/Test/production/05_src/data/price_data/price_data\\A\\A_2005.parquet', 'C:/Users/Test/production/05_src/data/price_data/price_data\\A\\A_2006.parquet', 'C:/Users/Test/production/05_src/data/price_data/price_data\\A\\A_2007.parquet', 'C:/Users/Test/production/05_src/data/price_data/price_data\\A\\A_2008.parquet', 'C:/Users/Test/production/05_src/data/price_data/price_data\\A\\A_2009.parquet', 'C:/Users/Test/production/05_src/data/price_data/price_data\\A\\A_2010.parquet', 'C:/Users/Test/production/05_src/data/price_data/price_data\\A\\A_2011.parquet', 'C:/Us

For each ticker and using Dask, do the following:

+ Add lags for variables Close and Adj_Close.
+ Add returns based on Adjusted Close:
    
    - `returns`: (Adj Close / Adj Close_lag) - 1

+ Add the following range: 

    - `hi_lo_range`: this is the day's High minus Low.

+ Assign the result to `dd_feat`.

(4 pt)

In [34]:
import dask.dataframe as dd

# Load Parquet files 
price_data_path = os.getenv("PRICE_DATA")
dd_feat = dd.read_parquet(price_data_path, index=False)

# Reset the index
# dd_feat = dd_feat.reset_index(drop=True)

# Set Unique key and drop dupes
dd_feat['ticker_date_key'] = dd_feat['ticker'].astype(str) + "_" + dd_feat['Date'].astype(str)
dd_feat = dd_feat.drop_duplicates(subset=['ticker_date_key'])

# Set index and repartition
dd_feat = dd_feat.set_index("ticker_date_key", sorted=True)
dd_feat = dd_feat.repartition(partition_size="100MB")

print("Dask DataFrame loaded successfully.")

Dask DataFrame loaded successfully.


In [35]:
# Display columns (had to verify column names to add lags)
print("Columns in dd_feat:", dd_feat.columns)


Columns in dd_feat: Index(['ticker', 'Date', 'Open', 'High', 'Low', 'Close', 'Adj Close', 'Volume',
       'sector', 'subsector', 'year'],
      dtype='object')


In [36]:
# Add lags for 'Close' and 'Adj_Close' with groupby('ticker')

dd_feat['Close Lag'] = dd_feat.groupby('ticker')['Close'].shift(1) 
dd_feat['Adj Close Lag'] = dd_feat.groupby('ticker')['Adj Close'].shift(1)

# Returns and Hi_Lo Range
dd_feat['Returns'] = (dd_feat['Adj Close'] / dd_feat['Adj Close Lag']) -1
dd_feat['Hi_Lo_Range'] = dd_feat['High'] - dd_feat['Low']

# Display first few rows to verify
print(dd_feat.head())


  Before: .shift(1)
  After:  .shift(1, meta={'x': 'f8', 'y': 'f8'}) for dataframe result
  or:     .shift(1, meta=('x', 'f8'))            for series result
  dd_feat['Close Lag'] = dd_feat.groupby('ticker')['Close'].shift(1)
  Before: .shift(1)
  After:  .shift(1, meta={'x': 'f8', 'y': 'f8'}) for dataframe result
  or:     .shift(1, meta=('x', 'f8'))            for series result
  dd_feat['Adj Close Lag'] = dd_feat.groupby('ticker')['Adj Close'].shift(1)


                ticker       Date       Open       High        Low      Close  \
ticker_date_key                                                                 
A_2000-01-03         A 2000-01-03  56.330471  56.464592  48.193848  51.502148   
A_2000-01-04         A 2000-01-04  48.730328  49.266811  46.316166  47.567955   
A_2000-01-05         A 2000-01-05  47.389126  47.567955  43.141991  44.617310   
A_2000-01-06         A 2000-01-06  44.080830  44.349072  41.577251  42.918453   
A_2000-01-07         A 2000-01-07  42.247852  47.165592  42.203148  46.494991   

                 Adj Close   Volume       sector  \
ticker_date_key                                    
A_2000-01-03     43.532223  4674353  Health Care   
A_2000-01-04     40.206841  4765083  Health Care   
A_2000-01-05     37.712814  5758642  Health Care   
A_2000-01-06     36.276836  2534434  Health Care   
A_2000-01-07     39.299919  2819626  Health Care   

                                      subsector  year  Close Lag  \

+ Convert the Dask data frame to a pandas data frame. 
+ Add a rolling average return calculation with a window of 10 days.
+ *Tip*: Consider using `.rolling(10).mean()`.

(3 pt)

In [42]:
# Convert Dask to Pandas
df = dd_feat.compute() 
print('Converted to Pandas')

# Calc 10 day rolling avg of Returns, groupedby ticker
df['Rolling_Avg_Return'] = df.groupby('ticker')['Returns'].rolling(10).mean()

# Display first few rows to verify
print(df[['Returns', 'Rolling_Avg_Return']].head(15))



Converted to Pandas
                 Returns  Rolling_Avg_Return
ticker_date_key                             
A_2000-01-03         NaN                 NaN
A_2000-01-04         NaN                 NaN
A_2000-01-05         NaN                 NaN
A_2000-01-06         NaN                 NaN
A_2000-01-07         NaN                 NaN
A_2000-01-10         NaN                 NaN
A_2000-01-11         NaN                 NaN
A_2000-01-12         NaN                 NaN
A_2000-01-13         NaN                 NaN
A_2000-01-14         NaN                 NaN
A_2000-01-18         NaN                 NaN
A_2000-01-19         NaN                 NaN
A_2000-01-20         NaN                 NaN
A_2000-01-21         NaN                 NaN
A_2000-01-24         NaN                 NaN


Please comment:

+ Was it necessary to convert to pandas to calculate the moving average return?

It wasn't necessary, however, my understanding is that pandas has efficient, built-in rolling functions that are great for simplied calculations and smaller data sets.  Pandas in this case is preferrable.

+ Would it have been better to do it in Dask? Why?

To calculate small data sets with simple functions would be unnecessarily a little more complex using Dask.  The effort isn't worth the time. 

(1 pt)

## Criteria

The [rubric](./assignment_1_rubric_clean.xlsx) contains the criteria for grading.

## Submission Information

🚨 **Please review our [Assignment Submission Guide](https://github.com/UofT-DSI/onboarding/blob/main/onboarding_documents/submissions.md)** 🚨 for detailed instructions on how to format, branch, and submit your work. Following these guidelines is crucial for your submissions to be evaluated correctly.

### Submission Parameters:
* Submission Due Date: `HH:MM AM/PM - DD/MM/YYYY`
* The branch name for your repo should be: `assignment-1`
* What to submit for this assignment:
    * This Jupyter Notebook (assignment_1.ipynb) should be populated and should be the only change in your pull request.
* What the pull request link should look like for this assignment: `https://github.com/<your_github_username>/production/pull/<pr_id>`
    * Open a private window in your browser. Copy and paste the link to your pull request into the address bar. Make sure you can see your pull request properly. This helps the technical facilitator and learning support staff review your submission easily.

Checklist:
- [ ] Created a branch with the correct naming convention.
- [ ] Ensured that the repository is public.
- [ ] Reviewed the PR description guidelines and adhered to them.
- [ ] Verify that the link is accessible in a private browser window.

If you encounter any difficulties or have questions, please don't hesitate to reach out to our team via our Slack at `#cohort-3-help`. Our Technical Facilitators and Learning Support staff are here to help you navigate any challenges.