# Working with parquet files

## Objective

+ In this assignment, we will use the data downloaded with the module `data_manager` to create features.

(11 pts total)

## Prerequisites

+ This notebook assumes that price data is available to you in the environment variable `PRICE_DATA`. If you have not done so, then execute the notebook `01_materials/labs/2_data_engineering.ipynb` to create this data set.


+ Load the environment variables using dotenv. (1 pt)

In [1]:
# Write your code below.

# Option 1: Jupyter Notebook magic commands

#     %load_ext dotenv
#     %dotenv


# Option 2: Plain Python code

# Load environment variable using dotenv
from dotenv import load_dotenv
import os

# Load .env file
load_dotenv()

True

In [2]:
# Turn the query planning option on to prevent message
import dask
dask.config.set({'dataframe.query-planning': True})
    
import dask.dataframe as dd

+ Load the environment variable `PRICE_DATA`.
+ Use [glob](https://docs.python.org/3/library/glob.html) to find the path of all parquet files in the directory `PRICE_DATA`.

(1pt)

In [3]:
import os
from glob import glob

# Write your code below.

# Retrieve the PRICE_DATA environment variable
PRICE_DATA = os.getenv('PRICE_DATA')
assert os.path.isdir(PRICE_DATA), f"'{PRICE_DATA=}' is not a valid directory"

# Get all *.parquet files and directories recursively
parquet_paths = glob(os.path.join(PRICE_DATA, "**", "*.parquet"), recursive=True)

# Filter to keep only files (exclude directories)
parquet_files = sorted([path for path in parquet_paths if os.path.isfile(path)])
assert len(parquet_files) == 11207, f"Expected 11207 files, but found {len(parquet_files)}"

For each ticker and using Dask, do the following:

+ Add lags for variables Close and Adj_Close.
+ Add returns based on Adjusted Close:
    
    - `returns`: (Adj Close / Adj Close_lag) - 1

+ Add the following range: 

    - `hi_lo_range`: this is the day's High minus Low.

+ Assign the result to `dd_feat`.

(4 pt)

In [4]:
# Write your code below.

import pandas as pd
import numpy as np

# Read all parquet files into a single Dask DataFrame
ddf = dd.read_parquet(parquet_files).set_index('ticker')

# Provides Dask with a template of the expected output structure, 
# so it knows the columns and data types without computing the 
# entire operation immediately.
# Not strictly necessary, but it's a nice-to-have.
column_types = {
    'Date': 'datetime64[ns, UTC]',
    'Adj Close': float,
    'Close': float,
    'High': float,
    'Low': float,
    'Open': float,
    'Volume': np.int64,
    'sector': 'string[pyarrow]',
    'subsector': 'string[pyarrow]',
    'year': 'int32',
    'Close_lag': float,
    'Adj_Close_lag': float#,
    #'hi_lo_range': float,
    #'returns': float
}
meta_df = pd.DataFrame({col: pd.Series(dtype=dt) for col, dt in column_types.items()})

# Option 1: Add features using chain of apply(), lambda, and assign()
dd_feat = (
    ddf.groupby('ticker', group_keys=False)
        .apply(
            lambda x: x.assign(
                # Add lags for 'Close' and 'Adj_Close'
                Close_lag = x['Close'].shift(1),
                Adj_Close_lag = x['Adj Close'].shift(1),
            )
            , meta = meta_df
        )
        .assign(
            # Calculate the daily high-low range
            hi_lo_range = lambda x: x['High'] - x['Low']
        )
        .assign(
            # Calculate returns based on Adjusted Close
            returns = lambda x: x['Adj Close'] / x['Adj_Close_lag'] - 1
        )
)

# Option 2: Add features with apply() and externally defined function.
# (See my Student Notes at the bottom of this notebook for details.)

+ Convert the Dask data frame to a pandas data frame. 
+ Add a rolling average return calculation with a window of 10 days.
+ *Tip*: Consider using `.rolling(10).mean()`.

(3 pt)

In [5]:
# Write your code below.

# Convert the Dask DataFrame to a Pandas DataFrame (takes 2m36s on my machine)
pd_feat = dd_feat.compute()

# Option 1 [OK] (takes 1s)
# 
# Calculate the 10-day rolling average return using the Pandas dataframe with 'groupby' and 'transform'
#
pd_feat['avg_return_10d'] = pd_feat.groupby('ticker')['returns'].transform(lambda s: s.rolling(10).mean())

# Option 2 [OK] (takes 1s)
# 
# Calculate the 10-day rolling average return using the Pandas dataframe with 'groupby' and 'apply'
# Beware of duplication of index if executed multiple times.
#
#pd_feat['avg_return_10d'] = pd_feat.groupby('ticker')['returns'].apply(lambda s: s.rolling(10).mean())

# Option 3 [Doesn't work, NaNs show up in the wrong places]
#
# Calculate the 10-day rolling average return using the Pandas dataframe with 'groupby' and 'rolling' directly, without apply
# 
# The reason why it doesn't work has to do with the indexes.
# This StackOverflow answer helped me to understand the problem: https://stackoverflow.com/a/13998600
# Many thanks to Jesús Calderón and Vakiloroayaei for their help and insights.
#
# pd_feat['avg_return_10d'] = pd_feat.groupby('ticker', group_keys=False)['returns'].rolling(window=10).mean().reset_index(drop=True)

In [6]:
pd_feat

Price,Date,Adj Close,Close,High,Low,Open,Volume,sector,subsector,year,Close_lag,Adj_Close_lag,hi_lo_range,returns,avg_return_10d
ticker,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1,Unnamed: 7_level_1,Unnamed: 8_level_1,Unnamed: 9_level_1,Unnamed: 10_level_1,Unnamed: 11_level_1,Unnamed: 12_level_1,Unnamed: 13_level_1,Unnamed: 14_level_1,Unnamed: 15_level_1
HUM,2017-01-03 00:00:00+00:00,186.429810,197.610001,204.000000,195.000000,202.869995,2784600,Health Care,Managed Health Care,2017,,,9.000000,,
HUM,2017-01-04 00:00:00+00:00,186.590164,197.779999,199.440002,195.910004,198.410004,1512400,Health Care,Managed Health Care,2017,197.610001,186.429810,3.529999,0.000860,
HUM,2017-01-05 00:00:00+00:00,189.373276,200.729996,201.589996,197.190002,198.449997,1826200,Health Care,Managed Health Care,2017,197.779999,186.590164,4.399994,0.014916,
HUM,2017-01-06 00:00:00+00:00,190.316696,201.729996,202.889999,199.619995,200.500000,1139700,Health Care,Managed Health Care,2017,200.729996,189.373276,3.270004,0.004982,
HUM,2017-01-09 00:00:00+00:00,191.505386,202.990005,203.470001,199.750000,201.050003,1063800,Health Care,Managed Health Care,2017,201.729996,190.316696,3.720001,0.006246,
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
JNPR,2017-12-22 00:00:00+00:00,23.743158,28.860001,28.910000,28.629999,28.700001,1904000,Information Technology,Communications Equipment,2017,28.680000,23.595066,0.280001,0.006276,0.001720
JNPR,2017-12-26 00:00:00+00:00,23.743158,28.860001,29.090000,28.780001,28.780001,2329700,Information Technology,Communications Equipment,2017,28.860001,23.743158,0.309999,0.000000,0.001685
JNPR,2017-12-27 00:00:00+00:00,23.759605,28.879999,28.990000,28.680000,28.799999,1326200,Information Technology,Communications Equipment,2017,28.860001,23.743158,0.309999,0.000693,-0.000113
JNPR,2017-12-28 00:00:00+00:00,23.751383,28.870001,29.280001,28.740000,29.080000,1676000,Information Technology,Communications Equipment,2017,28.879999,23.759605,0.540001,-0.000346,-0.000804


In [7]:
def verify_nan_pattern(df):
    """
    Verifies that there are exactly 10 NaN values in avg_return_10d per ticker
    and that they occur in the first 10 rows of each ticker group.
    
    Throws an AssertionError otherwise.
    """
    # Group by index (ticker)
    grouped = df.groupby(level=0)
    
    # Get cumulative count within groups
    cumcount = grouped.cumcount()
    
    # Separate first 10 rows and rest
    mask_first_10 = cumcount < 10
    
    # Check if first n rows have NaN values
    first_10_nan = df.loc[mask_first_10, 'avg_return_10d'].isna()

    # Check if remaining rows have valid values
    rest_not_nan = df.loc[~mask_first_10, 'avg_return_10d'].notna()

    assert first_10_nan.all(), f"First 10 rows do not contain all NaN values"
    assert rest_not_nan.all(), f"Rows after the first 10 contain NaN values"
    

In [8]:
verify_nan_pattern(pd_feat)

In [9]:
pd_feat.head(15)

Price,Date,Adj Close,Close,High,Low,Open,Volume,sector,subsector,year,Close_lag,Adj_Close_lag,hi_lo_range,returns,avg_return_10d
ticker,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1,Unnamed: 7_level_1,Unnamed: 8_level_1,Unnamed: 9_level_1,Unnamed: 10_level_1,Unnamed: 11_level_1,Unnamed: 12_level_1,Unnamed: 13_level_1,Unnamed: 14_level_1,Unnamed: 15_level_1
HUM,2017-01-03 00:00:00+00:00,186.42981,197.610001,204.0,195.0,202.869995,2784600,Health Care,Managed Health Care,2017,,,9.0,,
HUM,2017-01-04 00:00:00+00:00,186.590164,197.779999,199.440002,195.910004,198.410004,1512400,Health Care,Managed Health Care,2017,197.610001,186.42981,3.529999,0.00086,
HUM,2017-01-05 00:00:00+00:00,189.373276,200.729996,201.589996,197.190002,198.449997,1826200,Health Care,Managed Health Care,2017,197.779999,186.590164,4.399994,0.014916,
HUM,2017-01-06 00:00:00+00:00,190.316696,201.729996,202.889999,199.619995,200.5,1139700,Health Care,Managed Health Care,2017,200.729996,189.373276,3.270004,0.004982,
HUM,2017-01-09 00:00:00+00:00,191.505386,202.990005,203.470001,199.75,201.050003,1063800,Health Care,Managed Health Care,2017,201.729996,190.316696,3.720001,0.006246,
HUM,2017-01-10 00:00:00+00:00,189.852066,200.949997,203.270004,200.669998,202.0,1352700,Health Care,Managed Health Care,2017,202.990005,191.505386,2.600006,-0.008633,
HUM,2017-01-11 00:00:00+00:00,192.24231,203.479996,203.809998,199.809998,200.5,1809300,Health Care,Managed Health Care,2017,200.949997,189.852066,4.0,0.01259,
HUM,2017-01-12 00:00:00+00:00,193.91452,205.25,205.899994,203.0,203.770004,1470400,Health Care,Managed Health Care,2017,203.479996,192.24231,2.899994,0.008698,
HUM,2017-01-13 00:00:00+00:00,191.713257,202.919998,206.589996,202.429993,205.0,1297800,Health Care,Managed Health Care,2017,205.25,193.91452,4.160004,-0.011352,
HUM,2017-01-17 00:00:00+00:00,192.771393,204.039993,204.710007,201.460007,203.110001,902900,Health Care,Managed Health Care,2017,202.919998,191.713257,3.25,0.005519,


Please comment:

+ Was it necessary to convert to pandas to calculate the moving average return?
+ Would it have been better to do it in Dask? Why?

(1 pt)

### Was it necessary to convert to Pandas to calculate the moving average return?

No, it wasn't strictly necessary to convert to Pandas to calculate the moving average return. Dask has support for rolling operations, so we could have calculated it directly within Dask without converting.

### Would it have been better to do it in Dask? Why?

No, in this particular case it would not, because the data fits in memory and the computation is fast [1].

In addition, using Dask for rolling window operations is not as convenient as doing it in Pandas. With Dask, you should ensure that the partition sizes you choose are large enough to avoid boundary issues, but keep in mind that larger partitions can begin to slow down your computations. The data should also be index-aligned to ensure that it’s sorted in the correct order. Dask uses the index to determine which rows are adjacent to one another, so ensuring proper sort order is critical for the correct execution of any calculations on the data. [2]

References:
- [1] Dask. (n.d.). *Dask DataFrame*. Retrieved October 27, 2024, from [https://docs.dask.org/en/stable/dataframe.html#when-not-to-use-dask-dataframes](https://docs.dask.org/en/stable/dataframe.html#when-not-to-use-dask-dataframes)
- [2] Daniel, J. C. (2019). *Data science with Python and Dask* (p. 161). Manning Publications.

## Criteria

The [rubric](./assignment_1_rubric_clean.xlsx) contains the criteria for grading.

## Submission Information

🚨 **Please review our [Assignment Submission Guide](https://github.com/UofT-DSI/onboarding/blob/main/onboarding_documents/submissions.md)** 🚨 for detailed instructions on how to format, branch, and submit your work. Following these guidelines is crucial for your submissions to be evaluated correctly.

### Submission Parameters:
* Submission Due Date: `HH:MM AM/PM - DD/MM/YYYY`
* The branch name for your repo should be: `assignment-1`
* What to submit for this assignment:
    * This Jupyter Notebook (assignment_1.ipynb) should be populated and should be the only change in your pull request.
* What the pull request link should look like for this assignment: `https://github.com/<your_github_username>/production/pull/<pr_id>`
    * Open a private window in your browser. Copy and paste the link to your pull request into the address bar. Make sure you can see your pull request properly. This helps the technical facilitator and learning support staff review your submission easily.

Checklist:
- [ x ] Created a branch with the correct naming convention.
- [ x ] Ensured that the repository is public.
- [ x ] Reviewed the PR description guidelines and adhered to them.
- [ x ] Verify that the link is accessible in a private browser window.

If you encounter any difficulties or have questions, please don't hesitate to reach out to our team via our Slack at `#cohort-3-help`. Our Technical Facilitators and Learning Support staff are here to help you navigate any challenges.

## Student Notes

*Option 2: Add features with apply() and externally defined function*

This option only works when the function is defined outside the Jupyter notebook, otherwise Dask throws an error: 
```
Function ... may not be deterministically hashed by cloudpickle
```
Here are the steps to use this approach:
1. Define the function in its own source file, outside the notebook. For example, in `${SRC_DIR}/feature_engineering.py`:
    ```python
    # For each ticker, add lags, returns, and high-low range
    def add_features(df):
        # Sort by date if not already sorted
        #df = df.sort_index()
        
        # Add lags for 'Close' and 'Adj_Close'
        df['Close_lag'] = df['Close'].shift(1)
        df['Adj_Close_lag'] = df['Adj Close'].shift(1)
        
        # Calculate returns based on Adjusted Close
        df['returns'] = (df['Adj Close'] / df['Adj_Close_lag']) - 1
        
        # Calculate the daily high-low range
        df['hi_lo_range'] = df['High'] - df['Low']
        
        return df
    ```

2. In the notebook, import the externally defined function and apply it to each group of the Dask dataframe:
    ```python
    import sys
    sys.path.append(os.getenv('SRC_DIR'))

    from feature_engineering import add_features

    dd_feat = ddf.groupby('ticker', group_keys=False).apply(add_features, meta=ddf)
    ```