# Silver Layer
In this notebook, we will see how to do transformation and move the bronze data into silver folder after transformation.

## Connecting Databricks with ADLS

In [0]:
# Define values
storage_account = "adlsfornyctaxi"
storage_container = "bronze"
application_id = 'paste your application id here'
secret = 'paste your secret here'
tenant_id = 'paste your tenant id here'

In [0]:

# Configure Spark
spark.conf.set(f"fs.azure.account.auth.type.{storage_account}.dfs.core.windows.net", "OAuth")
spark.conf.set(f"fs.azure.account.oauth.provider.type.{storage_account}.dfs.core.windows.net", "org.apache.hadoop.fs.azurebfs.oauth2.ClientCredsTokenProvider")
spark.conf.set(f"fs.azure.account.oauth2.client.id.{storage_account}.dfs.core.windows.net", application_id)
spark.conf.set(f"fs.azure.account.oauth2.client.secret.{storage_account}.dfs.core.windows.net", secret)
spark.conf.set(f"fs.azure.account.oauth2.client.endpoint.{storage_account}.dfs.core.windows.net", f"https://login.microsoftonline.com/{tenant_id}/oauth2/v2.0/token")



In [0]:
# Attempt to list directory contents
path = f"abfss://{storage_container}@{storage_account}.dfs.core.windows.net/2023/trip-data/"
dbutils.fs.ls(f"abfss://{storage_container}@{storage_account}.dfs.core.windows.net/2023/")

### Importing Libraies

In [0]:
from pyspark.sql.functions import *
from pyspark.sql.types import *

In [0]:
# How to read all files in a directory
files = dbutils.fs.ls(path)
for i in files:
    print(i.path)

Let's First see the how the data is and let's decide the Column data type for one month and then we will assign the same data type to all other months

In [0]:
df_jan = spark.read.parquet("abfss://bronze@adlsfornyctaxi.dfs.core.windows.net/2023/trip-data/green_tripdata_2023-01.parquet")
df_jan.describe().display()
display(df_jan)
df_jan.select("*").count()

If you observe, the jan file describes, row's, Column names and it's data types. Based on this we can understand the max value and min value as well. Based on this we need to create our own schema for the best practice.

## Reading Data

### Reading the Parquet File

In [0]:
storage_container = "bronze"
files = dbutils.fs.ls(path)

df_main = None

for file in files:
    df_temp = spark.read.format('parquet')\
                .option('header', True)\
                        .load(file.path)
    if df_main is None:
        df_main = df_temp
    else:
        df_main = df_main.union(df_temp)

df_main.display()

# Just to check whether it's stored all the files row data or not
Total_rows_from_all_files = df_main.select("VendorID").count()
print(f"Total rows from all files: {Total_rows_from_all_files}")

### Apply Transformations

In [0]:
%python

# Converting the data types, if we don't do it, it will throw an error as datatype mismatch, to encounter this error, we need to convert the data types.
df_main = df_main.withColumn("VendorID", col("VendorID").cast(LongType())) \
                 .withColumn("lpep_pickup_datetime", col("lpep_pickup_datetime").cast(TimestampNTZType())) \
                 .withColumn("lpep_dropoff_datetime", col("lpep_dropoff_datetime").cast(TimestampNTZType())) \
                 .withColumn("store_and_fwd_flag", col("store_and_fwd_flag").cast(StringType())) \
                 .withColumn("RatecodeID", col("RatecodeID").cast(DoubleType())) \
                 .withColumn("PULocationID", col("PULocationID").cast(LongType())) \
                 .withColumn("DOLocationID", col("DOLocationID").cast(LongType())) \
                 .withColumn("passenger_count", col("passenger_count").cast(DoubleType())) \
                 .withColumn("trip_distance", col("trip_distance").cast(DoubleType())) \
                 .withColumn("fare_amount", col("fare_amount").cast(DoubleType())) \
                 .withColumn("extra", col("extra").cast(DoubleType())) \
                 .withColumn("mta_tax", col("mta_tax").cast(DoubleType())) \
                 .withColumn("tip_amount", col("tip_amount").cast(DoubleType())) \
                 .withColumn("tolls_amount", col("tolls_amount").cast(DoubleType())) \
                 .withColumn("ehail_fee", col("ehail_fee").cast(IntegerType())) \
                 .withColumn("improvement_surcharge", col("improvement_surcharge").cast(DoubleType())) \
                 .withColumn("total_amount", col("total_amount").cast(DoubleType())) \
                 .withColumn("payment_type", col("payment_type").cast(DoubleType())) \
                 .withColumn("trip_type", col("trip_type").cast(DoubleType())) \
                 .withColumn("congestion_surcharge", col("congestion_surcharge").cast(DoubleType()))


# Dropping the ehail_fee column since it is filled with null
df_main = df_main.drop("ehail_fee")

# Adding new column to find the time difference between pickup and dropoff
df_main = df_main.withColumn("time_diff_pick_drop_min", round(floor(unix_timestamp(col("lpep_dropoff_datetime")) - unix_timestamp(col("lpep_pickup_datetime"))) / 60))

# Separating the date from the pickup time
df_main = df_main.withColumn("trip_date", to_date(col("lpep_pickup_datetime"))) \
    .withColumn("trip_year", year(col("lpep_pickup_datetime")))

# Selecting the required columns
df_main = df_main.select("VendorID", "PULocationID", "DOLocationID", "passenger_count", "trip_distance", "fare_amount", "total_amount", "payment_type", "trip_type", "time_diff_pick_drop_min", "trip_date", "trip_year", "lpep_pickup_datetime", "lpep_dropoff_datetime")

display(df_main)
df_main.printSchema()

## Writing the Data

In [0]:
%python

# Write the main DataFrame to the silver container
output_path = f"abfss://silver@{storage_account}.dfs.core.windows.net/2023/"
df_main.write.format("parquet").mode("append").option("path", output_path).save()

> The Next task is to prepare the data for the Gold Layer, check the Gold Layer Databricks notebook.