# Import Dependencies

**Note** PySpark has it's own [implementation of pandas api](https://spark.apache.org/docs/3.2.0/api/python/reference/pyspark.pandas/frame.html) which computes the same functionality using distributed computing and clusters under the hood. There are some differences but the methods implemented follow along with the pyspark documentation

In [1]:
# import pyspark.pandas as pd
import pandas as pd
import os

# Load data

In [20]:
path_to_data = os.path.join('.', "data")

rides_df = pd.read_csv(os.path.join(path_to_data, 'rides_by_date.csv'), index_col = 0, parse_dates = [0])
gas_df = pd.read_csv(os.path.join(path_to_data, 'motor-gas-prices-2014.csv'), 
                     index_col = 0, nrows = 52, parse_dates = [0])

In [9]:
print( rides_df.index )
rides_df.head()

DatetimeIndex(['2014-04-01', '2014-04-02', '2014-04-03', '2014-04-04',
               '2014-04-05', '2014-04-06', '2014-04-07', '2014-04-08',
               '2014-04-09', '2014-04-10',
               ...
               '2014-09-21', '2014-09-22', '2014-09-23', '2014-09-24',
               '2014-09-25', '2014-09-26', '2014-09-27', '2014-09-28',
               '2014-09-29', '2014-09-30'],
              dtype='datetime64[ns]', length=183, freq=None)


Unnamed: 0,Total Rides,7-Day Average Rides
2014-04-01,14546,14546.0
2014-04-02,17474,16010.0
2014-04-03,20701,17573.666667
2014-04-04,26714,19858.75
2014-04-05,19521,19791.2


In [10]:
print( gas_df.index )
gas_df.head()

DatetimeIndex(['2014-01-06', '2014-01-13', '2014-01-20', '2014-01-27',
               '2014-02-03', '2014-02-10', '2014-02-17', '2014-02-24',
               '2014-03-03', '2014-03-10', '2014-03-17', '2014-03-24',
               '2014-03-31', '2014-04-07', '2014-04-14', '2014-04-21',
               '2014-04-28', '2014-05-05', '2014-05-12', '2014-05-19',
               '2014-05-26', '2014-06-02', '2014-06-09', '2014-06-16',
               '2014-06-23', '2014-06-30', '2014-07-07', '2014-07-14',
               '2014-07-21', '2014-07-28', '2014-08-04', '2014-08-11',
               '2014-08-18', '2014-08-25', '2014-09-01', '2014-09-08',
               '2014-09-15', '2014-09-22', '2014-09-29', '2014-10-06',
               '2014-10-13', '2014-10-20', '2014-10-27', '2014-11-03',
               '2014-11-10', '2014-11-17', '2014-11-24', '2014-12-01',
               '2014-12-08', '2014-12-15', '2014-12-22', '2014-12-29'],
              dtype='datetime64[ns]', freq=None)


Unnamed: 0,Statewide,Upstate,Downstate,NYC
2014-01-06,369.0,368.3,369.7,353.9
2014-01-13,364.4,366.1,362.9,349.6
2014-01-20,361.4,364.3,358.8,344.5
2014-01-27,359.6,362.5,356.9,342.8
2014-02-03,358.8,361.9,356.1,341.5


# Combine Data

If we were to join the datasets over just the matching dates, then there would only be 16 rows. This is a very small sample. By shifting the dates to the next week start, we can compare a single day's rides to the average gas price of that week

In [21]:
shifted_dates = rides_df.index + pd.to_timedelta(rides_df.index.dayofweek.map( lambda x: (7 - x)%7 ), unit = 'd')

In [37]:
df = rides_df.set_index(shifted_dates, 
                   append = True).rename_axis(['Date', 'Shifted Date']).join(gas_df.rename_axis("Shifted Date"))
df.head()

Unnamed: 0_level_0,Unnamed: 1_level_0,Total Rides,7-Day Average Rides,Statewide,Upstate,Downstate,NYC
Date,Shifted Date,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1,Unnamed: 7_level_1
2014-04-01,2014-04-07,14546,14546.0,374.3,373.2,375.4,359.4
2014-04-02,2014-04-07,17474,16010.0,374.3,373.2,375.4,359.4
2014-04-03,2014-04-07,20701,17573.666667,374.3,373.2,375.4,359.4
2014-04-04,2014-04-07,26714,19858.75,374.3,373.2,375.4,359.4
2014-04-05,2014-04-07,19521,19791.2,374.3,373.2,375.4,359.4


In [39]:
df.to_csv( os.path.join(path_to_data, 'rides_to_gas.csv') )
df.melt(id_vars = gas_df.columns, var_name = 'Metric', value_name = 'Rides').to_csv( os.path.join(path_to_data, 'rides_to_gas_melted.csv') )