In [1]:
import time

In [2]:
import os
import pyspark

In [3]:
conf = pyspark.SparkConf()
conf.set('spark.ui.proxyBase', '/user/' + os.environ['JUPYTERHUB_USER'] + '/proxy/4040')

sc = pyspark.SparkContext(conf=conf)
spark = pyspark.sql.SparkSession(sc)
spark

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


23/05/09 15:17:15 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


From the schema, the following features could be relevant for predicting the taxi fare:

- `trip_distance`: The distance of the trip is likely to be a strong predictor of the fare.
- `RatecodeID`: Different rate codes could correspond to different fare structures.
- `PULocationID` and `DOLocationID`: The pickup and dropoff locations could affect the fare.
- `pickup_hour`, `pickup_day`, `pickup_day_of_week`, `pickup_month`: The time of pickup could affect the fare, due to factors like traffic and surge pricing.
- `passenger_count`: It's unlikely that the number of passengers would directly affect the fare for a standard taxi ride, but it's possible that rides with many passengers are more likely to be longer trips, for example.
- `extra`, `mta_tax`, `tolls_amount`, `improvement_surcharge`, `congestion_surcharge`, `airport_fee`: These are likely to be components of the total fare and may be useful predictors.

- `fare_amount` is our target variable.



We will compare the performance of machine learning models on the NYC taxi dataset using scikit-learn (normal Python implementation), PySpark, and Dask.

We will build a simple linear regression model to predict the `fare_amount` based on `trip_distance`.

# Scikit-learn (Python) Implementation

In [4]:
start_time = time.time()

In [5]:
import pandas as pd
from sklearn.model_selection import train_test_split
from sklearn.linear_model import LinearRegression
from sklearn.metrics import mean_squared_error


# Load the data into a pandas DataFrame
df_python = pd.read_parquet('../data/yellow_tripdata_processed.parquet')
df_python.head()

Unnamed: 0,VendorID,tpep_pickup_datetime,tpep_dropoff_datetime,passenger_count,trip_distance,RatecodeID,store_and_fwd_flag,PULocationID,DOLocationID,payment_type,...,pickup_year,pickup_month,pickup_day,pickup_hour,pickup_day_of_week,dropoff_year,dropoff_month,dropoff_day,dropoff_hour,dropoff_day_of_week
0,2,2022-12-03 19:14:47,2022-12-03 19:31:51,1.0,3.37,1.0,N,143,152,1,...,2022,12,3,19,7,2022,12,3,19,7
1,2,2022-12-01 11:55:12,2022-12-01 12:18:01,6.0,1.0,1.0,N,100,230,2,...,2022,12,1,11,5,2022,12,1,12,5
2,1,2022-12-12 15:54:24,2022-12-12 15:59:52,1.0,1.9,1.0,N,236,74,1,...,2022,12,12,15,2,2022,12,12,15,2
3,1,2022-12-20 05:34:23,2022-12-20 06:14:12,1.0,0.1,99.0,N,242,242,1,...,2022,12,20,5,3,2022,12,20,6,3
4,2,2022-12-10 03:59:40,2022-12-10 04:07:27,1.0,1.99,1.0,N,24,236,1,...,2022,12,10,3,7,2022,12,10,4,7


In [6]:
# Prepare the data for modeling
X = df_python[['trip_distance']]
y = df_python['fare_amount']

# Split the data into training and testing sets
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)


# Train a linear regression model
model_python = LinearRegression()
model_python.fit(X_train, y_train)


# Evaluate the model
predictions_python = model_python.predict(X_test)

# Check the model performance
print(f'MSE: {mean_squared_error(y_test, predictions_python)}')

MSE: 64.60138133038457


In [7]:
end_time = time.time()

print('Time taken by Python implementation: ', end_time - start_time, 'seconds')

Time taken by Python implementation:  2.3027284145355225 seconds


# PySpark Implementation

In [8]:
start_time = time.time()

In [9]:
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.regression import LinearRegression
from pyspark.ml.evaluation import RegressionEvaluator

# Load the data into a PySpark DataFrame
df_spark = spark.read.parquet('../data/yellow_tripdata_processed.parquet')

                                                                                

In [10]:
# Prepare the data for modeling
assembler = VectorAssembler(inputCols=["trip_distance"], outputCol="features")
df_spark = assembler.transform(df_spark)

# Split the data into training and testing sets
train_data, test_data = df_spark.randomSplit([0.8, 0.2], seed=42)


# Train a linear regression model
model_spark = LinearRegression(featuresCol='features', labelCol='fare_amount')
model_spark = model_spark.fit(train_data)


# Evaluate the model
predictions_spark = model_spark.transform(test_data)
evaluator = RegressionEvaluator(predictionCol="prediction", labelCol="fare_amount", metricName="mse")
mse_spark = evaluator.evaluate(predictions_spark)
print(f'MSE: {mse_spark}')

23/05/09 15:18:51 WARN Instrumentation: [694d59e2] regParam is zero, which might cause numerical instability and overfitting.


[Stage 3:>                                                          (0 + 1) / 1]

MSE: 59.44400984943917


                                                                                

In [11]:
end_time = time.time()

print('Time taken by PySpark implementation: ', end_time - start_time, 'seconds')

Time taken by PySpark implementation:  25.76043128967285 seconds


# Dask Implementation

In [13]:
start_time = time.time()

In [12]:
import dask
import dask.dataframe as dd
from dask_ml.model_selection import train_test_split
from dask_ml.linear_model import LinearRegression

# Load the data into a Dask DataFrame
df_dask = dd.read_parquet('../data/yellow_tripdata_processed.parquet')
df_dask.head()

Unnamed: 0,VendorID,tpep_pickup_datetime,tpep_dropoff_datetime,passenger_count,trip_distance,RatecodeID,store_and_fwd_flag,PULocationID,DOLocationID,payment_type,...,pickup_year,pickup_month,pickup_day,pickup_hour,pickup_day_of_week,dropoff_year,dropoff_month,dropoff_day,dropoff_hour,dropoff_day_of_week
0,2,2022-12-03 19:14:47,2022-12-03 19:31:51,1.0,3.37,1.0,N,143,152,1,...,2022,12,3,19,7,2022,12,3,19,7
1,2,2022-12-01 11:55:12,2022-12-01 12:18:01,6.0,1.0,1.0,N,100,230,2,...,2022,12,1,11,5,2022,12,1,12,5
2,1,2022-12-12 15:54:24,2022-12-12 15:59:52,1.0,1.9,1.0,N,236,74,1,...,2022,12,12,15,2,2022,12,12,15,2
3,1,2022-12-20 05:34:23,2022-12-20 06:14:12,1.0,0.1,99.0,N,242,242,1,...,2022,12,20,5,3,2022,12,20,6,3
4,2,2022-12-10 03:59:40,2022-12-10 04:07:27,1.0,1.99,1.0,N,24,236,1,...,2022,12,10,3,7,2022,12,10,4,7


In [14]:
# Prepare your data
X = df_dask[['trip_distance']]
y = df_dask['fare_amount']

# Split your data into training and testing datasets
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)


# Create a linear regression model
lr = LinearRegression()


# Train your model
lr.fit(X_train.values, y_train.values)

# Predict on your test data
y_pred = lr.predict(X_test.values)


# Bring y_test and y_pred into memory
y_test, y_pred = dask.compute(y_test, y_pred)

# Check the model performance
print(f'MSE: {mean_squared_error(y_test, y_pred)}')



MSE: 52.71844897649518


In [15]:
end_time = time.time()

print('Time taken by Dask implementation: ', end_time - start_time, 'seconds')

Time taken by Dask implementation:  1.6753323078155518 seconds
