In [1]:
import numpy as np
from pyspark.sql.functions import col, stddev, unix_timestamp
from pyspark.sql import SparkSession
from pyspark.ml.feature import OneHotEncoder, StringIndexer, VectorAssembler

s3EndPointUrl = "http://localhost:9000"
s3AccessKeyAws = "minio"
s3SecretKeyAws = "minioadmin"
sourceBucket = "mlopscamp"

sparkApp = SparkSession\
                .builder\
                .appName("homework-week-1")\
                .config("spark.hadoop.fs.s3a.endpoint", s3EndPointUrl)\
                .config("spark.hadoop.fs.s3a.access.key", s3AccessKeyAws)\
                .config("spark.hadoop.fs.s3a.secret.key", s3SecretKeyAws)\
                .config("spark.hadoop.fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem")\
                .config("spark.hadoop.fs.s3a.aws.credentials.provider", "org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider")\
                .config("spark.hadoop.fs.s3a.path.style.access", "true")\
                .config("spark.hadoop.fs.s3a.fast.upload", "true")\
                .config("spark.sql.execution.pyarrow.enabled", "true")\
                .config("spark.sql.inMemoryColumnarStorage.compressed", "true")\
                .config("spark.sql.inMemoryColumnarStorage.batchSize", 10000)\
                .config("spark.sql.shuffle.partitions", 100)\
                .config("spark.sql.debug.maxToStringFields", 1000)\
                .getOrCreate()

sparkApp.sparkContext.setLogLevel("OFF")

yellow_files_1 = f"s3a://{sourceBucket}/yellow_tripdata_2022-01.parquet"
yellow_files_2 = f"s3a://{sourceBucket}/yellow_tripdata_2022-02.parquet"


23/05/24 11:08:29 WARN Utils: Your hostname, allmey-linux resolves to a loopback address: 127.0.1.1; using 192.168.0.98 instead (on interface wlp2s0)
23/05/24 11:08:29 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
23/05/24 11:08:30 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [2]:
df_yellow_1 = sparkApp.read\
                    .format("parquet")\
                    .option("inferSchema", True)\
                    .option("header", True)\
                    .load(yellow_files_1)

df_yellow_2 = sparkApp.read\
                      .format("parquet")\
                      .option("inferSchema", True)\
                      .option("header", True)\
                      .load(yellow_files_2)


                                                                                

In [3]:
# Q1. Count columns
# Read the data for January. How many columns are there ?
#
print('Total of Columns in the Dataset: ', len(df_yellow_1.columns))

Total of Columns in the Dataset:  19


In [4]:
# Q2 - Computing Duration
# What's the standard deviation of the trips duration in January ?
#
# Create new column to calculate the duration trip
df_yellow_1 = df_yellow_1.withColumn('Duration', (unix_timestamp('tpep_dropoff_datetime') - unix_timestamp('tpep_pickup_datetime')))
df_yellow_1 = df_yellow_1.withColumn('DurationInMinutes', col('Duration')/60)

result = df_yellow_1.select(stddev('DurationInMinutes')).collect()

print('Standard Deviation of the Trips: ', result)



Standard Deviation of the Trips:  [Row(stddev_samp(DurationInMinutes)=46.44530513776847)]


                                                                                

In [5]:
# Q3 - Dropping Outliers
# What fraction of the records left after you dropped the outliers ?
#
raw_count = df_yellow_1.count()
df_yellow_1 = df_yellow_1.filter((col('DurationInMinutes') >= 1) & (col('DurationInMinutes') <= 60))

print('Fraction of the records left after dropped: ', (df_yellow_1.count()/raw_count)*100)

Fraction of the records left after dropped:  98.27547930522405


                                                                                

In [6]:
df_yellow_1.selectExpr('cast(PULocationID as string) PULocationID')
df_yellow_1.selectExpr('cast(DOLocationID as string) DOLocationID')
df_yellow_1.selectExpr('cast(DurationInMinutes as double) DurationInMinutes')
categorical_features = ['PULocationID', 'DOLocationID']

In [7]:
# Q4 - One-Hot Encoding
# Let's apply one-hot encoding to the pickup and dropoff location IDs. We'll use only these two features for our model.
#
from sklearn.feature_extraction import DictVectorizer # Machine Learning

# Turn the dataframe into a list of dictionaries
dict_yellow_1 = df_yellow_1[categorical_features].toPandas().to_dict(orient='records')

# Instantiate a dictionary vectorizer
dv = DictVectorizer()

# Fit a dictionary vectorizer - transform the data into a feature matrix
x_train = dv.fit_transform(dict_yellow_1)

print('Dimensionality of the matrix: ', x_train.shape[1])

                                                                                

Dimensionality of the matrix:  2


In [8]:
# Q5 - Training Model
# Now let's use the feature matrix from the previous step to train a model. 
# * Train a plain linear regression model with default parameters 
# * Calculate the RMSE of the model on the training data
# What's the RMSE on train ?
#
from pyspark.ml.regression import LinearRegression
from pyspark.ml.feature import VectorAssembler

assembler = VectorAssembler(inputCols=categorical_features, outputCol='features')
x_train = assembler.transform(df_yellow_1)

target='DurationInMinutes'
lr = LinearRegression(featuresCol='features', labelCol=target)
lr_model = lr.fit(x_train)

trainSummary = lr_model.summary

print('RMSE on train: ', trainSummary.rootMeanSquaredError)



RMSE on train:  8.920327827581282


                                                                                

In [9]:
# Q6 - Evaluating the model 
# Now let's apply this model to the validation dataset (February 2022).
# What's the RMSE on validation ?
#
# Create new column to calculate the time trip
df_yellow_2 = df_yellow_2.withColumn('Duration', (unix_timestamp('tpep_dropoff_datetime') - unix_timestamp('tpep_pickup_datetime')))
df_yellow_2 = df_yellow_2.withColumn('DurationInMinutes', col('Duration')/60)
df_yellow_2 = df_yellow_2.filter((col('DurationInMinutes') >= 1) & (col('DurationInMinutes') <= 60))

df_yellow_2.selectExpr('cast(PULocationID as string) PULocationID')
df_yellow_2.selectExpr('cast(DOLocationID as string) DOLocationID')
df_yellow_2.selectExpr('cast(DurationInMinutes as double) DurationInMinutes')

assembler = VectorAssembler(inputCols=categorical_features, outputCol='features')

x_train_2 = assembler.transform(df_yellow_2)
lr_model_2 = lr.fit(x_train_2)

trainSummary = lr_model_2.summary

print('RMSE on train - February 2022: ', trainSummary.rootMeanSquaredError)



RMSE on train - February 2022:  9.573242692015372


                                                                                