## Step 0:  Set up <a class="anchor" id="setup"></a>

In [None]:
# Import standard open libraries
import pandas as pd
import matplotlib as mpl
import matplotlib.pyplot as plt
%matplotlib inline 

# AWS libraries and initialization
import boto3

# Load variables
%store -r df_raw
%store -r s3_bucket_name
%store -r s3_prefix
%store -r start_time
%store -r end_time
%store -r item_id
%store -r target_value
%store -r timestamp
%store -r forecast_dims
%store -r FORECAST_FREQ

## Step 1. Prepare Related Time Series <a class="anchor" id="RTS"></a>

Make sure RTS does not have any missing values, even if RTS extends into future. <br>
Trick:  create dataframe without any missing values using cross-join, faster than resample technique. <br>

In [None]:
# install dask for faster joins when df is large
!pip install "dask[dataframe]" 
import dask
import dask.dataframe as dd
from dask.diagnostics import ProgressBar
print('dask: {}'.format(dask.__version__))

In [None]:
# if you get memory allocation error in merges below, try overriding default value 0 to 1 for overcommit
# see https://www.kernel.org/doc/Documentation/vm/overcommit-accounting
# Next 2 commands - open new terminal and do these directly in terminal
# !sudo -i 
# !echo 1 > /proc/sys/vm/overcommit_memory
!cat /proc/sys/vm/overcommit_memory

In [None]:
idx = pd.date_range(start=start_time, end=end_time, freq=FORECAST_FREQ)
all_times = pd.DataFrame(index=idx)
print (f"Number of data points: {len(all_times.index)}")
print (f"Start date = {all_times.index.min()}")
print (f"End date = {all_times.index.max()}")

# Create timestamp column
all_times.reset_index(inplace=True)
all_times.columns = [timestamp]

print(all_times.dtypes)
print(all_times.isna().sum())
print(all_times.shape)
all_times.sample(5)

In [None]:
%%time
# create master template of all possible locations and items

items = pd.DataFrame(list(df_raw[item_id].unique()))
items.columns = [item_id]
# print(items.head(2))
master_records = items.copy()
print(master_records.shape, items.shape)

# check you did the right thing
num_items = len(master_records[item_id].value_counts())
print(f"num items = {num_items}")
master_records.tail()

In [None]:
%%time
# cross-join to create master template of all possible locations and items and times
all_times['key'] = "1"
master_records['key'] = "1"
all_times.set_index('key', inplace=True)
master_records.set_index('key', inplace=True)

# Do the cross-join
print("doing the merge...")
full_history = master_records.merge(all_times, how="outer", left_index=True, right_index=True)
print("done w/ merge...")
full_history.reset_index(inplace=True, drop=True)

# make sure you don't have any nulls
print(full_history.shape)
print("checking nulls...")
print(full_history.isna().sum())
full_history.tail()

In [None]:
# create small df of target_values - to merge later using dask
temp_target = df_raw[forecast_dims + [target_value]].copy()
# add key for faster join
temp_target['ts_key'] = temp_target[timestamp].astype(str) + "-" + temp_target[item_id]
temp_target = temp_target.groupby('ts_key').sum()
# temp_target.drop(forecast_dims, inplace=True, axis=1)
# temp_target.set_index('ts_key', inplace=True)
print(temp_target.shape, df_raw.shape)
display(temp_target.head(2))

<b> Parallelization for faster merge </b><br>
Below, I used dask.  I also tried ray through modin library.  I found error when adding a new column to modin dataframe.  Maybe by the time you use this notebook the modin/ray problem will be solved.
https://github.com/modin-project/modin/issues/2442

For reference, here are dask best practices:
<ul>
    <li>Choose partitions to be #items if your time series have more dimensions than just item_id, see <a href="https://docs.dask.org/en/latest/best-practices.html" target="_blank">https://docs.dask.org/en/latest/best-practices.html</a></li>
    <li>Make sure reset_index is only done in pandas and not dask, see <a href="https://docs.dask.org/en/latest/dataframe-best-practices.html" target="_blank">https://docs.dask.org/en/latest/dataframe-best-practices.html</a></li>
    </ul>

In [None]:
%%time

# USING dask

# convert pandas to dask df
print(type(full_history))
num_partitions = 1
print(f"using num_partitions = {num_partitions}")
large_df = dd.from_pandas(full_history, npartitions=num_partitions)
print(type(large_df))

# add key for faster join with target_value
large_df['ts_key'] = large_df[timestamp].astype(str) + "-" + large_df[item_id]   
print(large_df.shape, full_history.shape)
display(large_df.head(2))

In [None]:
%%time 

# merge in original target_value
temp = large_df.merge(temp_target, how="left", right_index=True, left_on="ts_key")
print(temp.shape, full_history.shape)
display(temp.head(3))

In [None]:
%%time 

# convert dask df back to pandas df
# # Below is too small !?  rows got dropped, why?
print(type(temp))
temp3 = temp.compute()
print(type(temp3))
temp3.drop('ts_key', axis=1, inplace=True)
print(temp3.shape, full_history.shape)

# check nulls
print(temp3.isna().sum())
display(temp3.sample(3))
temp3[target_value].describe()

In [None]:
# Check you did the right thing
# Check original target_values
df_raw[target_value].describe()

In [None]:
# Careful!!
# Really replace full_history with merged values
print(full_history.shape)
full_history = temp3.copy()
print(full_history.shape)
print(type(full_history))
del temp, temp_target, temp3
full_history.head(2)

In [None]:
# Candidate variables for hourly data
full_history['day_of_week'] = full_history[timestamp].dt.day_name().astype(str)
full_history['hour_of_day'] = full_history[timestamp].dt.hour.astype(str)
full_history['day_hour_name'] = full_history['day_of_week'] + "_" + full_history['hour_of_day']
full_history['weekend_flag'] = full_history[timestamp].dt.dayofweek
full_history['weekend_flag'] = (full_history['weekend_flag'] >= 5).astype(int)
full_history['is_sun_mon'] = 0
full_history.loc[((full_history.day_of_week=="Sunday") | (full_history.day_of_week=="Monday")), 'is_sun_mon'] = 1

print(full_history.sample(5))

In [None]:
# zoom-in time slice so you can see patterns
location_picked = "231"
df_plot = full_history.loc[(full_history[item_id] == location_picked), :].copy()
df_plot = full_history.loc[((full_history[timestamp]>"2020-01-10")
                           & (full_history[timestamp]<end_time)
                           & (full_history[item_id] == location_picked)), :].copy()
print(df_plot.shape, full_history.shape)
df_plot = df_plot.groupby([timestamp]).sum()
df_plot.reset_index(inplace=True)
df_plot.sample(3)

In [None]:
#check: target_value distribution in full dataframe looks same as original
df_plot[target_value].hist(bins=100)

## Step 2. Visualize Related Time Series <a class="anchor" id="visualize_rts"></a>

In [None]:
# Visualize candidate RTS variables
plt.figure(figsize=(15, 8))
ax = plt.gca()
df_plot.plot(x=timestamp, y=target_value, ax=ax);
ax2 = ax.twinx()
df_plot.plot(x=timestamp, y='weekend_flag', color='red', alpha=0.3, ax=ax2);

In [None]:
# EXAMPLE HOURLY RTS

# Visualize candidate RTS variables is_sun_mon
plt.figure(figsize=(15, 8))
ax = plt.gca()
df_plot.plot(x=timestamp, y=target_value, ax=ax);
ax2 = ax.twinx()
df_plot.plot(x=timestamp, y='is_sun_mon', color='red', alpha=0.3, ax=ax2);

It looks like lowest taxis rides are a combination of day and hour that seems to matter, not just day of week.

In [None]:
# EXAMPLE HOURLY RTS

# Assemble RTS - include whatever columns you finally decide
rts = full_history[forecast_dims + ['day_hour_name']].copy()

print(rts.shape)
print(rts.isnull().sum())
print(f"rts start: {rts[timestamp].min()}")
print(f"rts end: {rts[timestamp].max()}")
rts.sample(5)

## Step 3. Save Related Time Series <a class="anchor" id="save_rts"></a>

In [None]:
# Save rts to S3
local_file = "rts.csv"
# Save merged file locally
rts.to_csv(local_file, header=False, index=False)

key = f"{s3_prefix}/{local_file}"
boto3.Session().resource('s3').Bucket(s3_bucket_name).Object(key).upload_file(local_file)

In [None]:
%store df_raw
%store s3_bucket_name
%store s3_prefix
%store start_time
%store end_time
%store item_id
%store target_value
%store timestamp
%store FORECAST_FREQ
%store forecast_dims