In [0]:
from pyspark.sql.functions import *
from pyspark.sql.types import *

In [0]:
%sql
USE CATALOG adventureworks;
USE SCHEMA silver;

In [0]:
df_product = spark.table("adventureworks.bronze.bronze_product")
df_region = spark.table("adventureworks.bronze.bronze_region")
df_reseller = spark.table("adventureworks.bronze.bronze_reseller")
df_sales = spark.table("adventureworks.bronze.bronze_sales")
df_salesperson = spark.table("adventureworks.bronze.bronze_salesperson")
df_salespersonregion = spark.table("adventureworks.bronze.bronze_salespersonregion")
df_targets = spark.table("adventureworks.bronze.bronze_targets")

### Cleaning bronze_product

In [0]:
df_product = df_product.withColumn("standard_cost", 
                                   regexp_replace(regexp_replace(col("standard_cost"), "\\$", ""), 
                                       ",", ""
                                       ).cast("double"))

In [0]:
df_product = df_product.drop("background_color_format", "font_color_format")

### Cleaning bronze_sales

In [0]:
df_sales = df_sales.withColumn(
    "order_date_clean",
    trim(
        regexp_extract(col("order_date"), r"([A-Za-z]+ \d{1,2}, \d{4})", 1)
    )
)

In [0]:
df_sales = df_sales.withColumn("order_date_clean", try_to_date(col("order_date_clean"), 'MMMM d, yyyy'))

In [0]:
df_sales = df_sales.drop("order_date")

In [0]:
df_sales = df_sales.withColumnRenamed("order_date_clean", "order_date")

In [0]:
df_sales = df_sales.withColumn("unit_price", 
                                   regexp_replace(regexp_replace(col("unit_price"), "\\$", ""), 
                                       ",", ""
                                       ).cast("double"))\
                    .withColumn("sales", 
                                   regexp_replace(regexp_replace(col("sales"), "\\$", ""), 
                                       ",", ""
                                       ).cast("double"))\
                    .withColumn("cost", 
                                   regexp_replace(regexp_replace(col("cost"), "\\$", ""), 
                                       ",", ""
                                       ).cast("double"))

### Cleaning bronze_targets

In [0]:
df_targets = df_targets.withColumn("target", 
                                   regexp_replace(regexp_replace(col("target"), "\\$", ""), 
                                       ",", ""
                                       ).cast("double"))

In [0]:
df_targets = df_targets.withColumn(
    "target_month_cleaned",
    trim(
        regexp_extract(col("target_month"), r"([A-Za-z]+ \d{1,2}, \d{4})", 1)
    )
)

In [0]:
df_targets = df_targets.withColumn("target_month_cleaned", try_to_date(col("target_month_cleaned"), 'MMMM d, yyyy'))

In [0]:
df_targets = df_targets.drop("target_month")

In [0]:
df_targets = df_targets.withColumnRenamed("target_month_cleaned", "target_month")

### Writing all the files to Silver Layer

In [0]:
def write_to_silver_tables(df,table_name):
    df.write.format('delta')\
            .mode('overwrite')\
            .option('overwriteSchema','true')\
            .saveAsTable(f"adventureworks.silver.{table_name}")

In [0]:
write_to_silver_tables(df_product,'silver_product')
write_to_silver_tables(df_region,'silver_region')
write_to_silver_tables(df_reseller,'silver_reseller')
write_to_silver_tables(df_sales,'silver_sales')
write_to_silver_tables(df_salesperson,'silver_salesperson')
write_to_silver_tables(df_salespersonregion,'silver_salespersonregion')
write_to_silver_tables(df_targets,'silver_targets')

### Writing all the files in Delta Format to Silver Container in ADLS

In [0]:
silver_path = 'abfss://silver@adventureworksdls001.dfs.core.windows.net/'

df_product.write.format('delta').mode('overwrite').option('overwriteSchema','true').save(f"{silver_path}product")
df_region.write.format('delta').mode('overwrite').option('overwriteSchema','true').save(f"{silver_path}region")
df_reseller.write.format('delta').mode('overwrite').option('overwriteSchema','true').save(f"{silver_path}reseller")
df_sales.write.format('delta').mode('overwrite').option('overwriteSchema','true').save(f"{silver_path}sales")
df_salesperson.write.format('delta').mode('overwrite').option('overwriteSchema','true').save(f"{silver_path}salesperson")
df_salespersonregion.write.format('delta').mode('overwrite').option('overwriteSchema','true').save(f"{silver_path}salespersonregion")
df_targets.write.format('delta').mode('overwrite').option('overwriteSchema','true').save(f"{silver_path}targets")