In [7]:
from pyspark.sql import SparkSession
from notebookutils import mssparkutils

# Initialize Spark session
spark = SparkSession.builder \
    .appName("MedallionArchitecture") \
    .getOrCreate()

# Source (raw files)
raw_data_path = "abfss://mayankworkspace@onelake.dfs.fabric.microsoft.com/lakehouse_ws.Lakehouse/Files/subfolder1/PowerBI_AssignmentDataset/"

# Bronze folder
bronze_path = "abfss://mayankworkspace@onelake.dfs.fabric.microsoft.com/lakehouse_ws.Lakehouse/Files/bronze/"

# Create bronze folder if it doesn't exist
mssparkutils.fs.mkdirs(bronze_path)

# List all files in raw folder
files = mssparkutils.fs.ls(raw_data_path)

# Move each file to bronze
for file in files:
    if file.isFile:
        source_file = file.path
        destination_file = bronze_path + file.name
        mssparkutils.fs.mv(source_file, destination_file)

print("✅ All raw files moved to Bronze layer.")


StatementMeta(, 01667e87-8a71-49f6-a4d6-2426af9fe1e8, 9, Finished, Available, Finished)

✅ All raw files moved to Bronze layer.


In [12]:
from notebookutils import mssparkutils

bronze_path = "abfss://mayankworkspace@onelake.dfs.fabric.microsoft.com/lakehouse_ws.Lakehouse/Files/bronze/"

files = mssparkutils.fs.ls(bronze_path)

for file in files:
    print(file.name, file.size)


StatementMeta(, 01667e87-8a71-49f6-a4d6-2426af9fe1e8, 14, Finished, Available, Finished)

AW_Customers.csv 1975231
AW_Product_Category.csv 86
AW_Product_Subcategory.csv 640
AW_Products.csv 58122
AW_Sales_2016 - Copy.csv 1127918
AW_Sales_2016.csv 1127918
AW_Sales_2017 - Copy.csv 1389002
AW_Sales_2017.csv 1389002
AW_Territory.csv 403


In [13]:
from pyspark.sql import SparkSession

spark = SparkSession.builder.getOrCreate()

for file in files:
    if file.isFile:
        print(f"\n📂 File Name: {file.name}")
        print("-" * 50)
        
        df = spark.read.option("header", True).csv(file.path)
        
        df.printSchema()


StatementMeta(, 01667e87-8a71-49f6-a4d6-2426af9fe1e8, 15, Finished, Available, Finished)


📂 File Name: AW_Customers.csv
--------------------------------------------------
root
 |-- CustomerKey: string (nullable = true)
 |-- Prefix: string (nullable = true)
 |-- FirstName: string (nullable = true)
 |-- LastName: string (nullable = true)
 |-- BirthDate: string (nullable = true)
 |-- MaritalStatus: string (nullable = true)
 |-- Gender: string (nullable = true)
 |-- EmailAddress: string (nullable = true)
 |-- AnnualIncome: string (nullable = true)
 |-- TotalChildren: string (nullable = true)
 |-- EducationLevel: string (nullable = true)
 |-- Occupation: string (nullable = true)
 |-- HomeOwner: string (nullable = true)


📂 File Name: AW_Product_Category.csv
--------------------------------------------------
root
 |-- CategoryName: string (nullable = true)
 |-- ProductCategoryKey: string (nullable = true)


📂 File Name: AW_Product_Subcategory.csv
--------------------------------------------------
root
 |-- ProductCategoryKey: string (nullable = true)
 |-- ProductSubcategoryKey: 

In [14]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, to_date

spark = SparkSession.builder.appName("SilverLayer").getOrCreate()

# Bronze path
bronze_path = "abfss://mayankworkspace@onelake.dfs.fabric.microsoft.com/lakehouse_ws.Lakehouse/Files/bronze/"

# -----------------------------
# 1. CUSTOMERS
# -----------------------------
customers_df = spark.read.option("header", True).csv(bronze_path + "AW_Customers.csv")

customers_silver = (
    customers_df
    .withColumn("CustomerKey", col("CustomerKey").cast("int"))
    .withColumn("BirthDate", to_date("BirthDate", "yyyy-MM-dd"))
    .withColumn("AnnualIncome", col("AnnualIncome").cast("double"))
    .withColumn("TotalChildren", col("TotalChildren").cast("int"))
)

# -----------------------------
# 2. PRODUCT CATEGORY
# -----------------------------
category_df = spark.read.option("header", True).csv(bronze_path + "AW_Product_Category.csv")

category_silver = (
    category_df
    .withColumn("ProductCategoryKey", col("ProductCategoryKey").cast("int"))
)

# -----------------------------
# 3. PRODUCT SUBCATEGORY
# -----------------------------
subcategory_df = spark.read.option("header", True).csv(bronze_path + "AW_Product_Subcategory.csv")

subcategory_silver = (
    subcategory_df
    .withColumn("ProductCategoryKey", col("ProductCategoryKey").cast("int"))
    .withColumn("ProductSubcategoryKey", col("ProductSubcategoryKey").cast("int"))
)

# -----------------------------
# 4. PRODUCTS
# -----------------------------
products_df = spark.read.option("header", True).csv(bronze_path + "AW_Products.csv")

products_silver = (
    products_df
    .withColumn("ProductKey", col("ProductKey").cast("int"))
    .withColumn("ProductSubcategoryKey", col("ProductSubcategoryKey").cast("int"))
    .withColumn("ProductCost", col("ProductCost").cast("double"))
    .withColumn("ProductPrice", col("ProductPrice").cast("double"))
)

# -----------------------------
# 5. SALES (2016, 2017, copies)
# -----------------------------
sales_df = spark.read.option("header", True).csv(bronze_path + "AW_Sales*.csv")

sales_silver = (
    sales_df
    .withColumn("CustomerKey", col("CustomerKey").cast("int"))
    .withColumn("ProductKey", col("ProductKey").cast("int"))
    .withColumn("TerritoryKey", col("TerritoryKey").cast("int"))
    .withColumn("OrderLineItem", col("OrderLineItem").cast("int"))
    .withColumn("OrderQuantity", col("OrderQuantity").cast("int"))
    .withColumn("OrderDate", to_date("OrderDate", "yyyy-MM-dd"))
    .withColumn("StockDate", to_date("StockDate", "yyyy-MM-dd"))
)

# -----------------------------
# 6. TERRITORY
# -----------------------------
territory_df = spark.read.option("header", True).csv(bronze_path + "AW_Territory.csv")

territory_silver = (
    territory_df
    .withColumn("SalesTerritoryKey", col("SalesTerritoryKey").cast("int"))
)

# -----------------------------
# SCHEMA VALIDATION (OPTIONAL)
# -----------------------------
customers_silver.printSchema()
category_silver.printSchema()
subcategory_silver.printSchema()
products_silver.printSchema()
sales_silver.printSchema()
territory_silver.printSchema()


StatementMeta(, 01667e87-8a71-49f6-a4d6-2426af9fe1e8, 16, Finished, Available, Finished)

root
 |-- CustomerKey: integer (nullable = true)
 |-- Prefix: string (nullable = true)
 |-- FirstName: string (nullable = true)
 |-- LastName: string (nullable = true)
 |-- BirthDate: date (nullable = true)
 |-- MaritalStatus: string (nullable = true)
 |-- Gender: string (nullable = true)
 |-- EmailAddress: string (nullable = true)
 |-- AnnualIncome: double (nullable = true)
 |-- TotalChildren: integer (nullable = true)
 |-- EducationLevel: string (nullable = true)
 |-- Occupation: string (nullable = true)
 |-- HomeOwner: string (nullable = true)

root
 |-- CategoryName: string (nullable = true)
 |-- ProductCategoryKey: integer (nullable = true)

root
 |-- ProductCategoryKey: integer (nullable = true)
 |-- ProductSubcategoryKey: integer (nullable = true)
 |-- SubcategoryName: string (nullable = true)

root
 |-- ProductKey: integer (nullable = true)
 |-- ProductSubcategoryKey: integer (nullable = true)
 |-- ProductSKU: string (nullable = true)
 |-- ProductName: string (nullable = true)


In [15]:
from notebookutils import mssparkutils

# Silver base path
silver_path = "abfss://mayankworkspace@onelake.dfs.fabric.microsoft.com/lakehouse_ws.Lakehouse/Files/silver/"

# Create silver folder if not exists
mssparkutils.fs.mkdirs(silver_path)

# Write Customers
customers_silver.write.mode("overwrite").parquet(silver_path + "customers")

# Write Product Category
category_silver.write.mode("overwrite").parquet(silver_path + "product_category")

# Write Product Subcategory
subcategory_silver.write.mode("overwrite").parquet(silver_path + "product_subcategory")

# Write Products
products_silver.write.mode("overwrite").parquet(silver_path + "products")

# Write Sales
sales_silver.write.mode("overwrite").parquet(silver_path + "sales")

# Write Territory
territory_silver.write.mode("overwrite").parquet(silver_path + "territory")

print("✅ All Silver tables successfully written.")


StatementMeta(, 01667e87-8a71-49f6-a4d6-2426af9fe1e8, 17, Finished, Available, Finished)

✅ All Silver tables successfully written.


In [16]:
from pyspark.sql import SparkSession
from notebookutils import mssparkutils

spark = SparkSession.builder.getOrCreate()

silver_path = "abfss://mayankworkspace@onelake.dfs.fabric.microsoft.com/lakehouse_ws.Lakehouse/Files/silver/"

tables = [
    "customers",
    "product_category",
    "product_subcategory",
    "products",
    "sales",
    "territory"
]

for table in tables:
    print(f"\n📂 Silver Table: {table}")
    print("-" * 60)
    
    df = spark.read.parquet(silver_path + table)
    
    display(df)


StatementMeta(, 01667e87-8a71-49f6-a4d6-2426af9fe1e8, 18, Finished, Available, Finished)


📂 Silver Table: customers
------------------------------------------------------------


SynapseWidget(Synapse.DataFrame, c4bf44db-8b6d-4c61-a140-26fdf3ab49e6)


📂 Silver Table: product_category
------------------------------------------------------------


SynapseWidget(Synapse.DataFrame, 1cdae921-7943-4b53-a9ba-d95024d64683)


📂 Silver Table: product_subcategory
------------------------------------------------------------


SynapseWidget(Synapse.DataFrame, e4f39468-ecee-43ab-b0f2-919b4b67b2f5)


📂 Silver Table: products
------------------------------------------------------------


SynapseWidget(Synapse.DataFrame, da280243-32f8-446f-8007-f7d3bdfbf22f)


📂 Silver Table: sales
------------------------------------------------------------


SynapseWidget(Synapse.DataFrame, 9b6f4405-2583-4d84-8bfa-c172e1b4f04b)


📂 Silver Table: territory
------------------------------------------------------------


SynapseWidget(Synapse.DataFrame, de741d1d-b2e7-4b3b-9a6f-a0324fb73d33)

In [21]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, to_date

spark = SparkSession.builder.appName("BronzeToSilver").getOrCreate()

# =============================
# PATHS
# =============================
bronze_path = "abfss://mayankworkspace@onelake.dfs.fabric.microsoft.com/lakehouse_ws.Lakehouse/Files/bronze/"
silver_path = "abfss://mayankworkspace@onelake.dfs.fabric.microsoft.com/lakehouse_ws.Lakehouse/Files/silver/"

# =====================================================
# 1️⃣ CUSTOMERS — CLEAN (NO GeographyKey)
# =====================================================
customers_df = (
    spark.read
    .option("header", True)
    .option("inferSchema", True)
    .csv(bronze_path + "AW_Customers.csv")
)

customers_silver = (
    customers_df
    .withColumn("CustomerKey", col("CustomerKey").cast("int"))
    .withColumn("TotalChildren", col("TotalChildren").cast("int"))
    .withColumn("BirthDate", to_date(col("BirthDate"), "yyyy-MM-dd"))
)

customers_silver.write \
    .mode("overwrite") \
    .parquet(silver_path + "customers")

# =====================================================
# 2️⃣ SALES — CLEAN + FIX OrderDate
# =====================================================
sales_df = (
    spark.read
    .option("header", True)
    .option("inferSchema", True)
    .csv(bronze_path + "AW_Sales*.csv")
)

sales_silver = (
    sales_df
    .withColumn("OrderLineItem", col("OrderLineItem").cast("int"))
    .withColumn("OrderQuantity", col("OrderQuantity").cast("int"))
    .withColumn("ProductKey", col("ProductKey").cast("int"))
    .withColumn("CustomerKey", col("CustomerKey").cast("int"))
    .withColumn("TerritoryKey", col("TerritoryKey").cast("int"))
    .withColumn("OrderDate", to_date(col("OrderDate"), "yyyy-MM-dd"))
)

sales_silver.write \
    .mode("overwrite") \
    .parquet(silver_path + "sales")

# =====================================================
# 3️⃣ PRODUCTS
# =====================================================
products_df = (
    spark.read
    .option("header", True)
    .option("inferSchema", True)
    .csv(bronze_path + "AW_Products.csv")
)

products_silver = (
    products_df
    .withColumn("ProductKey", col("ProductKey").cast("int"))
    .withColumn("ProductSubcategoryKey", col("ProductSubcategoryKey").cast("int"))
)

products_silver.write \
    .mode("overwrite") \
    .parquet(silver_path + "products")

print("✅ Silver layer created successfully")


StatementMeta(, 01667e87-8a71-49f6-a4d6-2426af9fe1e8, 23, Finished, Available, Finished)

✅ Silver layer created successfully


In [22]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, to_date
from notebookutils import mssparkutils

# =============================
# Initialize Spark Session
# =============================
spark = SparkSession.builder.appName("SilverToGold").getOrCreate()

# =============================
# Paths
# =============================
silver_path = "abfss://mayankworkspace@onelake.dfs.fabric.microsoft.com/lakehouse_ws.Lakehouse/Files/silver/"
gold_path   = "abfss://mayankworkspace@onelake.dfs.fabric.microsoft.com/lakehouse_ws.Lakehouse/Files/gold/"

# =============================
# Utility: Delete Gold folder if exists
# =============================
def reset_gold(folder):
    path = gold_path + folder
    if mssparkutils.fs.exists(path):
        mssparkutils.fs.rm(path, recurse=True)

# =============================
# Step 1: Read Silver Tables and Clean
# =============================
customers = spark.read.parquet(silver_path + "customers").dropDuplicates(["CustomerKey"])
products  = spark.read.parquet(silver_path + "products").dropDuplicates(["ProductKey"])
territory = spark.read.parquet(silver_path + "territory").dropDuplicates(["SalesTerritoryKey"])
sales     = spark.read.parquet(silver_path + "sales").dropDuplicates(["OrderNumber", "OrderLineItem"])

# Fix any missing dates or types if needed
customers = customers.withColumn("BirthDate", to_date(col("BirthDate"), "yyyy-MM-dd"))
sales = sales.withColumn("OrderDate", to_date(col("OrderDate"), "yyyy-MM-dd")) \
             .withColumn("StockDate", to_date(col("StockDate"), "yyyy-MM-dd"))

# =============================
# Step 2: Create Dimension Tables
# =============================

# 2.1 dim_customers
reset_gold("dim_customers")
dim_customers = customers.select(
    "CustomerKey", "FirstName", "LastName", "BirthDate",
    "Gender", "MaritalStatus", "EmailAddress", "AnnualIncome",
    "TotalChildren", "EducationLevel", "Occupation", "HomeOwner"
)
dim_customers.write.mode("overwrite").parquet(gold_path + "dim_customers")

# 2.2 dim_products
reset_gold("dim_products")
dim_products = products.select(
    "ProductKey", "ProductSubcategoryKey", "ProductSKU", "ProductName",
    "ModelName", "ProductDescription", "ProductColor", "ProductSize",
    "ProductStyle", "ProductCost", "ProductPrice"
)
dim_products.write.mode("overwrite").parquet(gold_path + "dim_products")

# 2.3 dim_territory
reset_gold("dim_territory")
dim_territory = territory.select(
    "SalesTerritoryKey", "Continent", "Country", "Region"
)
dim_territory.write.mode("overwrite").parquet(gold_path + "dim_territory")

# =============================
# Step 3: Create Fact Table (fact_sales)
# =============================
reset_gold("fact_sales")
fact_sales = sales.select(
    "OrderNumber", "OrderLineItem", "CustomerKey", "ProductKey",
    "TerritoryKey", "OrderDate", "StockDate", "OrderQuantity"
)
fact_sales.write.mode("overwrite").parquet(gold_path + "fact_sales")

print("✅ Gold layer created successfully with dimensions and fact table")


StatementMeta(, 01667e87-8a71-49f6-a4d6-2426af9fe1e8, 24, Finished, Available, Finished)

✅ Gold layer created successfully with dimensions and fact table


In [25]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, sum as spark_sum, count as spark_count

spark = SparkSession.builder.appName("Customer360Table").getOrCreate()

# =============================
# Step 1: Read Gold Tables
# =============================
dim_customers = spark.read.parquet("abfss://mayankworkspace@onelake.dfs.fabric.microsoft.com/lakehouse_ws.Lakehouse/Files/gold/dim_customers")
dim_products  = spark.read.parquet("abfss://mayankworkspace@onelake.dfs.fabric.microsoft.com/lakehouse_ws.Lakehouse/Files/gold/dim_products")
dim_territory = spark.read.parquet("abfss://mayankworkspace@onelake.dfs.fabric.microsoft.com/lakehouse_ws.Lakehouse/Files/gold/dim_territory")
fact_sales    = spark.read.parquet("abfss://mayankworkspace@onelake.dfs.fabric.microsoft.com/lakehouse_ws.Lakehouse/Files/gold/fact_sales")

# =============================
# Step 2: Join Tables to Create Customer 360
# =============================
customer_360_raw = (
    fact_sales
    .join(dim_customers, on="CustomerKey", how="left")
    .join(dim_products, on="ProductKey", how="left")
    .join(dim_territory, fact_sales.TerritoryKey == dim_territory.SalesTerritoryKey, how="left")
    .drop(dim_territory.SalesTerritoryKey)
)

# =============================
# Step 3: Aggregate Metrics per Customer
# =============================
customer_360 = (
    customer_360_raw.groupBy(
        "CustomerKey", "FirstName", "LastName", "BirthDate",
        "Gender", "MaritalStatus", "EmailAddress", "AnnualIncome",
        "TotalChildren", "EducationLevel", "Occupation", "HomeOwner"
    )
    .agg(
        spark_count("OrderNumber").alias("TotalOrders"),
        spark_sum("OrderQuantity").alias("TotalQuantity"),
        spark_sum("ProductPrice").alias("TotalSpend")
    )
)

# =============================
# Step 4: Create Table in Metastore
# =============================
# Drop table if exists
spark.sql("DROP TABLE IF EXISTS customer_360")

# Save DataFrame as a managed table
customer_360.write.saveAsTable("customer_360", mode="overwrite")

print("✅ Customer 360 table created successfully in the table section!")


StatementMeta(, 01667e87-8a71-49f6-a4d6-2426af9fe1e8, 27, Finished, Available, Finished)

✅ Customer 360 table created successfully in the table section!


In [26]:
%%sql
select * from customer_360

StatementMeta(, 01667e87-8a71-49f6-a4d6-2426af9fe1e8, 28, Finished, Available, Finished)

<Spark SQL result set with 1000 rows and 15 fields>