## Transformation on the Address table

In [None]:
#level 1 transformation: connect to the bronze container to get the data
#list all the available data in SalesLT folder in the bronze container

# dbutils.fs.ls('mnt/bronze/SalesLT/')

In [None]:
#show silver container currently. it should be empty since no files have been moved here.

# dbutils.fs.ls('mnt/silver')

In [None]:
#test transformation on only one table, the address table. take address.parquet file. input path is the path of the bronze container.

# input_path = '/mnt/bronze/SalesLT/Address/Address.parquet'

In [None]:
#create a PySpark dataframe for the input file. pass input path as parameter to the pyspark dataframe function
#dataframe is like a temporary view with a table structure which has all the schema specific to the source file

# df = spark.read.format('parquet').load(input_path)

In [None]:
#display table structure, shows all the different columns in the address table.
#when a dataframe is created in PySpark it is stored in a temporary location which is easy to modify later.

# display(df)

In [None]:
#transform the ModifiedDate column. Convert to a date format structure without the time details

# from pyspark.sql.functions import from_utc_timestamp, date_format
# from pyspark.sql.types import TimestampType

# df = df.withColumn("ModifiedDate", date_format(from_utc_timestamp(df["ModifiedDate"].cast(TimestampType()), "UTC"), "yyyy-MM-dd"))

In [None]:
#display the address table after conducting transformations to ModifiedDate column. ModifiedDate has been converted to date format.

# display(df)

In [None]:
%sql --magic command to switch to sql cell. this is just for example purposes

# -- SELECT 1 AS column1

## Transformations for all tables

In [None]:
# for each loop to iterate through the bronze container to get directory name and append it to the table_name array
table_name = []

for i in dbutils.fs.ls('mnt/bronze/SalesLT'):
    table_name.append(i.name.split('/')[0])

In [None]:
#display the table_name array
table_name

In [None]:
#write all the transformed data into the silver container

from pyspark.sql.functions import from_utc_timestamp, date_format
from pyspark.sql.types import TimestampType

#for each loop iterates through the table_name array and generates the input_path for all the tables using the mount location for the bronze container
for i in table_name:
    path = '/mnt/bronze/SalesLT/' + i + '/' + i + '.parquet'
    #after generating the path, load it as a data frame
    df = spark.read.format('parquet').load(path)
    #get all the columns from the data frame as a list
    column = df.columns

    #for each loop to iterate through the column names to check if any of the columns have a date value in it
    for col in column:
        if "Date" in col or "date" in col:
            df = df.withColumn(col, date_format(from_utc_timestamp(df[col].cast(TimestampType()), "UTC"), "yyyy-MM-dd"))

    #output_path points to the silver container
    output_path = '/mnt/silver/SalesLT/' + i + '/'
    #write the transformed data to the data lake using the output_path in delta format
    #we write the data to the silver and gold container using delta format. currently only writing data into one schema structure, but in the future if the input data schema changes then the Delta format can easily handle that.
    df.write.format('delta').mode("overwrite").save(output_path)

In [None]:
#display df of last item in the list
display(df)