In [0]:
%fs ls /mnt/bronze

path,name,size,modificationTime
dbfs:/mnt/bronze/Product/,Product/,0,1747416805000
dbfs:/mnt/bronze/Region/,Region/,0,1747416819000
dbfs:/mnt/bronze/Reseller/,Reseller/,0,1747416822000
dbfs:/mnt/bronze/Sales/,Sales/,0,1747416826000
dbfs:/mnt/bronze/Salesperson/,Salesperson/,0,1747416829000
dbfs:/mnt/bronze/SalespersonRegion/,SalespersonRegion/,0,1747416831000
dbfs:/mnt/bronze/Targets/,Targets/,0,1747416833000


## Schema overview

In [0]:
files = [
    "Product",
    "Region",
    "Reseller",
    "Sales",
    "Salesperson",
    "SalespersonRegion",
    "Targets"
]

for file in files:
    df = spark.read.format("delta").load("/mnt/bronze/" + file)
    print(file)
    df.printSchema()


Product
root
 |-- productkey: integer (nullable = true)
 |-- product: string (nullable = true)
 |-- standard_cost: string (nullable = true)
 |-- color: string (nullable = true)
 |-- subcategory: string (nullable = true)
 |-- category: string (nullable = true)
 |-- background_color_format: string (nullable = true)
 |-- font_color_format: string (nullable = true)

Region
root
 |-- salesterritorykey: integer (nullable = true)
 |-- region: string (nullable = true)
 |-- country: string (nullable = true)
 |-- group: string (nullable = true)

Reseller
root
 |-- resellerkey: integer (nullable = true)
 |-- business_type: string (nullable = true)
 |-- reseller: string (nullable = true)
 |-- city: string (nullable = true)
 |-- state-province: string (nullable = true)
 |-- country-region: string (nullable = true)

Sales
root
 |-- salesordernumber: string (nullable = true)
 |-- orderdate: string (nullable = true)
 |-- productkey: integer (nullable = true)
 |-- resellerkey: integer (nullable = true)

## Statistics

In [0]:
files = [
    "Product",
    "Region",
    "Reseller",
    "Sales",
    "Salesperson",
    "SalespersonRegion",
    "Targets"
]

for file in files:
    df = spark.read.format("delta").load("/mnt/bronze/" + file)
    print(file)
    df.describe().show()


Product
+-------+------------------+-----------------+-------------+------+-----------+-----------+-----------------------+-----------------+
|summary|        productkey|          product|standard_cost| color|subcategory|   category|background_color_format|font_color_format|
+-------+------------------+-----------------+-------------+------+-----------+-----------+-----------------------+-----------------+
|  count|               397|              397|          397|   397|        397|        397|                    397|              397|
|   mean|             408.0|             NULL|         NULL|  NULL|       NULL|       NULL|                   NULL|             NULL|
| stddev|114.74827522305806|             NULL|         NULL|  NULL|       NULL|       NULL|                   NULL|             NULL|
|    min|               210|     AWC Logo Cap|        $0.86| Black| Bib-Shorts|Accessories|                #000000|          #000000|
|    max|               606|Women's Tights, S|        

## Product

standard cost -> string to float

In [0]:
from pyspark.sql.functions import regexp_replace

# Read Delta from Bronze
df = spark.read.format("delta").load("/mnt/bronze/Product")

# Clean standard_cost column: remove $, commas → cast to double
df = df.withColumn("standard_cost", regexp_replace("standard_cost", "[$,]", "").cast("double"))

# Write to Silver layer
df.write.format("delta").mode("overwrite").save("/mnt/silver/Product")



## Region

In [0]:
# Write to Silver layer

df = spark.read.format("delta").load("/mnt/bronze/Region")

df.write.format("delta").mode("overwrite").save("/mnt/silver/Region")


# Reseller

Change column name

In [0]:

# Read Delta from Bronze
df = spark.read.format("delta").load("/mnt/bronze/Reseller")

# Clean standard_cost column: remove $, commas → cast to double
df = df.withColumnRenamed("state-province","state_province").withColumnRenamed("country-region","country_region")

# Write to Silver layer
df.write.format("delta").mode("overwrite").save("/mnt/silver/Reseller")

## Sales
Remove duplicates
Change data type -> orderdate, unit_price, sales and cost

In [0]:
from pyspark.sql.functions import regexp_replace, to_date, trim

df = spark.read.format("delta").load("/mnt/bronze/Sales")

# Clean $ fields
df = df.withColumn("unit_price", regexp_replace("unit_price", "[$,]", "").cast("double")) \
       .withColumn("sales", regexp_replace("sales", "[$,]", "").cast("double")) \
       .withColumn("cost", regexp_replace("cost", "[$,]", "").cast("double"))


# Clean and store raw date parsing result

df = df.withColumn("orderdate", trim(regexp_replace("orderdate", "^[A-Za-z]+,\\s*", "")))

df = df.withColumn("orderdate", to_date("orderdate", "MMMM d, yyyy"))


# Remove duplicate rows
df = df.dropDuplicates()

# Save to Silver
df.write.format("delta").mode("overwrite").save("/mnt/silver/Sales")



## Sales Person



In [0]:
# Write to Silver layer

df = spark.read.format("delta").load("/mnt/bronze/Salesperson")

df.write.format("delta").mode("overwrite").save("/mnt/silver/Salesperson")

## Sales Person Region

In [0]:
df = spark.read.format("delta").load("/mnt/bronze/SalespersonRegion")

df.write.format("delta").mode("overwrite").save("/mnt/silver/SalespersonRegion")

## Targets

In [0]:
from pyspark.sql.functions import col, month, year, sum, round

# 1. Load data
df_sales = spark.read.format("delta").load("/mnt/silver/Sales") \
    .withColumn("month", month("orderdate")) \
    .withColumn("year", year("orderdate"))

df_targets = spark.read.format("delta").load("/mnt/silver/Targets")
df_salesperson = spark.read.format("delta").load("/mnt/silver/Salesperson")

# 2. Join with aliases
df_joined = df_sales.alias("s") \
    .join(df_salesperson.alias("p"), col("s.EmployeeKey") == col("p.EmployeeKey"), "left") \
    .join(df_targets.alias("t"),
          (col("s.EmployeeKey") == col("t.EmployeeID")) &
          (col("s.month") == col("t.month")) &
          (col("s.year") == col("t.year")),
          "left"
    )

# 3. Aggregate
df_gold = df_joined.groupBy(
    col("s.EmployeeKey").alias("employee_key"),
    col("t.EmployeeID").alias("employee_id"),
    col("p.salesperson"),
    col("s.month"),
    col("s.year"),
    col("t.target")
).agg(
    sum("s.sales").alias("total_sales")
).withColumn(
    "target_achievement_pct",
    round((col("total_sales") / col("target")) * 100, 2)
)

display(df_gold)



employee_key,employee_id,salesperson,month,year,target,total_sales,target_achievement_pct
292,,Ranjit Varkey Chudukatil,8,2018,,265288.35,
289,,David Campbell,12,2018,,10236.31,
281,,Michael Blythe,7,2019,,249177.96,
288,,José Saraiva,7,2019,,155252.69000000003,
285,,Tsvi Reiter,3,2018,,167507.91999999998,
288,,José Saraiva,10,2018,,63443.87000000002,
288,,José Saraiva,3,2019,,115840.99,
296,,Lynn Tsoflias,8,2019,,145995.97999999998,
293,,Tete Mensa-Annan,2,2020,,162756.22000000003,
286,,Pamela Ansman-Wolfe,9,2018,,97559.83000000005,


## 