#### **Importing required libraries. Installed these using Poetry for dependency Management**

In [20]:
from pyspark.sql import SparkSession
from sklearn.feature_extraction import DictVectorizer
from sklearn.metrics import mean_squared_error,root_mean_squared_error
from sklearn.linear_model import LinearRegression
import warnings
import seaborn as sns
import matplotlib.pyplot as plt
warnings.filterwarnings('ignore')
from xgboost import XGBRegressor

#### **Creating a Spark Session**

In [21]:
spark = SparkSession.builder \
    .appName("duration-prediction") \
    .master("local[*]") \
    .getOrCreate()

#### **Required Variables**

In [22]:
data_dir = "/home/manasa/Manasa-mlops/module_1_introduction/"
train_file = "yellow_tripdata_2024-01.parquet"
validation_file = "yellow_tripdata_2024-02.parquet"

#### **Download required files**

In [23]:
!wget -N https://d37ci6vzurychx.cloudfront.net/trip-data/yellow_tripdata_2024-01.parquet -P $data_dir
!wget  -N https://d37ci6vzurychx.cloudfront.net/trip-data/yellow_tripdata_2024-02.parquet -P $data_dir

--2025-03-20 12:17:07--  https://d37ci6vzurychx.cloudfront.net/trip-data/yellow_tripdata_2024-01.parquet
Resolving d37ci6vzurychx.cloudfront.net (d37ci6vzurychx.cloudfront.net)... 3.164.82.112, 3.164.82.40, 3.164.82.197, ...
Connecting to d37ci6vzurychx.cloudfront.net (d37ci6vzurychx.cloudfront.net)|3.164.82.112|:443... connected.
HTTP request sent, awaiting response... 304 Not Modified
File ‘/home/manasa/Manasa-mlops/module_1_introduction/yellow_tripdata_2024-01.parquet’ not modified on server. Omitting download.

--2025-03-20 12:17:08--  https://d37ci6vzurychx.cloudfront.net/trip-data/yellow_tripdata_2024-02.parquet
Resolving d37ci6vzurychx.cloudfront.net (d37ci6vzurychx.cloudfront.net)... 3.164.82.112, 3.164.82.160, 3.164.82.40, ...
Connecting to d37ci6vzurychx.cloudfront.net (d37ci6vzurychx.cloudfront.net)|3.164.82.112|:443... connected.
HTTP request sent, awaiting response... 304 Not Modified
File ‘/home/manasa/Manasa-mlops/module_1_introduction/yellow_tripdata_2024-02.parquet’ no

#### **Required functions**

In [24]:
def read_data(filename):
    df = spark.read.format("parquet").load(filename)
    return df

In [25]:
def preprocessing(df):
    preprocessed_df = df.withColumn('Duration_in_minutes', (df["tpep_dropoff_datetime"] - df["tpep_pickup_datetime"]).cast("long")/60)
    preprocessed_df = preprocessed_df.withColumn("PULocationID",preprocessed_df["PULocationID"].cast("string")).withColumn("DOLocationID",preprocessed_df["DOLocationID"].cast("string"))
    return preprocessed_df

In [26]:
def feature_eng(preprocessed_df,dv,fit_dv=False):
    categorical = ["PULocationID", "DOLocationID"]
    numerical = ["trip_distance"]
    features = preprocessed_df.filter( (preprocessed_df["Duration_in_minutes"] >= 1) & ( preprocessed_df["Duration_in_minutes"] <= 60)).orderBy("Duration_in_minutes", ascending=False)
    dict = features.select(*categorical,*numerical).toPandas().to_dict(orient='records')
    if fit_dv:
        X = dv.fit_transform(dict)
    else:
        X = dv.transform(dict)
    y = features.select("Duration_in_minutes").toPandas()
    return X,y

In [27]:
dv = DictVectorizer()

#### **Reading the parquet file into pyspark Dataframe**

In [28]:
df = read_data(f"{data_dir}{train_file}")
columns = len(df.columns)
df.show(5)
print("The number of columns in the dataset are: " , columns)
print("The number of rows in the dataset are: " , df.count())

+--------+--------------------+---------------------+---------------+-------------+----------+------------------+------------+------------+------------+-----------+-----+-------+----------+------------+---------------------+------------+--------------------+-----------+
|VendorID|tpep_pickup_datetime|tpep_dropoff_datetime|passenger_count|trip_distance|RatecodeID|store_and_fwd_flag|PULocationID|DOLocationID|payment_type|fare_amount|extra|mta_tax|tip_amount|tolls_amount|improvement_surcharge|total_amount|congestion_surcharge|Airport_fee|
+--------+--------------------+---------------------+---------------+-------------+----------+------------------+------------+------------+------------+-----------+-----+-------+----------+------------+---------------------+------------+--------------------+-----------+
|       2| 2024-01-01 00:57:55|  2024-01-01 01:17:43|              1|         1.72|         1|                 N|         186|          79|           2|       17.7|  1.0|    0.5|       0.

#### **Calculating the Ride Duration , Mean and Standard Deviation of all ride durations**

In [29]:
preprocessed_df = preprocessing(df)
preprocessed_df.show(5,truncate=0)
preprocessed_df.agg({'Duration_in_minutes':'mean'}).withColumnRenamed("avg(Duration_in_minutes)","Mean_of_Duration_in_minutes").show(truncate=0)
preprocessed_df.agg({'Duration_in_minutes':'std'}).withColumnRenamed("stddev(Duration_in_minutes)","Standard_Deviation_of_Duration_in_minutes").show(truncate=0)

+--------+--------------------+---------------------+---------------+-------------+----------+------------------+------------+------------+------------+-----------+-----+-------+----------+------------+---------------------+------------+--------------------+-----------+-------------------+
|VendorID|tpep_pickup_datetime|tpep_dropoff_datetime|passenger_count|trip_distance|RatecodeID|store_and_fwd_flag|PULocationID|DOLocationID|payment_type|fare_amount|extra|mta_tax|tip_amount|tolls_amount|improvement_surcharge|total_amount|congestion_surcharge|Airport_fee|Duration_in_minutes|
+--------+--------------------+---------------------+---------------+-------------+----------+------------------+------------+------------+------------+-----------+-----+-------+----------+------------+---------------------+------------+--------------------+-----------+-------------------+
|2       |2024-01-01 00:57:55 |2024-01-01 01:17:43  |1              |1.72         |1         |N                 |186         |7

#### **Dropping Outliers**

In [30]:
df_subset = preprocessed_df.filter( (preprocessed_df["Duration_in_minutes"] >= 1) & ( preprocessed_df["Duration_in_minutes"] <= 60)).orderBy("Duration_in_minutes", ascending=False)
fraction_of_records_left = (df_subset.count()/df.count())*100
dropped_rows = (df.count() - df_subset.count())
print(f"The number of filtered rows are {dropped_rows}")
print(f"The Percentage of records left is {fraction_of_records_left}")

The number of filtered rows are 65718
The Percentage of records left is 97.78326020432945


#### **One Hot Encoding**

Categorical Features selected
 1. *('PULocationID', 'bigint')*
 2. *('DOLocationID', 'bigint')*
 
 Since the categorial features we are selecting are numbers the dictonary vectorizer will assume it as numerical values and it wont encode them. So, We will convert them to string type and then perform one hot encoding.

In [31]:
X_train,y_train = feature_eng(preprocessed_df,dv,fit_dv=True)
rows,columns = X_train.shape
print(f'Number of rows of the matrix: {rows}')
print(f'Number of columns of the matrix: {columns}')

                                                                                

Number of rows of the matrix: 2898906
Number of columns of the matrix: 519


#### **Training the Model with Linear Regression**

In [32]:
lr_model = LinearRegression()
lr_model.fit(X_train, y_train)

#### *Optionally Training the Model with XGBoost Regressor*

In [33]:
xgb_model = XGBRegressor(random_state=2)
xgb_model.fit(X_train, y_train)

#### **Testing Predictions**

In [34]:
y_pred = lr_model.predict(X_train)
rmse = root_mean_squared_error(y_train, y_pred)
print("RMSE with Linear Regression Model: %0.2f" % (rmse))

RMSE with Linear Regression Model: 7.95


##### *Optionally Testing with XGBoost Model*

In [35]:
y_pred = xgb_model.predict(X_train)
rmse = root_mean_squared_error(y_train, y_pred)
print("RMSE with XGBoost Regression Model: %0.2f" % (rmse))

RMSE with XGBoost Regression Model: 4.97


#### **Evaluating the Model**

In [36]:
df = read_data(f"{data_dir}{validation_file}")
preprocessed_df = preprocessing(df)
X_val,y_val=feature_eng(preprocessed_df,dv,fit_dv=False)
rows,columns = X_val.shape
print(f'Number of rows of the matrix: {rows}')
print(f'Number of columns of the matrix: {columns}')

                                                                                

Number of rows of the matrix: 2938060
Number of columns of the matrix: 519


#### **Validating Predictions**

In [37]:
y_pred = lr_model.predict(X_val)
rmse = root_mean_squared_error(y_val,y_pred)
print("RMSE on Validation Set with Linear Regression Model: %0.2f" % (rmse))

RMSE on Validation Set with Linear Regression Model: 8.13
