In [0]:
from pyspark.sql.functions import *
from pyspark.sql.types import *

In [0]:
# get the output parameters from the bronze_to_silver notebook
dbutils.widgets.text('bronze_adls','')
dbutils.widgets.text('silver_adls','')
dbutils.widgets.text('gold_adls','')

bronze_adls = dbutils.widgets.get('bronze_adls')
silver_adls = dbutils.widgets.get('silver_adls')
gold_adls = dbutils.widgets.get('gold_adls')

# list the files in the bronze ADLS
dbutils.fs.ls(f"{silver_adls}/SalesLT")

In [0]:
"""
# comment this block out after creating jobs to retrieve the task value pairs from the previous notebook
# mount ADLS
tiers = ['bronze', 'silver', 'gold']
adls_path = {tier: f"abfss://{tier}@salesadls2.dfs.core.windows.net/" for tier in tiers}
# assign the reference path to each ADLS
bronze_adls = adls_path['bronze']
silver_adls = adls_path['silver']
gold_adls = adls_path['gold']
# list the files in the bronze ADLS
dbutils.fs.ls(f"{silver_adls}/SalesLT")
"""

[FileInfo(path='abfss://silver@salesadls2.dfs.core.windows.net/SalesLT/Address/', name='Address/', size=0, modificationTime=1747025623000),
 FileInfo(path='abfss://silver@salesadls2.dfs.core.windows.net/SalesLT/Customer/', name='Customer/', size=0, modificationTime=1747025626000),
 FileInfo(path='abfss://silver@salesadls2.dfs.core.windows.net/SalesLT/CustomerAddress/', name='CustomerAddress/', size=0, modificationTime=1747025628000),
 FileInfo(path='abfss://silver@salesadls2.dfs.core.windows.net/SalesLT/Product/', name='Product/', size=0, modificationTime=1747025631000),
 FileInfo(path='abfss://silver@salesadls2.dfs.core.windows.net/SalesLT/ProductCategory/', name='ProductCategory/', size=0, modificationTime=1747025633000),
 FileInfo(path='abfss://silver@salesadls2.dfs.core.windows.net/SalesLT/ProductDescription/', name='ProductDescription/', size=0, modificationTime=1747025635000),
 FileInfo(path='abfss://silver@salesadls2.dfs.core.windows.net/SalesLT/ProductModel/', name='ProductMode

In [0]:
# extract the table names from the directory
table_names = []
for items in dbutils.fs.ls(f"{silver_adls}/SalesLT"):
    table_names.append(items.name.split('/')[0])
table_names

['Address',
 'Customer',
 'CustomerAddress',
 'Product',
 'ProductCategory',
 'ProductDescription',
 'ProductModel',
 'ProductModelProductDescription',
 'SalesOrderDetail',
 'SalesOrderHeader']

In [0]:
%python
# iterate through the table names, read, perform transformations, and write to the gold ADLS
for table in table_names:
    df = spark.read\
        .format("delta")\
        .load(f"{silver_adls}/SalesLT/{table}")
    if table == 'Address':
        # rename the column
        df = df.withColumnRenamed('country_region', 'country')
    if table == 'SalesOrderDetail':
        # rename the column
        df = df.withColumnRenamed('line_total', 'total_price')
    if table == 'ProductModelProductDescription':
        # expand the abbreviations
        df = df.withColumn('culture', 
                           when(col('culture') == 'en', 'English')
                           .when(col('culture') == 'fr', 'French')
                           .when(col('culture') == 'ar', 'Arabic')
                           .when(col('culture') == 'th', 'Thai')
                           .when(col('culture') == 'he', 'Hebrew')
                           .when(col('culture') == 'zh-cht', 'Traditional Chinese')
                           .otherwise(col('culture')))
    if table == 'Customer':
        # create a new column 'gender'
        df = df.withColumn('gender',
                           when((col('title') == 'Mr.') | (col('title') == 'Sr.'), 'Male')
                           .when((col('title') == 'Ms.') | (col('title') == 'Sra.'), 'Female')
                           .otherwise('Unknown')) 
        # rename the column
        df = df.withColumnRenamed('sales_person', 'sales_person_id')
        # extract sales person name
        df = df.withColumn('sales_person', initcap(substring(col('sales_person_id'), 17, length(col('sales_person_id')) - 17)))
        # format the phone number
        df = df.withColumn('phone',
                           substring(
                               regexp_replace(col('phone'), r"[^0-9]", ""),
                               length(regexp_replace(col('phone'), r"[^0-9]", "")) - 9,
                               10))
    # write the transformed data to the gold ADLS
    df.write\
        .format("delta")\
        .mode('overwrite')\
        .option('mergeSchema', True)\
        .save(f"{gold_adls}/SalesLT/{table}")

In [0]:
# check if the write is successful
dbutils.fs.ls(f"{gold_adls}/SalesLT")

[FileInfo(path='abfss://gold@salesadls2.dfs.core.windows.net/SalesLT/Address/', name='Address/', size=0, modificationTime=1747025681000),
 FileInfo(path='abfss://gold@salesadls2.dfs.core.windows.net/SalesLT/Customer/', name='Customer/', size=0, modificationTime=1747025684000),
 FileInfo(path='abfss://gold@salesadls2.dfs.core.windows.net/SalesLT/CustomerAddress/', name='CustomerAddress/', size=0, modificationTime=1747025686000),
 FileInfo(path='abfss://gold@salesadls2.dfs.core.windows.net/SalesLT/Product/', name='Product/', size=0, modificationTime=1747025689000),
 FileInfo(path='abfss://gold@salesadls2.dfs.core.windows.net/SalesLT/ProductCategory/', name='ProductCategory/', size=0, modificationTime=1747025691000),
 FileInfo(path='abfss://gold@salesadls2.dfs.core.windows.net/SalesLT/ProductDescription/', name='ProductDescription/', size=0, modificationTime=1747025694000),
 FileInfo(path='abfss://gold@salesadls2.dfs.core.windows.net/SalesLT/ProductModel/', name='ProductModel/', size=0, m