####Authentication

In [0]:
spark.conf.set("fs.azure.account.auth.type.'storage-account'.dfs.core.windows.net", "OAuth")
spark.conf.set("fs.azure.account.oauth.provider.type.'storage-account'.dfs.core.windows.net", "org.apache.hadoop.fs.azurebfs.oauth2.ClientCredsTokenProvider")
spark.conf.set("fs.azure.account.oauth2.client.id.'storage-account'.dfs.core.windows.net", 'application-id')
spark.conf.set("fs.azure.account.oauth2.client.secret.'storage-account'.dfs.core.windows.net", 'service_credential')
spark.conf.set("fs.azure.account.oauth2.client.endpoint.'storage-account'.dfs.core.windows.net", "https://login.microsoftonline.com/'directory-id'/oauth2/token")

In [0]:
#checking access to the storage account
dbutils.fs.ls("abfss://bronze@nyctaxivamsi.dfs.core.windows.net/")

###Importing Necessary functions

In [0]:
from pyspark.sql.functions import *

In [0]:
trip_type_df = spark.read.format("csv")\
                        .option("header", "true")\
                        .option("inferschema", True)\
                        .load("abfss://bronze@nyctaxivamsi.dfs.core.windows.net/trip_type")

In [0]:
trip_zone_df = spark.read.format("csv")\
                        .option("header", "true")\
                        .option("inferschema", True)\
                        .load("abfss://bronze@nyctaxivamsi.dfs.core.windows.net/trip_zone")

In [0]:
myschema = """
        VendorID BIGINT,
        lpep_pickup_datetime TIMESTAMP,
        lpep_dropoff_datetime TIMESTAMP,
        store_and_fwd_flag STRING,
        RatecodeID BIGINT,
        PULocationID BIGINT,
        DOLocationID BIGINT,
        passenger_count BIGINT,
        trip_distance DOUBLE,
        fare_amount DOUBLE,
        extra DOUBLE,
        mta_tax DOUBLE,
        tip_amount DOUBLE,
        tolls_amount DOUBLE,
        ehail_fee DOUBLE,
        improvement_surcharge DOUBLE,
        total_amount DOUBLE,
        payment_type BIGINT,
        trip_type BIGINT,
        congestion_surcharge DOUBLE
"""


In [0]:
trip_data_df = spark.read.format("parquet")\
                        .option("header", "true")\
                        .schema(myschema)\
                        .option("recursiveFileLookup", "true")\
                        .load("abfss://bronze@nyctaxivamsi.dfs.core.windows.net/trip-data")

#Transformations

##trip_type

In [0]:
#Remaning description column to trip_decription
trip_type_df = trip_type_df.withColumnRenamed("description", "trip_description")
display(trip_type_df)

In [0]:
#Writing trip_type_df to silver as parquet
trip_type_df.write.format("parquet")\
                    .mode("append")\
                    .option('path',"abfss://silver@nyctaxivamsi.dfs.core.windows.net/trip_type")\
                    .save()

##trip_zone

In [0]:
#splitting Zone columns into values(if present) and split happens with '/' delimiter
split_col = split(col("Zone"), "/")

trip_zone_df = trip_zone_df \
    .withColumn("zone1", trim(get(split_col, 0))) \
    .withColumn("zone2", trim(get(split_col, 1)))

display(trip_zone_df)

In [0]:
#Writing trip_type_df to silver as parquet
trip_zone_df.write.format("parquet")\
                    .mode("overwrite")\
                    .option('path',"abfss://silver@nyctaxivamsi.dfs.core.windows.net/trip_zone")\
                    .save()

##Trip_data

In [0]:
#picking some columns
trip_data_df = trip_data_df.select('VendorID','lpep_pickup_datetime','DOLocationID','trip_distance','fare_amount','total_amount')

display(trip_data_df.limit(100))

In [0]:
trip_data_df.write.format('parquet')\
                    .mode('append')\
                    .option('path',"abfss://silver@nyctaxivamsi.dfs.core.windows.net/trip_data")\
                    .save()