### FHV Taxi Trip Data
Loading, Transforming and Cleaning FHV (For Hire Vehicle) Data

We will also register it as a Global temporary view

In [0]:
dbutils.widgets.help()

In [0]:
from pyspark.sql.functions import *
from pyspark.sql.types import *

In [0]:
dbutils.widgets.text("ProcessMonth","201812","Process Month (yyyymm)")  # name, default value, label

In [0]:
processMonth = dbutils.widgets.get("ProcessMonth")  # get() to assign it to a variable

processMonth

In [0]:
# checking data from the mount created
dbutils.fs.ls("/mnt/datablob")

Defining Schema and Loading Data

In [0]:
# Define schema for FHV taxi trip file

fhvTaxiTripSchema = StructType(
                        [
                          StructField("Pickup_DateTime", TimestampType(), True),
                          StructField("DropOff_datetime", TimestampType(), True),
                          StructField("PUlocationID", IntegerType(), True),
                          StructField("DOlocationID", IntegerType(), True),
                          StructField("SR_Flag", IntegerType(), True),
                          StructField("Dispatching_base_number", StringType(), True),
                          StructField("Dispatching_base_num", StringType(), True)
                        ]
             )

print("Defined schema for FHV Taxi data")

In [0]:
print("Starting to extract FHV Taxi data from multiple files")

# Extract data from multiple FHV files
fhvTaxiTripDataDF = spark \
    .read   \
    .option("header", "true") \
    .schema(fhvTaxiTripSchema) \
    .csv(f"/mnt/datablob/FHVTaxiTripData_{processMonth}_*.csv")

print("Extracted FHV Taxi data")

In [0]:
fhvTaxiTripDataDF.show(2)

In [0]:
print("Starting cleanup and transformation on FHV Taxi data")

# Apply transformations to FHV trip data
fhvTaxiTripDataDF = fhvTaxiTripDataDF.na.drop(subset=("PULocationID", "DOLocationID"),how="all") \
                        .dropDuplicates() \
                        .select(col("Pickup_DateTime").alias("PickupTime"),                         
                                  "DropOff_DateTime", 
                                  "PUlocationID", 
                                  "DOlocationID", 
                                  "SR_Flag", 
                                  "Dispatching_base_number") \
                        .withColumnRenamed("DropOff_DateTime", "DropTime") \
                        .withColumnRenamed("PUlocationID", "PickupLocationId") \
                        .withColumnRenamed("DOlocationID", "DropLocationId") \
                        .withColumnRenamed("Dispatching_base_number", "BaseLicenseNumber") \
                        .withColumn("TripYear", year("PickupTime")) \
                        .withColumn("TripMonth", month("PickupTime")) \
                        .select("*", \
                                  dayofmonth("PickupTime").alias("TripDay") \
                               ) \
                        .withColumn("TripTimeInMinutes",
                                        round( 
                                                (unix_timestamp("DropTime") - unix_timestamp("PickupTime")) 
                                                    / 60 
                                             ) 
                                   ) \
                        .withColumn("TripType",
                                        when(col("SR_Flag") == 1,"SharedTrip") \
                                        .otherwise("SoloTrip") \
                                   ) \
                        .drop("SR_Flag") \

print("Cleaned up and applied transformations on FHV Taxi data")

In [0]:
display(fhvTaxiTripDataDF)

PickupTime,DropTime,PickupLocationId,DropLocationId,BaseLicenseNumber,TripYear,TripMonth,TripDay,TripTimeInMinutes,TripType
2018-12-02T18:21:36.000+0000,2018-12-02T18:35:12.000+0000,7.0,229.0,B02510,2018,12,2,14.0,SharedTrip
2018-12-02T23:39:42.000+0000,2018-12-02T23:42:36.000+0000,112.0,112.0,B02617,2018,12,2,3.0,SoloTrip
2018-12-09T16:25:11.000+0000,2018-12-09T16:32:43.000+0000,208.0,183.0,B02617,2018,12,9,8.0,SharedTrip
2018-12-03T02:41:23.000+0000,2018-12-03T02:48:06.000+0000,57.0,82.0,B02395,2018,12,3,7.0,SoloTrip
2018-12-08T06:37:59.000+0000,2018-12-08T06:50:29.000+0000,37.0,157.0,B02395,2018,12,8,13.0,SoloTrip
2018-12-06T15:35:41.000+0000,2018-12-06T16:02:17.000+0000,56.0,9.0,B02395,2018,12,6,27.0,SharedTrip
2018-12-02T07:57:35.000+0000,2018-12-02T08:13:36.000+0000,69.0,41.0,B02395,2018,12,2,16.0,SoloTrip
2018-12-03T05:11:15.000+0000,2018-12-03T05:32:36.000+0000,42.0,138.0,B02395,2018,12,3,21.0,SoloTrip
2018-12-06T21:06:10.000+0000,2018-12-06T21:15:09.000+0000,82.0,8.0,B02395,2018,12,6,9.0,SoloTrip
2018-12-01T11:24:59.000+0000,2018-12-01T11:38:43.000+0000,32.0,60.0,B02395,2018,12,1,14.0,SoloTrip


In [0]:
# Creating Global Temp View
fhvTaxiTripDataDF.createOrReplaceGlobalTempView("FactFhvTaxiTripData")

print("Saved FHV Taxi fact as a global temp view")

In [0]:
%sql

CREATE DATABASE TaxiServiceWarehouse;

In [0]:
print("Starting to save FHV Taxi dataframe as a fact and unmanaged table")

# Store the DataFrame as an Unmanaged Table
fhvTaxiTripDataDF \
    .write \
    .mode("overwrite") \
    .option("path", "/mnt/datalake/DimensionalModel/Facts/FhvTaxiFact.parquet") \
    .saveAsTable("TaxiServiceWarehouse.FactFhvTaxiTripData")

print("Saved FHV Taxi dataframe as a fact and unmanaged table")

In [0]:
dbutils.notebook.exit("Success")