#### This notebook demonstrates how to load data from Azure Open Datasets into Delta Lake
#### and how to load data using comparison method incrementally month by month.
 
#### The particular dataset being used in this example is the New York City Green Taxi dataset.

In [5]:
# Loading the necessary libraries
from pyspark.sql import SparkSession
from azureml.opendatasets import NycTlcGreen
from datetime import datetime
from dateutil import parser,relativedelta
import pyspark.sql.functions as f
from pyspark.sql.functions import year, month, dayofmonth, dayofweek, hour, to_date, col, quarter, explode, sequence, expr,current_timestamp
from pyspark.sql.types import StructType, StructField, IntegerType, TimestampType, DoubleType, StringType
from delta.tables import DeltaTable


StatementMeta(, 29fb1d8f-d200-4a2f-b7fb-8dad3aa89d5f, 9, Finished, Available)

##### We set the Parquet Vorder to be enabled. This is a performance setting that can help speed up reading Parquet files in one lake

In [4]:
%%sql 
SET spark.sql.parquet.vorder.enabled=TRUE

StatementMeta(, 29fb1d8f-d200-4a2f-b7fb-8dad3aa89d5f, 8, Finished, Available)

<Spark SQL result set with 1 rows and 2 fields>

In [30]:
delta_load = 1 # Set to 1 for incremental load, 0 for full load
start_date_param = '2014-05-01' # Set your desired start date
end_date_param = '2018-06-06' # Set your desired end date

StatementMeta(, , , Waiting, )

##### We check if the Delta Table already exists. If it does, we fetch the max date to use as the start date for our data fetching.

In [31]:
# If the table exists, find the max date for incremental load
if (delta_load == 1 or delta_load is None) and spark._jsparkSession.catalog().tableExists("nycgreentaxi"):
    max_date_df = spark.sql("SELECT MAX(date_key) as max_date FROM nycgreentaxi")
    max_date = max_date_df.collect()[0]['max_date']
    start_date = datetime.combine(max_date, datetime.min.time()) if max_date else parser.parse('2014-05-01')
    end_date =  start_date + relativedelta.relativedelta(months=1)
elif (delta_load == 1 or delta_load is None) and not spark._jsparkSession.catalog().tableExists("nycgreentaxi"):
    start_date = parser.parse(start_date_param)
    end_date =  start_date + relativedelta.relativedelta(months=1)
else: # delta_load == 0
    # For full load, we use the parameter start and end dates
    start_date = parser.parse(start_date_param)
    end_date = parser.parse(end_date_param)

StatementMeta(, , , Waiting, )

In [32]:
print(start_date)
print(end_date)

StatementMeta(, , , Waiting, )

2014-06-01 00:00:00
2014-07-01 00:00:00


##### We define the schema for our data. This helps ensure that the data types are correct when we load the data into the DataFrame.

In [33]:
# Define the schema for the NYC Green Taxi data
schema = StructType([
    StructField('vendorID', IntegerType(), True),
    StructField('lpepPickupDatetime', TimestampType(), True),
    StructField('lpepDropoffDatetime', TimestampType(), True),
    StructField('passengerCount', IntegerType(), True),
    StructField('tripDistance', DoubleType(), True),
    StructField('puLocationId', StringType(), True),
    StructField('doLocationId', StringType(), True),
    StructField('pickupLongitude', DoubleType(), True),
    StructField('pickupLatitude', DoubleType(), True),
    StructField('dropoffLongitude', DoubleType(), True),
    StructField('dropoffLatitude', DoubleType(), True),
    StructField('rateCodeID', IntegerType(), True),
    StructField('storeAndFwdFlag', StringType(), True),
    StructField('paymentType', IntegerType(), True),
    StructField('fareAmount', DoubleType(), True),
    StructField('extra', DoubleType(), True),
    StructField('mtaTax', DoubleType(), True),
    StructField('improvementSurcharge', StringType(), True),
    StructField('tipAmount', DoubleType(), True),
    StructField('tollsAmount', DoubleType(), True),
    StructField('ehailFee', DoubleType(), True),
    StructField('totalAmount', DoubleType(), True),
    StructField('tripType', DoubleType(), True)
   
])

StatementMeta(, , , Waiting, )

##### We fetch the data from the Azure Open Dataset for Nyc green taxi and load it into a DataFrame  We then add new columns for year, month, day of month, day of week, hour, and date key and then write the same in to Delta Table.

In [34]:
# Loading data chunk by chunk (1 month at a time)
while start_date <= end_date:
    chunk_end_date = min(start_date + relativedelta.relativedelta(months=1), end_date)
    # Load the data for this chunk
    nyc_tlc = NycTlcGreen(start_date=start_date, end_date=chunk_end_date)
    nyc_tlc_df = nyc_tlc.to_s_dataframe()
    nyc_tlc_df_spark = spark.createDataFrame(nyc_tlc_df, schema)

    # Transform the DataFrame
    nyc_tlc_df_transformed = nyc_tlc_df_spark.selectExpr(
        "*", 
        "year(lpepPickupDatetime) as year",
        "month(lpepPickupDatetime) as month",
        "dayofmonth(lpepPickupDatetime) as day_of_month",
        "dayofweek(lpepPickupDatetime) as day_of_week",
        "hour(lpepPickupDatetime) as hour",
        "to_date(lpepPickupDatetime) as date_key"
    ).withColumn("bronze_loaded_timestamp", current_timestamp())

    # Writing the transformed data to the Delta table
    nyc_tlc_df_transformed.write.format('delta').mode("append").saveAsTable("nycgreentaxi")

    # Move the start_date to the next month
    start_date = chunk_end_date + relativedelta.relativedelta(days=1)

StatementMeta(, , , Waiting, )

[Info] read from /tmp/tmp3ds6bbtw/https%3A/%2Fazureopendatastorage.azurefd.net/nyctlc/green/puYear=2014/puMonth=6/part-00122-tid-4753095944193949832-fee7e113-666d-4114-9fcb-bcd3046479f3-2692-1.c000.snappy.parquet


[Info] read from /tmp/tmp3ds6bbtw/https%3A/%2Fazureopendatastorage.azurefd.net/nyctlc/green/puYear=2014/puMonth=7/part-00194-tid-4753095944193949832-fee7e113-666d-4114-9fcb-bcd3046479f3-2764-1.c000.snappy.parquet




##### Check the Max date which has been loaded to far

In [35]:
df = spark.sql("SELECT MAX(date_key) as max_date FROM NYCGreenTaxi")
display(df)

StatementMeta(, , , Waiting, )

SynapseWidget(Synapse.DataFrame, 0d0805a2-d7af-4fc7-947a-5a1549677625)

##### Maintaince of File Size

In [None]:
spark.sql("VACUUM bronze_nyc_tlc_green.nycgreentaxi RETAIN 168 HOURS") 
spark.conf.set("spark.databricks.delta.optimize.maxFileSize", 134217728)  # 128MB
spark.sql("OPTIMIZE bronze_nyc_tlc_green.nycgreentaxi")

StatementMeta(, , , Waiting, )

DataFrame[path: string, metrics: struct<numFilesAdded:bigint,numFilesRemoved:bigint,filesAdded:struct<min:bigint,max:bigint,avg:double,totalFiles:bigint,totalSize:bigint>,filesRemoved:struct<min:bigint,max:bigint,avg:double,totalFiles:bigint,totalSize:bigint>,partitionsOptimized:bigint,zOrderStats:struct<strategyName:string,inputCubeFiles:struct<num:bigint,size:bigint>,inputOtherFiles:struct<num:bigint,size:bigint>,inputNumCubes:bigint,mergedFiles:struct<num:bigint,size:bigint>,numOutputCubes:bigint,mergedNumCubes:bigint>,numBatches:bigint,totalConsideredFiles:bigint,totalFilesSkipped:bigint,preserveInsertionOrder:boolean,numFilesSkippedToReduceWriteAmplification:bigint,numBytesSkippedToReduceWriteAmplification:bigint,startTimeMs:bigint,endTimeMs:bigint,totalClusterParallelism:bigint,totalScheduledTasks:bigint,autoCompactParallelismStats:struct<maxClusterActiveParallelism:bigint,minClusterActiveParallelism:bigint,maxSessionActiveParallelism:bigint,minSessionActiveParallelism:bigint>>]

##### Another method of comparision using delta logs to find the last written date in bronze's delta table

In [None]:
Describe_df= spark.sql("DESCRIBE history nycgreentaxi")
max_write_date = Describe_df.filter("operation = 'WRITE'").agg({"Timestamp": "max"}).collect()[0][0]
print(max_write_date)


StatementMeta(, , , Waiting, )

2023-06-18 19:44:43.750000


#### Conclusion

##### This notebook demonstrated how to load data from Azure Open Datasets into a Delta Table and perform some basic data transformations. and load data month by month using comparision incrementalload. This provides a scalable and efficient way to handle large amounts of data. From here, we will use the data to move in Silver layer for further enrichment