# Importing  necessary liberaries

In [None]:
# !pip install pyspark

In [None]:
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
import warnings
warnings.filterwarnings("ignore")
from pyspark.sql import SparkSession
from pyspark.sql.functions import col,year, month, dayofmonth, date_format,hour
from pyspark.sql.types import *
from google.colab import drive
import zipfile
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.regression import LinearRegression,RandomForestRegressor
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml import Pipeline
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
from pyspark.ml.evaluation import RegressionEvaluator

In [None]:
# creating the spark session
spark=SparkSession.builder.appName("NYC Taxi Fare Predction").getOrCreate()
spark

In [None]:
# spark.conf.set("spark.sql.legacy.timeParserPolicy", "LEGACY")

# Data Loading and Inspection

In [None]:
drive.mount('/content/drive')

Mounted at /content/drive


In [None]:
# variables
zip_path = "/content/drive/MyDrive/NYC_taxi_data/new-york-city-taxi-fare-prediction.zip"
extract_path = "/content/drive/MyDrive/NYC_taxi_data/"
train_file_path = "/content/drive/MyDrive/NYC_taxi_data/train.csv"
test_file_path = "/content/drive/MyDrive/NYC_taxi_data/test.csv"
number_of_rows = 200000
min_latitude = -180
max_latitude = 180
min_longitude = -90
max_longitude = 90


In [None]:
# function to extract zip file
def unzip_file(zip_path, extract_path):
    with zipfile.ZipFile(zip_path, 'r') as zip_ref:
        zip_ref.extractall(extract_path)
        print("File unzipped successfully")

# unzip_file(zip_path, extract_path)

In [None]:
!ls /content/drive/MyDrive/NYC_taxi_data/

GCP-Coupons-Instructions.rtf		sample_submission.csv  train.csv
new-york-city-taxi-fare-prediction.zip	test.csv


In [None]:
# read csv file from google drive and show 10 rows from the dataframe
df = spark.read.csv(train_file_path, header=True, inferSchema=True).limit(number_of_rows)
df.show(10)

+-------------------+-----------+-------------------+----------------+---------------+-----------------+----------------+---------------+
|                key|fare_amount|    pickup_datetime|pickup_longitude|pickup_latitude|dropoff_longitude|dropoff_latitude|passenger_count|
+-------------------+-----------+-------------------+----------------+---------------+-----------------+----------------+---------------+
|2009-06-15 17:26:21|        4.5|2009-06-15 17:26:21|      -73.844311|      40.721319|        -73.84161|       40.712278|              1|
|2010-01-05 16:52:16|       16.9|2010-01-05 16:52:16|      -74.016048|      40.711303|       -73.979268|       40.782004|              1|
|2011-08-18 00:35:00|        5.7|2011-08-18 00:35:00|      -73.982738|       40.76127|       -73.991242|       40.750562|              2|
|2012-04-21 04:30:42|        7.7|2012-04-21 04:30:42|       -73.98713|      40.733143|       -73.991567|       40.758092|              1|
|2010-03-09 07:51:00|        5.3|2

In [None]:
# check for the columns, data types and if there are any null values
df.printSchema()

root
 |-- key: timestamp (nullable = true)
 |-- fare_amount: double (nullable = true)
 |-- pickup_datetime: timestamp (nullable = true)
 |-- pickup_longitude: double (nullable = true)
 |-- pickup_latitude: double (nullable = true)
 |-- dropoff_longitude: double (nullable = true)
 |-- dropoff_latitude: double (nullable = true)
 |-- passenger_count: integer (nullable = true)



In [None]:
# check for the number of partitons
num_partitions = df.rdd.getNumPartitions()
print("Number of partitions:", num_partitions)

Number of partitions: 1


In [None]:
# count number of rows
row_count = df.count()
print("Number of rows:", row_count)

Number of rows: 200000


In [None]:
# checking for the statistical summery
summary_df =df.describe()
summary_df.show()

+-------+------------------+------------------+------------------+------------------+-----------------+------------------+
|summary|       fare_amount|  pickup_longitude|   pickup_latitude| dropoff_longitude| dropoff_latitude|   passenger_count|
+-------+------------------+------------------+------------------+------------------+-----------------+------------------+
|  count|            200000|            200000|            200000|            199999|           199999|            200000|
|   mean|11.342876950000601|-72.50612144955218|39.922325777255104|-72.51867346221218|39.92557945791384|          1.682445|
| stddev| 9.837854787330032|11.608096802996164|10.048946659938037|10.724225862534897|6.751120031010652|1.3067296429203594|
|    min|             -44.9|           -736.55|      -3116.285383|       -1251.19589|      -1189.61544|                 0|
|    max|             500.0|        2140.60116|       1703.092772|         40.851027|       404.616667|                 6|
+-------+-------

**The minimum** fare is in negative, which is clearly erroneous (fares should not be negative).
**The maximum** fare is $93,963.36, which is extremely high and likely an outlier.

# Data Cleaning

In [None]:
# drop the rows with the null columns
df_new=df.na.drop()

In [None]:
# drop duplicates
df_new=df_new.dropDuplicates()

In [None]:
# filtering out the erroneous data
df_new = df_new.filter(col('fare_amount') >= 0) \
               .filter((col('passenger_count') > 0) & (col('passenger_count') < 7)) \
               .filter((col('pickup_longitude') >= min_latitude) & (col('pickup_longitude') <= max_latitude)) \
               .filter((col('pickup_latitude') >= min_longitude) & (col('pickup_latitude') <= max_longitude)) \
               .filter((col('dropoff_longitude') >= min_latitude) & (col('dropoff_longitude') <= max_latitude)) \
               .filter((col('dropoff_latitude') >= min_longitude) & (col('dropoff_latitude') <= max_longitude))

# alternatively, df_new = df_new.where(col('fare_amount') >= 0)

In [None]:
# removeing outlier
q1 = df_new.approxQuantile("fare_amount", [0.25], 0.01)[0]
q3 = df_new.approxQuantile("fare_amount", [0.75], 0.01)[0]
iqr = q3 - q1

# defining upper and the lower bound
lower_bound = q1 - 1.5 * iqr
upper_bound = q3 + 1.5 * iqr

In [None]:
df_new = df_new.filter(col('fare_amount') >= lower_bound) \
               .filter(col('fare_amount') <= upper_bound )

In [None]:
# extracting new columns from pickup_datetime
df_new = df_new.withColumn("year", year(col("pickup_datetime"))) \
       .withColumn("month", month(col("pickup_datetime"))) \
       .withColumn("day", dayofmonth(col("pickup_datetime"))) \
       .withColumn("day_of_week", date_format(col("pickup_datetime"), 'u').cast(IntegerType())) \
       .withColumn("hour_of_day", hour(col("pickup_datetime"))) \
       .drop("key")

df_new.printSchema()

root
 |-- fare_amount: double (nullable = true)
 |-- pickup_datetime: timestamp (nullable = true)
 |-- pickup_longitude: double (nullable = true)
 |-- pickup_latitude: double (nullable = true)
 |-- dropoff_longitude: double (nullable = true)
 |-- dropoff_latitude: double (nullable = true)
 |-- passenger_count: integer (nullable = true)
 |-- year: integer (nullable = true)
 |-- month: integer (nullable = true)
 |-- day: integer (nullable = true)
 |-- day_of_week: integer (nullable = true)
 |-- hour_of_day: integer (nullable = true)



In [None]:
df_new.show(10)

SparkUpgradeException: [INCONSISTENT_BEHAVIOR_CROSS_VERSION.DATETIME_PATTERN_RECOGNITION] You may get a different result due to the upgrading to Spark >= 3.0:
Fail to recognize 'u' pattern in the DateTimeFormatter. 1) You can set "spark.sql.legacy.timeParserPolicy" to "LEGACY" to restore the behavior before Spark 3.0. 2) You can form a valid datetime pattern with the guide from 'https://spark.apache.org/docs/latest/sql-ref-datetime-pattern.html'.

In [None]:
# create tempurarty view to execute sql queries
df_new.createOrReplaceTempView("NYC_taxi_data")

In [None]:
# Mean, median, and standard deviation for numerical columns
summery_stats = """
SELECT
    AVG(fare_amount) AS avg_fare_amount,
    PERCENTILE_APPROX(fare_amount, 0.5) AS median_fare_amount,
    STDDEV(fare_amount) AS stddev_fare_amount,
    AVG(pickup_longitude) AS avg_pickup_longitude,
    AVG(pickup_latitude) AS avg_pickup_latitude,
    AVG(dropoff_longitude) AS avg_dropoff_longitude,
    AVG(dropoff_latitude) AS avg_dropoff_latitude,
    AVG(passenger_count) AS avg_passenger_count
FROM NYC_taxi_data
"""
summary_stats = spark.sql(summery_stats)
summary_stats.show()

In [None]:
# which time of the day is the busiest
busiest_hour = """
SELECT
    hour_of_day,
    COUNT(*) AS trip_count
FROM NYC_taxi_data
GROUP BY hour_of_day
ORDER BY trip_count DESC
"""
busiest_hour = spark.sql(busiest_hour)
busiest_hour.show()

In [None]:
# which day of the week is the busiest day
busiest_day_of_week = """
SELECT
    day_of_week,
    COUNT(*) AS trip_count
FROM NYC_taxi_data
GROUP BY day_of_week
ORDER BY trip_count DESC
"""
busiest_day_of_week = spark.sql(busiest_day_of_week)
busiest_day_of_week.show()

In [None]:
# daily trend: average fare amount and trip count per day
daily_trends = spark.sql("""
    SELECT year, month, day,
           AVG(fare_amount) AS average_fare,
           COUNT(*) AS trip_count
    FROM taxi_data
    GROUP BY year, month, day
    ORDER BY year, month, day
""")
daily_trends.show()


In [None]:
# weekly average fare and trip count by day of the week
weekly_trends = spark.sql("""
    SELECT day_of_week,
           AVG(fare_amount) AS average_fare,
           COUNT(*) AS trip_count
    FROM taxi_data
    GROUP BY day_of_week
    ORDER BY day_of_week
""")
weekly_trends.show()


# Exploratory Data Analysis (EDA)

# Feature Engineering

# Feature Selection

In [None]:
# select features and target variable
feature_columns = ["pickup_longitude", "pickup_latitude", "dropoff_longitude", "dropoff_latitude",
                   "passenger_count", "year", "month", "day", "day_of_week", "hour_of_day"]
target_column = "fare_amount"

In [None]:
# split the data into training and testing sets (70% training, 30% testing)
train_data, test_data = df_new.randomSplit([0.7, 0.3], seed=123)

In [None]:
# create feature vector
assembler = VectorAssembler(inputCols=feature_columns, outputCol="features")
df_new = assembler.transform(df_new)

In [None]:
# VectorAssembler to combine feature columns
assembler = VectorAssembler(inputCols=feature_columns, outputCol="features")

# Split Data

# Model Selection and Training

In [None]:
# initialize the Linear Regression model
lr = LinearRegression(featuresCol="features", labelCol=target_column)


In [None]:
# define the parameter grid for Linear Regression
paramGrid = ParamGridBuilder() \
    .addGrid(lr.regParam, [0.01, 0.1, 0.5]) \
    .addGrid(lr.elasticNetParam, [0.0, 0.5, 1.0]) \
    .build()


In [None]:
# define the evaluator
evaluator = RegressionEvaluator(labelCol=target_column, predictionCol="prediction", metricName="rmse")

# det up cross-validation
crossval = CrossValidator(
    estimator=Pipeline(stages=[assembler, lr]),  # the pipeline with assembler and linear regression
    estimatorParamMaps=paramGrid,  # parameter grid to search
    evaluator=evaluator,  # evaluation metric
    numFolds=3  # three-fold cross-validation
)


In [None]:
# fit the model
cv_model = crossval.fit(train_data)

In [None]:
# get predictions from the best model
predictions = cv_model.transform(test_data)

# evaluate the best model using RMSE
rmse = evaluator.evaluate(predictions)
print(f"Best Model's RMSE on test data: {rmse}")

# retrieve the best model's parameters
best_model = cv_model.bestModel.stages[-1]
print("Best Model's regParam:", best_model._java_obj.getRegParam())
print("Best Model's elasticNetParam:", best_model._java_obj.getElasticNetParam())

# Model Evaluation

# Final Model Evaluation

# Saving the model