# Working with parquet files

## Objective

+ In this assignment, we will use the data downloaded with the module `data_manager` to create features.

(11 pts total)

## Prerequisites

+ This notebook assumes that price data is available to you in the environment variable `PRICE_DATA`. If you have not done so, then execute the notebook `production_2_data_engineering.ipynb` to create this data set.


+ Load the environment variables using dotenv. (1 pt)

In [2]:
# Write your code below.
%load_ext dotenv
%dotenv ../src/.env


In [3]:
import dask
dask.config.set({'dataframe.query-planning': True})
import dask.dataframe as dd

+ Load the environment variable `PRICE_DATA`.
+ Use [glob](https://docs.python.org/3/library/glob.html) to find the path of all parquet files in the directory `PRICE_DATA`.

(1pt)

In [4]:
import os
from glob import glob

# Write your code below.
PRICE_DATA = os.getenv("PRICE_DATA")
parquet_files = glob(PRICE_DATA+"/**/*.parquet/*.parquet")
dd_px = dd.read_parquet(parquet_files).set_index("ticker")


for file_path in parquet_files:
    print(file_path)

../data/prices\MMM\MMM_2000.parquet\part.0.parquet
../data/prices\MMM\MMM_2001.parquet\part.0.parquet
../data/prices\MMM\MMM_2002.parquet\part.0.parquet
../data/prices\MMM\MMM_2003.parquet\part.0.parquet
../data/prices\MMM\MMM_2004.parquet\part.0.parquet
../data/prices\MMM\MMM_2005.parquet\part.0.parquet
../data/prices\MMM\MMM_2006.parquet\part.0.parquet
../data/prices\MMM\MMM_2007.parquet\part.0.parquet
../data/prices\MMM\MMM_2008.parquet\part.0.parquet
../data/prices\MMM\MMM_2009.parquet\part.0.parquet
../data/prices\MMM\MMM_2010.parquet\part.0.parquet
../data/prices\MMM\MMM_2011.parquet\part.0.parquet
../data/prices\MMM\MMM_2012.parquet\part.0.parquet


For each ticker and using Dask, do the following:

+ Add lags for variables Close and Adj_Close.
+ Add returns based on Adjusted Close:
    
    - `returns`: (Adj Close / Adj Close_lag) - 1

+ Add the following range: 

    - `hi_lo_range`: this is the day's High minus Low.

+ Assign the result to `dd_feat`.

(4 pt)

In [5]:
# Write your code below.

import numpy as np
dd_rets = (dd_px.groupby('ticker', group_keys=False).apply(
    lambda x: x.assign(Close_lag_1 = x['Close'].shift(1),
                       Adj_Close_lag_1 =x['Adj Close'].shift(1))
).assign(
    returns = lambda x: x['Adj Close']/x['Adj_Close_lag_1'] - 1
).assign(
    hi_lo_range = lambda x: (x['High'] - x['Low'])
))

  Before: .apply(func)
  After:  .apply(func, meta={'x': 'f8', 'y': 'f8'}) for dataframe result
  or:     .apply(func, meta=('x', 'f8'))            for series result
  dd_rets = (dd_px.groupby('ticker', group_keys=False).apply(


In [6]:
print(dd_rets.columns)

Index(['Date', 'Open', 'High', 'Low', 'Close', 'Adj Close', 'Volume', 'sector',
       'subsector', 'year', 'Close_lag_1', 'Adj_Close_lag_1', 'returns',
       'hi_lo_range'],
      dtype='object')


In [7]:
dd_rets

<dask_expr.expr.DataFrame: expr=Assign(frame=Assign(frame=Assign(frame=Assign(frame=Assign(frame=Assign(frame=GroupByApply(frame=ReadParquet(79bbe12), observed=False, group_keys=False, func=<function <lambda> at 0x0000013670D9AFC0>, meta=_NoDefault.no_default, args=(), kwargs={})))))))>

+ Convert the Dask data frame to a pandas data frame. 
+ Add a rolling average return calculation with a window of 10 days.
+ *Tip*: Consider using `.rolling(10).mean()`.

(3 pt)

In [16]:
print(dd_rets.divisions)

(Timestamp('2000-01-03 00:00:00'), Timestamp('2000-09-22 00:27:41.538461568'), Timestamp('2001-07-16 01:50:46.153846144'), Timestamp('2002-05-17 14:27:21.938658432'), Timestamp('2003-03-31 12:27:49.940266880'), Timestamp('2004-03-15 04:19:16.004001536'), Timestamp('2005-06-04 18:14:47.322534912'), Timestamp('2006-08-23 19:47:16.211342080'), Timestamp('2007-09-15 11:06:33.461851904'), Timestamp('2008-08-09 22:45:25.259837696'), Timestamp('2009-09-06 02:09:36.940674816'), Timestamp('2010-08-14 04:31:29.748885760'), Timestamp('2011-08-05 06:37:52.097180160'), Timestamp('2012-12-31 00:00:00'))


In [21]:
# Write your code below.
dd_rets = dd_rets.map_partitions(lambda df: df.sort_values('returns'))
dd_rets['rolling_avg_return'] = dd_rets['returns'].rolling(window=10).mean().compute()
dd_rets.compute()


Unnamed: 0_level_0,Open,High,Low,Close,Adj Close,Volume,sector,subsector,year,Close_lag_1,Adj_Close_lag_1,returns,hi_lo_range,rolling_avg_return
Date,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1,Unnamed: 7_level_1,Unnamed: 8_level_1,Unnamed: 9_level_1,Unnamed: 10_level_1,Unnamed: 11_level_1,Unnamed: 12_level_1,Unnamed: 13_level_1,Unnamed: 14_level_1
2000-01-03,48.031250,48.250000,47.031250,47.187500,23.990576,2173400,Industrials,Industrial Conglomerates,2000,77.930000,45.981716,-0.478258,1.218750,
2000-02-25,44.062500,44.656250,42.593750,42.875000,21.939238,3627200,Industrials,Industrial Conglomerates,2000,45.500000,23.282455,-0.057692,2.062500,
2000-04-26,46.750000,46.750000,43.687500,44.468750,22.754761,6956600,Industrials,Industrial Conglomerates,2000,46.906250,24.002037,-0.051965,3.062500,
2000-03-07,44.125000,44.437500,41.000000,41.375000,21.171682,5195600,Industrials,Industrial Conglomerates,2000,43.625000,22.323019,-0.051576,3.437500,
2000-03-22,44.500000,44.687500,42.500000,43.000000,22.003197,4404800,Industrials,Industrial Conglomerates,2000,45.218750,23.138535,-0.049067,2.187500,
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
2011-10-10,75.120003,76.739998,75.089996,76.720001,51.462395,3855600,Industrials,Industrial Conglomerates,2011,73.820000,49.517147,0.039284,1.650002,0.031962
2011-11-30,79.709999,81.050003,79.430000,81.040001,54.744278,8721000,Industrials,Industrial Conglomerates,2011,77.239998,52.177303,0.049197,1.620003,0.034270
2011-08-09,80.230003,82.860001,78.059998,82.690002,55.095543,12008400,Industrials,Industrial Conglomerates,2011,78.589996,52.363750,0.052170,4.800003,0.036801
2011-10-27,79.510002,82.330002,78.919998,81.410004,54.608379,7413700,Industrials,Industrial Conglomerates,2011,77.019997,51.663635,0.056998,3.410004,0.039702


Please comment:

+ Was it necessary to convert to pandas to calculate the moving average return? 
**No, it wasn't. You can actually calculate the moving average return from Dask but you just need to compute it since it's a lazy execution.**
+ Would it have been better to do it in Dask? Why? 
**If you have a smaller memory, then yes, Dask might be a better option. However, if you have a large memory, pandas is better option since it can run faster.**

(1 pt)