In [45]:
# Importing pyspark dependencies
from pyspark.sql.types import IntegerType,BooleanType,DateType,NumericType,TimestampType
from pyspark.ml.feature import Binarizer, Bucketizer, QuantileDiscretizer
from pyspark.sql.functions import col
import pyspark.sql.functions as F

# Add required imports
import com.microsoft.spark.sqlanalytics
from com.microsoft.spark.sqlanalytics.Constants import Constants

# Importing mlflow libraries
from mlflow.models import infer_signature, set_signature
from mlflow.models.model import get_model_info
import mlflow

# Importing general libraries
from sklearn.metrics import accuracy_score, f1_score, precision_score, confusion_matrix, recall_score, roc_auc_score, classification_report
import pandas as pd

### 1.1 Read raw data from CSV

#### 1.1.1 Reading green taxi data

In [46]:
# Read from existing internal table
green_raw = (spark.read
                     .synapsesql("synapseazuremldedicates.dbo.greenTaxiData"))

# Show contents of the dataframe
display(green_raw.head(5))

In [47]:
green_columns_remap =    {
        "vendorID": "vendor",
        "lpepPickupDatetime": "pickup_datetime",
        "lpepDropoffDatetime": "dropoff_datetime",
        "storeAndFwdFlag": "store_forward",
        "pickupLongitude": "pickup_longitude",
        "pickupLatitude": "pickup_latitude",
        "dropoffLongitude": "dropoff_longitude",
        "dropoffLatitude": "dropoff_latitude",
        "passengerCount": "passengers",
        "fareAmount": "cost",
        "tripDistance": "distance",
    }

select_expr = [f"{col} as {new_name}" for col, new_name in green_columns_remap.items()]
green_remapped = green_raw.selectExpr(*select_expr)
display(green_remapped.head(5))

#### 1.1.2 Reading yellow taxi data

In [48]:
# Read from existing internal table
yellow_raw  = (spark.read
                     .synapsesql("synapseazuremldedicates.dbo.yellowTaxiData"))

# Show contents of the dataframe
display(yellow_raw.head(5))

In [49]:
yellow_columns_remap = {
        "vendorID": "vendor",
        "tpepPickupDateTime": "pickup_datetime",
        "tpepDropoffDateTime": "dropoff_datetime",
        "storeAndFwdFlag": "store_forward",
        "startLon": "pickup_longitude",
        "startLat": "pickup_latitude",
        "endLon": "dropoff_longitude",
        "endLat": "dropoff_latitude",
        "passengerCount": "passengers",
        "fareAmount": "cost",
        "tripDistance": "distance",
    }

select_expr = [f"{col} as {new_name}" for col, new_name in yellow_columns_remap.items()]
yellow_remapped = yellow_raw.selectExpr(*select_expr)
display(yellow_remapped.head(5))

### 1.2 Combining data from raw data sources

In [50]:
df_combined_taxi = green_remapped.union(yellow_remapped)

print("Length of Green DataFrame :" , green_remapped.count())
print("Length of Yellow DataFrame :" , yellow_remapped.count())
print("Length of combined dataframes :" , df_combined_taxi.count())

### 1.3 Saving merged dataset as table

In [51]:
#https://learn.microsoft.com/en-us/azure/synapse-analytics/spark/synapse-spark-sql-pool-import-export?tabs=scala%2Cscala1%2Cscala2%2Cscala3%2Cscala4%2Cpython5#write-using-basic-authentication

In [52]:
df_combined_taxi.write.mode("overwrite").synapsesql("synapseazuremldedicates.dbo.combined_taxi_data")

### 1.4 Data Transformation

#### 1.4.1 Transforming float64 fields

In [53]:
# Convert String to Integer Type
df_combined_taxi = df_combined_taxi.withColumn("pickup_longitude",df_combined_taxi.pickup_longitude.cast('double')).withColumn("pickup_latitude",df_combined_taxi.pickup_latitude.cast('double')).withColumn("dropoff_longitude",df_combined_taxi.dropoff_longitude.cast('double')).withColumn("dropoff_latitude",df_combined_taxi.dropoff_latitude.cast('double')).withColumn("distance",df_combined_taxi.distance.cast('double')).withColumn("cost",df_combined_taxi.cost.cast('double')).withColumn("passengers",df_combined_taxi.passengers.cast('int'))
display(df_combined_taxi.head(5))

In [54]:
latlong_filtered_df = df_combined_taxi.filter( (df_combined_taxi.pickup_longitude <= -73.72) & \
(df_combined_taxi.pickup_longitude >= -74.09) & \
(df_combined_taxi.pickup_latitude  <= 40.88) \
& (df_combined_taxi.pickup_latitude >= 40.53)\
& (df_combined_taxi.dropoff_longitude <= -73.72)\
& (df_combined_taxi.dropoff_longitude >= -74.72)\
& (df_combined_taxi.dropoff_latitude <= 40.88)\
& (df_combined_taxi.dropoff_latitude >= 40.53)\
)

latlong_filtered_df.count()

#### 1.4.2 Transforming pickup_datetime

In [55]:
latlong_filtered_df = latlong_filtered_df.withColumn("pickup_datetime",F.to_timestamp(col("pickup_datetime"), "M/d/yyyy H:mm"))
display(latlong_filtered_df.head(5))

In [56]:
normalized_df = latlong_filtered_df.withColumn('pickup_weekday',(F.dayofweek('pickup_datetime')))\
    .withColumn('pickup_month',(F.month('pickup_datetime')))\
    .withColumn('pickup_monthday',(F.dayofmonth('pickup_datetime')))\
    .withColumn('pickup_hour',(F.hour('pickup_datetime')))\
    .withColumn('pickup_minute',(F.minute('pickup_datetime')))\
    .withColumn('pickup_second',(F.second('pickup_datetime'))).drop("pickup_datetime")

display(normalized_df.head(5))

#### 1.4.3 Transforming dropoff_datetime

In [57]:
normalized_df =  normalized_df.withColumn("dropoff_datetime",F.to_timestamp(col("dropoff_datetime"), "M/d/yyyy H:mm"))
display(normalized_df.head(5))

In [58]:
normalized_df = normalized_df.withColumn('dropoff_weekday',(F.dayofweek('dropoff_datetime')))\
    .withColumn('dropoff_month',(F.month('dropoff_datetime')))\
    .withColumn('dropoff_monthday',(F.dayofmonth('dropoff_datetime')))\
    .withColumn('dropoff_hour',(F.hour('dropoff_datetime')))\
    .withColumn('dropoff_minute',(F.minute('dropoff_datetime')))\
    .withColumn('dropoff_second',(F.second('dropoff_datetime'))).drop("dropoff_datetime")

display(normalized_df.head(5))

#### 1.4.4 Change the store_forward column to binary values


In [59]:
print("--------------- Count before transforming into binary values ---------------")
latlong_filtered_df.groupBy("store_forward").count().show()

normalized_df = normalized_df.withColumn('store_forward', F.when(F.col('store_forward') == "Y", F.lit(1)).otherwise(F.lit(0)))

print("--------------- Count after transforming into binary values ---------------")
normalized_df.groupBy("store_forward").count().show()

#### 1.4.5 Filtering by distance and cost

In [60]:
print("--------------- Count before filter ---------------")
print(normalized_df.count())

final_df = normalized_df.filter( (normalized_df.distance > 0) & \
(normalized_df.cost > 0))

final_df.count()

print("--------------- Count after filter ---------------")
print(final_df.count())

### 1.5 Save Transformed regression Data

In [61]:
df_combined_taxi.write.mode("overwrite").synapsesql("synapseazuremldedicates.dbo.reg_transformed_taxi_data")

### 1.6 Transform target into classes and saving table

In [62]:
discretizer = QuantileDiscretizer(numBuckets=5, inputCol="cost", outputCol="cost_class")
bucketed_df = discretizer.fit(final_df).transform(final_df)
bucketed_df.groupBy("cost_class").count().show()

In [63]:
display(bucketed_df.head(10))

In [64]:
bucketed_df.write.mode("overwrite").synapsesql("synapseazuremldedicates.dbo.class_transformed_taxi_data")