In [None]:
spark

In [None]:
!pip install holidays

# Set Up

In [None]:
import pandas as pd
from google.cloud import storage
from io import BytesIO
from datetime import datetime, date
import matplotlib.pyplot as plt
import holidays

import pyspark
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, isnan, when, count, udf, to_date, year, month, date_format, size, split, dayofweek
from pyspark.sql import functions as F
from pyspark.ml.feature import StringIndexer, OneHotEncoder, VectorAssembler, StandardScaler, MinMaxScaler
from pyspark.ml.regression import LinearRegression
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.mllib.evaluation import MulticlassMetrics
from pyspark.ml import Pipeline




In [None]:
bucket_name = 'my-bigdataproject-jg'
gs_path  = f'gs://{bucket_name}/'
cleaned_folder = 'cleaned/'
destination_folder = 'code_and_models/'

storage_client = storage.Client() 
bucket = storage_client.get_bucket(bucket_name)

# Weather DF

In [None]:
weather_files = gs_path + cleaned_folder + 'weather_data_*.parquet'
weather_df = spark.read.parquet(weather_files)

"""
Drop columns:
tempmax
tempmin
feelslikemax
feelslikemin
preciptype
cloudcover

visibility
humidity

Keep columns:
datetime
borough

temp
feelslike
precip
snow
snowdepth
windspeed
uvindex
conditions
"""

weather_df = weather_df.drop('tempmax', 'tempmin', 'feelslikemax', 'feelslikemin', 'preciptype', 'cloudcover', 'visibility', 'humidity')

weather_df = weather_df.withColumnRenamed('borough', 'weather_borough')
weather_df = weather_df.withColumnRenamed('snow', 'snow_precip')

weather_df.printSchema()

# Taxi Trips DF

In [None]:
taxi_files = gs_path + cleaned_folder + "taxi_data/*.parquet"
taxi_df = spark.read.parquet(taxi_files)
taxi_df.printSchema()

In [None]:
# Show the number of records for each unique RatecodeID
taxi_df.groupBy('RatecodeID').count().show()

# Show the number of records for each unique passenger_count
taxi_df.groupBy('passenger_count').count().show()


In [None]:
# create a new column pickup_date, which the date using the pickup_datetime column
taxi_df = taxi_df.withColumn('pickup_date', to_date(col('pickup_datetime')))

# create a new column pickup_hour, which the hour using the pickup_datetime column
taxi_df = taxi_df.withColumn('time_of_day', 
                             when((F.hour(F.col('pickup_datetime')) >= 5) & (F.hour(F.col('pickup_datetime')) < 12), 'morning')\
                             .when((F.hour(F.col('pickup_datetime')) >= 12) & (F.hour(F.col('pickup_datetime')) < 21), 'afternoon')\
                             .otherwise('night'))



# Remove the records with RatecodeID = 99 
taxi_df = taxi_df.filter(col('RatecodeID') != 99)

# Remove the records passenger_count = 0
taxi_df = taxi_df.filter(col('passenger_count') != 0)

# Remove the records with fare_amount < 3.70 (minimum fare amount)
taxi_df = taxi_df.filter(col('fare_amount') >= 3.70)

# Remove the records with total_amount <= 4.20 (minimum total amount)
taxi_df = taxi_df.filter(col('total_amount') > 4.20)

# Remove the records with trip_distance < 1/5 mile
taxi_df = taxi_df.filter(col('trip_distance') >= 0.2)


taxi_df = taxi_df.drop('dropoff_datetime', 'RatecodeID', 'payment_type', 'total_amount', 'pickup_datetime', 'tip_amount')


# Taxi Zone DF

In [None]:
taxi_zone_file = gs_path + cleaned_folder + 'taxi_zones_data.parquet'
taxi_zone_df = spark.read.parquet(taxi_zone_file)
taxi_zone_df = taxi_zone_df.drop('zone')
taxi_zone_df.printSchema()


In [None]:
# Show the unique boroughs in the taxi_zone_df DataFrame
taxi_zone_df.select('borough').distinct().show()

## Taxi data frames combined

In [None]:
# PU Location join
taxi_df = taxi_df.join(taxi_zone_df, taxi_df.PULocationID == taxi_zone_df.LocationID, how='left')


taxi_df = taxi_df.withColumnRenamed('Borough', 'PUBorough')
taxi_df = taxi_df.drop('LocationID')

# DO Location join
taxi_df = taxi_df.join(taxi_zone_df, taxi_df.DOLocationID == taxi_zone_df.LocationID, how='left')

taxi_df = taxi_df.withColumnRenamed('Borough', 'DOBorough')
taxi_df = taxi_df.drop('LocationID')

# Drop the PULocationID and DOLocationID columns
taxi_df = taxi_df.drop('PULocationID', 'DOLocationID')

# Drop the records where the PUBorough or DOBorough is 'EWR'
taxi_df = taxi_df.filter((taxi_df.PUBorough != 'EWR'))
taxi_df = taxi_df.filter((taxi_df.DOBorough != 'EWR'))

taxi_df.show(15)

# Combined Data Frame

In [None]:
combined_df = taxi_df.join(weather_df, [taxi_df.pickup_date == weather_df.datetime, taxi_df.PUBorough == weather_df.weather_borough])

combined_df = combined_df.drop('datetime')
combined_df = combined_df.drop('weather_borough')

combined_df.printSchema()


In [None]:
# Summarize the columns: tip_percentage, trip_distance, fare_amount, passenger_count
combined_df.select('tip_percentage', 'trip_distance', 'fare_amount', 'passenger_count').summary().show()

# Datetime features

In [None]:
# datetime

# month
combined_df = combined_df.withColumn('month', month(col('pickup_date')))
# dayofweek
combined_df = combined_df.withColumn('dayofweek', dayofweek(col('pickup_date')))

# weekend
combined_df = combined_df.withColumn('weekend', when(col('dayofweek') == 1, 1.0).when(col('dayofweek') == 7, 1.0).otherwise(0))

# holiday
combined_df = combined_df.withColumn('pickup_date', to_date(col('pickup_date')))

# Get the min and max date in the datetime column
min_date = combined_df.agg({"pickup_date": "min"}).collect()[0][0]
max_date = combined_df.agg({"pickup_date": "max"}).collect()[0][0]


# Get the holidays observed in New York
us_holidays = holidays.UnitedStates(years=[min_date.year, max_date.year], observed=True, subdiv='NY')

#print(us_holidays)

# Keep only the dates of the holidays
us_holidays = list(us_holidays.keys())

# Create a new column holiday and set it to 1 if the date is a holiday, 0 otherwise
combined_df = combined_df.withColumn('holiday', when(col('pickup_date').isin(us_holidays), 1).otherwise(0))

In [None]:
"""
Vector Assembler Directly
temp
feelslike
precip
snow
snowdepth
windspeed
uvindex
conditions
"""

# UDF Condition Features

In [None]:
# conditions

# Select all the distinct options for conditions and save them in a list
conditions = combined_df.select('conditions').distinct().rdd.flatMap(lambda x: x).collect()
# Split the string into a list
conditions = [x.split(', ') for x in conditions]

# Flatten the list
conditions = [item for sublist in conditions for item in sublist]
# Keep the unique values only
conditions = list(set(conditions))

print(conditions)

# Create a new column for each condition
for condition in conditions:
    combined_df = combined_df.withColumn(condition, when(col('conditions').contains(condition), 1).otherwise(0))


# Tip Label

In [None]:
# Use Tip Percentage to create a new column tip_class
# If the tip percentage is greater than 10%, the tip_class is 1, otherwise 0
combined_df = combined_df.withColumn('tip_class', when(col('tip_percentage') > 10, 1).otherwise(0))


# Pipeline

## String Indexer

In [None]:
# String Index columns
indexer_input = ['PUBorough', 'DOBorough', 'time_of_day']
indexer_output = [x + '_index' for x in indexer_input]
indexer = StringIndexer(inputCols=indexer_input, outputCols=indexer_output)


## One Hot Encoder

In [None]:
encoder_output = [x + '_encoded' for x in indexer_input]
encoder = OneHotEncoder(inputCols=indexer_output, outputCols=encoder_output)

## Vector Assembler

In [None]:
"""
temp
feelslike
precip
snow
snowdepth
windspeed
uvindex
conditions
 'month', 'dayofweek', 'weekend', 'holiday', 'trip_distance', 'passenger_count', 'fare_amount'
"""

encode_directly = [
    'temp',
    'feelslike',
    'precip',
    'snow_precip',
    'snowdepth',
    'windspeed',
    'uvindex',
    'month',
    'dayofweek',
    'weekend',
    'holiday',
    'trip_distance',
    'passenger_count',
    'fare_amount'
]


input_cols =  conditions + encoder_output + encode_directly
print(input_cols)

In [None]:
assembler = VectorAssembler(inputCols=input_cols, outputCol='features')

# Save the pipeline with features

In [None]:
print('Saving the transformed data...')
# Create a new frame with the transformed data
pipeline = Pipeline(stages=[indexer, encoder, assembler])
model = pipeline.fit(combined_df)
transformed_df = model.transform(combined_df)

# Save the transformed data
transformed_df.write.parquet(gs_path + destination_folder + 'features', mode='overwrite')
print('Transformed data saved!')

# Train / Test

In [None]:
train_df, test_df = combined_df.randomSplit([0.7, 0.3], seed=42)

# Pipeline  

In [None]:
lr = LogisticRegression(featuresCol='features', labelCol='tip_class', maxIter=10, regParam=0.1, elasticNetParam=0.8)

In [None]:
lr_pipeline = Pipeline(stages=[
    indexer,
    encoder,
    assembler,
    lr
])

# Cross validating

In [None]:
grid = ParamGridBuilder()
grid = grid.addGrid(lr.regParam, [0.0, 0.2, 0.4, 0.6, 0.8, 1.0])
grid = grid.addGrid(lr.elasticNetParam, [0, 0.5, 1])
grid = grid.build()

print('Number of models to be tested: ', len(grid))

# Binary classification evaluator with area under ROC as the metric
evaluator = BinaryClassificationEvaluator(labelCol='tip_class', metricName='areaUnderROC')


cv = CrossValidator(
    estimator=lr_pipeline,
    estimatorParamMaps=grid,
    evaluator=evaluator,
    numFolds=3
)

all_models = cv.fit(train_df)



In [None]:
# predictions
predictions = all_models.transform(test_df)

# Calculate the AUC
auc = evaluator.evaluate(predictions)

print(f"AUC: {auc}")

# Create a confusion matrix
predictions.groupby('tip_class', 'prediction').count().show()
cm = predictions.groupby('tip_class').pivot('prediction').count().fillna(0).collect()

def calculate_metrics(cm):
    tn = cm[0][1]                # True Negative
    fp = cm[0][2]                # False Positive
    fn = cm[1][1]                # False Negative
    tp = cm[1][2]                # True Positive
    accuracy = (tp + tn) / (tp + fp + tn + fn)
    precision = tp / (tp + fp)
    recall = tp / (tp + fn)
    f1 = 2 * (precision * recall) / (precision + recall)
    return accuracy, precision, recall, f1

accuracy, precision, recall, f1 = calculate_metrics(cm)
print(f'Accuracy: {accuracy}')
print(f'Precision: {precision}')
print(f'Recall: {recall}')
print(f'F1 Score: {f1}')

In [None]:
# Best model 
best_model = all_models.bestModel

print(f"Best Model Stages: \n{best_model.stages}")

# Parameters of the best model
best_model.stages[-1].extractParamMap()

# Create a ROC curve
trainingSummary = best_model.stages[-1].summary

plt.figure(figsize=(5, 5))
plt.plot(trainingSummary.roc.select('FPR').collect(),
         trainingSummary.roc.select('TPR').collect())
plt.xlabel('FPR')
plt.ylabel('TPR')
plt.title('ROC Curve')
plt.show()


In [None]:
coefficients = best_model.stages[-1].coefficients
print("bestModel coefficients", coefficients)

# Save the Model

In [None]:
# Save the model
print('Saving the model')
model_path = gs_path + destination_folder + 'model'
best_model.write().overwrite().save(model_path)
print('Model saved')