In [None]:
!pip install prophet
from pyspark.sql import SparkSession, SQLContext, Row
from pyspark.sql.functions import hour, when, year, col, date_format, to_timestamp, round, coalesce, sum, lit, udf,  floor, datediff, weekofyear, min, count, avg
from pyspark.sql.functions import floor, datediff, lit, min, count, date_add, col, date_format, date_trunc
from pyspark.sql.functions import hour, when, col, date_format, year, to_timestamp, round, format_string, coalesce, sum, count, StringType, concat_ws, row_number,lit
from pyspark.sql.types import IntegerType
from pyspark.sql import functions as F
from pyspark.sql.window import Window
import pandas as pd
import numpy as np
from sklearn.ensemble import IsolationForest
from sklearn.neighbors import LocalOutlierFactor
from sklearn.preprocessing import StandardScaler
from pyspark.ml.feature import StandardScaler
from pyspark.sql import Window
from prophet import Prophet
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.feature import StandardScaler
from google.cloud import bigquery
from pyspark.sql.functions import year, count
from google.cloud import storage
import os

In [None]:
class suppress_stdout_stderr(object):
    '''
    A context manager for doing a "deep suppression" of stdout and stderr in
    Python, i.e. will suppress all print, even if the print originates in a
    compiled C/Fortran sub-function.
       This will not suppress raised exceptions, since exceptions are printed
    to stderr just before a script exits, and after the context manager has
    exited (at least, I think that is why it lets exceptions through).

    '''
    def __init__(self):
        # Open a pair of null files
        self.null_fds = [os.open(os.devnull, os.O_RDWR) for x in range(2)]
        # Save the actual stdout (1) and stderr (2) file descriptors.
        self.save_fds = (os.dup(1), os.dup(2))

    def __enter__(self):
        # Assign the null pointers to stdout and stderr.
        os.dup2(self.null_fds[0], 1)
        os.dup2(self.null_fds[1], 2)

    def __exit__(self, *_):
        # Re-assign the real stdout/stderr back to (1) and (2)
        os.dup2(self.save_fds[0], 1)
        os.dup2(self.save_fds[1], 2)
        # Close the null files
        os.close(self.null_fds[0])
        os.close(self.null_fds[1])
        
        
import warnings
warnings.filterwarnings("ignore")

warnings.filterwarnings("ignore", category=RuntimeWarning)
warnings.filterwarnings("ignore", category=UserWarning)

In [None]:
class NYCabDataProcessor:
    """
    Class to read NYC taxi data in from big query, clean 
    the data (using Isolation Forest and basic filtration 
    steps), join it using a lookup table, aggregate 
    individual trip data into a weekly basis, and forecast 
    the future. Other class methods can be used to build 
    networks based on various critera.
    
    Attributes
    ----------
    data : pyspark.sql.DataFrame
        Contains the 2019 and 2020 trip data after being read 
        in from bigquery via read_data. Also contains cleaned
        data from data cleaning methods

    anomalous_data : pyspark.sql.DataFrame
        Contains a subset of the trip data that is anomalous.
        Used to visualize anomalous trips

    data_with_lookup: pyspark.sql.DataFrame
        Merges data class variable with externally provided 
        lookup table. Allows connection of each zone with
        geographic data.
    
    agg_data: pyspark.sql.DataFrame
        In order to forecast and visualize data on a 
        reasonable level, contains total trips, total tips,
        and other statistics aggregated on weekly basis

    forcasted_data: pyspark.sql.DataFrame
        Contains data forcasted past March 15 as provided by
        class methods using the Prophet model
        
    mape: float
        Used to store the mean absolute percentage error (MAPE)
        of the forecasst created.
    
    final_prices: pyspark.sql.DataFrame
        Used to store a dataframe of final prices to be used by the dashboard
        
    map_data_weekly_with_forecast: pyspark.sql.DataFrame
        Used to store the combined aggregated cleaned data along with taxi lookup and forecasted data
    
    map_delta_data_weekly: pyspark.sql.DataFrame
        Used in the dashboard, the final output of this notebook. Formatting and column
        names changed to make visualization easier.
    
    Methods
    -------
    read_data (filename1, filename2):
        Calls read_bigquery_data twice to read in two tables.
        For the purposes of COVID-19 forcasting, it is suggested
        to read in the 2019 and 2020 trip data.

    read_bigquery_data(table_name):
        Reads bigquery data in given by the path passed into the
        class method. Mainly used by read_data.

    read_csv_through_path(gs_path):
        Reads a CSV at a file path provided into a spark DataFrame

    report_metadata:
        Utility function to print the metadata of the data class 
        variable: Count of rows and column names

    upload_data_to_bigquery(project, bucket_name, dataset_id, table_name, df):
        Utility function to write final data back to BigQuery at the end
        of the data session. Can specify which class dataframe to write.

    clean_data:
        Base function to do some basic data cleaning. Cast columns
        to DateTime, switch columns to doubles and floats for 
        calculation accuracy/performance as necessary, 
        and filter all negative trip distances.

    anomaly_profile_flag(K=3):
        Flag data as anomalous using an isolation forest algorithm.
        K = 3 is used as a hyperparameter and can be changed by the 
        user to find different results.
    
    join_data_and_lookup(taxi_lookup):
        Joins the base data class variable on the taxi_lookup data 
        provided in a CSV in the project folder.

    aggregate_data_weekly:
        Aggregates data on a weekly basis, from individual trips to
        all trips on a weekly basis. Takes the trip count sums, but
        also takes the min of other columns in the dataframe grouped
        on a weekly basis.

    wmape(actual, forecast):
        Calculates the wmape on the forecasted data from the prophet 
        model.

    forecast_data_post_march_fifteen:
        Uses facebook prophet model and aggregated data provided 
        by weekly model to forecast future data and place into
        class variable for forecated data

    get_mape_for_forecast_data:
        Uses wmape function, base data, and forecasted data to find
        the MAPE for the forecast.
        
    create_network_data(taxi_lookup):
        Aggregate all of the data from the processed trip data and format it into a single dataframe. 
        Will be used in combination with the forecasted data to create differentials and map visuals.
    
    combined_forecasted_Data(taxi_lookup):
        Combined the forecast data with the network data to produce a CSV ready for a dashboard,
        complete with names of zones and boroughs, as well as shape files.

    update(self,filename1,filename2, taxi_lookup):
        Wrapper function for class methods that reads, cleans, 
        forecasts, and finds error for the data for 2019
        and 2020 taxi trips on Google BigQuery.


    """

    # Initialize all class variables to None (described above)
    data = None
    anomalous_data = None
    data_with_lookup = None
    agg_data = None
    forecasted_data = None
    mape = None
    final_prices = None
    nyc_network_df = None
    map_weekly_data_with_forecast = None
    map_data_delta_weekly = None
    
    


    def __init__(self, project, bucket, appname='spark-anomaly-detection'):
        self.spark = SparkSession.builder.master('yarn').appName(appname).getOrCreate()
        self.project = project
        self.bucket = bucket
        
    def read_data(self, filename1 : str , filename2: str):
        """Reads data from Google BigQuery using class methods from filenames.

        Args:
            filename1 (str): First filename to read from
            filename2 (str): Second filename to read from
        """
        data_2019 = self.read_bigquery_table(filename1)
        data_2020 = self.read_bigquery_table(filename2)
        # Join the two dataframes and store in class variable data
        self.data = data_2019.union(data_2020)
    
    def read_bigquery_table(self,table_name):
        """Uses pyspark to read data in from a location called table_name.

        Args:
            table_name (str): Location of the file to be read from BigQuery

        Returns:
            pyspark.sql.DataFrame: Data in dataframe read in from BigQuery
        """
        return self.spark.read.format('bigquery').option('table', table_name).load()
    
    def read_csv_through_path(self,gs_path):
        """Reads data in from a csv and stores in spark DataFrame

        Args:
            gs_path (str): Relative or absolute path to csv

        Returns:
            pyspark.sql.DataFrame: Data from csv
        """
        return self.spark.read.csv(gs_path, header=True, inferSchema=True)

    
    def report_metadata(self):
        """Reports some basic metadata about the data class variable

        Returns:
            dict: Dictionary containing a count of rows and list of colum names
        """
        column_names = [field.name for field in self.data.schema]
        return {'row_count': self.data.count(), 'column_names': column_names}
    

    def upload_data_to_bigquery(self,project_id, dataset_id, table_name, df):
        """Utility to upload data back to bigquery. Input a project id, 
        a dataset id, and a table name.

        Args:
            project_id (str): String containing the Google project ID for BigQuery
            dataset_id (str): The "bucket" or dataset to store the data in
            table_name (str): The specific name of the file/data to be stored
            df (pyspark.sql.DataFrame): The DataFrame containing the data you would
                like to write.
        """
        df.write.csv(f"gs://{dataset_id}/{table_name}",header=True,mode='overwrite')

    
    
    def clean_data(self):
        """Function do to some basic data cleaning and filtering. Performs the 
        cleaning on the data class variable and changes it to store the data
        """
        # Change the datetime columns to datetime data types
        data_ = self.data.withColumn("pickup_datetime", self.data["pickup_datetime"].cast("timestamp"))
        data_ = data_.withColumn("dropoff_datetime", data_["dropoff_datetime"].cast("timestamp"))
        # Cast the trip distance to a double for precision reasons
        data_ = data_.withColumn("trip_distance", data_["trip_distance"].cast("double"))
        # Find the trip duration in minutes by calculating using datetime objects
        data_ = data_.withColumn("trip_duration", (col("dropoff_datetime").cast("long") - col("pickup_datetime").cast("long")) / 60)
        # Filter the data such that the trip distance should be >= 0
        data_ = data_.filter(col("trip_distance") >= 0)
        # Filter the data such that the total amount should be >= 0
        data_ = data_.filter(col("total_amount") >= 0)
        # Filter the data for NA and NV zones (cannot be visualized on map)
        data_ = data_.filter(col("pickup_location_id") < 264)
        data_ = data_.filter(col("dropoff_location_id") < 264)
        # Filter data where dropoff date is earlier than pickup date
        data_ = data_.filter(col("dropoff_datetime") >= col("pickup_datetime"))
        self.data = data_ #cleaned data


    def anomaly_profile_flag(self,K=3):
        """Use isolation forest algorithm, as well as Chebyshev's theorem
        to identify anomalous trips with regards to trip distance and trip 
        duration. Store the anomalous trip data in a separate DataFrame 
        for visualization reasons. Also creates some information describing 
        the reasons that data can be anomalous.

        Args:
            K (int=3): The K to use for Chebyshev's theorem (the number of 
            standard deviations to allow in trip distance and duration)
        """
        # Select the columns to find if anomalous
        selected_columns = ["trip_distance", "trip_duration"]
        # Select the other columns to train the anomaly detection on
        data_sub = self.data.select(*selected_columns)
        # Case to double for precision reason
        data_sub = data_sub.withColumn("trip_distance", col("trip_distance").cast("double"))
        data_sub = data_sub.withColumn("trip_duration", col("trip_duration").cast("double"))

        # Initialize lists to store the results
        IF_anomaly_distance_results = list()
        IF_anomaly_duration_results = list()
        IF_normal_distance_results = list()
        IF_normal_duration_results = list()

        # Bootstrap the data to train the isolation forest on
        for i in range(1, 15):
            # Define a fraction of the data to sample for bootstrapping (set to small for computational reasons)
            sample_fraction = 0.0005  
            sampled_data = data_sub.sample(fraction=sample_fraction)
            sampled_pandas_df = sampled_data.toPandas()
            # Create the isolation forest trip ditances and durations
            isolation_forest_trip_distance = IsolationForest(contamination=0.03, random_state=42, n_jobs=-1)
            isolation_forest_trip_duration = IsolationForest(contamination=0.03, random_state=42, n_jobs=-1)
            # Fit the models
            isolation_forest_trip_distance.fit(sampled_pandas_df[["trip_distance"]])
            isolation_forest_trip_duration.fit(sampled_pandas_df[["trip_duration"]])

            anomaly_labels_trip_distance = isolation_forest_trip_distance.predict(sampled_pandas_df[["trip_distance"]])
            anomaly_labels_trip_duration = isolation_forest_trip_duration.predict(sampled_pandas_df[["trip_duration"]])

            sampled_pandas_df['distance_anomaly'] = anomaly_labels_trip_distance
            sampled_pandas_df['duration_anomaly'] = anomaly_labels_trip_duration

            # Find what makes trips anomalous in terms of distance and duration in terms of the other variables
            distance_anomaly_description = sampled_pandas_df[sampled_pandas_df['distance_anomaly'] == -1]['trip_distance'].describe()
            distance_normal_description = sampled_pandas_df[sampled_pandas_df['distance_anomaly'] == 1]['trip_distance'].describe()
            duration_anomaly_description = sampled_pandas_df[sampled_pandas_df['duration_anomaly'] == -1]['trip_duration'].describe()
            duration_normal_description = sampled_pandas_df[sampled_pandas_df['duration_anomaly'] == 1]['trip_duration'].describe()

            # Store the results
            IF_anomaly_distance_results.append(distance_anomaly_description)
            IF_anomaly_duration_results.append(duration_anomaly_description)
            IF_normal_distance_results.append(distance_normal_description)
            IF_normal_duration_results.append(duration_normal_description)

            # Find the mean and standard deviation of each characteristic
            distance_normal_mean = pd.DataFrame(IF_normal_distance_results).mean()['mean']
            distance_normal_std = pd.DataFrame(IF_normal_distance_results).mean()['std']
            duration_normal_mean = pd.DataFrame(IF_normal_duration_results).mean()['mean']
            duration_normal_std = pd.DataFrame(IF_normal_duration_results).mean()['std']

            # Find the normal bounds as defined by K standard deviations from the mean
            lower_bound_distance_normal = distance_normal_mean - K * distance_normal_std
            upper_bound_distance_normal = distance_normal_mean + K * distance_normal_std

            lower_bound_duration_normal = duration_normal_mean - K * duration_normal_std
            upper_bound_duration_normal = duration_normal_mean + K * duration_normal_std
            
        # Finally aggregate all of the anomalous data into a separate class variable
        data_sub_anomalous = self.data.withColumn("is_anomalous_distance",
                                      when((col("trip_distance") < lower_bound_distance_normal) | 
                                           (col("trip_distance") > upper_bound_distance_normal), lit(True))
                                      .otherwise(lit(False)))
        # Store column for anomalous duration
        data_sub_anomalous = data_sub_anomalous.withColumn("is_anomalous_duration",
                                                          when((col("trip_duration") < lower_bound_duration_normal) | 
                                                               (col("trip_duration") > upper_bound_duration_normal), lit(True))
                                                          .otherwise(lit(False)))
        # Store column for anomalous in any category
        data_sub_anomalous = data_sub_anomalous.withColumn("is_anomalous",
                                                          (col("is_anomalous_distance") | col("is_anomalous_duration")))
        # Assign to class variable
        self.anomalous_data = data_sub_anomalous
        
        
    
    #Aggregation
    def join_data_and_lookup(self,taxi_lookup):
        """Join the data on the zone lookup containign things like path declarations 
        and other metadata (like zone name and borough)

        Args:
            taxi_lookup (pyspark.sql.DataFrame): Table containing taxi lookup data
        """
        #filter to relevant columns
        processed_df = self.anomalous_data.select("vendor_id", "pickup_datetime", "dropoff_datetime", "trip_distance", "payment_type", "tip_amount","total_amount", "pickup_location_id", "dropoff_location_id", "trip_duration", "is_anomalous", "is_anomalous_distance", "is_anomalous_duration")
        processed_df = processed_df.withColumn("year", year("pickup_datetime"))
        processed_df = processed_df.filter((col("year") == 2019) | (col("year") == 2020))
        processed_df = processed_df.withColumnRenamed("pickup_location_id", "LocationID")
        joined_df = processed_df.join(taxi_lookup, taxi_lookup.zone_id == processed_df.LocationID, 'left')
        filtered_df = joined_df.filter(col("pickup_datetime") >= "2019-01-01")
        sorted_df = filtered_df.orderBy(col("pickup_datetime"))
    
        self.data_with_lookup = sorted_df
        
        
        
    
    def aggregate_data_weekly(self):
        """Aggregate the data with lookup class variable to a weekly
        basis in order to forecast into the future. Assumes
        2019 and 2020 data, as the first sunday is hardcoded for now.
        Takes the count of the trips in each week, but takes the min
        of other columns. Could be updated in future for other 
        visualizations.

        """
        first_sunday = "2019-01-06"
        filtered_df = self.data_with_lookup.filter(col("pickup_datetime") >= first_sunday)
    
        agg_df = filtered_df.withColumn(
           "days_since_first_sunday",
           datediff(col("pickup_datetime"), lit(first_sunday))
        )

        # Calculate the number of weeks since the first Sunday in 2019
        agg_df = agg_df.withColumn(
           "weeks_since_first_sunday",
           floor((col("days_since_first_sunday") + 1) / 7).cast(IntegerType())
        )

        # Calculate the start and end dates of each week
        agg_df = agg_df.withColumn(
            "week_start_date",
            date_add(lit(first_sunday), (col("weeks_since_first_sunday") * 7))
        ).withColumn(
            "week_end_date",
            date_add(lit(first_sunday), (col("weeks_since_first_sunday") + 1) * 7 - 1)
        )

        # Calculate the correct "week" column representing the start of each week (Sunday)
        agg_df = agg_df.withColumn(
            "week",
            date_format(date_trunc("week", col("week_start_date")), "yyyy-MM-dd HH:mm:ss")
        )

        nyc_aggregated_df = agg_df.groupBy("week_start_date", "LocationID").agg(
            count("*").alias("trip_count"),
            *[sum(col(col_name).cast("int")).alias(col_name) for col_name in filtered_df.columns if col_name in ["is_anomalous", "is_anomalous_distance", "is_anomalous_duration"]],
            *[sum(col_name).alias(col_name) for col_name in filtered_df.columns if col_name not in [ "week_start_date", "week_end_date", "zone_id", "is_anomalous", "is_anomalous_distance", "is_anomalous_duration","zone_name", "borough", "zone_geom", "LocationID", "trip_count"]]
        )

        nyc_aggregated_df = nyc_aggregated_df.select(
            "week_start_date",
            "trip_count",
            *[col_name for col_name in nyc_aggregated_df.columns if col_name not in ["week_start_date", "week_end_date", "Zone", "trip_count"]]
        )

        #rename week_start_date to week and verify all columns are in dataset 
        nyc_aggregated_df = nyc_aggregated_df.withColumnRenamed("week_start_date", "week")
        self.agg_data = nyc_aggregated_df
        
    def find_mean_prices(self):
        """Finds the mean price to be used for economic forecasts
        """
        merged_df = self.data_with_lookup
        merged_df = merged_df.withColumn("LocationID", col("LocationID").cast(IntegerType()))

        before_date = '2020-03-15'
        filtered_df = merged_df.filter(col("is_anomalous") == False)

        before_df = filtered_df[filtered_df['pickup_datetime'] < before_date]
        after_df = filtered_df[filtered_df['pickup_datetime'] >= before_date]
        before_totals = before_df.groupby('Borough').agg(avg('total_amount').alias('Mean_Total_Amount_Before'))
        after_totals = after_df.groupby('Borough').agg(avg('total_amount').alias('Mean_Total_Amount_After'))

        final_prices_df = before_totals.join(after_totals, on="Borough")
        self.final_prices = final_prices_df
        

    def wmape(self,actual, forecast):
        """Calculate the MAPE of two pandas DataFrames (use
        Series for better robust code).

        Args:
            actual (pd.DataFrame): DataFrame/Series containing actual values
            forecast (pd.DataFrame): DataFrame/Series containing the forecast values

        Returns: 
            float: WMAPE calculated by the method
        """
        actual_np = np.array(actual)
        forecast_np = np.array(forecast)

        if len(actual_np) != len(forecast_np):
            return None  

        absolute_percentage_errors = np.abs((actual_np - forecast_np) / actual_np)

        return np.sum(absolute_percentage_errors) / len(actual_np) * 100

    
    def forecast_data_post_march_fifteen(self):
        """Forecast the trip count for all NYC yellow taxis post March 15th 
        on a weekly granularity. Uses Meta PROPHET model to perform this 
        forecast. Creates the forecasted_data class variable at the end
        using the agg_data class variable.
        """
        # Make the agg data into a pandas dataframe (easier for Prophet to work with)
        agg_df = self.agg_data.toPandas()
        agg_df['week'] = pd.to_datetime(agg_df['week'])
        unique_zones = agg_df.LocationID.drop_duplicates().to_list()

        # Place for the results to go from the prediction
        results_df = pd.DataFrame(columns=['week', 'LocationID', 'forecast_qty', 'trip_count'])

        # For each of the zones in the list of zones, train the model on 2019 data
        # and forecast into 2020.
        for zone in unique_zones:

            # Find the data only for the given zone
            zone_data = agg_df[agg_df['LocationID'] == zone]
            # Adjust the columns names for the sake of the prophet model
            zone_data.rename(columns={'week': 'ds', 'trip_count': 'y'}, inplace=True)
                        
            # Check for and handle NaN values in the target variable
            zone_data = zone_data.dropna(subset=['y'])

            # Check for and handle missing values in the 'ds' column
            zone_data = zone_data.dropna(subset=['ds'])

            # Remove duplicates in the 'ds' column
            zone_data = zone_data.drop_duplicates(subset=['ds'])

            # Define training period/forecast period
            train_start_date = '2019-01-01'
            train_end_date = '2020-03-08'
            forecast_start_date = '2020-03-09'
            forecast_end_date = '2020-12-31'

            # Filter down the to the necessary data
            train_df = zone_data[(zone_data['ds'] >= train_start_date) & (zone_data['ds'] <= train_end_date)]
            train_df = train_df.sort_values(by='ds')

            # Forecast the future data
            forecast_df = zone_data[(zone_data['ds'] >= forecast_start_date) & (zone_data['ds'] <= forecast_end_date)]
            forecast_df = forecast_df.sort_values(by='ds')

            # Create a prophet model instance
            model = Prophet()
            
            if len(train_df) > 1:
                # Suppress model output
                with suppress_stdout_stderr():
                    model.fit(train_df)

                # Create an empty dataframe for the forecast of the future
                future = pd.DataFrame(forecast_df['ds']).reset_index(drop=True)


                # Use the trained model to create the forecast results
                if len(future) > 1:
                    forecast = model.predict(future)
                    forecast['yhat'] = forecast['yhat'].round(0).astype(int)

                    observed = zone_data[['ds', 'y']]
                    predicted = forecast[['ds', 'yhat']]

                    result_df = observed.merge(predicted, on='ds')
                    result_df['LocationID'] = zone
                    result_df.rename(columns={'ds': 'week', 'y': 'trip_count', 'yhat': 'forecast_qty'}, inplace=True)
                    result_df = result_df.sort_values(by='week')
                    results_df = pd.concat([results_df, result_df], ignore_index=True)
                
        # Merge the final results into a the existing agg_df class variable and assign to forecasted data 
        results_df = results_df.rename(columns={"LocationID": "zone_id"})
        agg_df = agg_df.rename(columns={"LocationID": "zone_id"})
        final_df = agg_df.merge(results_df, on=['week', 'zone_id'], how='left', suffixes=('_original', '_forecast'))
        self.forecasted_data = final_df
        
    
    
    def get_mape_for_forecast_data(self):
        """Use the forecast data to get the MAPE for each part of the forecast.
        Also fits the model very similar to March 15 method, but generates
        the MAPE to figure out how well the model is fit to the training
        data.
        """
        # Make agg_df spark dataframe into pandas dataframe for processing reasons
        agg_df = self.agg_data.toPandas()
        # Initialize variables to be used
        fbp_wmape = list()
        zones_ = list()

        agg_df['week'] = pd.to_datetime(agg_df['week'])
        unique_zones = agg_df.LocationID.drop_duplicates().to_list()
        counter = 0

        for zone in unique_zones:

            # Some logging such that the user can verify progress is occurring.
            counter += 1
            if counter % 10 == 0:
                print(counter/len(unique_zones))
            zone_data = agg_df[agg_df['LocationID'] == zone]
            zone_data.rename(columns={'week': 'ds', 'trip_count': 'y'}, inplace=True)
            
            # Check for and handle NaN values in the target variable
            zone_data = zone_data.dropna(subset=['y'])

            # Check for and handle missing values in the 'ds' column
            zone_data = zone_data.dropna(subset=['ds'])

            # Remove duplicates in the 'ds' column
            zone_data = zone_data.drop_duplicates(subset=['ds'])

            end_date = '2019-11-30'

            train_df = zone_data[zone_data['ds'] <= end_date]
            test_df = zone_data[(zone_data['ds'] >= end_date) & (zone_data['ds'] <= '2020-01-01')]

            model = Prophet()

            if len(train_df) > 1:
                with suppress_stdout_stderr():
                            model.fit(train_df)

                future = model.make_future_dataframe(periods=5, freq='W', include_history=True)
                future = future[future['ds'].dt.month == 12]

                if len(future) > 1:
                    forecast = model.predict(future)

                    observed = pd.DataFrame(test_df[test_df['ds'].dt.month == 12]['y']).reset_index(drop=True)
                    predicted = pd.DataFrame(forecast[forecast['ds'].dt.month == 12]['yhat']).reset_index(drop=True)
                    zones_.append(predicted)
                    wmape_ = self.wmape(observed, predicted)
                    fbp_wmape.append(wmape_)
        lst = [wmape for wmape in fbp_wmape if wmape is not None]

        # Store the MAPE into the class variable
        self.mape = pd.DataFrame(lst).describe()[0]
        
    def create_network_data(self, taxi_lookup):
        processed_df = self.anomalous_data.select("vendor_id", "pickup_datetime", "dropoff_datetime", "trip_distance", "payment_type", "tip_amount","total_amount", "pickup_location_id", "dropoff_location_id", "trip_duration", "is_anomalous", "is_anomalous_distance", "is_anomalous_duration")
        processed_df = processed_df.withColumn("year", year("pickup_datetime"))
        processed_df = processed_df.filter((col("year") == 2019) | (col("year") == 2020))
        processed_df.groupBy("year").agg(count("*").alias("row_count")).show()
        processed_df = processed_df.withColumnRenamed("pickup_location_id", "LocationID")
        joined_df = processed_df.join(taxi_lookup, taxi_lookup.zone_id == processed_df.LocationID, 'left')
        filtered_df = joined_df.filter(col("pickup_datetime") >= "2019-01-01")
        sorted_df = filtered_df.orderBy(col("pickup_datetime"))
        first_sunday = "2019-01-06"
        filtered_df = sorted_df.filter(col("pickup_datetime") >= first_sunday)
        agg_df = filtered_df.withColumn(
           "days_since_first_sunday",
           datediff(col("pickup_datetime"), lit(first_sunday))
        )

        agg_df = agg_df.withColumn(
           "weeks_since_first_sunday",
           floor((col("days_since_first_sunday") + 1) / 7).cast(IntegerType())
        )

        agg_df = agg_df.withColumn(
            "week_start_date",
            date_add(lit(first_sunday), (col("weeks_since_first_sunday") * 7))
        ).withColumn(
            "week_end_date",
            date_add(lit(first_sunday), (col("weeks_since_first_sunday") + 1) * 7 - 1)
        )
        agg_df = agg_df.withColumn(
            "week",
            date_format(date_trunc("week", col("week_start_date")), "yyyy-MM-dd HH:mm:ss")
        )
        agg_df_false = agg_df.where(agg_df.is_anomalous == False)
        nyc_aggregated_df = agg_df_false.groupBy("week_start_date", agg_df["LocationID"].alias("origin"), agg_df["dropoff_location_id"].alias("destination")).agg(sum("total_amount").alias("sum_total_amount"), sum("tip_amount").alias("sum_tip_amount"), sum("trip_duration").alias("sum_trip_duration"), sum("trip_distance").alias("sum_trip_distance"), count("*").alias("trip_count"))
        nyc_aggregated_df = nyc_aggregated_df.join(taxi_lookup, nyc_aggregated_df.origin == taxi_lookup.zone_id, 'left')
        nyc_aggregated_df = nyc_aggregated_df.withColumnRenamed("zone_id","zone_name_origin")
        nyc_aggregated_df = nyc_aggregated_df.withColumnRenamed("Borough","borough_origin")
        nyc_aggregated_df = nyc_aggregated_df.withColumnRenamed("service_zone","service_zone_origin")

        # join aggregated data set again with taxi_lookup based on destination/location ID this time
        nyc_aggregated_df = nyc_aggregated_df.join(taxi_lookup, nyc_aggregated_df.destination == taxi_lookup.zone_id, 'left')
        nyc_aggregated_df = nyc_aggregated_df.withColumnRenamed("zone_id","zone_name_destination")
        nyc_aggregated_df = nyc_aggregated_df.withColumnRenamed("Borough","borough_destination")
        nyc_aggregated_df = nyc_aggregated_df.withColumnRenamed("service_zone","service_zone_destination")
        self.nyc_network_df = nyc_aggregated_df
        
    def combined_forecasted_data(self, taxi_lookup):
        # make the starting data frame for map table w/ necessary columns, join network data with forecast data, rename cols
        forecasted_data_local = self.forecasted_data
        forecasted_data_local['forecast_qty'] = forecasted_data_local['forecast_qty'].fillna(0).astype(int)
        forecasted_data_local['trip_count_forecast'] = forecasted_data_local['trip_count_forecast'].fillna(0).astype(int)
        forecasted_data_local['zone_id'] = forecasted_data_local['zone_id'].fillna(0).astype(int)
        forecasted_data_local = self.spark.createDataFrame(self.forecasted_data)
        forecasted_data_local = forecasted_data_local.select("week", "zone_id","trip_count_original","forecast_qty")
        self.nyc_network_df = self.nyc_network_df.join(forecasted_data_local, (self.nyc_network_df.week_start_date == forecasted_data_local.week) & (self.nyc_network_df.origin == forecasted_data_local.zone_id.cast(StringType())),"left")
        self.nyc_network_df = self.nyc_network_df.withColumnRenamed("forecast_qty", "forecasted_zone_origin_weight")
        self.nyc_network_df = self.nyc_network_df.withColumnRenamed("trip_count_original", "actual_zone_origin_weight")
        self.nyc_network_df = self.nyc_network_df.withColumnRenamed("trip_count", "weight")
        
        # removing entries that have no origin or destination (won't work well with maps if you don't have two points to generate the path/line)
        self.nyc_network_df = self.nyc_network_df.filter(self.nyc_network_df.zone_name_origin.isNotNull())
        self.nyc_network_df = self.nyc_network_df.filter(self.nyc_network_df.zone_name_destination.isNotNull())

        # creating path col by concating origin zone number with destination zone number
        self.nyc_network_df = self.nyc_network_df.withColumn("path", concat_ws("_",self.nyc_network_df["origin"].cast(StringType()),self.nyc_network_df["destination"].cast(StringType())))
        self.nyc_network_df = self.nyc_network_df.withColumn("year", year("week_start_date"))
        
        # creating origin and destination datasets and then will concat them (union wise) to make our dataset for map viz

        data_all_o = self.nyc_network_df.select("week_start_date","origin","path","weight","actual_zone_origin_weight","zone_name_origin","borough_origin","year","sum_tip_amount","sum_total_amount","sum_trip_duration","sum_trip_distance","forecasted_zone_origin_weight")
        data_all_o = data_all_o.withColumnRenamed("origin","origin_destination")
        data_all_o = data_all_o.withColumnRenamed("zone_name_origin","zone_name")
        data_all_o = data_all_o.withColumnRenamed("borough_origin","borough")
        data_all_o = data_all_o.withColumnRenamed("zone_geom_origin","zone_geom")
        data_all_o = data_all_o.withColumn("node_name",format_string("origin"))

        data_all_d = self.nyc_network_df.select("week_start_date","destination","path","weight","actual_zone_origin_weight","zone_name_destination","borough_destination","year","sum_tip_amount","sum_total_amount","sum_trip_duration","sum_trip_distance")
        data_all_d = data_all_d.withColumnRenamed("destination","origin_destination")
        data_all_d = data_all_d.withColumnRenamed("zone_name_destination","zone_name")
        data_all_d = data_all_d.withColumnRenamed("borough_destination","borough")
        data_all_d = data_all_d.withColumnRenamed("zone_geom_destination","zone_geom")
        data_all_d = data_all_d.withColumn("node_name",format_string("destination"))
        
        union_edge_all = data_all_o.unionByName(data_all_d, allowMissingColumns=True)
        
        # create 2019 and 2020 datasets 
        union_edge_2019 = union_edge_all.where(col("year") == 2019)
        union_edge_2020 = union_edge_all.where(col("year") == 2020)

        # rename fields in 2020 data set so we can merge the datasets later on without issue
        union_2020_delta = union_edge_2020.select("week_start_date","path","weight","forecasted_zone_origin_weight","actual_zone_origin_weight","node_name")
        union_2020_delta = union_2020_delta.withColumnRenamed("week_start_date","week_start_date_2020")
        union_2020_delta = union_2020_delta.withColumnRenamed("path","path_2020")
        union_2020_delta = union_2020_delta.withColumnRenamed("forecasted_zone_origin_weight","forecasted_zone_origin_weight_2020")
        union_2020_delta = union_2020_delta.withColumnRenamed("actual_zone_origin_weight","actual_zone_origin_weight_2020")
        union_2020_delta = union_2020_delta.withColumnRenamed("node_name","node_name_2020")
        union_2020_delta = union_2020_delta.withColumnRenamed("weight","weight_2020")
        union_2019_delta = union_edge_2019
        
        year_2019_week_list = union_2019_delta.select("week_start_date").distinct()
        year_2020_week_list = union_2020_delta.select("week_start_date_2020").distinct()
        year_2020_week_list = year_2020_week_list.where(year_2020_week_list.week_start_date_2020 != "2019-12-29")
        
        w = Window().orderBy("week_start_date")
        year_2019_week_list_key = year_2019_week_list.withColumn("week_number", row_number().over(w))
        w = Window().orderBy("week_start_date_2020")
        year_2020_week_list_key = year_2020_week_list.withColumn("week_number_2020", row_number().over(w))

        year_2019_week_list_key = year_2019_week_list_key.withColumnRenamed("week_start_date","week_key_1")
        year_2020_week_list_key = year_2020_week_list_key.withColumnRenamed("week_start_date_2020","week_key_2")
        
        union_2019_delta = union_2019_delta.join(year_2019_week_list_key, union_2019_delta.week_start_date == year_2019_week_list_key.week_key_1, "left")
        union_2020_delta = union_2020_delta.join(year_2020_week_list_key, union_2020_delta.week_start_date_2020 == year_2020_week_list_key.week_key_2, "left")

        union_edge_data_delta = union_2019_delta.join(union_2020_delta, (union_2019_delta.week_number == union_2020_delta.week_number_2020) & (union_2019_delta.path == union_2020_delta.path_2020) & (union_2019_delta.node_name == union_2020_delta.node_name_2020) , "left")
        union_edge_data_delta = union_edge_data_delta.withColumn("delta_actual_zone_origin_weight", (union_edge_data_delta.actual_zone_origin_weight - union_edge_data_delta.actual_zone_origin_weight_2020)) # the original origin zone weight in 2019 - 2020 (this came from the forecast table)
        union_edge_data_delta = union_edge_data_delta.withColumn("trip_weight_delta", (union_edge_data_delta.weight - union_edge_data_delta.weight_2020)) # trip count in 2019 - trip count in 2020
        union_edge_data_delta = union_edge_data_delta.select("origin_destination","path","weight","weight_2020","trip_weight_delta","actual_zone_origin_weight","actual_zone_origin_weight_2020","delta_actual_zone_origin_weight","zone_name","borough","node_name","week_number","week_start_date","week_start_date_2020")
        
        self.map_weekly_data_with_forecast = union_edge_all
        self.map_data_delta_weekly = union_edge_data_delta
    
    def write_to_BQ(self, BQ_suffix):
        self.upload_data_to_bigquery(self.project, self.bucket, f"final_aggregated_data{BQ_suffix}.csv", self.agg_data)
        # Upload pandas dataframe to GCS
        bucket_name = self.bucket
        file_name = f"final_forecasted_data{BQ_suffix}.csv"
        self.forecasted_data.to_csv(file_name, index=False)
        client = storage.Client()
        bucket = client.get_bucket(bucket_name)
        blob = bucket.blob(file_name)
        blob.upload_from_filename(file_name)
        self.upload_data_to_bigquery(self.project, self.bucket, f"final_prices{BQ_suffix}.csv", self.final_prices)
        self.upload_data_to_bigquery(self.project, self.bucket, f"map_weekly_data_with_forecast{BQ_suffix}.csv", self.map_weekly_data_with_forecast)
        self.upload_data_to_bigquery(self.project, self.bucket, f"map_data_delta_weekly{BQ_suffix}.csv", self.map_data_delta_weekly)

    def update(self,filename1,filename2, taxi_lookup,write_to_BQ=False, BQ_suffix=""):
        """Wrapper function or running the core methods of the class.
        Reads the data in, cleans it, finds anomalies, aggregates the data,
        trains a Prophet model and finds the quality of the fit, then uses
        the trained model to forecast model for 2020 once the COVID-19
        pandemic began.

        Args:
            filename1 (str): Location of the first file on BigQuery (2019)
            filename2 (str): Location of the second file on BigQuery (2020)
            taxi_lookup (pyspark.sql.DataFrame): PySpark DataFrame containing the lookup data

        Returns:
            pyspark.sql.DataFrame: The forecasted data
            float: The MAPE of the model used to forecast the data
        """
        print("Reading Data")
        self.read_data(filename1, filename2) #gives self.data
        print("Cleaning Data")
        self.clean_data()
        print("Filtering anomalous data")
        self.anomaly_profile_flag(K=3) #changes self.data
        print("Joining with taxi lookup data")
        self.join_data_and_lookup(taxi_lookup) #changes self.data
        print("Finding mean prices")
        self.find_mean_prices() # changes self.final_prices
        print("Aggregating to weekly level")
        self.aggregate_data_weekly() #gives self.agg_data
        print("Beginning forecast")
        self.get_mape_for_forecast_data()#get mape
        self.forecast_data_post_march_fifteen()
        print("Creating network data")
        self.create_network_data(taxi_lookup)
        print("Designing network differential")
        self.combined_forecasted_data(taxi_lookup)
        if write_to_BQ == True:
            print("Writing data")
            self.write_to_BQ(BQ_suffix) # Write the data to the project
        return self.forecasted_data, self.mape

In [None]:
# Some code to use the class defined above
if __name__ == "__main__":
    # Set the options for this run (users change)
    google_project_name = 'test-project-406500'
    google_storage_bucket = 'example_bucket_143234'
    bigquery_suffix = "_test"
    # Create class object
    nyc_processor = NYCabDataProcessor(google_project_name,google_storage_bucket)

    # Read in taxi lookup data
    bigquery_path = 'bigquery-public-data.new_york_taxi_trips.taxi_zone_geom'
    taxi_lookup = nyc_processor.read_bigquery_table(bigquery_path)
    
    # Location of the bigquery data
    filename1 = 'bigquery-public-data.new_york_taxi_trips.tlc_yellow_trips_2019'
    filename2 = 'bigquery-public-data.new_york_taxi_trips.tlc_yellow_trips_2020'
    # Run all of the data and store the results
    #final_df,mape = nyc_processor.update(filename1,filename2, taxi_lookup)
    final_df,mape = nyc_processor.update(filename1,filename2, taxi_lookup, write_to_BQ=True, BQ_suffix=bigquery_suffix)
    # Store the resulting data
    print('MAPE : ',mape)
