## NYC Yellow Taxi 2019

In [0]:
# To view the root 'databricks-datasets' folder, run: `display(dbutils.fs.ls("/databricks-datasets/"))`
# display(dbutils.fs.ls("/databricks-datasets/nyctaxi/tripdata/yellow/"))

### `NYCTaxiData` Class

Loads and cleans NYC Yellow Taxi trip data for a given month and year. Provides trip counts and pickup aggregations by date or hour using Spark DataFrames.



In [0]:
from pyspark.sql.functions import min, max, col, unix_timestamp, date_format, hour

class NYCTaxiData:
    def __init__(self, month_year_list):
        self.month_year_list = month_year_list
        self.clean_df = self.load_and_process_data()
    
    def read_data(self, month, year):
        return spark.read \
            .option("header", "true") \
            .option("inferSchema", "true") \
            .csv(f"dbfs:/databricks-datasets/nyctaxi/tripdata/yellow/yellow_tripdata_{year}-{month}.csv.gz")
    
    def clean_data(self, df, month, year):
        next_month = (int(month) + 1) % 13
        next_month = f"0{next_month}" if next_month < 10 else str(next_month)
        year_of_next_month = year if month != "12" else f"{int(year) + 1}"
        # print(f"Cleaning data: The next_month is: {next_month}, year of the next month is: {year_of_next_month}\n")

        # Drop rows if missing pickup or dropoff datetime
        # Filter rows to include only trips in the year-month
        # Exclude trips with invalid durations: negative or exceeding 24 hours (86400 seconds)
        # Extract day and hour (military) from pickup and dropoff datetime
        clean_df = df\
            .na.drop(subset=["tpep_pickup_datetime", "tpep_dropoff_datetime"])\
            .filter(
                (col("tpep_pickup_datetime") >= f"{year}-{month}-01") & 
                (col("tpep_pickup_datetime") < f"{year_of_next_month}-{next_month}-01") &
                (col("tpep_dropoff_datetime") >= f"{year}-{month}-01") & 
                (col("tpep_dropoff_datetime") < f"{year_of_next_month}-{next_month}-01")
            )\
            .filter(
                (unix_timestamp("tpep_dropoff_datetime") - unix_timestamp("tpep_pickup_datetime")).between(0, 86400)
            )\
            .withColumn("pickup_date", date_format(col("tpep_pickup_datetime"), "yyyy-MM-dd"))\
            .withColumn("pickup_hour", hour(col("tpep_pickup_datetime")))\
            .withColumn("dropoff_date", date_format(col("tpep_dropoff_datetime"), "yyyy-MM-dd"))\
            .withColumn("dropoff_hour", hour(col("tpep_dropoff_datetime")))

        return clean_df
    
    def load_and_process_data(self):
        if not self.month_year_list:
            raise ValueError("Empty list")

        dfs = []
        for month, year in self.month_year_list:
            data_df = self.read_data(month, year)
            clean_df = self.clean_data(data_df, month, year)
            dfs.append(clean_df)

        union_df = dfs[0]
        for df in dfs[1:]:
            union_df = union_df.unionByName(df)
        return union_df

    def len(self):
        return self.clean_df.count()
    
    # Return the top n hours with the most pickups
    def pickups_per_time_interval(self, time_interval, top_n):
        # time_interval either based on date or hour
        pickups_per_time_interval = self.clean_df\
            .groupby(f"pickup_{time_interval}")\
            .count()\
            .withColumnRenamed("count", "num_pickups") \
            .orderBy("num_pickups", ascending=False)\
            .limit(top_n)
        return pickups_per_time_interval

In [0]:
month_year_list = [("08", "2019")]
nyc_2019_08_df = NYCTaxiData(month_year_list)
print(f"Number of rows: {nyc_2019_08_df.len()}")
nyc_2019_08_df.pickups_per_time_interval("hour", 5).show()
nyc_2019_08_df.pickups_per_time_interval("date", 5).show()

## Forecast Taxi Demands
Forecast taxi demand (number of rides) on a daily/hourly basis using 1 month of data.

In [0]:
class ForecastTaxiDemands(NYCTaxiData):
    def __init__(self, month_year_list):
        super().__init__(month_year_list)
        self.aggregate_df = self.aggregate_data()
    
    def aggregate_data(self):
        """Aggregate pickups by date and time"""
        aggregate_df = self.clean_df \
                        .groupBy("pickup_date", "pickup_hour") \
                        .count() \
                        .withColumnRenamed("count", "num_pickups") \
                        .orderBy("pickup_date", "pickup_hour")
        return aggregate_df
    
    def convert_to_pandas(self):
        return self.aggregate_df.toPandas()
    
    def len(self):
        return self.aggregate_df.count()

### Baseline: ARIMA
- **I:** Integration (Differencing)
    - Makes the data stationary by modeling differences between consecutive observations. If \( d = 1 \), the model predicts the difference between current and previous data points.
- **AR:** AutoRegressive
    - Models the current value as a weighted sum of past values (lags).
- **MA:** Moving Average
    - Models the current value as a weighted sum of past forecast errors (shocks).


In [0]:
!pip install pmdarima

In [0]:
# %restart_python

In [0]:
import numpy as np
import pmdarima as pm
import pandas as pd
import matplotlib.pyplot as plt

class ARIMAForecaster:
    def __init__(self, data: pd.DataFrame):
        """
        Parameters:
        - data (pd.DataFrame): Must include a 'num_pickups' column.
        """
        self.original_data = data.reset_index(drop=True)
        self.train, self.test = self._split_data()
        self.model = None

    def _split_data(self, split_ratio: float = 0.8):
        idx = int(split_ratio * len(self.original_data))
        # Only log-transform the training data
        train_log = np.log(self.original_data["num_pickups"][:-48] + 1)
        test_actual = self.original_data["num_pickups"][-48:]
        return train_log, test_actual

    def fit(self):
        """Fits auto ARIMA model to log-transformed training data."""
        self.model = pm.auto_arima(
            self.train,
            start_p=0, max_p=3,
            start_q=0, max_q=3,
            d=None,
            start_P=0, max_P=1,
            start_Q=0, max_Q=1,
            D=None,
            seasonal=True,
            m=24,
            stepwise=True,
            suppress_warnings=True,
            error_action="ignore"
        )
        return self

    def forecast(self, steps: int = None) -> np.ndarray:
        """Forecasts and returns predictions in the original scale."""
        if self.model is None:
            raise ValueError("Model not fit yet.")
        if steps is None:
            steps = len(self.test)
        log_preds = self.model.predict(n_periods=steps)
        return np.exp(log_preds) - 1  # inverse log-transform

    def evaluate(self) -> float:
        """Returns RMSE between predicted and actual values (original scale)."""
        y_pred = self.forecast()
        y_true = self.test.values
        rmse = np.sqrt(np.mean((y_true - y_pred) ** 2))
        mape = np.mean(np.abs((y_true - y_pred) / y_true)) * 100
        return rmse, mape
    
    def plot_forecast_error(self):
        y_pred = self.forecast()
        y_true = self.test.values

        plt.figure(figsize=(12, 5))
        plt.plot(np.abs(y_true - y_pred), label='Actual', marker='o')
        plt.title('ARIMA Forecast Error')
        plt.xlabel('Time (hour index)')
        plt.ylabel('Error (absolute)')
        plt.grid(True)
        plt.tight_layout()
        plt.show()

In [0]:
month_year_list = []
for i in range(4, 11):
  month = f"0{i}" if i < 10 else f"{i}"
  month_year_list.append((month, "2019"))

data = ForecastTaxiDemands(month_year_list).convert_to_pandas()
data.shape

**Forecast the next 48 hours**

In [0]:
forecaster = ARIMAForecaster(data).fit()

rmse, mape = forecaster.evaluate()
print(f"RMSE: {rmse:.4f}")
print(f"MAPE: {mape:.4f}")
# RMSE: 6146.1763, 3111
# MAPE: 80.4993

# RMSE: 4433.4665
# MAPE: 105.2355

## Prophet

In [0]:
!pip install prophet

In [0]:
data.describe()

In [0]:
import numpy as np
import pandas as pd
from prophet import Prophet

def split_train_test(data):
    data['pickup_date'] = pd.to_datetime(data['pickup_date'])  # convert date column to datetime
    data['ds'] = data['pickup_date'] + pd.to_timedelta(data['pickup_hour'], unit='h')

    df = data[['ds', 'num_pickups']].copy()
    df.rename(columns={'num_pickups': 'y'}, inplace=True)

    # log-transform the training set
    df['y_orig'] = df['y']  # keep original
    df['y'] = np.log(df['y'] + 1)

    train = df.iloc[:-48]
    test = df.iloc[-48:]
    return train, test

def evaluate(y_true, y_pred):
    # Reverse log transform to original scale
    y_pred_exp = np.exp(y_pred) - 1
    
    rmse = np.sqrt(np.mean((y_true - y_pred_exp) ** 2))
    mape = np.mean(np.abs((y_true - y_pred_exp) / y_true)) * 100
    return rmse, mape

# Assuming 'data' is your full DataFrame with pickup_date and num_pickups columns
train, test = split_train_test(data)

best_rmse = float('inf')
best_mape = float('inf')
best_params = {}

for sps in [4, 6, 8, 10]:
    for cps in [0.01, 0.05, 0.1]:
        for fourier_order in [3, 5, 7]:
            model = Prophet(
                daily_seasonality=True, 
                weekly_seasonality=True, 
                changepoint_prior_scale=cps,
                seasonality_prior_scale=sps
            )
            model.add_seasonality(name='hourly', period=24, fourier_order=7)  # capture hourly pattern
            model.fit(train)

            future = model.make_future_dataframe(periods=48, freq='H')
            forecast = model.predict(future)

            # Align since prophet forecasts both TRAINING period and future
            pred_df = forecast[['ds', 'yhat']].merge(test[['ds', 'y_orig']], on='ds', how='inner')
            rmse, mape = evaluate(pred_df['y_orig'].values, pred_df['yhat'].values)
            print(f"RMSE: {rmse:.4f}")
            print(f"MAPE: {mape:.4f}")
            
            if rmse < best_rmse:
                best_rmse = rmse
                best_mape = mape
                best_params = {'changepoint_prior_scale': cps, 
                               'fourier_order': fourier_order, 
                               'seasonality_prior_scale': sps}

In [0]:
print("Best params:", best_params)
print("Best RMSE:", best_rmse)
print("Best MAPE:", best_mape)