In [1]:
%fs ls /mnt/storage

path,name,size
dbfs:/mnt/storage/FhvBases.json,FhvBases.json,464836
dbfs:/mnt/storage/fhv_tripdata_2018-12.csv,fhv_tripdata_2018-12.csv,1694806758


###Reading and tranforming Fhvtrips data

In [3]:
# Reading without specifying the schema takes long time

fhv_trips_df = spark \
              .read \
              .option('header', 'true') \
              .option('inferSchema', 'true') \
              .csv('/mnt/storage/fhv_tripdata_2018*.csv')# Read multiple files of FHV taxi data

In [4]:
# Create schema for FHV taxi data

from pyspark.sql.types import *

# Define schema for columns of Fhv trips csv file

fhv_taxi_trips_schema = StructType([
    StructField("Pickup_DateTime", TimestampType(), True),
    StructField("DropOff_datetime", TimestampType(), True),
    StructField("PUlocationID", IntegerType(), True),
    StructField("DOlocationID", IntegerType(), True),
    StructField("SR_Flag", IntegerType(), True),
    StructField("Dispatching_base_number", StringType(), True),
    StructField("Dispatching_base_num", StringType(), True)
])

In [5]:
# Apply schema to FHV taxi data

fhv_trips_df = spark \
              .read \
              .schema(fhv_taxi_trips_schema) \
              .csv('/mnt/storage/fhv_tripdata_2018-12.csv')

###Writing dataframe to a deltalake to optimize processing time

In [7]:
fhv_trips_df.write \
            .format('delta') \
            .mode('overwrite') \
            .save('/mnt/storage/fhv_trips')

In [8]:
fhv_trips_df = spark.read.format('delta').load('/mnt/storage/fhv_trips')

In [9]:
fhv_trips_df = fhv_trips_df \
                .dropna(subset=["PULocationID", "DOLocationID"]) \
                .drop_duplicates() \
                .where("Pickup_DateTime >= '2018-12-01' AND DropOff_datetime <= '2018-12-31'")

In [10]:
# fhv_trips_df.count()

In [11]:
fhv_trips_df.printSchema()

In [12]:
# Removing columns that are redundant

fhv_trips_df = fhv_trips_df \
                .select(
                 "Pickup_DateTime",
                 "DropOff_datetime",
                 "PULocationID",
                 "DOLocationID",
                 "SR_Flag",
                 "Dispatching_base_number"
                  )

fhv_trips_df.printSchema()

Alternatively in the above command you could have done fhv_trips_df.drop("Dispatching_base_num")

In [14]:
from pyspark.sql.functions import col

fhv_trips_df = fhv_trips_df.select(
                            col("Pickup_DateTime").alias("PickupTime"), 
                            "DropOff_DateTime", 
                            "PUlocationID", 
                            "DOlocationID", 
                            "SR_Flag", 
                            "Dispatching_base_number"
                         )

fhv_trips_df.printSchema()

In [15]:
fhv_trips_df = fhv_trips_df \
                        .withColumnRenamed("DropOff_DateTime", "DropTime") \
                        .withColumnRenamed("PUlocationID", "PickupLocationId") \
                        .withColumnRenamed("DOlocationID", "DropLocationId") \
                        .withColumnRenamed("Dispatching_base_number", "BaseLicenseNumber")

In [16]:
fhv_trips_df.printSchema()

In [18]:
from pyspark.sql.functions import year, month, dayofmonth

fhv_trips_df = fhv_trips_df \
                .withColumn('TripYear', year(col("PickupTime"))) \
                .withColumn('TripMonth', month(col("PickupTime"))) \
                \
                .select(
                  '*',
                  dayofmonth(col("PickupTime")).alias('TripDay')
                )

In [19]:
from pyspark.sql.functions import unix_timestamp, round

fhv_trips_df = fhv_trips_df \
                            .withColumn("TripTimeInMinutes", 
                                        round(
                                            (unix_timestamp("DropTime") - unix_timestamp("PickupTime")) 
                                                / 60
                                        )
                               )                                               


In [20]:
from pyspark.sql.functions import when

fhv_trips_df = fhv_trips_df \
                .withColumn('TripType', 
                                     when(
                                            col("SR_Flag") == 1,
                                             "SharedTrip"
                                        )
                                    .otherwise("SoloTrip") 
                           ) \
                .drop("SR_Flag")

###Reading and transforming Fhvbases data

In [22]:
%fs head /mnt/storage/FhvBases.json

In [23]:
from pyspark.sql.types import *

# Defining a complex shema ("Address" is a complex structure)

fhv_bases_schema = StructType(
  [
    StructField("License Number", StringType(), True),
    StructField("Entity Name", StringType(), True),
    StructField("Telephone Number", LongType(), True),
    StructField("SHL Endorsed", StringType(), True),
    StructField("Type of Base", StringType(), True),
    
    StructField("Address", 
                StructType([
                    StructField("Building", StringType(), True),
                    StructField("Street", StringType(), True), 
                    StructField("City", StringType(), True), 
                    StructField("State", StringType(), True), 
                    StructField("Postcode", StringType(), True)
                ]),
                True
                ),
                
    StructField("GeoLocation", 
                StructType([
                    StructField("Latitude", StringType(), True),
                    StructField("Longitude", StringType(), True), 
                    StructField("Location", StringType(), True)
                ]),
                True
              )   
  ]
)

In [24]:
# Applying the schema defined above to fhv bases df
# Applying schema Will not throw an error. 
#  If any of the fields defined in schema is not present, the value will be set to null for that
#  If any additional columns are present in the json, they will be ignored

fhv_bases_df = spark \
                .read \
                .schema(fhv_bases_schema) \
                .option('multiline', 'true') \
                .json('/mnt/storage/FhvBases.json')

# display(fhv_bases_df)

In [25]:
fhv_bases_df = fhv_bases_df \
                .select(
                          col("License Number").alias("BaseLicenseNumber"),
                          col("Type of Base").alias("BaseType"),
                          col("Address.Building").alias("AddressBuilding"),
                          col("Address.Street").alias("AddressStreet"),
                          col("Address.City").alias("AddressCity"),
                          col("Address.State").alias("AddressState"),
                          col("Address.Postcode").alias("AddressPostCode")
                        )

###Merging two dataframes

In [27]:
fhv_trips_data_with_bases_df = fhv_trips_df \
                                          .join(
                                                fhv_bases_df,
                                                how="inner",
                                                on="BaseLicenseNumber"
                                               )

In [28]:
display(fhv_trips_data_with_bases_df, limit=10)

In [29]:
fhv_trips_data_with_bases_df.printSchema()

###Generating Report

In [31]:
from pyspark.sql.functions import sum

# Python in built sum function won't work as expected in the following line. 
# Need the pyspark sum function which will take a column as argrument and sum the values in it

fhv_trips_report = fhv_trips_data_with_bases_df \
                    .groupBy(["AddressCity", "BaseType"]) \
                    .agg(sum("TripTimeInMinutes")) \
                    .withColumnRenamed("sum(TripTimeInMinutes)", "TotalTripTime") \
                    .orderBy(["AddressCity", "BaseType"])

###All Operations chained together

In [33]:
# Create schema for FHV taxi data

from pyspark.sql.functions import col, year, month, dayofmonth, unix_timestamp, round, when

# ----------------------------- COMMENT OUT THE BELOW SECTION IF DATA IS ALREADY STORED IN DELTA LAKE (saves time) ------------------------------------------------


# Define schema for columns of Fhv trips csv file

fhv_taxi_trips_schema = StructType([
    StructField("Pickup_DateTime", TimestampType(), True),
    StructField("DropOff_datetime", TimestampType(), True),
    StructField("PUlocationID", IntegerType(), True),
    StructField("DOlocationID", IntegerType(), True),
    StructField("SR_Flag", IntegerType(), True),
    StructField("Dispatching_base_number", StringType(), True),
    StructField("Dispatching_base_num", StringType(), True)
])


# Apply schema to FHV taxi data

fhv_trips_df = spark \
              .read \
              .schema(fhv_taxi_trips_schema) \
              .csv('/mnt/storage/fhv_tripdata_2018-12.csv')


# Writing dataframe to a deltalake to optimize processing time

fhv_trips_df.write \
            .format('delta') \
            .mode('overwrite') \
            .save('/mnt/storage/fhv_trips')

# ----------------------------- COMMENT OUT THE ABOVE SECTION IF DATA IS ALREADY STORED IN DELTA LAKE ------------------------------------------------

fhv_trips_df = spark.read.format('delta').load('/mnt/storage/fhv_trips')

fhv_trips_df = fhv_trips_df \
                .dropna(subset=["PULocationID", "DOLocationID"]) \
                .drop_duplicates() \
                .where("Pickup_DateTime >= '2018-12-01' AND DropOff_datetime <= '2018-12-31'") \
                \
                .select(
                            col("Pickup_DateTime").alias("PickupTime"), 
                            "DropOff_DateTime", 
                            "PUlocationID", 
                            "DOlocationID", 
                            "SR_Flag", 
                            "Dispatching_base_number"
                         ) \
                \
                .withColumnRenamed("DropOff_DateTime", "DropTime") \
                .withColumnRenamed("PUlocationID", "PickupLocationId") \
                .withColumnRenamed("DOlocationID", "DropLocationId") \
                .withColumnRenamed("Dispatching_base_number", "BaseLicenseNumber") \
                \
                .withColumn('TripYear', year(col("PickupTime"))) \
                .withColumn('TripMonth', month(col("PickupTime"))) \
                .select(
                  '*',
                  dayofmonth(col("PickupTime")).alias('TripDay')
                ) \
                \
                .withColumn("TripTimeInMinutes", 
                          round(
                              (unix_timestamp("DropTime") - unix_timestamp("PickupTime")) 
                                  / 60
                          )
                 ) \
                .withColumn('TripType', 
                                     when(
                                            col("SR_Flag") == 1,
                                             "SharedTrip"
                                        )
                                    .otherwise("SoloTrip") 
                           ) \
                .drop("SR_Flag")



In [34]:
from pyspark.sql.types import *

# Defining a complex shema ("Address" is a complex structure)

fhv_bases_schema = StructType(
  [
    StructField("License Number", StringType(), True),
    StructField("Entity Name", StringType(), True),
    StructField("Telephone Number", LongType(), True),
    StructField("SHL Endorsed", StringType(), True),
    StructField("Type of Base", StringType(), True),
    
    StructField("Address", 
                StructType([
                    StructField("Building", StringType(), True),
                    StructField("Street", StringType(), True), 
                    StructField("City", StringType(), True), 
                    StructField("State", StringType(), True), 
                    StructField("Postcode", StringType(), True)
                ]),
                True
                ),
                
    StructField("GeoLocation", 
                StructType([
                    StructField("Latitude", StringType(), True),
                    StructField("Longitude", StringType(), True), 
                    StructField("Location", StringType(), True)
                ]),
                True
              )   
  ]
)


# Applying the schema defined above to fhv bases df
# Applying schema Will not throw an error. 
#  If any of the fields defined in schema is not present, the value will be set to null for that
#  If any additional columns are present in the json, they will be ignored

fhv_bases_df = spark \
                .read \
                .schema(fhv_bases_schema) \
                .option('multiline', 'true') \
                .json('/mnt/storage/FhvBases.json')

# display(fhv_bases_df)

fhv_bases_df = fhv_bases_df \
                .select(
                          col("License Number").alias("BaseLicenseNumber"),
                          col("Type of Base").alias("BaseType"),
                          col("Address.Building").alias("AddressBuilding"),
                          col("Address.Street").alias("AddressStreet"),
                          col("Address.City").alias("AddressCity"),
                          col("Address.State").alias("AddressState"),
                          col("Address.Postcode").alias("AddressPostCode")
                        )

In [35]:
fhv_trips_data_with_bases_df = fhv_trips_df \
                                          .join(
                                                fhv_bases_df,
                                                how="inner",
                                                on="BaseLicenseNumber"
                                               )

###Create Temp Views and use Spark SQL

In [37]:
fhv_trips_df.createOrReplaceTempView("LocalFhvTaxiTripsData")

In [38]:
sql_based_dF = spark.sql("SELECT * FROM LocalFhvTaxiTripsData WHERE BaseLicenseNumber = 'B02510'")

display(sql_based_dF.limit(10))

PickupTime,DropTime,PickupLocationId,DropLocationId,BaseLicenseNumber,TripYear,TripMonth,TripDay,TripTimeInMinutes,TripType
2018-12-20T11:09:58.000+0000,2018-12-20T12:11:16.000+0000,255,186,B02510,2018,12,20,61.0,SharedTrip
2018-12-20T11:14:41.000+0000,2018-12-20T11:29:07.000+0000,40,65,B02510,2018,12,20,14.0,SoloTrip
2018-12-20T11:16:43.000+0000,2018-12-20T12:21:38.000+0000,244,227,B02510,2018,12,20,65.0,SoloTrip
2018-12-20T11:11:29.000+0000,2018-12-20T11:40:53.000+0000,155,52,B02510,2018,12,20,29.0,SoloTrip
2018-12-20T11:11:44.000+0000,2018-12-20T11:21:24.000+0000,32,94,B02510,2018,12,20,10.0,SoloTrip
2018-12-20T11:14:51.000+0000,2018-12-20T11:30:12.000+0000,225,61,B02510,2018,12,20,15.0,SoloTrip
2018-12-20T11:09:31.000+0000,2018-12-20T11:38:23.000+0000,35,97,B02510,2018,12,20,29.0,SoloTrip
2018-12-20T11:14:44.000+0000,2018-12-20T11:40:35.000+0000,216,197,B02510,2018,12,20,26.0,SoloTrip
2018-12-20T11:11:18.000+0000,2018-12-20T11:29:25.000+0000,76,28,B02510,2018,12,20,18.0,SoloTrip
2018-12-20T11:12:02.000+0000,2018-12-20T12:40:14.000+0000,148,265,B02510,2018,12,20,88.0,SoloTrip


In [39]:
%sql

SELECT * 
FROM LocalFhvTaxiTripsData
WHERE BaseLicenseNumber = 'B02510'
LIMIT 10

PickupTime,DropTime,PickupLocationId,DropLocationId,BaseLicenseNumber,TripYear,TripMonth,TripDay,TripTimeInMinutes,TripType
2018-12-20T11:09:58.000+0000,2018-12-20T12:11:16.000+0000,255,186,B02510,2018,12,20,61.0,SharedTrip
2018-12-20T11:14:41.000+0000,2018-12-20T11:29:07.000+0000,40,65,B02510,2018,12,20,14.0,SoloTrip
2018-12-20T11:16:43.000+0000,2018-12-20T12:21:38.000+0000,244,227,B02510,2018,12,20,65.0,SoloTrip
2018-12-20T11:11:29.000+0000,2018-12-20T11:40:53.000+0000,155,52,B02510,2018,12,20,29.0,SoloTrip
2018-12-20T11:11:44.000+0000,2018-12-20T11:21:24.000+0000,32,94,B02510,2018,12,20,10.0,SoloTrip
2018-12-20T11:14:51.000+0000,2018-12-20T11:30:12.000+0000,225,61,B02510,2018,12,20,15.0,SoloTrip
2018-12-20T11:09:31.000+0000,2018-12-20T11:38:23.000+0000,35,97,B02510,2018,12,20,29.0,SoloTrip
2018-12-20T11:14:44.000+0000,2018-12-20T11:40:35.000+0000,216,197,B02510,2018,12,20,26.0,SoloTrip
2018-12-20T11:11:18.000+0000,2018-12-20T11:29:25.000+0000,76,28,B02510,2018,12,20,18.0,SoloTrip
2018-12-20T11:12:02.000+0000,2018-12-20T12:40:14.000+0000,148,265,B02510,2018,12,20,88.0,SoloTrip


In [40]:
fhv_trips_data_with_bases_df.createOrReplaceGlobalTempView("FactFhvTaxiTripData")