# Enrich NYC Yellow Taxi Data with Holiday and Weather

In this session, we will be working on enriching NYC Yellow Taxi data by combining it with Holiday and Weather datasets. 

To get started, we will be using three separate notebooks, following the best practices for Fabric Lakehouse. In the first notebook, we will read the datasets from a Public Azure Storage account and save the data in the BronzeLakehouse. 

Next, we will be using multiple notebooks to enrich the NYC Yellow Taxi data. In the first notebook, we will provide examples of how to load and clean the NYC Yellow Taxi data.

 In the other notebooks, we will be focusing on reading Open Datasets, manipulating the data to prepare for further analysis (including column projection, filtering, grouping, and joins), creating a Spark table for modeling training, and building a Lakehouse table to generate PBI reports.


 <font size="2" color="red" face="sans-serif" bold> 

<b> <i> <u>Make sure that for every Notebook the right Lakehouse is pinned. Basically for this notebook you need to see the BronzeLakehouse under Lakehouses on the left.
</font>





Importing the Data types we need for this Notebook

In [1]:
from pyspark.sql.types import StructType, StructField, IntegerType, DoubleType, StringType, TimestampType
import pyspark.sql.functions as f

StatementMeta(, 14acf7cb-cee9-4b57-9826-f9e4911a33d3, 3, Finished, Available)

## Data Ingestion New York Yellow Taxi Data Set
We are going to load the New York Yellow Taxi data from a Public Azure Storage Account.

<font size="2" color="red" face="sans-serif" bold> 

<b> <i> <u>No changes are required to this cell, This cell have all the necessary credentials to Ingest data from storage account
</font>

In [2]:
# Azure storage access info for open datasets yellow cab
storage_account = "azuresynapsestorage"
container = "sampledata"

sas_token = r"" # Blank since container is Anonymous access

# Set Spark config to access  blob storage
spark.conf.set(f"fs.azure.sas.{container}.{storage_account}.blob.core.windows.net", sas_token)

# NOTE TO FELLOW AUTHORS:
# We can now easily read all of the parquet files in the directory just by pointing Spark at the directory
# But if we need to restrict the years that we read (maybe to match the weather data), then we
# can use the "pathGlobFilter" parameter.  So line 18 (below) would be something like this:
#                .parquet(f"wasbs://{container}@{storage_account}.blob.core.windows.net/Fabric/NYCTaxiDataset/yellow/", pathGlobFilter="20{18,19,21}.parquet") \

# Read the parquet files
nyc_tlc_df = spark.read \
                .parquet(f"wasbs://{container}@{storage_account}.blob.core.windows.net/Fabric/NYCTaxiDataset/yellow/") \
                .cache()

StatementMeta(, 14acf7cb-cee9-4b57-9826-f9e4911a33d3, 4, Finished, Available)

In [None]:
# Look at the data that we read
display(nyc_tlc_df)

Now as we read the data from the external Storage account, we can save the files on the bronze layer. Hence whenever we need to come back to the original files, we can simply read the raw parquet file from Bronze Layer.

<font size="2" color="red" face="sans-serif" bold> 

<b> <i> <u>Make sure that the BronzeLakehouse is pinned for this Notebook.   
</font>

In [4]:
# Save the data Frame renamedcolumns_df_expand as raw files in the Bronze Layer
# You can also use the ABFSS path from BronzeLake Files #Sample ABFSS Path "abfss://d239837d-5508-4a0c-acf9-8699feb71c5a@msit-onelake.dfs.fabric.microsoft.com/22c420ed-63ea-4c95-9d01-302573d1d5db/Files/NYCTaxiRawFiles"
# Since we have the BronzeLakehouse pinned for this notebook we just use Files/NYCTaxiRawFiles

nyc_tlc_df.write.mode("overwrite").parquet("Files/NYCTaxiRawFiles")
print ("Raw files saved on the Bronze layer")


StatementMeta(, 14acf7cb-cee9-4b57-9826-f9e4911a33d3, 6, Finished, Available)

Raw files saved on the Bronze layer


As the data has been loaded into the Data Frame, let's explore the data.
The `nyc_tlc_df` dataframe contains several years of data.  Let's see which years we have and how many taxi trips were recorded in each of those years.

In [None]:
nyc_tlc_df.groupBy(f.year("tpep_pickup_datetime").alias("year")).count().orderBy("year").show()

Now that the initial data is loaded. Let's do some projection on the data to 
- create new columns for the month number, day of month, day of week, and hour of day. These info is going to be used in the training model to factor in time-based seasonality.
- add a static feature for the country code to join holiday data. 

In [None]:
# renaming the columns from the raw big data set to better run the queries(in the end the big data set will look like the one that can be imported from Azure Open DataSets)
renamedcolumns_df = nyc_tlc_df\
                    .withColumnRenamed("VendorID", "vendorID")\
                    .withColumnRenamed("tpep_Pickup_DateTime", "tpepPickupDateTime")\
                    .withColumnRenamed("tpep_dropoff_datetime", "tpepDropoffDatetime")\
                    .withColumnRenamed("passenger_count", "passengerCount")\
                    .withColumnRenamed("RatecodeID", "ratecodeID")\
                    .withColumnRenamed("trip_distance", "tripDistance")\
                    .withColumnRenamed("store_and_fwd_flag", "storeAndFwdFlag")\
                    .withColumnRenamed("PULocationID", "pULocationID")\
                    .withColumnRenamed("DOLocationID", "doLocationID")\
                    .withColumnRenamed("payment_type", "paymentType")\
                    .withColumnRenamed("fare_amount", "fareAmount")\
                    .withColumnRenamed("extra", "extra")\
                    .withColumnRenamed("mta_tax", "mtaTax")\
                    .withColumnRenamed("tip_amount", "tipAmount")\
                    .withColumnRenamed("tolls_amount", "tollsAmount")\
                    .withColumnRenamed("improvement_surcharge", "improvementSurcharge")\
                    .withColumnRenamed("total_amount", "totalAmount")\
                    .withColumnRenamed("congestion_surcharge", "congestionSurcharge")\
                    .withColumnRenamed("airport_fee", "airportFee")

display(renamedcolumns_df)

After the data is read, we'll want to do some initial filtering to clean the dataset. Hence we need to extract the year, month, day of the month and day of the week that will be further used in the joins with Holiday and Weather datasets.


In [6]:
# Extract month, day of month, and day of week from pickup datetime and add a static column for the country code to join holiday data. 
renamedcolumns_df_expand = renamedcolumns_df.withColumn('datetime',f.to_timestamp('tpepPickupDateTime'))\
                            .withColumn('year',f.year(renamedcolumns_df.tpepPickupDateTime))\
                            .withColumn('month_num',f.month(renamedcolumns_df.tpepPickupDateTime))\
                            .withColumn('day_of_month',f.dayofmonth(renamedcolumns_df.tpepPickupDateTime))\
                            .withColumn('day_of_week',f.dayofweek(renamedcolumns_df.tpepPickupDateTime))\
                            .withColumn('hour_of_day',f.hour(renamedcolumns_df.tpepPickupDateTime))\
                            .withColumn('country_code',f.lit('US'))

StatementMeta(, 16415818-681f-4094-9353-9cfc64909408, 8, Finished, Available)

 Storing the DataFrame 
Now with its columns renamed, and the data typle columns extraction into a table within the Bronze Layer. Going forward, there's no need to retrieve the data from the Storage account and extract the dates and execute the column renaming operations. Instead, we can just generate a new data frame by directly accessing the Bronze Layer.

<font size="2" color="red" face="sans-serif" bold> 

<b> <i> <u>Make sure that the BronzeLakehouse is pinned for this Notebook.   
</font>

In [7]:
# Save the data Frame renamedcolumns_df_expand as a table in the Bronze Layer
# Example for the last line in this cell: save("abfss://d239837d-5508-4a0c-acf9-8699feb71c5a@msit-onelake.dfs.fabric.microsoft.com/22c420ed-63ea-4c95-9d01-302573d1d5db" + table_name)
# Use the ABFSS path from BronzeLake Tables  

from pyspark.sql.functions import col, year, month, quarter

table_name = 'NYCTaxi_Raw'



renamedcolumns_df_expand \
    .write \
    .mode("overwrite") \
    .format("delta") \
    .save("Tables/" + table_name)

StatementMeta(, 16415818-681f-4094-9353-9cfc64909408, 9, Finished, Available)

#### Reading the Data from Files vs Reading the Data from Table

On the next 2 Cells we will see the difference to read the data from files vs Tables. 
Reading from files is much faster than reading from tables.



In [None]:
# you can also run the bellow command as follow: renamedcolumns_df_expand = spark.read.parquet("abfss://d239837d-5508-4a0c-acf9-8699feb71c5a@msit-onelake.dfs.fabric.microsoft.com/22c420ed-63ea-4c95-9d01-302573d1d5db/Files/NYCTaxiRawFiles")
# The abfss path will be used when you read the data from a different lakehouse than is pinned to the current Notebook.
renamedcolumns_df_expand = spark.read.parquet("Files/NYCTaxiRawFiles")
display(renamedcolumns_df_expand)

Reading the data from the table. Make sure that you replace with your BronzeLakehouse 

In [None]:
# You will be able to read data from different tables from different lakehouses from the same workspace by creating a dataframe
# df = spark.read.table("LakehouseName.Tablename")
# In this example 
renamedcolumns_df_expand = spark.read.table("BronzeLakehouse.NYCTaxi_Raw")
display(renamedcolumns_df_expand)

We might remove unneeded columns from the dataframe. This will reduce the overall size of the dataset and in the end we will have less columns to deal with

In [22]:
# Remove unused columns from nyc yellow data

columns_to_remove = [
                     "lpepDropoffDatetime", "puLocationId", "doLocationId", "pickupLongitude", 
                     "pickupLatitude", "dropoffLongitude","dropoffLatitude" ,"rateCodeID", 
                     "storeAndFwdFlag","paymentType", "fareAmount", "extra", "mtaTax",
                     "improvementSurcharge", "tollsAmount", "ehailFee", "tripType"  
                    ]

nyc_tlc_df_clean = renamedcolumns_df_expand.drop(*columns_to_remove)

StatementMeta(, , , Waiting, )

Let's display the new dataframe to check on the columns we have selected

In [None]:
display(nyc_tlc_df_clean)

By printing the schema of the DataFrame, we gain a deeper understanding of the data types and the specific columns that have been selected. This allows us to more effectively analyze and manipulate the data to extract meaningful insights.

In [33]:
nyc_tlc_df_clean.printSchema()

StatementMeta(, , , Waiting, )

root
 |-- vendorID: long (nullable = true)
 |-- tpepPickupDateTime: timestamp (nullable = true)
 |-- tpepDropoffDatetime: timestamp (nullable = true)
 |-- passengerCount: long (nullable = true)
 |-- tripDistance: double (nullable = true)
 |-- tipAmount: double (nullable = true)
 |-- totalAmount: double (nullable = true)
 |-- congestionSurcharge: double (nullable = true)
 |-- airportFee: double (nullable = true)
 |-- source_file: string (nullable = true)
 |-- datetime: timestamp (nullable = true)
 |-- year: integer (nullable = true)
 |-- month_num: integer (nullable = true)
 |-- day_of_month: integer (nullable = true)
 |-- day_of_week: integer (nullable = true)
 |-- hour_of_day: integer (nullable = true)
 |-- country_code: string (nullable = false)



We are exploring further the dataframe by filtering it by the year

In [None]:
display(nyc_tlc_df_clean.filter("year == 2019"))

We can filter one more time to check against what years we have loaded the dataframe

In [27]:
display(nyc_tlc_df_clean.select("year").distinct().orderBy("year"))

StatementMeta(, , , Waiting, )

## Writing a Delta Table
To begin, we have to check the existing tables in the SilverLakehouse. Once this is done, we should check whether the desired table has been successfully created. If the table exists, we can proceed by uncommenting the command provided below and executing it

In [16]:
%%sql
--DROP TABLE IF EXISTS Silverlakehouse.NYCTaxi_Clean


StatementMeta(, , , Waiting, )

As the final step in our data processing pipeline, we can save the cleaned and filtered DataFrame, `nyc_tlc_df_clean`, as a table in the Silver layer. However, since we are saving the table in a different Lakehouse, namely Silver, we need to provide the ABFSS path. 

<font size="2" color="red" face="sans-serif"> 
Replace what is inside of <> with the abfss path from your SilverLakehouse  
</font>

Please note that the ABFSS path will be unique to your specific environment and should look similar to the following:

`abfss://d239837d-5508-4a0c-acf9-8699feb71c5a@msit-onelake.dfs.fabric.microsoft.com/a6114b59-17f1-45b7-a5f1-9d4fd93a92d8/Tables/`

To retrieve the ABFSS path, please follow these steps:
 1. Go to the Workspace and click on the SilverLakehouse.
 1. Navigate to the Tables section and select the desired table.
 1. Click on Properties and copy the ABFSS path provided.

By following these steps, you can ensure that your DataFrame is saved in the appropriate location and can be easily accessed for any future data transformations.

In [21]:
# Save the DF clean file as a table in the SilverLakehouse
# Example of abfss path:.save("abfss://d239837d-5508-4a0c-acf9-8699feb71c5a@msit-onelake.dfs.fabric.microsoft.com/a6114b59-17f1-45b7-a5f1-9d4fd93a92d8/Tables/" + table_name)
from pyspark.sql.functions import col, year, month, quarter

table_name = 'NYCTaxi_Clean'

#df = spark.read.format("parquet").load('Files/green_tripdata_2018-07.parquet')
#df.write.mode("overwrite").format("delta").partitionBy("Year","Quarter").save("Tables/" + table_name)

nyc_tlc_df_clean \
    .write \
    .mode("overwrite") \
    .format("delta") \
    .save("<Replace with your SilverLakehouse abfss path>/Tables/" + table_name)

StatementMeta(, , , Waiting, )

Now we just read the table we created in the Silverlakehouse by creating a dataframe and run the display command

In [None]:
df = spark.read.table("Silverlakehouse.NYCTaxi_Clean")
display(df)

StatementMeta(, , , Waiting, )

##  Data Ingestion Holiday data set
We are going to load the Holiday  data from a Public Azure Storage Account. This is the second dataset that we are using for this lab and it will be joined with the New York Yellow Taxi data.

<font size="2" color="red" face="sans-serif" bold> 

<b> <i> <u>No changes are Required to this Cell,This Cell have all the Necessary Credentials to Ingest Data from Storage Account
</font>



In [None]:
# Azure storage access info for open datasets yellow cab
storage_account = "azuresynapsestorage"
container = "sampledata"

sas_token = r"" # Blank since container is Anonymous access

# Set Spark config to access  blob storage
spark.conf.set("fs.azure.sas.%s.%s.blob.core.windows.net" % (container, storage_account),sas_token)

wasbs_path = f"wasbs://{container}@{storage_account}.blob.core.windows.net/Fabric/Holiday/"

hol_df = spark.read.format("csv").option("header","true").load(wasbs_path)

Now as we read the data from the external Storage account, we can save the files on the bronze layer. Hence whenever we need to come back to the original files, we can simply read the raw parquet file from Bronze Layer

In [None]:
# Save the data Frame hol_df as raw file in the Bronze Layer
# Example of abfss path:abfss://d239837d-5508-4a0c-acf9-8699feb71c5a@msit-onelake.dfs.fabric.microsoft.com/22c420ed-63ea-4c95-9d01-302573d1d5db

hol_df.write.mode("overwrite").parquet("Files/HolidayRawFiles")
print("Holiday Raw files saved on the Bronze layer")

StatementMeta(, , , Waiting, )

Holiday Raw files saved on the Bronze layer


<font size="2" color="red" face="sans-serif" bold> 

<b> <i> <u> Since we still have the BronzeLakehouse attached to this Notebook, we can read the files without specifying the abfss path
</font>


In [None]:
# Here we are going to read the data directly from the bronze layer
# you can use the abfss path as well to poin to the exact location of the files: abfss://d239837d-5508-4a0c-acf9-8699feb71c5a@msit-onelake.dfs.fabric.microsoft.com/22c420ed-63ea-4c95-9d01-302573d1d5db/

# Let's display the hol_df data frame from the Bronze files 

hol_df = spark.read.format("parquet").load("Files/HolidayRawFiles/")
display(hol_df)

##  Data Ingestion Weather data set
We are going to load the Weather data from a Public Azure Storage Account. This is the third and the last dataset that we are using for this lab and it will be joined with the New York Yellow Taxi data and Holiday dataset.

<font size="4" color="red" face="sans-serif"> 
<font size="2" color="red" face="sans-serif" bold> 

<b> <i> <u>
This cell have all the necessary credentials to Ingest data from storage account
No changes are required to this cell  
</font>

In [None]:
# Reading the data from the Azure Public Synapse Storage Account
storage_account = "azuresynapsestorage"
container = "sampledata"

sas_token = r"" # Blank since container is Anonymous access

# Set Spark config to access  blob storage
spark.conf.set("fs.azure.sas.%s.%s.blob.core.windows.net" % (container, storage_account),sas_token)

isd_df  = spark.read.parquet(f"wasbs://{container}@{storage_account}.blob.core.windows.net/Fabric/Weather") \
                .withColumn("source_file", f.input_file_name()) \
                .cache()

StatementMeta(, , , Waiting, )

In [None]:
# Save the Data Frame isd_df as raw file in the BronzeLakehouse

isd_df.write.mode("overwrite").parquet("Files/WeatherRawFiles")
print("Weather Raw files saved on the Bronze layer")

StatementMeta(, , , Waiting, )

Weather Raw files saved on the Bronze layer
