### Import Required Libraries and Spark SQL Functions

In [0]:
import os
from pyspark.sql.functions import *
from pyspark.sql.types import *

### Define Bronze and Silver Layer Base Paths

In [0]:
bronze_base = "/Volumes/adventure_works_lakehouse/adventure_works/lakehouse/bronze"
silver_base = "/Volumes/adventure_works_lakehouse/adventure_works/lakehouse/silver"

### Retrieve Source Table Names from Bronze Layer

In [0]:
file_names = [f.name.replace(".csv", "") for f in dbutils.fs.ls(bronze_base)]
file_names

['bronze_adventureworks_calendar/',
 'bronze_adventureworks_customers/',
 'bronze_adventureworks_product_categories/',
 'bronze_adventureworks_product_subcategories/',
 'bronze_adventureworks_products/',
 'bronze_adventureworks_returns/',
 'bronze_adventureworks_sales_2015/',
 'bronze_adventureworks_sales_2016/',
 'bronze_adventureworks_sales_2017/',
 'bronze_adventureworks_territories/']

# Transformations

## adventureworks_calendar

Load Data from Bronze Layer and Derive/Update Attributes

In [0]:
df_adv_cal = spark.read.format('delta').load(f"{bronze_base}/bronze_adventureworks_calendar")

In [0]:
df_adv_cal.limit(7).display()

Date,ing_ts
2015-01-01,2026-01-13T11:05:47.548Z
2015-01-02,2026-01-13T11:05:47.548Z
2015-01-03,2026-01-13T11:05:47.548Z
2015-01-04,2026-01-13T11:05:47.548Z
2015-01-05,2026-01-13T11:05:47.548Z
2015-01-06,2026-01-13T11:05:47.548Z
2015-01-07,2026-01-13T11:05:47.548Z


In [0]:
df_adv_cal = df_adv_cal.withColumn("Month", month(col('Date')))\
        .withColumn("Year", year(col('Date')))

df_adv_cal.limit(7).display()

Date,ing_ts,Month,Year
2015-01-01,2026-01-13T11:05:47.548Z,1,2015
2015-01-02,2026-01-13T11:05:47.548Z,1,2015
2015-01-03,2026-01-13T11:05:47.548Z,1,2015
2015-01-04,2026-01-13T11:05:47.548Z,1,2015
2015-01-05,2026-01-13T11:05:47.548Z,1,2015
2015-01-06,2026-01-13T11:05:47.548Z,1,2015
2015-01-07,2026-01-13T11:05:47.548Z,1,2015


Remove Duplicate and Validate Data Quality

In [0]:
df_adv_cal = df_adv_cal.dropDuplicates(['Date'])

In [0]:
print("Null Counts : ", df_adv_cal.filter(col("Date").isNull()).count())
print("Duplicate rows: ", df_adv_cal.count() - df_adv_cal.dropDuplicates(["Date"]).count())

Null Counts :  0
Duplicate rows:  0


Write Transformed Data to Silver Layer

In [0]:
df_adv_cal.write.format('delta')\
                .mode('overwrite')\
                .option('overWriteSchema', 'true')\
                .save(f"{silver_base}/silver_adventureworks_calendar")

## adventureworks_customers

Load Data from Bronze Layer 

In [0]:
df_adv_cust = spark.read.format('delta').load(f"{bronze_base}/bronze_adventureworks_customers")

In [0]:
df_adv_cust.limit(7).display()

CustomerKey,Prefix,FirstName,LastName,BirthDate,MaritalStatus,Gender,EmailAddress,AnnualIncome,TotalChildren,EducationLevel,Occupation,HomeOwner,ing_ts
11000,MR.,JON,YANG,1966-04-08,M,M,jon24@adventure-works.com,"$90,000",2,Bachelors,Professional,Y,2026-01-13T11:05:49.968Z
11001,MR.,EUGENE,HUANG,1965-05-14,S,M,eugene10@adventure-works.com,"$60,000",3,Bachelors,Professional,N,2026-01-13T11:05:49.968Z
11002,MR.,RUBEN,TORRES,1965-08-12,M,M,ruben35@adventure-works.com,"$60,000",3,Bachelors,Professional,Y,2026-01-13T11:05:49.968Z
11003,MS.,CHRISTY,ZHU,1968-02-15,S,F,christy12@adventure-works.com,"$70,000",0,Bachelors,Professional,N,2026-01-13T11:05:49.968Z
11004,MRS.,ELIZABETH,JOHNSON,1968-08-08,S,F,elizabeth5@adventure-works.com,"$80,000",5,Bachelors,Professional,Y,2026-01-13T11:05:49.968Z
11005,MR.,JULIO,RUIZ,1965-08-05,S,M,julio1@adventure-works.com,"$70,000",0,Bachelors,Professional,Y,2026-01-13T11:05:49.968Z
11007,MR.,MARCO,MEHTA,1964-05-09,M,M,marco14@adventure-works.com,"$60,000",3,Bachelors,Professional,Y,2026-01-13T11:05:49.968Z


Remove Duplicate and Validate Data Quality

In [0]:
df_adv_cust = df_adv_cust.dropDuplicates(['CustomerKey'])

In [0]:
print("Null counts : ", df_adv_cust.filter(col("CustomerKey").isNull()).count())
print("Duplicate CustomerKeys: ",
      df_adv_cust.count() - df_adv_cust.dropDuplicates(["CustomerKey"]).count())

Null counts :  0
Duplicate CustomerKeys:  0


Derive/Update Attributes

In [0]:
df_adv_cust = df_adv_cust.withColumn("AnnualIncome",
    expr("try_cast(regexp_replace(trim(AnnualIncome), '[$,]', '') as double)")
)

In [0]:
# df_adv_cust.withColumn("FullName", initcap(concat(col('Prefix'), lit(' '), col('FirstName'), lit(' '), col('LastName')))).display()

df_adv_cust = df_adv_cust.withColumn("FullName", initcap(concat_ws(' ', col('Prefix'), col('FirstName'), col('LastName'))))

df_adv_cust.limit(7).display()

CustomerKey,Prefix,FirstName,LastName,BirthDate,MaritalStatus,Gender,EmailAddress,AnnualIncome,TotalChildren,EducationLevel,Occupation,HomeOwner,ing_ts,FullName
11000,MR.,JON,YANG,1966-04-08,M,M,jon24@adventure-works.com,90000.0,2,Bachelors,Professional,Y,2026-01-13T11:05:49.968Z,Mr. Jon Yang
11001,MR.,EUGENE,HUANG,1965-05-14,S,M,eugene10@adventure-works.com,60000.0,3,Bachelors,Professional,N,2026-01-13T11:05:49.968Z,Mr. Eugene Huang
11002,MR.,RUBEN,TORRES,1965-08-12,M,M,ruben35@adventure-works.com,60000.0,3,Bachelors,Professional,Y,2026-01-13T11:05:49.968Z,Mr. Ruben Torres
11003,MS.,CHRISTY,ZHU,1968-02-15,S,F,christy12@adventure-works.com,70000.0,0,Bachelors,Professional,N,2026-01-13T11:05:49.968Z,Ms. Christy Zhu
11004,MRS.,ELIZABETH,JOHNSON,1968-08-08,S,F,elizabeth5@adventure-works.com,80000.0,5,Bachelors,Professional,Y,2026-01-13T11:05:49.968Z,Mrs. Elizabeth Johnson
11005,MR.,JULIO,RUIZ,1965-08-05,S,M,julio1@adventure-works.com,70000.0,0,Bachelors,Professional,Y,2026-01-13T11:05:49.968Z,Mr. Julio Ruiz
11007,MR.,MARCO,MEHTA,1964-05-09,M,M,marco14@adventure-works.com,60000.0,3,Bachelors,Professional,Y,2026-01-13T11:05:49.968Z,Mr. Marco Mehta


Write Transformed Data to Silver Layer

In [0]:
df_adv_cust.write.format('delta')\
    .mode('overwrite')\
    .option('overwriteSchema', 'true')\
    .save(f"{silver_base}/silver_adventureworks_customers")

## adventureworks_product_categories

Load Data from Bronze Layer

In [0]:
df_adv_prod_cat = spark.read.format('delta').load(f"{bronze_base}/bronze_adventureworks_product_categories")
df_adv_prod_cat.display()

ProductCategoryKey,CategoryName,ing_ts
1,Bikes,2026-01-13T11:05:52.113Z
2,Components,2026-01-13T11:05:52.113Z
3,Clothing,2026-01-13T11:05:52.113Z
4,Accessories,2026-01-13T11:05:52.113Z


Remove Duplicates

In [0]:
df_adv_prod_cat = df_adv_prod_cat.dropDuplicates(['ProductCategoryKey', 'CategoryName'])

Write Transformed Data to Silver Layer

In [0]:
df_adv_prod_cat.write.format('delta')\
    .mode('overwrite')\
    .option('overwriteSchema', 'true')\
    .save(f"{silver_base}/silver_adventureworks_product_categories")

## adventureworks_product_subcategories

Load Data from Bronze Layer

In [0]:
df_adv_prod_sub_cat = spark.read.format('delta').load(f"{bronze_base}/bronze_adventureworks_product_subcategories")
df_adv_prod_sub_cat.limit(7).display()

ProductSubcategoryKey,SubcategoryName,ProductCategoryKey,ing_ts
1,Mountain Bikes,1,2026-01-13T11:05:54.255Z
2,Road Bikes,1,2026-01-13T11:05:54.255Z
3,Touring Bikes,1,2026-01-13T11:05:54.255Z
4,Handlebars,2,2026-01-13T11:05:54.255Z
5,Bottom Brackets,2,2026-01-13T11:05:54.255Z
6,Brakes,2,2026-01-13T11:05:54.255Z
7,Chains,2,2026-01-13T11:05:54.255Z


Remove Duplicate and Validate Data Quality

In [0]:
df_adv_prod_sub_cat = df_adv_prod_sub_cat.dropDuplicates(['ProductSubcategoryKey', 'SubcategoryName'])

In [0]:
print("Null counts : ", df_adv_prod_sub_cat.filter(col("ProductSubcategoryKey").isNull()).count())
print("Duplicate ProductSubcategoryKey: ",
      df_adv_prod_sub_cat.count() - df_adv_prod_sub_cat.dropDuplicates(["ProductSubcategoryKey"]).count())

Null counts :  0
Duplicate ProductSubcategoryKey:  0


Write Transformed Data to Silver Layer

In [0]:
df_adv_prod_sub_cat.write.format('delta')\
    .option('overwiteSchema', 'true')\
    .mode('overwrite')\
    .save(f"{silver_base}/silver_adventureworks_product_subcategories")

## adventureworks_products

Load Data from Bronze Layer and Derive/Update Attributes

In [0]:
df_adv_prod = spark.read.format('delta').load(f"{bronze_base}/bronze_adventureworks_products")
df_adv_prod.limit(7).display()

ProductKey,ProductSubcategoryKey,ProductSKU,ProductName,ModelName,ProductDescription,ProductColor,ProductSize,ProductStyle,ProductCost,ProductPrice,ing_ts
214,31,HL-U509-R,"Sport-100 Helmet, Red",Sport-100,"Universal fit, well-vented, lightweight , snap-on visor.",Red,0,0,13.0863,34.99,2026-01-13T11:05:56.126Z
215,31,HL-U509,"Sport-100 Helmet, Black",Sport-100,"Universal fit, well-vented, lightweight , snap-on visor.",Black,0,0,12.0278,33.6442,2026-01-13T11:05:56.126Z
218,23,SO-B909-M,"Mountain Bike Socks, M",Mountain Bike Socks,Combination of natural and synthetic fibers stays dry and provides just the right cushioning.,White,M,U,3.3963,9.5,2026-01-13T11:05:56.126Z
219,23,SO-B909-L,"Mountain Bike Socks, L",Mountain Bike Socks,Combination of natural and synthetic fibers stays dry and provides just the right cushioning.,White,L,U,3.3963,9.5,2026-01-13T11:05:56.126Z
220,31,HL-U509-B,"Sport-100 Helmet, Blue",Sport-100,"Universal fit, well-vented, lightweight , snap-on visor.",Blue,0,0,12.0278,33.6442,2026-01-13T11:05:56.126Z
223,19,CA-1098,AWC Logo Cap,Cycling Cap,Traditional style with a flip-up brim; one-size fits all.,Multi,0,U,5.7052,8.6442,2026-01-13T11:05:56.126Z
226,21,LJ-0192-S,"Long-Sleeve Logo Jersey, S",Long-Sleeve Logo Jersey,Unisex long-sleeve AWC logo microfiber cycling jersey,Multi,S,U,31.7244,48.0673,2026-01-13T11:05:56.126Z


In [0]:
df_adv_prod = df_adv_prod.withColumn("ProductSKU", split(col('ProductSKU'), '-')[0])\
                .withColumn("ProductName", split(col('ProductName'), ' ')[0])



Remove Duplicate and Validate Data Quality

In [0]:
df_adv_prod = df_adv_prod.dropDuplicates(['ProductKey'])

In [0]:
print("Null counts : ", df_adv_prod.filter(col("ProductKey").isNull()).count())
print("Duplicate ProductKey: ",
      df_adv_prod.count() - df_adv_prod.dropDuplicates(["ProductKey"]).count())

Null counts :  0
Duplicate ProductKey:  0


Write Transformed Data to Silver Layer

In [0]:
df_adv_prod.write.format('delta')\
    .mode('overwrite')\
    .option('overwriteSchema', 'true')\
    .save(f"{silver_base}/silver_adventureworks_products")

com.databricks.backend.common.rpc.CommandSkippedException
	at com.databricks.spark.chauffeur.SequenceExecutionState.$anonfun$cancel$3(SequenceExecutionState.scala:141)
	at com.databricks.spark.chauffeur.SequenceExecutionState.$anonfun$cancel$3$adapted(SequenceExecutionState.scala:136)
	at scala.collection.immutable.Range.foreach(Range.scala:192)
	at com.databricks.spark.chauffeur.SequenceExecutionState.cancel(SequenceExecutionState.scala:136)
	at com.databricks.spark.chauffeur.ExecContextState.cancelRunningSequence(ExecContextState.scala:721)
	at com.databricks.spark.chauffeur.ExecContextState.$anonfun$cancel$1(ExecContextState.scala:441)
	at scala.Option.getOrElse(Option.scala:201)
	at com.databricks.spark.chauffeur.ExecContextState.cancel(ExecContextState.scala:441)
	at com.databricks.spark.chauffeur.ExecutionContextManagerV1.cancelExecution(ExecutionContextManagerV1.scala:486)
	at com.databricks.spark.chauffeur.ChauffeurState.$anonfun$process$1(ChauffeurState.scala:768)
	at com.data

## adventureworks_returns

Load Data from Bronze Layer

In [0]:
df_adv_return = spark.read.format('delta').load(f"{bronze_base}/bronze_adventureworks_returns")
df_adv_return.limit(7).display()

ReturnDate,TerritoryKey,ProductKey,ReturnQuantity,ing_ts
2015-01-18,9,312,1,2026-01-13T11:05:58.179Z
2015-01-18,10,310,1,2026-01-13T11:05:58.179Z
2015-01-21,8,346,1,2026-01-13T11:05:58.179Z
2015-01-22,4,311,1,2026-01-13T11:05:58.179Z
2015-02-02,6,312,1,2026-01-13T11:05:58.179Z
2015-02-15,1,312,1,2026-01-13T11:05:58.179Z
2015-02-19,9,311,1,2026-01-13T11:05:58.179Z


Write Transformed Data to Silver Layer

In [0]:
df_adv_return.write.format('delta')\
    .mode('overwrite')\
    .option('overwriteSchema', 'true')\
    .save(f"{silver_base}/silver_adventureworks_returns")

com.databricks.backend.common.rpc.CommandSkippedException
	at com.databricks.spark.chauffeur.SequenceExecutionState.$anonfun$cancel$3(SequenceExecutionState.scala:141)
	at com.databricks.spark.chauffeur.SequenceExecutionState.$anonfun$cancel$3$adapted(SequenceExecutionState.scala:136)
	at scala.collection.immutable.Range.foreach(Range.scala:192)
	at com.databricks.spark.chauffeur.SequenceExecutionState.cancel(SequenceExecutionState.scala:136)
	at com.databricks.spark.chauffeur.ExecContextState.cancelRunningSequence(ExecContextState.scala:721)
	at com.databricks.spark.chauffeur.ExecContextState.$anonfun$cancel$1(ExecContextState.scala:441)
	at scala.Option.getOrElse(Option.scala:201)
	at com.databricks.spark.chauffeur.ExecContextState.cancel(ExecContextState.scala:441)
	at com.databricks.spark.chauffeur.ExecutionContextManagerV1.cancelExecution(ExecutionContextManagerV1.scala:486)
	at com.databricks.spark.chauffeur.ChauffeurState.$anonfun$process$1(ChauffeurState.scala:768)
	at com.data

## adventureworks_territories

Load Data from Bronze Layer

In [0]:
df_adv_territory = spark.read.format('delta').load(f"{bronze_base}/bronze_adventureworks_territories")

df_adv_territory.limit(7).display()

SalesTerritoryKey,Region,Country,Continent,ing_ts
1,Northwest,United States,North America,2026-01-13T11:06:06.738Z
2,Northeast,United States,North America,2026-01-13T11:06:06.738Z
3,Central,United States,North America,2026-01-13T11:06:06.738Z
4,Southwest,United States,North America,2026-01-13T11:06:06.738Z
5,Southeast,United States,North America,2026-01-13T11:06:06.738Z
6,Canada,Canada,North America,2026-01-13T11:06:06.738Z
7,France,France,Europe,2026-01-13T11:06:06.738Z


Remove Duplicate and Validate Data Quality

In [0]:
df_adv_territory = df_adv_territory.dropDuplicates(['SalesTerritoryKey'])

In [0]:
print("Null counts : ", df_adv_territory.filter(col("Region").isNull()).count())
print("Duplicate Region: ",
      df_adv_territory.count() - df_adv_territory.dropDuplicates(["Region"]).count())

Null counts :  0
Duplicate Region:  0


Write Transformed Data to Silver Layer

In [0]:
df_adv_territory.write.format('delta')\
    .mode('overwrite')\
    .option('overwriteSchema', 'true')\
    .save(f"{silver_base}/silver_adventureworks_territories")

com.databricks.backend.common.rpc.CommandSkippedException
	at com.databricks.spark.chauffeur.SequenceExecutionState.$anonfun$cancel$3(SequenceExecutionState.scala:141)
	at com.databricks.spark.chauffeur.SequenceExecutionState.$anonfun$cancel$3$adapted(SequenceExecutionState.scala:136)
	at scala.collection.immutable.Range.foreach(Range.scala:192)
	at com.databricks.spark.chauffeur.SequenceExecutionState.cancel(SequenceExecutionState.scala:136)
	at com.databricks.spark.chauffeur.ExecContextState.cancelRunningSequence(ExecContextState.scala:721)
	at com.databricks.spark.chauffeur.ExecContextState.$anonfun$cancel$1(ExecContextState.scala:441)
	at scala.Option.getOrElse(Option.scala:201)
	at com.databricks.spark.chauffeur.ExecContextState.cancel(ExecContextState.scala:441)
	at com.databricks.spark.chauffeur.ExecutionContextManagerV1.cancelExecution(ExecutionContextManagerV1.scala:486)
	at com.databricks.spark.chauffeur.ChauffeurState.$anonfun$process$1(ChauffeurState.scala:768)
	at com.data

## adventureworks_sales_combined

Load Multi-Year Sales Data from Bronze Layer

In [0]:
sales_dfs = [
    "bronze_adventureworks_sales_2015",
    "bronze_adventureworks_sales_2016",
    "bronze_adventureworks_sales_2017"
]

dfs = [
    spark.read.format("delta").load(f"{bronze_base}/{df}")
    for df in sales_dfs
]

In [0]:
# df_adv_sales = dfs[0]
# for df in dfs[1:]:
#     df_adv_sales = df_adv_sales.unionByName(df)


com.databricks.backend.common.rpc.CommandSkippedException
	at com.databricks.spark.chauffeur.SequenceExecutionState.$anonfun$cancel$3(SequenceExecutionState.scala:141)
	at com.databricks.spark.chauffeur.SequenceExecutionState.$anonfun$cancel$3$adapted(SequenceExecutionState.scala:136)
	at scala.collection.immutable.Range.foreach(Range.scala:192)
	at com.databricks.spark.chauffeur.SequenceExecutionState.cancel(SequenceExecutionState.scala:136)
	at com.databricks.spark.chauffeur.ExecContextState.cancelRunningSequence(ExecContextState.scala:721)
	at com.databricks.spark.chauffeur.ExecContextState.$anonfun$cancel$1(ExecContextState.scala:441)
	at scala.Option.getOrElse(Option.scala:201)
	at com.databricks.spark.chauffeur.ExecContextState.cancel(ExecContextState.scala:441)
	at com.databricks.spark.chauffeur.ExecutionContextManagerV1.cancelExecution(ExecutionContextManagerV1.scala:486)
	at com.databricks.spark.chauffeur.ChauffeurState.$anonfun$process$1(ChauffeurState.scala:768)
	at com.data

Combine Multi-Year Sales Data into a Unified DataFrame

In [0]:
from functools import reduce
df_adv_sales = reduce(
    lambda df1, df2: df1.unionByName(df2),
    dfs
)

Derive/Update Attributes

In [0]:
df_adv_sales = df_adv_sales.withColumn("StockDate", to_timestamp(col('StockDate')))

In [0]:
df_adv_sales = df_adv_sales.withColumn("OrderNumber",  regexp_replace(col('OrderNumber'), 'S' ,'T'))

In [0]:
df_adv_sales = df_adv_sales.withColumn("OrderLineTotal",  col('OrderLineItem')*col('OrderQuantity'))

In [0]:
df_adv_sales.limit(10).display()

Write Transformed Data to Silver Layer

In [0]:
df_adv_sales.write.format('delta')\
    .mode('overwrite')\
    .option('overwriteSchema', 'true')\
    .save(f"{silver_base}/silver_adventureworks_sales")