# Working with parquet files

## Objective

+ In this assignment, we will use the data downloaded with the module `data_manager` to create features.

(11 pts total)

## Prerequisites

+ This notebook assumes that price data is available to you in the environment variable `PRICE_DATA`. If you have not done so, then execute the notebook `01_materials/labs/2_data_engineering.ipynb` to create this data set.


+ Load the environment variables using dotenv. (1 pt)

In [1]:
# Write your code below.
%load_ext dotenv
%dotenv ../../01_materials/labs/.env

In [2]:
import dask.dataframe as dd

In a future release, Dask DataFrame will use new implementation that
contains several improvements including a logical query planning.
The user-facing DataFrame API will remain unchanged.

The new implementation is already available and can be enabled by
installing the dask-expr library:

    $ pip install dask-expr

and turning the query planning option on:

    >>> import dask
    >>> dask.config.set({'dataframe.query-planning': True})
    >>> import dask.dataframe as dd

API documentation for the new implementation is available at
https://docs.dask.org/en/stable/dask-expr-api.html

Any feedback can be reported on the Dask issue tracker
https://github.com/dask/dask/issues 

  import dask.dataframe as dd


+ Load the environment variable `PRICE_DATA`.
+ Use [glob](https://docs.python.org/3/library/glob.html) to find the path of all parquet files in the directory `PRICE_DATA`.

(1pt)

In [3]:
import os
from glob import glob

# Write your code below.

# Load the environment variable PRICE_DATA
price_data_dir = os.getenv('PRICE_DATA')

# Use glob to find all parquet files in the directory PRICE_DATA
parquet_files = glob(os.path.join(price_data_dir, '*/*.parquet', 'part.*.parquet'))

In [4]:
# Optional Observation using for Metadata

import pandas as pd
# Load the first parquet file in the list to inspect columns
single_file = parquet_files[0]
df_obs = pd.read_parquet(single_file)

# Display column names and the first few rows of the file
print("Columns in the file:", df_obs.columns)
print("First few rows of the data:\n", df_obs.head())

Columns in the file: Index(['Date', 'Open', 'High', 'Low', 'Close', 'Adj Close', 'Volume', 'sector',
       'subsector', 'year'],
      dtype='object')
First few rows of the data:
              Date    Open    High     Low   Close  Adj Close   Volume  \
ticker                                                                  
CTAS   2008-01-02  8.3725  8.4050  8.0650  8.0875   4.451986  5793600   
CTAS   2008-01-03  8.1525  8.2875  8.1275  8.2425   4.537309  5339200   
CTAS   2008-01-04  8.1625  8.1850  7.9025  7.9025   4.350146  5692400   
CTAS   2008-01-07  7.9050  8.2175  7.9050  8.1175   4.468500  5946000   
CTAS   2008-01-08  8.1350  8.1475  7.7750  7.7750   4.279962  6229600   

             sector                     subsector  year  
ticker                                                   
CTAS    Industrials  Diversified Support Services  2008  
CTAS    Industrials  Diversified Support Services  2008  
CTAS    Industrials  Diversified Support Services  2008  
CTAS    Industria

For each ticker and using Dask, do the following:

+ Add lags for variables Close and Adj_Close.
+ Add returns based on Adjusted Close:
    
    - `returns`: (Adj Close / Adj Close_lag) - 1

+ Add the following range: 

    - `hi_lo_range`: this is the day's High minus Low.

+ Assign the result to `dd_feat`.

(4 pt)

In [56]:

# Read all parquet files into a Dask DataFrame, including the file path for 'ticker' in path
ddf = dd.read_parquet(parquet_files)

# Reset the index to make 'ticker' a column
ddf = ddf.reset_index()

# Verify the structure
print("Columns after reset_index:", ddf.columns)

# Define metadata for the DataFrame explicitly
initial_meta = {
    'ticker': 'object',
    'Date': 'datetime64[ns]',
    'Open': 'float64',
    'High': 'float64',
    'Low': 'float64',
    'Close': 'float64',
    'Adj Close': 'float64',
    'Volume': 'int64',
    'sector': 'object',
    'subsector': 'object',
    'year': 'int64'
}

# Sort data by 'ticker' and 'Date' within each partition
ddf = ddf.map_partitions(lambda df: df.sort_values(['ticker', 'Date']), meta=initial_meta)

# Create lagged variables for Close and Adj Close with explicit meta for .shift()
ddf['Close_lag'] = ddf.groupby('ticker')['Close'].shift(1, meta=('Close_lag', 'float64'))
ddf['Adj_Close_lag'] = ddf.groupby('ticker')['Adj Close'].shift(1, meta=('Adj_Close_lag', 'float64'))

# Calculate returns based on Adj Close
ddf['returns'] = (ddf['Adj Close'] / ddf['Adj_Close_lag']) - 1

# Calculate daily range: High - Low
ddf['hi_lo_range'] = ddf['High'] - ddf['Low']

# Update meta to include new calculated columns
updated_meta = {
    'ticker': 'object',
    'Date': 'datetime64[ns]',
    'Open': 'float64',
    'High': 'float64',
    'Low': 'float64',
    'Close': 'float64',
    'Adj Close': 'float64',
    'Volume': 'int64',
    'sector': 'object',
    'subsector': 'object',
    'year': 'int64',
    'Close_lag': 'float64',
    'Adj_Close_lag': 'float64',
    'returns': 'float64',
    'hi_lo_range': 'float64'
}

# Re-assign the meta to dd_feat
dd_feat = ddf.map_partitions(lambda df: df, meta=updated_meta)

actual_meta = ddf._meta
print("ddf._meta: ", actual_meta)

Columns after reset_index: Index(['ticker', 'Date', 'Open', 'High', 'Low', 'Close', 'Adj Close', 'Volume',
       'sector', 'subsector', 'year'],
      dtype='object')
ddf._meta:  Empty DataFrame
Columns: [ticker, Date, Open, High, Low, Close, Adj Close, Volume, sector, subsector, year, Close_lag, Adj_Close_lag, returns, hi_lo_range]
Index: []


+ Convert the Dask data frame to a pandas data frame. 
+ Add a rolling average return calculation with a window of 10 days.
+ *Tip*: Consider using `.rolling(10).mean()`.

(3 pt)

In [None]:
# Write your code below.

dd_feat = dd_feat.reset_index(drop=True)

# Compute to convert Dask DataFrame to pandas DataFrame
df = dd_feat.compute()

# Add a 10-day rolling average for returns
df['rolling_avg_return'] = df.groupby('ticker')['returns'].rolling(10).mean().reset_index(level=0, drop=True)

Please comment:

### Was it necessary to convert to pandas to calculate the moving average return?
+ Yes, because Dask’s support for `rolling` operations with `groupby` is limited and can lead to complex implementations, particularly if the data fits into memory. Converting to pandas simplifies the moving average calculation, making it straightforward without needing to manage Dask's partitioning and indexing constraints.

### Would it have been better to do it in Dask? Why?
+ For larger-than-memory data, using Dask would be essential. However, for datasets that comfortably fit into memory, pandas is preferable because it avoids Dask’s overhead and partition management requirements.

(1 pt)

## Criteria

The [rubric](./assignment_1_rubric_clean.xlsx) contains the criteria for grading.

## Submission Information

🚨 **Please review our [Assignment Submission Guide](https://github.com/UofT-DSI/onboarding/blob/main/onboarding_documents/submissions.md)** 🚨 for detailed instructions on how to format, branch, and submit your work. Following these guidelines is crucial for your submissions to be evaluated correctly.

### Submission Parameters:
* Submission Due Date: `HH:MM AM/PM - DD/MM/YYYY`
* The branch name for your repo should be: `assignment-1`
* What to submit for this assignment:
    * This Jupyter Notebook (assignment_1.ipynb) should be populated and should be the only change in your pull request.
* What the pull request link should look like for this assignment: `https://github.com/<your_github_username>/production/pull/<pr_id>`
    * Open a private window in your browser. Copy and paste the link to your pull request into the address bar. Make sure you can see your pull request properly. This helps the technical facilitator and learning support staff review your submission easily.

Checklist:
- [ ] Created a branch with the correct naming convention.
- [ ] Ensured that the repository is public.
- [ ] Reviewed the PR description guidelines and adhered to them.
- [ ] Verify that the link is accessible in a private browser window.

If you encounter any difficulties or have questions, please don't hesitate to reach out to our team via our Slack at `#cohort-3-help`. Our Technical Facilitators and Learning Support staff are here to help you navigate any challenges.