# Working with parquet files

## Objective

+ In this assignment, we will use the data downloaded with the module `data_manager` to create features.

(11 pts total)

## Prerequisites

+ This notebook assumes that price data is available to you in the environment variable `PRICE_DATA`. If you have not done so, then execute the notebook `01_materials/labs/2_data_engineering.ipynb` to create this data set.


+ Load the environment variables using dotenv. (1 pt)

In [1]:
# Write your code below.
%load_ext dotenv
%dotenv

In [2]:
import dask.dataframe as dd

+ Load the environment variable `PRICE_DATA`.
+ Use [glob](https://docs.python.org/3/library/glob.html) to find the path of all parquet files in the directory `PRICE_DATA`.

(1pt)

In [3]:
import os
from glob import glob

# Write your code below.
PRICE_DATA = os.getenv('PRICE_DATA')

# Create a list of paths to all parquet files within the PRICE_DATA directory
parquet_files = glob(os.path.join(PRICE_DATA, '**/*.parquet'), recursive=True)

For each ticker and using Dask, do the following:

+ Add lags for variables Close and Adj_Close.
+ Add returns based on Close:
    
    - `returns`: (Close / Close_lag_1) - 1

+ Add the following range: 

    - `hi_lo_range`: this is the day's High minus Low.

+ Assign the result to `dd_feat`.

(4 pt)

In [4]:
# Write your code below.

# Read the parquet files as a dask dataframe
dd_px = dd.read_parquet(parquet_files).set_index("ticker")

In [5]:
# Ideally we should sort the groups by date before calculating new columns.  However, I am not sure how to apply sorting in a way that grouping is preserved.
# Add lags for variables Close and Adj Close using apply function after grouping by ticker.
dd_feat = dd_px.groupby('ticker',group_keys=False).apply(
    lambda x: x.assign(Close_lag_1 = x['Close'].shift(1),
                        Adj_Close_lag_1 = x['Adj Close'].shift(1))
)

# Calculate returns and hi_lo_range using lambda functions within assign.
dd_feat = dd_feat.assign(
    returns = lambda x: x['Close']/x["Close_lag_1"] - 1,
    hi_lo_range = lambda x: x['High'] - x['Low']
)

Please provide `meta` if the result is unexpected.
  Before: .apply(func)
  After:  .apply(func, meta={'x': 'f8', 'y': 'f8'}) for dataframe result
  or:     .apply(func, meta=('x', 'f8'))            for series result

  dd_feat = dd_px.groupby('ticker',group_keys=False).apply(


+ Convert the Dask data frame to a pandas data frame. 
+ Add a new feature containing the moving average of `returns` using a window of 10 days. There are several ways to solve this task, a simple one uses `.rolling(10).mean()`.

(3 pt)

In [6]:
# Write your code below.

# Convert the Dask data frame to a pandas dataframe
df = dd_feat.compute().reset_index()
df

Unnamed: 0,ticker,Date,Open,High,Low,Close,Adj Close,Volume,source,Year,Close_lag_1,Adj_Close_lag_1,returns,hi_lo_range
0,A,1999-11-18,32.546494,35.765381,28.612303,31.473534,27.068665,62546300.0,A.csv,1999,,,,7.153078
1,A,1999-11-19,30.713520,30.758226,28.478184,28.880543,24.838577,15234100.0,A.csv,1999,31.473534,27.068665,-0.082386,2.280043
2,A,1999-11-22,29.551144,31.473534,28.657009,31.473534,27.068665,6577800.0,A.csv,1999,28.880543,24.838577,0.089783,2.816525
3,A,1999-11-23,30.400572,31.205294,28.612303,28.612303,24.607880,5975600.0,A.csv,1999,31.473534,27.068665,-0.090909,2.592991
4,A,1999-11-24,28.701717,29.998211,28.612303,29.372318,25.261524,4843200.0,A.csv,1999,28.612303,24.607880,0.026563,1.385908
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
324306,ZEUS,2004-12-27,28.000000,29.230000,27.690001,28.670000,26.258089,651600.0,ZEUS.csv,2004,27.670000,25.342215,0.036140,1.539999
324307,ZEUS,2004-12-28,28.900000,30.200001,28.700001,29.900000,27.384602,529400.0,ZEUS.csv,2004,28.670000,26.258089,0.042902,1.500000
324308,ZEUS,2004-12-29,30.280001,30.299999,29.030001,29.070000,26.624434,355300.0,ZEUS.csv,2004,29.900000,27.384602,-0.027759,1.269999
324309,ZEUS,2004-12-30,28.450001,28.450001,25.330000,26.170000,23.968391,1361800.0,ZEUS.csv,2004,29.070000,26.624434,-0.099759,3.120001


In [7]:
# I noticed that when we convert our dask dataframe to pandas, the order of rows may not be preserved.
# Therefore, before calculating rolling average, we should make sure that the dataframe is sorted by dates within each ticker.
df_sorted = df.sort_values(by = ['ticker', 'Date'])
df_sorted

Unnamed: 0,ticker,Date,Open,High,Low,Close,Adj Close,Volume,source,Year,Close_lag_1,Adj_Close_lag_1,returns,hi_lo_range
0,A,1999-11-18,32.546494,35.765381,28.612303,31.473534,27.068665,62546300.0,A.csv,1999,,,,7.153078
1,A,1999-11-19,30.713520,30.758226,28.478184,28.880543,24.838577,15234100.0,A.csv,1999,31.473534,27.068665,-0.082386,2.280043
2,A,1999-11-22,29.551144,31.473534,28.657009,31.473534,27.068665,6577800.0,A.csv,1999,28.880543,24.838577,0.089783,2.816525
3,A,1999-11-23,30.400572,31.205294,28.612303,28.612303,24.607880,5975600.0,A.csv,1999,31.473534,27.068665,-0.090909,2.592991
4,A,1999-11-24,28.701717,29.998211,28.612303,29.372318,25.261524,4843200.0,A.csv,1999,28.612303,24.607880,0.026563,1.385908
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
321582,ZEUS,2020-03-26,9.610000,9.940000,9.260000,9.590000,9.590000,60500.0,ZEUS.csv,2020,9.670000,9.670000,-0.008273,0.679999
321583,ZEUS,2020-03-27,9.330000,9.330000,8.700000,8.700000,8.700000,52900.0,ZEUS.csv,2020,9.590000,9.590000,-0.092805,0.630000
321584,ZEUS,2020-03-30,8.810000,9.760000,8.700000,9.680000,9.680000,73700.0,ZEUS.csv,2020,8.700000,8.700000,0.112644,1.060000
321585,ZEUS,2020-03-31,9.640000,10.470000,9.590000,10.350000,10.350000,68900.0,ZEUS.csv,2020,9.680000,9.680000,0.069215,0.880000


In [8]:
# Check if the calculations in dask have happened correctly by looking at first two rows of each ticker
# for dataframes obtained from dask and dataframe sorted by Date within each ticker.
for ticker in df['ticker'].unique():
    print("Dataframe obtained from dask")
    print(df[df['ticker']==ticker][['ticker','Date','Close', 'Close_lag_1']].head(2))
    print("Dataframe after sorting by dates")
    print(df_sorted[df_sorted['ticker']==ticker][['ticker','Date','Close', 'Close_lag_1']].head(2))
    print("\n")

Dataframe obtained from dask
  ticker       Date      Close  Close_lag_1
0      A 1999-11-18  31.473534          NaN
1      A 1999-11-19  28.880543    31.473534
Dataframe after sorting by dates
  ticker       Date      Close  Close_lag_1
0      A 1999-11-18  31.473534          NaN
1      A 1999-11-19  28.880543    31.473534


Dataframe obtained from dask
     ticker       Date    Close  Close_lag_1
5124    ACB 2009-07-06  0.36008          NaN
5125    ACB 2009-07-07  0.33089      0.36008
Dataframe after sorting by dates
     ticker       Date      Close  Close_lag_1
7827    ACB 2000-01-03  40.900398     0.809000
7828    ACB 2000-01-04  40.889801    40.900398


Dataframe obtained from dask
      ticker       Date      Close  Close_lag_1
10179   AFYA 2019-07-19  23.844999          NaN
10180   AFYA 2019-07-22  23.559999    23.844999
Dataframe after sorting by dates
      ticker       Date      Close  Close_lag_1
10179   AFYA 2019-07-19  23.844999          NaN
10180   AFYA 2019-07-22  23.55

We can see that for many of the tickers, the Close_lag_1 calculation is incorrect for the very first ticker entry because the most recent date is not the first row in the dask dataframe.

In [9]:
# Calcualte the rolling average
df_result = df.groupby("ticker", group_keys=False).apply(
    lambda x: x.assign(moving_10_day_return_average = x['returns'].rolling(10).mean()))

# The first 10 rows of the new column will be NaN because there aren't 10 preceding entries to calculate the rolling mean. 
# The columns starts to get populated once there are 10 preceding entries.
df_result.head(20)

  df_result = df.groupby("ticker", group_keys=False).apply(


Unnamed: 0,ticker,Date,Open,High,Low,Close,Adj Close,Volume,source,Year,Close_lag_1,Adj_Close_lag_1,returns,hi_lo_range,moving_10_day_return_average
0,A,1999-11-18,32.546494,35.765381,28.612303,31.473534,27.068665,62546300.0,A.csv,1999,,,,7.153078,
1,A,1999-11-19,30.71352,30.758226,28.478184,28.880543,24.838577,15234100.0,A.csv,1999,31.473534,27.068665,-0.082386,2.280043,
2,A,1999-11-22,29.551144,31.473534,28.657009,31.473534,27.068665,6577800.0,A.csv,1999,28.880543,24.838577,0.089783,2.816525,
3,A,1999-11-23,30.400572,31.205294,28.612303,28.612303,24.60788,5975600.0,A.csv,1999,31.473534,27.068665,-0.090909,2.592991,
4,A,1999-11-24,28.701717,29.998211,28.612303,29.372318,25.261524,4843200.0,A.csv,1999,28.612303,24.60788,0.026563,1.385908,
5,A,1999-11-26,29.238197,29.685265,29.148785,29.461731,25.338428,1729400.0,A.csv,1999,29.372318,25.261524,0.003044,0.53648,
6,A,1999-11-29,29.32761,30.355865,29.014664,30.132332,25.915169,4074700.0,A.csv,1999,29.461731,25.338428,0.022762,1.341202,
7,A,1999-11-30,30.042919,30.71352,29.282904,30.177038,25.953619,4310000.0,A.csv,1999,30.132332,25.915169,0.001484,1.430616,
8,A,1999-12-01,30.177038,31.071173,29.953505,30.71352,26.415012,2957300.0,A.csv,1999,30.177038,25.953619,0.017778,1.117668,
9,A,1999-12-02,31.294706,32.188843,30.892345,31.562946,27.145563,3069800.0,A.csv,1999,30.71352,26.415012,0.027656,1.296497,


Please comment:

+ Was it necessary to convert to pandas to calculate the moving average return?
+ Would it have been better to do it in Dask? Why?

(1 pt)

- It is theoretically not necessary to convert to pandas to calculate the moving average return. However, in practice, when I performed row based operations in dask, even calculation of a new column based on previous day's prices (Close_lag_1 and Adj_Close_lag_1), there are errors in calculation because dask does not seem to preserve the order of rows. Therefore, there is a high chance that the results where row-based operations when peformed in dask will be incorrect and we should use pandas whenever order of rows needs to be preserved and row-based operations are needed.

My conclusion is that any operation requiring ordering of rows (e.g. sorting by date) should not be performed in dask. Therefore, if I had to re-do the current notebook, I would use pandas to add the new columns Close_lag_1 and Adj_Close_lag_1 as well as calculation of rolling averages because they need strict ordering of rows and dask seems to make errors in these calculations. Dask is suitable for operations such as calculation of hi_lo_range and returns because these are stricly column operations and do not depend on ordering of rows.

## Criteria

The [rubric](./assignment_1_rubric_clean.xlsx) contains the criteria for grading.

## Submission Information

🚨 **Please review our [Assignment Submission Guide](https://github.com/UofT-DSI/onboarding/blob/main/onboarding_documents/submissions.md)** 🚨 for detailed instructions on how to format, branch, and submit your work. Following these guidelines is crucial for your submissions to be evaluated correctly.

### Submission Parameters:
* Submission Due Date: `HH:MM AM/PM - DD/MM/YYYY`
* The branch name for your repo should be: `assignment-1`
* What to submit for this assignment:
    * This Jupyter Notebook (assignment_1.ipynb) should be populated and should be the only change in your pull request.
* What the pull request link should look like for this assignment: `https://github.com/<your_github_username>/production/pull/<pr_id>`
    * Open a private window in your browser. Copy and paste the link to your pull request into the address bar. Make sure you can see your pull request properly. This helps the technical facilitator and learning support staff review your submission easily.

Checklist:
- [ ] Created a branch with the correct naming convention.
- [ ] Ensured that the repository is public.
- [ ] Reviewed the PR description guidelines and adhered to them.
- [ ] Verify that the link is accessible in a private browser window.

If you encounter any difficulties or have questions, please don't hesitate to reach out to our team via our Slack at `#cohort-3-help`. Our Technical Facilitators and Learning Support staff are here to help you navigate any challenges.