## 1. Preprocessing

In [1]:
# Importing pyspark dependencies
from pyspark.sql.types import IntegerType,BooleanType,DateType,NumericType,TimestampType
from pyspark.ml.feature import Binarizer, Bucketizer, QuantileDiscretizer
from pyspark.sql.functions import col
import pyspark.sql.functions as F

# Importing flaml for AutoML
from flaml.automl.spark.utils import to_pandas_on_spark
import flaml

# Importing mlflow libraries
from mlflow.models import infer_signature, set_signature
from mlflow.models.model import get_model_info
import mlflow

# Importing general libraries
from sklearn.metrics import accuracy_score, f1_score, precision_score, confusion_matrix, recall_score, roc_auc_score, classification_report
import flaml.visualization as fviz
import pandas as pd

StatementMeta(, a3e720ab-d811-46ae-8622-f3b8d5aff177, 3, Finished, Available)

#### 1.1 Read raw data from CSV

##### 1.1.1 Reading green taxi data

In [2]:
green_raw = spark.read.format("csv").option("header","true").load("Files/greenTaxiData.csv")
display(green_raw.head(5))

StatementMeta(, a3e720ab-d811-46ae-8622-f3b8d5aff177, 4, Finished, Available)

SynapseWidget(Synapse.DataFrame, ce2d504e-2426-4cb8-b9f8-c1b241f14504)

In [3]:
green_columns_remap =    {
        "vendorID": "vendor",
        "lpepPickupDatetime": "pickup_datetime",
        "lpepDropoffDatetime": "dropoff_datetime",
        "storeAndFwdFlag": "store_forward",
        "pickupLongitude": "pickup_longitude",
        "pickupLatitude": "pickup_latitude",
        "dropoffLongitude": "dropoff_longitude",
        "dropoffLatitude": "dropoff_latitude",
        "passengerCount": "passengers",
        "fareAmount": "cost",
        "tripDistance": "distance",
    }

select_expr = [f"{col} as {new_name}" for col, new_name in green_columns_remap.items()]
green_remapped = green_raw.selectExpr(*select_expr)
display(green_remapped.head(5))

StatementMeta(, a3e720ab-d811-46ae-8622-f3b8d5aff177, 5, Finished, Available)

SynapseWidget(Synapse.DataFrame, d594f0bb-7bb5-40f7-b580-0a8c951623d7)

##### 1.1.2 Reading yellow taxi data

In [4]:
yellow_raw = spark.read.format("csv").option("header","true").load("Files/yellowTaxiData.csv")
display(yellow_raw.head(5))

StatementMeta(, a3e720ab-d811-46ae-8622-f3b8d5aff177, 6, Finished, Available)

SynapseWidget(Synapse.DataFrame, a1d2c344-6653-4586-9841-457bb539dc94)

In [5]:
yellow_columns_remap = {
        "vendorID": "vendor",
        "tpepPickupDateTime": "pickup_datetime",
        "tpepDropoffDateTime": "dropoff_datetime",
        "storeAndFwdFlag": "store_forward",
        "startLon": "pickup_longitude",
        "startLat": "pickup_latitude",
        "endLon": "dropoff_longitude",
        "endLat": "dropoff_latitude",
        "passengerCount": "passengers",
        "fareAmount": "cost",
        "tripDistance": "distance",
    }

select_expr = [f"{col} as {new_name}" for col, new_name in yellow_columns_remap.items()]
yellow_remapped = yellow_raw.selectExpr(*select_expr)
display(yellow_remapped.head(5))

StatementMeta(, a3e720ab-d811-46ae-8622-f3b8d5aff177, 7, Finished, Available)

SynapseWidget(Synapse.DataFrame, 1a43a73d-53c5-460b-9540-3dbbbe561240)

#### 1.2 Combining data from raw data sources

In [6]:
df_combined_taxi = green_remapped.union(yellow_remapped)

print("Length of Green DataFrame :" , green_remapped.count())
print("Length of Yellow DataFrame :" , yellow_remapped.count())
print("Length of combined dataframes :" , df_combined_taxi.count())

StatementMeta(, a3e720ab-d811-46ae-8622-f3b8d5aff177, 8, Finished, Available)

Length of Green DataFrame : 5000
Length of Yellow DataFrame : 5000
Length of combined dataframes : 10000


#### 1.3 Saving merged dataset as table

In [7]:
df_combined_taxi.write.saveAsTable("combined_taxi_data", mode ='overwrite')

StatementMeta(, a3e720ab-d811-46ae-8622-f3b8d5aff177, 9, Finished, Available)

#### 1.4 Data Transformation

##### 1.4.1 Transforming float64 fields

In [8]:
# Convert String to Integer Type
df_combined_taxi = df_combined_taxi.withColumn("pickup_longitude",df_combined_taxi.pickup_longitude.cast('double')).withColumn("pickup_latitude",df_combined_taxi.pickup_latitude.cast('double')).withColumn("dropoff_longitude",df_combined_taxi.dropoff_longitude.cast('double')).withColumn("dropoff_latitude",df_combined_taxi.dropoff_latitude.cast('double')).withColumn("distance",df_combined_taxi.distance.cast('double')).withColumn("cost",df_combined_taxi.cost.cast('double')).withColumn("passengers",df_combined_taxi.passengers.cast('int'))
display(df_combined_taxi.head(5))

StatementMeta(, a3e720ab-d811-46ae-8622-f3b8d5aff177, 10, Finished, Available)

SynapseWidget(Synapse.DataFrame, bcbaa09b-5523-417f-bd8c-9927b7721db4)

In [9]:
latlong_filtered_df = df_combined_taxi.filter( (df_combined_taxi.pickup_longitude <= -73.72) & \
(df_combined_taxi.pickup_longitude >= -74.09) & \
(df_combined_taxi.pickup_latitude  <= 40.88) \
& (df_combined_taxi.pickup_latitude >= 40.53)\
& (df_combined_taxi.dropoff_longitude <= -73.72)\
& (df_combined_taxi.dropoff_longitude >= -74.72)\
& (df_combined_taxi.dropoff_latitude <= 40.88)\
& (df_combined_taxi.dropoff_latitude >= 40.53)\
)

latlong_filtered_df.count()

StatementMeta(, a3e720ab-d811-46ae-8622-f3b8d5aff177, 11, Finished, Available)

9851

##### 1.4.2 Transforming pickup_datetime

In [10]:
latlong_filtered_df = latlong_filtered_df.withColumn("pickup_datetime",df_combined_taxi.pickup_datetime.cast(TimestampType()))
display(latlong_filtered_df.head(5))

StatementMeta(, a3e720ab-d811-46ae-8622-f3b8d5aff177, 12, Finished, Available)

SynapseWidget(Synapse.DataFrame, 7aa8d921-55d1-463b-bbc2-7a4fe72dab39)

In [11]:
normalized_df = latlong_filtered_df.withColumn('pickup_weekday',(F.dayofweek('pickup_datetime')))\
    .withColumn('pickup_month',(F.month('pickup_datetime')))\
    .withColumn('pickup_monthday',(F.dayofmonth('pickup_datetime')))\
    .withColumn('pickup_hour',(F.hour('pickup_datetime')))\
    .withColumn('pickup_minute',(F.minute('pickup_datetime')))\
    .withColumn('pickup_second',(F.second('pickup_datetime'))).drop("pickup_datetime")

display(normalized_df.head(5))

StatementMeta(, a3e720ab-d811-46ae-8622-f3b8d5aff177, 13, Finished, Available)

SynapseWidget(Synapse.DataFrame, 564b4046-ebe7-4071-b589-26bbbfd06737)

##### 1.4.3 Transforming dropoff_datetime

In [12]:
normalized_df = normalized_df.withColumn("dropoff_datetime",normalized_df.dropoff_datetime.cast(TimestampType()))
display(normalized_df.head(5))

StatementMeta(, a3e720ab-d811-46ae-8622-f3b8d5aff177, 14, Finished, Available)

SynapseWidget(Synapse.DataFrame, be782340-e970-41d2-a1af-adc27d20a765)

In [13]:
normalized_df = normalized_df.withColumn('dropoff_weekday',(F.dayofweek('dropoff_datetime')))\
    .withColumn('dropoff_month',(F.month('dropoff_datetime')))\
    .withColumn('dropoff_monthday',(F.dayofmonth('dropoff_datetime')))\
    .withColumn('dropoff_hour',(F.hour('dropoff_datetime')))\
    .withColumn('dropoff_minute',(F.minute('dropoff_datetime')))\
    .withColumn('dropoff_second',(F.second('dropoff_datetime'))).drop("dropoff_datetime")

display(normalized_df.head(5))

StatementMeta(, a3e720ab-d811-46ae-8622-f3b8d5aff177, 15, Finished, Available)

SynapseWidget(Synapse.DataFrame, 4ccdbba7-dbe8-49f9-9144-0edb21916e22)

##### 1.4.4 Change the store_forward column to binary values

In [14]:
print("--------------- Count before transforming into binary values ---------------")
latlong_filtered_df.groupBy("store_forward").count().show()

normalized_df = normalized_df.withColumn('store_forward', F.when(F.col('store_forward') == "Y", F.lit(1)).otherwise(F.lit(0)))

print("--------------- Count after transforming into binary values ---------------")
normalized_df.groupBy("store_forward").count().show()

StatementMeta(, a3e720ab-d811-46ae-8622-f3b8d5aff177, 16, Finished, Available)

--------------- Count before transforming into binary values ---------------
+-------------+-----+
|store_forward|count|
+-------------+-----+
|            Y|   53|
|            N| 9798|
+-------------+-----+

--------------- Count after transforming into binary values ---------------
+-------------+-----+
|store_forward|count|
+-------------+-----+
|            1|   53|
|            0| 9798|
+-------------+-----+



##### 1.4.5 Filtering by distance and cost

In [15]:
print("--------------- Count before filter ---------------")
print(normalized_df.count())

final_df = normalized_df.filter( (normalized_df.distance > 0) & \
(normalized_df.cost > 0))

final_df.count()

print("--------------- Count after filter ---------------")
print(final_df.count())

StatementMeta(, a3e720ab-d811-46ae-8622-f3b8d5aff177, 17, Finished, Available)

--------------- Count before filter ---------------
9851
--------------- Count after filter ---------------
9773


#### 1.5 Save Transformed regression Data

In [16]:
final_df.write.saveAsTable("reg_transformed_taxi_data", mode ='overwrite')

StatementMeta(, a3e720ab-d811-46ae-8622-f3b8d5aff177, 18, Finished, Available)

#### 1.6 Transform target into classes and saving table

In [17]:
discretizer = QuantileDiscretizer(numBuckets=5, inputCol="cost", outputCol="cost_class")
bucketed_df = discretizer.fit(final_df).transform(final_df)
bucketed_df.groupBy("cost_class").count().show()

StatementMeta(, a3e720ab-d811-46ae-8622-f3b8d5aff177, 19, Finished, Available)

+----------+-----+
|cost_class|count|
+----------+-----+
|       0.0| 1844|
|       1.0| 1965|
|       4.0| 1986|
|       3.0| 2100|
|       2.0| 1878|
+----------+-----+

