In [1]:
import pandas as pd
import os
import urllib
from sklearn.decomposition import PCA
from sklearn.decomposition import KernelPCA
import numpy as np
from scipy.signal import periodogram
import statsmodels.api as sm
import dask.dataframe as dd

from statsmodels.tsa.stattools import adfuller
from statsmodels.graphics.tsaplots import plot_acf
from statsmodels.graphics.tsaplots import plot_pacf
from statsmodels.tsa.arima_model import ARIMA
from matplotlib import pyplot as plt

from sklearn.preprocessing import StandardScaler
from sklearn.metrics import r2_score
from sklearn.metrics import mean_squared_error
from sklearn.ensemble import RandomForestRegressor
from sklearn.model_selection import GridSearchCV
from sklearn.model_selection import PredefinedSplit
import warnings
warnings.filterwarnings('ignore')

# Theoretic part (20 pts, 5 pts each)

Multiple choice questions: please select all that applies and explain your answer.

### Question 1 (Autocorrelation). 
The autocorrelation plot of the daily time-series has local peaks at t=7,14,21,28 etc.. How would you interpret that?

A. The time-series reaches its maximum on the days 7,14,21,28...

B. The time-series reaches its minimum on the days 7,14,21,28...

C. The time-series is likely to have a periodic pattern with a period of 7 days

D. The time-series is likely to have 7 periods per day

E. The appropriate AR model for the time-series should have at least 7 terms.

### Answer - C 
The autocorrelation plot of the daily time-series having local peaks at t=7, 14, 21, 28, etc. suggests that the time-series is likely to have a periodic pattern with a period of 7 days. This means that the values of the time-series are repeating themselves every 7 days.

### Question 2 (Stationarity).

Which of the following time-series models are stationary:

A. Linear trend

B. AR(1) model

C. White noise

D. Random walk

E. ARMA(1,2) model

F. ARIMA(1,1,1) model

### Answer - C and E
White noise is stationary because it has constant mean and variance and the autocorrelation between any two observations in the series is zero. ARMA(1,2) model is also stationary under certain conditions. For example, if the roots of the characteristic equation lie outside the unit circle, then the ARMA(1,2) model is stationary.

### Question 3 (PCA). 
Which of the following statements regarding the model dimensionality reduction through Principal Component Analysis (PCA) are true:

A. Leading principal components of the features are the most efficient for modeling the output variable.

B. Principal components of the standardized features are uncorrelated and this way less exposed to multicollinearity.

C. The model using principal components of the features can't overfit.

D. Feature selection based on the principal components of the features is often more efficient in preventing overfitting comparered the feature selection over the original features.

E. Principal components are harder to interpret compared to the original features making the PCA regresssion model less interpretable compared to the regression model using original features.

### Answer - B, D, and E
B. Standardizing the features before applying PCA ensures that the resulting principal components are not influenced by differences in the scale or units of the original features. This can improve the stability of the PCA results and make them less sensitive to multicollinearity.

D. This is because PCA reduces the dimensionality of the feature space while retaining most of the relevant information, which can help to prevent overfitting and improve the generalization of the model.

E. This is because the principal components are linear combinations of the original features, and the coefficients in the regression equation represent the contributions of each principal component rather than the original features themselves. However, this interpretability tradeoff may be acceptable if the reduction in dimensionality and the resulting increase in model efficiency and generalization are deemed more important.

### Question 4 (MapReduce). 

What is true about MapReduce:

A. MapReduce is a Python module enabling parallel computing

B. Using MapReduce approach makes the code more suitable for parallel computing.

C. MapReduce code always runs faster compared to the code using more traditional approaches, like loops or list comprehensions.

D. MapReduce code will always efficiently run on multiple cores of you CPU or multiple machines within your cluster if available.

E. Multiprocessing and PySpark efficient alternatives to MapReduce.



### Answer - BCDE
MapReduce is a programming paradigm for distributed computing on large datasets. It allows for parallel processing by dividing the input data into smaller chunks that can be processed independently by multiple nodes in a cluster, and then aggregating the results. Using the MapReduce approach can make the code more suitable for parallel computing because it provides a framework for handling the parallelization of the code and the distribution of data across the nodes in the cluster.

# Practice part: Taxi ridership from JFK to other taxi zones prediction.
This project is an example of applying PCA to predict hourly yellow taxi ridership at the taxi zone level. Modeling taxi ridership at a fine spatial and temporal granularity is challenging due to the low signal-to-noise ratio and high dimensionality. In this case, dimension reduction essential in feature engineering. This project has five steps: data downloading, data preprocessing, baseline modeling, feature engineering, and RandomForest modeling.

Let's start with data downloading. 

## 1. Data downloading (5pts)
Design a function to download yellow taxi data from 2017-01-01 to 2018-12-31 at https://www1.nyc.gov/site/tlc/about/tlc-trip-record-data.page.

In [2]:
dataDir = 'taxidata'
if os.path.exists(dataDir):
    pass
else:
    os.mkdir(dataDir)
Years = [2017,2018]
Months = range(1,13)
VehicleTypes = ['yellow']

def getUrl(cabtype,year,month):
    baseUrl = 'https://d37ci6vzurychx.cloudfront.net/trip-data/'
    
    month = str(month).zfill(2)
    fileName = '%s_tripdata_%s-%s.parquet'%(cabtype,year,month)
        
    return baseUrl + fileName, fileName        

In [3]:
for year in Years:
    for month in Months:
        for cabtype in VehicleTypes:
            url, fileName = getUrl(cabtype,year,month)
            
            print("Downloading: "+str(fileName))
            
            if fileName in os.listdir(dataDir):
                print("file exists")
                continue
            
            filePath = os.path.join(dataDir, fileName)
            try:
                urllib.request.urlretrieve(url, filePath)
            except:
                # if fails remove the incomplete file
                os.remove(filePath)
                try:
                    # start again after a delay of 2 min
                    time.sleep(60*2)
                    urllib.request.urlretrieve(url, filePath)
                except:
                    print("Download this file later!")
                    pass

Downloading: yellow_tripdata_2017-01.parquet
file exists
Downloading: yellow_tripdata_2017-02.parquet
file exists
Downloading: yellow_tripdata_2017-03.parquet
file exists
Downloading: yellow_tripdata_2017-04.parquet
file exists
Downloading: yellow_tripdata_2017-05.parquet
file exists
Downloading: yellow_tripdata_2017-06.parquet
file exists
Downloading: yellow_tripdata_2017-07.parquet
file exists
Downloading: yellow_tripdata_2017-08.parquet
file exists
Downloading: yellow_tripdata_2017-09.parquet
file exists
Downloading: yellow_tripdata_2017-10.parquet
file exists
Downloading: yellow_tripdata_2017-11.parquet
file exists
Downloading: yellow_tripdata_2017-12.parquet
file exists
Downloading: yellow_tripdata_2018-01.parquet
file exists
Downloading: yellow_tripdata_2018-02.parquet
file exists
Downloading: yellow_tripdata_2018-03.parquet
file exists
Downloading: yellow_tripdata_2018-04.parquet
file exists
Downloading: yellow_tripdata_2018-05.parquet
file exists
Downloading: yellow_tripdata_20

## 2. Data Preprocessing (10 pts, 7 for dask, 3 for sanity check)
Use dask to aggregate all months' records into one dataframe, and aggregate dataset by date and hour to get the ridership from JFK to each taxi zone each hour. The expected output has columns: date, hour, drop-off location 1, drop-off location 2, etc. 

Hint: 
1. JFK taxi zone id is 132.
2. time column should be the pickup time, and ridership is passenger count.
3. Try read_csv("*.csv") to read all csv file in a folder 
4. files in 2017 and 2018 have different columns; apply argument usecols to select desired columns.
5. using .compute() function to convert processed dask dataframe to pandas dataframe for further modeling.

### 2.1 Data loading

In [4]:
# read file: 'read_csv()' works just like pandas
%time df = dd.read_parquet('yellow_tripdata_2017-01.parquet')
#df.head()

CPU times: total: 46.9 ms
Wall time: 111 ms


In [5]:
# check time with pandas 
%time df_demo= pd.read_parquet('yellow_tripdata_2017-01.parquet')
df_demo.head()

CPU times: total: 4.92 s
Wall time: 2.02 s


Unnamed: 0,VendorID,tpep_pickup_datetime,tpep_dropoff_datetime,passenger_count,trip_distance,RatecodeID,store_and_fwd_flag,PULocationID,DOLocationID,payment_type,fare_amount,extra,mta_tax,tip_amount,tolls_amount,improvement_surcharge,total_amount,congestion_surcharge,airport_fee
0,1,2017-01-01 00:32:05,2017-01-01 00:37:48,1,1.2,1,N,140,236,2,6.5,0.5,0.5,0.0,0.0,0.3,7.8,,
1,1,2017-01-01 00:43:25,2017-01-01 00:47:42,2,0.7,1,N,237,140,2,5.0,0.5,0.5,0.0,0.0,0.3,6.3,,
2,1,2017-01-01 00:49:10,2017-01-01 00:53:53,2,0.8,1,N,140,237,2,5.5,0.5,0.5,0.0,0.0,0.3,6.8,,
3,1,2017-01-01 00:36:42,2017-01-01 00:41:09,1,1.1,1,N,41,42,2,6.0,0.5,0.5,0.0,0.0,0.3,7.3,,
4,1,2017-01-01 00:07:41,2017-01-01 00:18:16,1,3.0,1,N,48,263,2,11.0,0.5,0.5,0.0,0.0,0.3,12.3,,


In [6]:
df = dd.read_parquet('yellow_tripdata_2017-*.parquet', dtype={'trip_distance': float,
                        'total_amount': float, 'tolls_amount':float, 'RatecodeID': float, 'VendorID': float, 
                                                     'passenger_count': float, 'payment_type':float, 
                                                     'PULocationID':int, 'DOLocationID':int})
df.head()

Unnamed: 0,VendorID,tpep_pickup_datetime,tpep_dropoff_datetime,passenger_count,trip_distance,RatecodeID,store_and_fwd_flag,PULocationID,DOLocationID,payment_type,fare_amount,extra,mta_tax,tip_amount,tolls_amount,improvement_surcharge,total_amount,congestion_surcharge,airport_fee
0,1,2017-01-01 00:32:05,2017-01-01 00:37:48,1,1.2,1,N,140,236,2,6.5,0.5,0.5,0.0,0.0,0.3,7.8,,
1,1,2017-01-01 00:43:25,2017-01-01 00:47:42,2,0.7,1,N,237,140,2,5.0,0.5,0.5,0.0,0.0,0.3,6.3,,
2,1,2017-01-01 00:49:10,2017-01-01 00:53:53,2,0.8,1,N,140,237,2,5.5,0.5,0.5,0.0,0.0,0.3,6.8,,
3,1,2017-01-01 00:36:42,2017-01-01 00:41:09,1,1.1,1,N,41,42,2,6.0,0.5,0.5,0.0,0.0,0.3,7.3,,
4,1,2017-01-01 00:07:41,2017-01-01 00:18:16,1,3.0,1,N,48,263,2,11.0,0.5,0.5,0.0,0.0,0.3,12.3,,


In [7]:
df1 = dd.read_parquet('yellow_tripdata_2018-*.parquet', dtype={'trip_distance': float,
                        'total_amount': float, 'tolls_amount':float, 'RatecodeID': float, 'VendorID': float, 
                                                     'passenger_count': float, 'payment_type':float, 
                                                     'PULocationID':int, 'DOLocationID':int})
df1.head()

Unnamed: 0,VendorID,tpep_pickup_datetime,tpep_dropoff_datetime,passenger_count,trip_distance,RatecodeID,store_and_fwd_flag,PULocationID,DOLocationID,payment_type,fare_amount,extra,mta_tax,tip_amount,tolls_amount,improvement_surcharge,total_amount,congestion_surcharge,airport_fee
0,1,2018-01-01 00:21:05,2018-01-01 00:24:23,1,0.5,1,N,41,24,2,4.5,0.5,0.5,0.0,0.0,0.3,5.8,,
1,1,2018-01-01 00:44:55,2018-01-01 01:03:05,1,2.7,1,N,239,140,2,14.0,0.5,0.5,0.0,0.0,0.3,15.3,,
2,1,2018-01-01 00:08:26,2018-01-01 00:14:21,2,0.8,1,N,262,141,1,6.0,0.5,0.5,1.0,0.0,0.3,8.3,,
3,1,2018-01-01 00:20:22,2018-01-01 00:52:51,1,10.2,1,N,140,257,2,33.5,0.5,0.5,0.0,0.0,0.3,34.8,,
4,1,2018-01-01 00:09:18,2018-01-01 00:27:06,2,2.5,1,N,246,239,1,12.5,0.5,0.5,2.75,0.0,0.3,16.55,,


In [8]:
import dask.dataframe as dd
import numpy as np

# Read all parquet files for 2017 dataset
df_17 = dd.read_parquet('yellow_tripdata_2017-*.parquet', dtype={'trip_distance': float,
                        'total_amount': float, 'tolls_amount':float, 'RatecodeID': float, 'VendorID': float, 
                                                     'passenger_count': float, 'payment_type':float, 
                                                     'PULocationID':int, 'DOLocationID':int},
                     usecols=['tpep_pickup_datetime', 'passenger_count', 'PULocationID', 'DOLocationID'])

# Filter the records with pickup from JFK (zone 132)
df_jfk_17 = df_17[df_17['PULocationID'] == 132]

# Extract date and hour from pickup time
df_jfk_17['date'] = df_jfk_17['tpep_pickup_datetime'].dt.date
df_jfk_17['hour'] = df_jfk_17['tpep_pickup_datetime'].dt.hour

df_jfk_17 = df_jfk_17.reset_index()

In [9]:
# Select only the desired columns
df_jfk_17 = df_jfk_17[['date', 'hour', 'DOLocationID', 'passenger_count']]

# Convert the dask dataframe to pandas dataframe
#df_jfk_17_pandas = df_jfk_17.compute()

df_agg_17 = df_jfk_17.groupby(['date', 'hour', 'DOLocationID']).sum()[['passenger_count']].reset_index()

df_agg_17.head()

Unnamed: 0,date,hour,DOLocationID,passenger_count
0,2017-01-01,0,4,1
1,2017-01-01,0,7,2
2,2017-01-01,0,10,7
3,2017-01-01,0,12,1
4,2017-01-01,0,13,13


In [10]:
df_agg_17 = df_agg_17[(df_agg_17['date'] >= pd.to_datetime('2017-01-01')) & (df_agg_17['date'] <= pd.to_datetime('2017-12-31'))]
df_agg_17 = df_agg_17.reset_index(drop=True)
df_agg_17.head()

ValueError: Metadata inference failed in `ge`.

Original error is below:
------------------------
TypeError("'>=' not supported between instances of 'str' and 'Timestamp'")

Traceback:
---------
  File "C:\Users\Sharvari\anaconda3\lib\site-packages\dask\dataframe\utils.py", line 177, in raise_on_meta_error
    yield
  File "C:\Users\Sharvari\anaconda3\lib\site-packages\dask\dataframe\core.py", line 5756, in elemwise
    meta = partial_by_order(*parts, function=op, other=other)
  File "C:\Users\Sharvari\anaconda3\lib\site-packages\dask\utils.py", line 1225, in partial_by_order
    return function(*args2, **kwargs)
  File "C:\Users\Sharvari\anaconda3\lib\site-packages\pandas\core\ops\common.py", line 72, in new_method
    return method(self, other)
  File "C:\Users\Sharvari\anaconda3\lib\site-packages\pandas\core\arraylike.py", line 62, in __ge__
    return self._cmp_method(other, operator.ge)
  File "C:\Users\Sharvari\anaconda3\lib\site-packages\pandas\core\series.py", line 6243, in _cmp_method
    res_values = ops.comparison_op(lvalues, rvalues, op)
  File "C:\Users\Sharvari\anaconda3\lib\site-packages\pandas\core\ops\array_ops.py", line 287, in comparison_op
    res_values = comp_method_OBJECT_ARRAY(op, lvalues, rvalues)
  File "C:\Users\Sharvari\anaconda3\lib\site-packages\pandas\core\ops\array_ops.py", line 75, in comp_method_OBJECT_ARRAY
    result = libops.scalar_compare(x.ravel(), y, op)
  File "pandas\_libs\ops.pyx", line 107, in pandas._libs.ops.scalar_compare


In [None]:
# Convert the dask dataframe to pandas dataframe
df_jfk_17_pandas = df_jfk_17.compute()

### 2.2 Sanity check
Then, we need to do some basic sanity checks. It is possible that in a particular hour, completely dispatched no yellow taxis from JFK. Check does each day has 24-hour records and add missing records back to the dataframe. The final output should have 17520 rows ($365\times2\times24$)

In [None]:
# Define the number of drop-off locations
#num_locs = 263
num_locs = len(df_agg_17['DOLocationID'].unique())

# Subset the 2017 dataframe to include only records from January 1 to December 31
df_agg_17 = df_agg_17[(df_agg_17['date'] >= pd.to_datetime('2017-01-01')) & (df_agg_17['date'] <= pd.to_datetime('2017-12-31'))]

# Reset the index of the dataframe
df_agg_17 = df_agg_17.reset_index(drop=True)

# Check if each date has 24-hour records
unique_dates = df_agg_17['date'].unique()
unique_hours = df_agg_17['hour'].unique()
missing_groups = []
for date in unique_dates:
    for hour in unique_hours:
        group = df_agg_17[(df_agg_17['date'] == date) & (df_agg_17['hour'] == hour)]
        if len(group) != num_locs: 
            missing_groups.append((date, hour))

# Create a new dataframe with missing date-hour combinations
if missing_groups:
    print("Missing date-hour groups:")
    print(missing_groups)
    missing_df = pd.DataFrame(missing_groups, columns=['date', 'hour'])
    missing_df['DOLocationID'] = np.arange(num_locs)
    missing_df['passenger_count'] = 0

    # Concatenate the missing dataframe with the original dataframe
    df_agg_17 = pd.concat([df_agg_17, missing_df])
    df_agg_17 = df_agg_17.groupby(['date', 'hour', 'DOLocationID']).sum().reset_index()

# Sort the dataframe by date and hour
df_agg_17 = df_agg_17.sort_values(by=['date', 'hour']).reset_index(drop=True)

# Check the final length of the dataframe
assert len(df_agg_17) == len(unique_dates) * len(unique_hours) * num_locs

## 3. Time-series exploratory analysis
Apply exploratory analysis over the daily aggregated dataset at first.

### 3.1 aggregate the ridership from each dropoff location and sum it to get daily records. (3pts)

### 3.2 Period detection and report the strongest period length on the 2017 data. (3pts)
Hint: using periodogram or acf plot.

### 3.3 Trend, seasonality, noise decomposition (using additive model) on 2017 data, . (3 pts, 1 pts if freq is wrong)

## 4. Predict the total daily ridership from JFK using ARIMA.
ARIMA is a common method to predict taxi ridership. Before we predict taxi zone level hourly ridership, let's try to predict the aggregated daily ridership using ARIMA.

### 4.1 Using adfuller test to test the stability of the aggregated dataset. If not stable, apply differencing method until the p-value from adfuller test is smaller than 0.05. (3pts)

### 4.2 Find out proper AR and MA terms in an ARIMA model using pacf and acf plots. (4 pts, 2 for each plot)
Hint: positive autocorrelation is usually best treated by adding an AR term to the model and negative autocorrelation is usually best treated by adding an MA term. In general, differencing reduces positive autocorrelation and may even cause a switch from positive to negative autocorrelation. 

Identifying the numbers of AR and MA terms:
1. if the pacf plot shows a sharp cutoff and/or the lag-1 autocorrelation is positive then consider adding one or more AR terms to the model. The lag beyond which the PACF cuts off is the indicated number of AR terms.

2. if the acf plot displays a sharp cutoff and/or the lag-1 autocorrelation is negative then consider adding an MA term to the model. The lag beyond which the ACF cuts off is the indicated number of MA terms.

3. It is generally advisable to stick to models in which at least one of p and q is no larger than 1, i.e., do not try to fit a model such as ARIMA(2,1,2).

### 4.3 build an ARIMA model using terms from 4.2, training on the first 700 days, forecast on the last 31 days. Print ARIMA model results and plot in-sample and out-of-sample prediction in different colors. (8 pts, 3 for correct terms, 3 for training and summary, 2pts for the plot)

# Taxi zone level prediction

This project aims to predict hourly yellow taxi ridership volume from JFK to each taxi zone. The ARIMA experiment in section 3 forecasts the total ridership amount from JFK. However, based on the reported $R^2$, this model is not a good fit. ARIMA model has four main shortcomings: 1) they rely heavily on stationarity assumption which does not hold in real-world traffic systems 2) they do not consider spatial and structural dependencies that traffic networks exhibit and forecast each sensor as an individual time series 3) they are unable to model non-linear temporal dynamics 4) they suffer from the curse of dimensionality. Due to the limitation of ARIMA, we need to apply another method to predict taxi zone level ridership.

## 5. Feature engineering

Our workflow is first standardizing the dataset, then using PCA to compress the dataset. As we predict future ridership, PCA should be learned from historical data (2017) then apply to the following year (2018). Next, add lag features (PCA components) from the past 12 hours and apply a Random Forest regressor to predict each PCA component's values in the next hour. After we had the PCA component prediction, inverse PCA, and inverse standardization to retrieve taxi ridership prediction in its original scale and dimension, in other words, we are predicting the PCA components instead of taxi zone level ridership and then using the inverse PCA method to reconstruct 

### 5.1. standardization. (3 pts)
The standardscaler stores information of this standardization process, including the mean and standard deviation values required when converting the prediction back to the raw scale. Split the whole dataset into two parts: 2017 and 2018, standardize each separately.

### 5.2. PCA

#### 5.2.1 train PCA on 2017 data. Let's arbitrarily set PCA components as 5, and gamma is None, try kernel ‘linear’, ‘poly’, ‘rbf’, and ‘sigmoid’. Select the transformer which has the lowest mean squared error in data reconstruction (inverse transform). (5 pts)

#### 5.2.2 Apply the selected transformer from 4.2.1 to the standardized 2018 dataset and report the mean squared error between the standardized data and reconstructed data. Hint: fit the PCA on 2017 data and apply it to transform 2018 data.(5pts)

### 5.3 Add lag (5pts)
add 12 lags of each component from 4.2.2 (compressed 2018 data only). The expected output should have 65 dimensions. In the further modeling step, we will apply the 60 lag variables to predict the 5 components.

## 6. RandomForest modeling (23pts)

We aim at predicting compressed daily ridership (5 PCA components values) from 12-hour lag variables. Parameter tuning is required in this section, including min_samples_split, min_samples_leaf, and n_estimators. First 80% days for training, test on the rest 20%. And in the training dataset, validate the model on the bottom 20%. 


### Extra credit: 

Using grid_search function in sklearn instead of a for-loop when tuning parameters in a RandomForest. The train, validation, and test datasets should be split in the same way as described above. Hint: To fix train and validation in a grid search, you might need the PredefinedSplit function from sklearn.

### 6.1 train test split (3pts)
Please keep in mind that random train test split is not applicable in this case.

### 6.2 parameter tuning (10pts)
Please search the best parameter set in the following range:
min_samples_split: 2 to 10,
min_samples_leaf: 2 to 10,
and n_estimators equal to 50.


### 6.3 model performance measurement (10pts)
Prediction results are PCA components instead of taxi zone level ridership. To reconstruct the data back to its original size and scale, we need to inverse PCA and inverse standardization. Report the taxi zone level $R^2$ value.