In [0]:
from pyspark.sql.functions import *
from pyspark.sql.types import *
from pyspark.sql import SparkSession
from pyspark.sql.functions import to_timestamp
import pyspark.sql.functions as F

In [0]:
spark.conf.set("fs.azure.account.auth.type.<yourstorage>.dfs.core.windows.net", "OAuth")
spark.conf.set("fs.azure.account.oauth.provider.type.<yourstorage>.dfs.core.windows.net", "org.apache.hadoop.fs.azurebfs.oauth2.ClientCredsTokenProvider")
spark.conf.set("fs.azure.account.oauth2.client.id.<yourstorage>.dfs.core.windows.net", "<yourclientid>")
spark.conf.set("fs.azure.account.oauth2.client.secret.<yourstorage>.dfs.core.windows.net", "<yoursecret>")
spark.conf.set("fs.azure.account.oauth2.client.endpoint.<yourstorage>.dfs.core.windows.net", "https://login.microsoftonline.com/<yourtenantid>/oauth2/token")

In [0]:
df_date = spark.read.option('header','true').csv('abfss://bronze@datalakeboya.dfs.core.windows.net/bronze/Calendar.csv')
df_cust = spark.read.csv('abfss://bronze@datalakeboya.dfs.core.windows.net/bronze/Customers.csv', header=True, inferSchema=True)
df_prod = spark.read.option('header','true').csv('abfss://bronze@datalakeboya.dfs.core.windows.net/bronze/Products.csv')
df_prod_category = spark.read.option('header','true').csv('abfss://bronze@datalakeboya.dfs.core.windows.net/bronze/Product_Categories.csv')
df_sales7 = spark.read.option('header','true').csv('abfss://bronze@datalakeboya.dfs.core.windows.net/bronze/Sales_2017.csv')
df_sales5 = spark.read.option('header','true').csv('abfss://bronze@datalakeboya.dfs.core.windows.net/bronze/Sales_2015.csv')
df_sales6 = spark.read.option('header','true').csv('abfss://bronze@datalakeboya.dfs.core.windows.net/bronze/Sales_2016.csv')
returns = spark.read.option('header','true').csv('abfss://bronze@datalakeboya.dfs.core.windows.net/bronze/Returns.csv')

In [0]:
df_date.printSchema()
df_prod.printSchema()
df_cust.printSchema()
df_prod_category.printSchema()
df_sales5.printSchema()
df_sales6.printSchema()
df_sales7.printSchema()
returns.printSchema()

root
 |-- Date: string (nullable = true)

root
 |-- ProductKey: string (nullable = true)
 |-- ProductSubcategoryKey: string (nullable = true)
 |-- ProductSKU: string (nullable = true)
 |-- ProductName: string (nullable = true)
 |-- ModelName: string (nullable = true)
 |-- ProductDescription: string (nullable = true)
 |-- ProductColor: string (nullable = true)
 |-- ProductSize: string (nullable = true)
 |-- ProductStyle: string (nullable = true)
 |-- ProductCost: string (nullable = true)
 |-- ProductPrice: string (nullable = true)

root
 |-- CustomerKey: integer (nullable = true)
 |-- Prefix: string (nullable = true)
 |-- FirstName: string (nullable = true)
 |-- LastName: string (nullable = true)
 |-- BirthDate: date (nullable = true)
 |-- MaritalStatus: string (nullable = true)
 |-- Gender: string (nullable = true)
 |-- EmailAddress: string (nullable = true)
 |-- AnnualIncome: string (nullable = true)
 |-- TotalChildren: integer (nullable = true)
 |-- EducationLevel: string (nullable =

In [0]:
# Convert Products columns to proper types
df_prod_clean = df_prod \
    .withColumn("ProductKey", col("ProductKey").cast(IntegerType())) \
    .withColumn("ProductSubcategoryKey", col("ProductSubcategoryKey").cast(IntegerType())) \
    .withColumn("ProductCost", col("ProductCost").cast(DoubleType())) \
    .withColumn("ProductPrice", col("ProductPrice").cast(DoubleType()))
df_prod_clean.printSchema()

root
 |-- ProductKey: integer (nullable = true)
 |-- ProductSubcategoryKey: integer (nullable = true)
 |-- ProductSKU: string (nullable = true)
 |-- ProductName: string (nullable = true)
 |-- ModelName: string (nullable = true)
 |-- ProductDescription: string (nullable = true)
 |-- ProductColor: string (nullable = true)
 |-- ProductSize: string (nullable = true)
 |-- ProductStyle: string (nullable = true)
 |-- ProductCost: double (nullable = true)
 |-- ProductPrice: double (nullable = true)



In [0]:
# Convert Product Categories
df_prod_category_clean = df_prod_category \
    .withColumn("ProductCategoryKey", col("ProductCategoryKey").cast(IntegerType()))
df_prod_category.printSchema()


root
 |-- ProductCategoryKey: string (nullable = true)
 |-- CategoryName: string (nullable = true)



In [0]:
df_sales_all.dtypes

[('OrderDate', 'string'),
 ('StockDate', 'string'),
 ('OrderNumber', 'string'),
 ('ProductKey', 'string'),
 ('CustomerKey', 'string'),
 ('TerritoryKey', 'string'),
 ('OrderLineItem', 'string'),
 ('OrderQuantity', 'string')]

In [0]:
df_sales_all = df_sales5.unionByName(df_sales6).unionByName(df_sales7)

# Convert to proper data types
df_sales_clean = df_sales_all \
    .withColumn("OrderDate", to_timestamp("OrderDate", "M/d/yyyy")) \
    .withColumn("StockDate", to_timestamp("StockDate", "M/d/yyyy")) \
    .withColumn("ProductKey", col("ProductKey").cast(IntegerType())) \
    .withColumn("CustomerKey", col("CustomerKey").cast(IntegerType())) \
    .withColumn("TerritoryKey", col("TerritoryKey").cast(IntegerType())) \
    .withColumn("OrderLineItem", col("OrderLineItem").cast(IntegerType())) \
    .withColumn("OrderQuantity", col("OrderQuantity").cast(IntegerType()))

total_sales = df_sales_clean.count()

In [0]:
returns.printSchema()

root
 |-- ReturnDate: string (nullable = true)
 |-- TerritoryKey: string (nullable = true)
 |-- ProductKey: string (nullable = true)
 |-- ReturnQuantity: string (nullable = true)



In [0]:
# Convert Returns to proper types
df_returns_clean = returns \
    .withColumn("ReturnDate", to_timestamp("ReturnDate", "M/d/yyyy")) \
    .withColumn("TerritoryKey", col("TerritoryKey").cast(IntegerType())) \
    .withColumn("ProductKey", col("ProductKey").cast(IntegerType())) \
    .withColumn("ReturnQuantity", col("ReturnQuantity").cast(IntegerType()))


### Create Sales Fact Table with Metrics

In [0]:
# Join sales with products to get pricing
sales_with_products = df_sales_clean.join(
    df_prod_clean.select("ProductKey", "ProductName", "ProductCost", "ProductPrice", 
                         "ProductSubcategoryKey", "ProductColor", "ProductSize"),
    on="ProductKey",
    how="left"
)

# Calculate revenue, cost, and profit
sales_fact = sales_with_products \
    .withColumn("Revenue", F.round(col("OrderQuantity") * col("ProductPrice"), 2)) \
    .withColumn("Cost", F.round(col("OrderQuantity") * col("ProductCost"), 2)) \
    .withColumn("Profit", F.round(col("Revenue") - col("Cost"), 2)) \
    .withColumn("ProfitMargin", 
                F.round((col("Profit") / col("Revenue")) * 100, 2))

In [0]:
# Join with product categories
sales_fact = sales_fact.join(
    df_prod_category_clean.select("ProductCategoryKey", "CategoryName"),
    sales_fact.ProductSubcategoryKey == df_prod_category_clean.ProductCategoryKey,
    how="left"
).drop(df_prod_category_clean.ProductCategoryKey)

In [0]:
# Join with customers for demographic analysis
sales_fact = sales_fact.join(
    df_cust.select("CustomerKey", "Gender", "MaritalStatus", "AnnualIncome", 
                   "TotalChildren", "EducationLevel", "Occupation", "HomeOwner"),
    on="CustomerKey",
    how="left"
)

# Add date components for analysis
sales_fact = sales_fact \
    .withColumn("Year", year(col("OrderDate"))) \
    .withColumn("Month", month(col("OrderDate"))) \
    .withColumn("Quarter", quarter(col("OrderDate")))

In [0]:
display(sales_fact.select(
    "OrderDate", "OrderNumber", "ProductName", "CategoryName",
    "OrderQuantity", "Revenue", "Profit", "ProfitMargin",
    "Gender", "AnnualIncome"
).limit(10))

OrderDate,OrderNumber,ProductName,CategoryName,OrderQuantity,Revenue,Profit,ProfitMargin,Gender,AnnualIncome
2015-01-01T00:00:00Z,SO45080,"Road-650 Black, 58",Components,1,699.1,285.95,40.9,M,"$80,000"
2015-01-01T00:00:00Z,SO45079,"Road-150 Red, 48",Components,1,3578.27,1406.98,39.32,M,"$80,000"
2015-01-01T00:00:00Z,SO45082,"Mountain-100 Black, 44",Bikes,1,3374.99,1476.9,43.76,M,"$100,000"
2015-01-01T00:00:00Z,SO45081,"Road-650 Black, 44",Components,1,699.1,285.95,40.9,M,"$30,000"
2015-01-02T00:00:00Z,SO45083,"Road-150 Red, 48",Components,1,3578.27,1406.98,39.32,M,"$20,000"
2015-01-02T00:00:00Z,SO45084,"Road-150 Red, 62",Components,1,3578.27,1406.98,39.32,F,"$60,000"
2015-01-02T00:00:00Z,SO45086,"Road-150 Red, 56",Components,1,3578.27,1406.98,39.32,F,"$70,000"
2015-01-02T00:00:00Z,SO45085,"Road-150 Red, 48",Components,1,3578.27,1406.98,39.32,M,"$70,000"
2015-01-03T00:00:00Z,SO45093,"Road-150 Red, 48",Components,1,3578.27,1406.98,39.32,F,"$30,000"
2015-01-03T00:00:00Z,SO45090,"Road-150 Red, 62",Components,1,3578.27,1406.98,39.32,F,"$60,000"


### Creaye aggregated tables for PowerBI

In [0]:
sales_by_date = sales_fact.groupBy("OrderDate", "Year", "Month", "Quarter").agg(
    count("OrderNumber").alias("TotalOrders"),
    F.sum("OrderQuantity").alias("TotalUnitsSold"),
    F.round(F.sum("Revenue"), 2).alias("TotalRevenue"),
    F.round(F.sum("Cost"), 2).alias("TotalCost"),
    F.round(F.sum("Profit"), 2).alias("TotalProfit"),
    F.round(avg("Revenue"), 2).alias("AvgOrderValue"),
    F.round((F.sum("Profit") / F.sum("Revenue")) * 100, 2).alias("ProfitMarginPct")
).orderBy("OrderDate")

display(sales_by_date.limit(10))

OrderDate,Year,Month,Quarter,TotalOrders,TotalUnitsSold,TotalRevenue,TotalCost,TotalProfit,AvgOrderValue,ProfitMarginPct
2015-01-01T00:00:00Z,2015,1,1,4,4,8351.46,4895.68,3455.78,2087.87,41.38
2015-01-02T00:00:00Z,2015,1,1,4,4,14313.08,8685.16,5627.92,3578.27,39.32
2015-01-03T00:00:00Z,2015,1,1,8,8,28041.32,16564.78,11476.54,3505.17,40.93
2015-01-04T00:00:00Z,2015,1,1,5,5,17713.07,10597.31,7115.76,3542.61,40.17
2015-01-05T00:00:00Z,2015,1,1,3,3,7855.64,4755.73,3099.91,2618.55,39.46
2015-01-06T00:00:00Z,2015,1,1,6,6,21266.34,12754.54,8511.8,3544.39,40.02
2015-01-07T00:00:00Z,2015,1,1,4,4,8554.74,5168.88,3385.86,2138.69,39.58
2015-01-08T00:00:00Z,2015,1,1,8,8,25365.43,15079.84,10285.59,3170.68,40.55
2015-01-09T00:00:00Z,2015,1,1,4,4,14313.08,8685.16,5627.92,3578.27,39.32
2015-01-10T00:00:00Z,2015,1,1,4,4,14109.8,8411.96,5697.84,3527.45,40.38


In [0]:
# Monthly aggregations
sales_by_month = sales_fact.groupBy("Year", "Month").agg(
    count("OrderNumber").alias("TotalOrders"),
    F.sum("OrderQuantity").alias("TotalUnitsSold"),
    F.round(F.sum("Revenue"), 2).alias("TotalRevenue"),
    F.round(F.sum("Profit"), 2).alias("TotalProfit"),
    F.round(avg("Revenue"), 2).alias("AvgOrderValue"),
    countDistinct("CustomerKey").alias("UniqueCustomers")
).orderBy("Year", "Month")

# Add period label
sales_by_month = sales_by_month.withColumn(
    "Period",
    concat(col("Year"), lit("-"), 
           when(col("Month") < 10, concat(lit("0"), col("Month"))).otherwise(col("Month")))
)
display(sales_by_month)

Year,Month,TotalOrders,TotalUnitsSold,TotalRevenue,TotalProfit,AvgOrderValue,UniqueCustomers,Period
2015,1,184,184,585312.69,235814.67,3181.05,184,2015-1
2015,2,165,165,532226.28,212187.27,3225.61,165,2015-2
2015,3,198,198,643436.14,259085.24,3249.68,198,2015-3
2015,4,204,204,653364.08,263032.06,3202.77,204,2015-4
2015,5,206,206,659325.94,266276.48,3200.61,206,2015-5
2015,6,212,212,669988.72,270068.24,3160.32,212,2015-6
2015,7,247,247,486114.93,196682.65,1968.08,247,2015-7
2015,8,278,278,536452.77,218355.33,1929.69,278,2015-8
2015,9,196,196,344062.89,140516.01,1755.42,196,2015-9
2015,10,223,223,404276.65,168581.63,1812.9,223,2015-10


In [0]:
# Category performance
sales_by_category = sales_fact.groupBy("CategoryName").agg(
    count("OrderNumber").alias("TotalOrders"),
    F.sum("OrderQuantity").alias("TotalUnitsSold"),
    F.round(F.sum("Revenue"), 2).alias("TotalRevenue"),
    F.round(F.sum("Profit"), 2).alias("TotalProfit"),
    F.round((F.sum("Profit") / F.sum("Revenue")) * 100, 2).alias("ProfitMarginPct"),
    countDistinct("ProductKey").alias("UniqueProducts"),
    countDistinct("CustomerKey").alias("UniqueCustomers")
).orderBy(col("TotalRevenue").desc())
display(sales_by_category)

CategoryName,TotalOrders,TotalUnitsSold,TotalRevenue,TotalProfit,ProfitMarginPct,UniqueProducts,UniqueCustomers
Components,7099,7099,11287183.77,4368350.76,38.7,38,5728
Bikes,4706,4706,8583751.63,3930658.03,45.79,28,3950
Clothing,2124,2124,3771564.66,1427160.29,37.84,22,2100
,42117,70245,1272083.6,731567.45,57.51,42,16158


In [0]:
# Customer demographic analysis
sales_by_customer_segment = sales_fact.groupBy("Gender", "MaritalStatus", "EducationLevel").agg(
    countDistinct("CustomerKey").alias("CustomerCount"),
    count("OrderNumber").alias("TotalOrders"),
    F.round(F.sum("Revenue"), 2).alias("TotalRevenue"),
    F.round(F.sum("Profit"), 2).alias("TotalProfit"),
    F.round(avg("Revenue"), 2).alias("AvgOrderValue"),
    F.round(F.sum("Revenue") / countDistinct("CustomerKey"), 2).alias("RevenuePerCustomer")
).orderBy(col("TotalRevenue").desc())
display(sales_by_customer_segment.limit(10))

Gender,MaritalStatus,EducationLevel,CustomerCount,TotalOrders,TotalRevenue,TotalProfit,AvgOrderValue,RevenuePerCustomer
M,M,Bachelors,1556,5189,2320284.52,974895.55,447.15,1491.19
F,M,Bachelors,1383,4700,2258597.92,946483.84,480.55,1633.11
F,S,Bachelors,1106,3634,2061880.36,860206.77,567.39,1864.27
M,S,Bachelors,1026,3330,1738201.81,724504.63,521.98,1694.15
M,M,Partial College,1287,4286,1674636.87,707568.43,390.72,1301.19
F,S,Partial College,1168,3730,1656237.9,698025.16,444.03,1418.01
F,M,Partial College,1189,3905,1609097.75,678343.79,412.06,1353.32
M,S,Partial College,1064,3308,1534220.27,639574.58,463.79,1441.94
M,M,Graduate Degree,952,3154,1309773.13,555845.46,415.27,1375.81
F,M,Graduate Degree,847,2750,1252436.5,529161.2,455.43,1478.67


In [0]:
# Territory performance
sales_by_territory = sales_fact.groupBy("TerritoryKey").agg(
    count("OrderNumber").alias("TotalOrders"),
    F.sum("OrderQuantity").alias("TotalUnitsSold"),
    F.round(F.sum("Revenue"), 2).alias("TotalRevenue"),
    F.round(F.sum("Profit"), 2).alias("TotalProfit"),
    F.round((F.sum("Profit") / F.sum("Revenue")) * 100, 2).alias("ProfitMarginPct"),
    countDistinct("CustomerKey").alias("UniqueCustomers")
).orderBy(col("TotalRevenue").desc())
display(sales_by_territory)

TerritoryKey,TotalOrders,TotalUnitsSold,TotalRevenue,TotalProfit,ProfitMarginPct,UniqueCustomers
9,12409,17951,7416455.01,3077027.17,41.49,3480
4,11463,17191,4822794.68,2040348.96,42.31,4134
1,8267,12513,3095074.3,1314754.28,42.48,3075
10,6423,9694,2902561.58,1214777.16,41.85,1822
8,5289,7950,2524679.6,1054188.26,41.76,1675
7,5239,7862,2362642.77,989348.33,41.87,1705
6,6875,10894,1769245.47,757847.04,42.83,1499
5,34,49,11585.63,5133.32,44.31,10
2,27,40,6401.56,2874.57,44.9,8
3,20,30,3143.06,1437.44,45.73,8


In [0]:
# Top performing products
top_products = sales_fact.groupBy("ProductKey", "ProductName", "CategoryName").agg(
    F.sum("OrderQuantity").alias("TotalUnitsSold"),
    F.round(F.sum("Revenue"), 2).alias("TotalRevenue"),
    F.round(F.sum("Profit"), 2).alias("TotalProfit"),
    F.round((F.sum("Profit") / F.sum("Revenue")) * 100, 2).alias("ProfitMarginPct"),
    count("OrderNumber").alias("OrderCount")
).orderBy(col("TotalRevenue").desc())
display(top_products.limit(20))

ProductKey,ProductName,CategoryName,TotalUnitsSold,TotalRevenue,TotalProfit,ProfitMarginPct,OrderCount
362,"Mountain-200 Black, 46",Bikes,606,1241754.6,571633.74,46.03,606
360,"Mountain-200 Black, 42",Bikes,602,1233558.2,567860.58,46.03,602
352,"Mountain-200 Silver, 38",Bikes,586,1213852.12,558786.16,46.03,586
356,"Mountain-200 Silver, 46",Bikes,571,1182780.82,544482.76,46.03,571
358,"Mountain-200 Black, 38",Bikes,569,1165937.9,536732.01,46.03,569
354,"Mountain-200 Silver, 42",Bikes,547,1133066.74,521597.32,46.03,547
377,"Road-250 Black, 52",Components,316,689372.96,272038.08,39.46,316
371,"Road-250 Red, 58",Components,303,661012.68,260846.64,39.46,303
375,"Road-250 Black, 48",Components,294,641378.64,253098.72,39.46,294
312,"Road-150 Red, 48",Components,179,640510.33,251849.42,39.32,179


### Calculate KPI

In [0]:
# Overall business KPIs
overall_kpis = sales_fact.agg(
    count("OrderNumber").alias("TotalOrders"),
    countDistinct("CustomerKey").alias("TotalCustomers"),
    countDistinct("ProductKey").alias("TotalProducts"),
    F.sum("OrderQuantity").alias("TotalUnitsSold"),
    F.round(F.sum("Revenue"), 2).alias("TotalRevenue"),
    F.round(F.sum("Cost"), 2).alias("TotalCost"),
    F.round(F.sum("Profit"), 2).alias("TotalProfit"),
    F.round((F.sum("Profit") / F.sum("Revenue")) * 100, 2).alias("OverallProfitMargin"),
    F.round(avg("Revenue"), 2).alias("AvgOrderValue")
)
display(overall_kpis)

TotalOrders,TotalCustomers,TotalProducts,TotalUnitsSold,TotalRevenue,TotalCost,TotalProfit,OverallProfitMargin,AvgOrderValue
56046,17416,130,84174,24914583.66,14456847.13,10457736.53,41.97,444.54


### Save

In [0]:
sales_fact.write.format('parquet') \
    .mode('overwrite') \
    .partitionBy('Year', 'Month') \
    .option('path', 'abfss://silver@datalakeboya.dfs.core.windows.net/sales_fact') \
    .save()
# Save daily sales
sales_by_date.write.format('parquet') \
    .mode('overwrite') \
    .option('path', 'abfss://silver@datalakeboya.dfs.core.windows.net/sales_by_date') \
    .save()

# Save monthly sales
sales_by_month.write.format('parquet') \
    .mode('overwrite') \
    .option('path', 'abfss://silver@datalakeboya.dfs.core.windows.net/sales_by_month') \
    .save()

# Save category performance
sales_by_category.write.format('parquet') \
    .mode('overwrite') \
    .option('path', 'abfss://silver@datalakeboya.dfs.core.windows.net/sales_by_category') \
    .save()

# Save customer segments
sales_by_customer_segment.write.format('parquet') \
    .mode('overwrite') \
    .option('path', 'abfss://silver@datalakeboya.dfs.core.windows.net/sales_by_customer_segment') \
    .save()

# Save territory performance
sales_by_territory.write.format('parquet') \
    .mode('overwrite') \
    .option('path', 'abfss://silver@datalakeboya.dfs.core.windows.net/sales_by_territory') \
    .save()

# Save top products
top_products.write.format('parquet') \
    .mode('overwrite') \
    .option('path', 'abfss://silver@datalakeboya.dfs.core.windows.net/top_products') \
    .save()

# Save clean products dimension
df_prod_clean.write.format('parquet') \
    .mode('overwrite') \
    .option('path', 'abfss://silver@datalakeboya.dfs.core.windows.net/dim_products') \
    .save()

# Save customers dimension
df_cust.write.format('parquet') \
    .mode('overwrite') \
    .option('path', 'abfss://silver@datalakeboya.dfs.core.windows.net/dim_customers') \
    .save()

# Save categories dimension
df_prod_category_clean.write.format('parquet') \
    .mode('overwrite') \
    .option('path', 'abfss://silver@datalakeboya.dfs.core.windows.net/dim_categories') \
    .save()