In [3]:
from pyspark.sql.types import *

# Create the schema for the table
orderSchema = StructType([
    StructField("Transaction_ID", StringType()),
    StructField("Customer_ID", StringType()),
    StructField("Name", StringType()),
    StructField("Email", StringType()),
    StructField("Phone", StringType()),
    StructField("Address", StringType()),
    StructField("City", StringType()),
    StructField("State", StringType()),
    StructField("Zipcode", StringType()),
    StructField("Country", StringType()),
    StructField("Age", IntegerType()),
    StructField("Gender", StringType()),
    StructField("Income", StringType()),
    StructField("Customer_Segment", StringType()),
    StructField("Date", StringType()),
    StructField("Year", IntegerType()),
    StructField("Month", StringType()),
    StructField("Time", StringType()),
    StructField("Total_Purchases", IntegerType()),
    StructField("Amount", FloatType()),
    StructField("Total_Amount", FloatType()),
    StructField("Product_Category", StringType()),
    StructField("Product_Brand", StringType()),
    StructField("Product_Type", StringType()),
    StructField("Feedback", StringType()),
    StructField("Shipping_Method", StringType()),
    StructField("Payment_Method", StringType()),
    StructField("Order_Status", StringType()),
    StructField("Ratings", FloatType()),
    StructField("products", StringType())
])


StatementMeta(, 53cfe0b0-7257-49a5-949a-330e3d754108, 5, Finished, Available, Finished)

In [4]:
# Load the CSV file using Spark DataFrame
retail_data_path = "Files/rawdata/retail_data.csv"  # Path within the Fabric Lakehouse
df = spark.read.option("header", True).csv(retail_data_path)




StatementMeta(, 53cfe0b0-7257-49a5-949a-330e3d754108, 6, Finished, Available, Finished)

In [5]:
# Set Spark to use the legacy time parser
spark.conf.set("spark.sql.legacy.timeParserPolicy", "LEGACY")

# Convert the Date column to a DateType using the format "MM/dd/yyyy"
from pyspark.sql import functions as F

# Convert Date column and extract Year, Month, and Day
df = df.withColumn("Date", F.to_date("Date", "MM/dd/yyyy")) \
       .withColumn("Year", F.year("Date")) \
       .withColumn("Month", F.month("Date")) \
       .withColumn("Day", F.dayofmonth("Date"))

# Display transformed columns to verify
df.select("Date", "Year", "Month", "Day").show(10)



StatementMeta(, 53cfe0b0-7257-49a5-949a-330e3d754108, 7, Finished, Available, Finished)

+----------+----+-----+----+
|      Date|Year|Month| Day|
+----------+----+-----+----+
|2023-09-18|2023|    9|  18|
|2023-12-31|2023|   12|  31|
|2023-04-26|2023|    4|  26|
|      NULL|NULL| NULL|NULL|
|      NULL|NULL| NULL|NULL|
|2023-09-21|2023|    9|  21|
|2023-06-26|2023|    6|  26|
|2023-03-24|2023|    3|  24|
|      NULL|NULL| NULL|NULL|
|      NULL|NULL| NULL|NULL|
+----------+----+-----+----+
only showing top 10 rows



In [6]:
# List of numeric columns to replace nulls with 0
numeric_columns = ["Total_Purchases", "Amount", "Total_Amount", "Ratings"]

# Replace nulls in specific numeric columns with 0
df = df.fillna({col: 0 for col in numeric_columns})

# List of string columns to replace nulls with "NA"
string_columns = ["Transaction_ID", "Customer_ID", "Name", "Email", "Phone", "Address", "City", "State", 
                  "Zipcode", "Country", "Gender", "Income", "Customer_Segment", "Product_Category", 
                  "Product_Brand", "Product_Type", "Feedback", "Shipping_Method", "Payment_Method", 
                  "Order_Status", "products"]

# Replace nulls in specific string columns with "NA"
df = df.fillna("NA", subset=string_columns)




StatementMeta(, 53cfe0b0-7257-49a5-949a-330e3d754108, 8, Finished, Available, Finished)

In [7]:
from pyspark.sql import functions as F

# Select the necessary columns to form the fact table
fact_columns = [
    "Transaction_ID",  # Fact key (primary key)
    "Customer_ID",  # Foreign key to customer dimension
    "Date",  # Foreign key to date dimension
    "Year",  # Year of transaction
    "Month",  # Month of transaction
    "Time",  # Timestamp of transaction
    "Total_Purchases",  # Quantity of items purchased
    "Amount",  # Price of each item purchased
    "Total_Amount",  # Total monetary value of the transaction
    "Ratings",  # Customer rating
    "Order_Status",  # Status of the order
    "Shipping_Method"  # Method of shipping
]

# Create the fact table DataFrame
fact_table_df = df.select(fact_columns)

# Show the fact table
fact_table_df.show(10)

StatementMeta(, 53cfe0b0-7257-49a5-949a-330e3d754108, 9, Finished, Available, Finished)

+--------------+-----------+----------+----+-----+--------+---------------+-----------+------------+-------+------------+---------------+
|Transaction_ID|Customer_ID|      Date|Year|Month|    Time|Total_Purchases|     Amount|Total_Amount|Ratings|Order_Status|Shipping_Method|
+--------------+-----------+----------+----+-----+--------+---------------+-----------+------------+-------+------------+---------------+
|       8691788|      37249|2023-09-18|2023|    9|22:03:55|              3|108.0287567|   324.08627|      5|     Shipped|       Same-Day|
|       2174773|      69749|2023-12-31|2023|   12| 8:42:04|              2|403.3539073| 806.7078147|      4|  Processing|       Standard|
|       6679610|      30192|2023-04-26|2023|    4| 4:06:29|              3|354.4775997| 1063.432799|      2|  Processing|       Same-Day|
|       7232460|      62101|      NULL|NULL| NULL|14:55:17|              7|352.4077173| 2466.854021|      4|  Processing|       Standard|
|       4983775|      27901|      

In [8]:
# Select columns for the customer dimension table
customer_dimension_columns = [
    "Customer_ID",  # Primary Key for Customer
    "Name",  # Customer Name
    "Email",  # Customer Email
    "Phone",  # Customer Phone
    "Address",  # Customer Address
    "City",  # Customer City
    "State",  # Customer State
    "Zipcode",  # Customer Zip Code
    "Country",  # Customer Country
    "Age",  # Customer Age
    "Gender",  # Customer Gender
    "Income",  # Customer Income
    "Customer_Segment"  # Customer Segment
]

# Create the customer dimension DataFrame
customer_dimension_df = df.select(customer_dimension_columns).distinct()

# Show the customer dimension table
customer_dimension_df.show(10)


StatementMeta(, 53cfe0b0-7257-49a5-949a-330e3d754108, 10, Finished, Available, Finished)

+-----------+-----------------+--------------------+----------+--------------------+----------+-------+-------+-------+---+------+------+----------------+
|Customer_ID|             Name|               Email|     Phone|             Address|      City|  State|Zipcode|Country|Age|Gender|Income|Customer_Segment|
+-----------+-----------------+--------------------+----------+--------------------+----------+-------+-------+-------+---+------+------+----------------+
|      55159|    Ellen Simpson| Tiffany30@gmail.com|3133334228|0145 Aaron Juncti...|Portsmouth|England|  10430|     UK| 50|  Male|Medium|         Regular|
|      56970|  Christine Lewis|    John32@gmail.com|1861269747|9296 Nicholas Spr...|Portsmouth|England|  47035|     UK| 51|  Male|   Low|         Regular|
|      18983|Melissa Oneill MD| William57@gmail.com|7670084825|3502 Cox Spurs Su...|Portsmouth|England|  78475|     UK| 35|Female|  High|         Regular|
|      20512| Cynthia Sullivan|  Ashley38@gmail.com|5192414007|  8519 

In [9]:
# Select columns for the product dimension table
product_dimension_columns = [
    "Product_Category",  # Product Category
    "Product_Brand",     # Product Brand
    "Product_Type"       # Product Type
]

# Create the product dimension DataFrame
product_dimension_df = df.select(product_dimension_columns).distinct()

# Show the product dimension table
product_dimension_df.show(10)


StatementMeta(, 53cfe0b0-7257-49a5-949a-330e3d754108, 11, Finished, Available, Finished)

+----------------+-------------+------------+
|Product_Category|Product_Brand|Product_Type|
+----------------+-------------+------------+
|           Books|           NA|     Fiction|
|           Books|           NA|  Literature|
|      Home Decor|   Home Depot|   Furniture|
|         Grocery|       Nestle|      Coffee|
|           Books|Penguin Books|  Children's|
|              NA|HarperCollins|     Fiction|
|              NA| Random House| Non-Fiction|
|         Grocery|           NA|      Snacks|
|              NA|         Nike|       Shoes|
|           Books|           NA|    Thriller|
+----------------+-------------+------------+
only showing top 10 rows



In [10]:
# Select the Date column, ensuring only unique values
date_dimension_df = df.select("Date").distinct()

# Add Year, Month, and Day columns by extracting them from the Date column
date_dimension_df = (
    date_dimension_df
    .withColumn("Year", F.year("Date"))  # Extract Year
    .withColumn("Month", F.month("Date"))  # Extract Month
    .withColumn("Day", F.dayofmonth("Date"))  # Extract Day
)

# Show the Date Dimension table
date_dimension_df.show(10)


StatementMeta(, 53cfe0b0-7257-49a5-949a-330e3d754108, 12, Finished, Available, Finished)

+----------+----+-----+---+
|      Date|Year|Month|Day|
+----------+----+-----+---+
|2023-07-15|2023|    7| 15|
|2023-06-22|2023|    6| 22|
|2023-05-22|2023|    5| 22|
|2023-09-14|2023|    9| 14|
|2023-11-22|2023|   11| 22|
|2023-06-18|2023|    6| 18|
|2023-09-19|2023|    9| 19|
|2023-06-23|2023|    6| 23|
|2023-11-29|2023|   11| 29|
|2023-03-24|2023|    3| 24|
+----------+----+-----+---+
only showing top 10 rows



In [11]:
# Calculate total sales per city
total_sales_per_city = df.groupBy("City").agg(F.sum("Total_Amount").alias("Total_Sales"))
total_sales_per_city.show(10)
# Extract Quarter from the Date column and calculate total sales per quarter per year
total_sales_per_quarter_year = df.withColumn("Quarter", F.quarter("Date")) \
                                 .groupBy("Year", "Quarter") \
                                 .agg(F.sum("Total_Amount").alias("Total_Sales"))
total_sales_per_quarter_year.show(10)
# Calculate total products sold per city
total_products_sold_per_city = df.groupBy("City").agg(F.sum("Total_Purchases").alias("Total_Products_Sold"))
total_products_sold_per_city.show(10)
# Identify top-selling products based on total sales amount
top_selling_products = df.groupBy("Product_Type") \
                         .agg(F.sum("Total_Amount").alias("Total_Sales")) \
                         .orderBy(F.desc("Total_Sales"))
top_selling_products.show(10)




StatementMeta(, 53cfe0b0-7257-49a5-949a-330e3d754108, 13, Finished, Available, Finished)

+--------+------------------+
|    City|       Total_Sales|
+--------+------------------+
| Hanover|3217664.2687996104|
|Winnipeg|3237655.1906558997|
| Phoenix|    1283073.605356|
|  Cairns|2999906.1519890605|
| Kelowna|2966322.9487095205|
|Brighton|  3119716.25959924|
|   Omaha|1204551.6702961903|
| Bendigo|3158468.0051768506|
|Canberra|3019667.7421780503|
|  Ottawa|3066374.6119559095|
+--------+------------------+
only showing top 10 rows

+----+-------+--------------------+
|Year|Quarter|         Total_Sales|
+----+-------+--------------------+
|NULL|   NULL|1.6272645120248568E8|
|2023|      3| 6.279246719382084E7|
|2023|      2| 6.190404570670599E7|
|2023|      4| 6.269561396010586E7|
|2024|      1| 4.085001372979897E7|
|2023|      1| 2.159705581262804E7|
+----+-------+--------------------+

+--------+-------------------+
|    City|Total_Products_Sold|
+--------+-------------------+
| Hanover|            12387.0|
|Winnipeg|            12800.0|
| Phoenix|             4830.0|
|  Cair

In [12]:
# Count total customers in each segment
customer_segmentation_df = (
    customer_dimension_df
    .groupBy("Customer_Segment")
    .agg(F.count("Customer_ID").alias("Total_Customers"))
    .orderBy(F.desc("Total_Customers"))
)
customer_segmentation_df.show()


StatementMeta(, 53cfe0b0-7257-49a5-949a-330e3d754108, 14, Finished, Available, Finished)

+----------------+---------------+
|Customer_Segment|Total_Customers|
+----------------+---------------+
|         Regular|         145864|
|             New|          90879|
|         Premium|          64070|
|              NA|            214|
+----------------+---------------+



In [13]:
# Calculate total sales by shipping method
total_sales_by_sales_channel = df.groupBy("Shipping_Method") \
                                 .agg(F.sum("Total_Amount").alias("Total_Sales"))
total_sales_by_sales_channel.show()


StatementMeta(, 53cfe0b0-7257-49a5-949a-330e3d754108, 15, Finished, Available, Finished)

+---------------+--------------------+
|Shipping_Method|         Total_Sales|
+---------------+--------------------+
|             NA|  507516.72776054003|
|        Express| 1.399264616638967E8|
|       Standard|1.2989226147541288E8|
|       Same-Day|1.4223940773847538E8|
+---------------+--------------------+



In [14]:
# Calculate Average Order Value (AOV)
average_order_value_df = (
    fact_table_df
    .agg(
        (F.sum("Total_Amount") / F.countDistinct("Transaction_ID")).alias("Average_Order_Value")
    )
)
average_order_value_df.show()


StatementMeta(, 53cfe0b0-7257-49a5-949a-330e3d754108, 16, Finished, Available, Finished)

+-------------------+
|Average_Order_Value|
+-------------------+
| 1401.0828141001018|
+-------------------+



In [15]:
# Fact Table with relationships enabled
fact_table_df = df.select(
    "Transaction_ID", "Customer_ID", "Date", "Year", "Month", "Time", 
    "Total_Purchases", "Amount", "Total_Amount", "Ratings", 
    "Order_Status", "Shipping_Method"
)
fact_table_df.write.format("delta").mode("overwrite").saveAsTable("casestudtlakeshouse.fact_transactions")

# Customer Dimension Table with primary key Customer_ID
customer_dimension_df = df.select(
    "Customer_ID", "Name", "Email", "Phone", "Address", "City", 
    "State", "Zipcode", "Country", "Age", "Gender", "Income", "Customer_Segment"
).distinct()
customer_dimension_df.write.format("delta").mode("overwrite").saveAsTable("casestudtlakeshouse.dim_customer")

# Product Dimension Table with Product_Type as a key (ensure uniqueness)
product_dimension_df = df.select(
    "Product_Category", "Product_Brand", "Product_Type"
).distinct()
product_dimension_df.write.format("delta").mode("overwrite").saveAsTable("casestudtlakeshouse.dim_product")

# Date Dimension Table with Date as primary key
date_dimension_df = df.select("Date").distinct()
date_dimension_df = date_dimension_df.withColumn("Year", F.year("Date")).withColumn("Month", F.month("Date")).withColumn("Day", F.dayofmonth("Date"))
date_dimension_df.write.format("delta").mode("overwrite").saveAsTable("casestudtlakeshouse.dim_date")

# Aggregated tables (optional relationships)
total_sales_per_city.write.format("delta").mode("overwrite").saveAsTable("casestudtlakeshouse.total_sales_per_city")
total_sales_per_quarter_year.write.format("delta").mode("overwrite").saveAsTable("casestudtlakeshouse.total_sales_per_quarter_year")
total_products_sold_per_city.write.format("delta").mode("overwrite").saveAsTable("casestudtlakeshouse.total_products_sold_per_city")
top_selling_products.write.format("delta").mode("overwrite").saveAsTable("casestudtlakeshouse.top_selling_products")
customer_segmentation_df.write.format("delta").mode("overwrite").saveAsTable("casestudtlakeshouse.customer_segmentation")
total_sales_by_sales_channel.write.format("delta").mode("overwrite").saveAsTable("casestudtlakeshouse.total_sales_by_sales_channel")
average_order_value_df.write.format("delta").mode("overwrite").saveAsTable("casestudtlakeshouse.average_order_value")

print("All tables with key structures saved to Lakehouse in Delta format for Power BI relationships.")


StatementMeta(, 53cfe0b0-7257-49a5-949a-330e3d754108, 17, Finished, Available, Finished)

All tables with key structures saved to Lakehouse in Delta format for Power BI relationships.


In [21]:
from pyspark.sql import functions as F

# Calculate Product-Level Performance: Assess sales and revenue metrics for each individual product
product_performance_df = df.groupBy("Product_Type", "Product_Brand", "Product_Category") \
                           .agg(F.sum("Total_Amount").alias("Total_Sales"), 
                                F.avg("Total_Amount").alias("Average_Sales"),
                                F.count("Transaction_ID").alias("Total_Transactions"))

# Identify Top and Low Performers: Identify best-selling and underperforming products
# Top Performers: Products with the highest total sales
top_performers_df = product_performance_df.orderBy(F.desc("Total_Sales")).limit(10)

top_performers_df.show()

# Low Performers: Products with the lowest total sales
low_performers_df = product_performance_df.orderBy("Total_Sales").limit(10)

low_performers_df.show()

# Save the DataFrames as delta tables in Fabric
product_performance_df.write.format("delta").mode("overwrite").saveAsTable("casestudtlakeshouse.product_performance")
top_performers_df.write.format("delta").mode("overwrite").saveAsTable("casestudtlakeshouse.top_performers")
low_performers_df.write.format("delta").mode("overwrite").saveAsTable("casestudtlakeshouse.low_performers")

StatementMeta(, 53cfe0b0-7257-49a5-949a-330e3d754108, 23, Finished, Available, Finished)

+--------------------+-----------------+----------------+--------------------+------------------+------------------+
|        Product_Type|    Product_Brand|Product_Category|         Total_Sales|     Average_Sales|Total_Transactions|
+--------------------+-----------------+----------------+--------------------+------------------+------------------+
|               Water|            Pepsi|         Grocery|2.5041924743803088E7|1376.2323996374525|             18196|
|              Fridge|        Whirepool|     Electronics|1.0078253856460562E7|1353.6942721908076|              7445|
|Mitsubishi 1.5 To...|       Mitsubhisi|     Electronics|    9188886.29989399|1366.5803539402127|              6724|
|          Smartphone|             Sony|     Electronics|   8554898.355652299|  1381.15892083505|              6194|
|             Kitchen|Bed Bath & Beyond|      Home Decor|   8518760.318454972|1370.4569366883802|              6216|
|          Television|          Samsung|     Electronics|   8514

In [22]:
from pyspark.sql import functions as F

# 1. Customer Demographics
customer_demographics = customer_dimension_df.groupBy("Age", "Gender", "Income") \
    .agg(F.count("Customer_ID").alias("Customer_Count"))

# Save the Customer Demographics table in Delta format
customer_demographics.write.format("delta").mode("overwrite").saveAsTable("casestudtlakeshouse.customer_demographics")

# 2. Valuable Segments
# Define high-value segment as customers with high total purchases or high total amount spent
valuable_segments = df.groupBy("Customer_ID", "Customer_Segment") \
    .agg(
        F.sum("Total_Purchases").alias("Total_Purchases"),
        F.sum("Total_Amount").alias("Total_Spent")
    ) \
    .filter((F.col("Total_Purchases") > 5) | (F.col("Total_Spent") > 1000)) \
    .orderBy(F.desc("Total_Spent"))

# Save the Valuable Segments table in Delta format
valuable_segments.write.format("delta").mode("overwrite").saveAsTable("casestudtlakeshouse.valuable_segments")

# 3. Customer Metrics
# Calculate metrics like AOV (Average Order Value) and repeat purchase rate

# Average Order Value (AOV) per customer
customer_metrics = df.groupBy("Customer_ID") \
    .agg(
        (F.sum("Total_Amount") / F.countDistinct("Transaction_ID")).alias("Average_Order_Value"),
        F.countDistinct("Transaction_ID").alias("Order_Count")
    ) \
    .withColumn("Repeat_Purchase_Rate", F.col("Order_Count") / F.lit(12))  # Assuming a 12-month repeat rate

customer_metrics.show()
# Save the Customer Metrics table in Delta format
customer_metrics.write.format("delta").mode("overwrite").saveAsTable("casestudtlakeshouse.customer_metrics")


StatementMeta(, 53cfe0b0-7257-49a5-949a-330e3d754108, 24, Finished, Available, Finished)

+-----------+-------------------+-----------+--------------------+
|Customer_ID|Average_Order_Value|Order_Count|Repeat_Purchase_Rate|
+-----------+-------------------+-----------+--------------------+
|      66762|      1728.88879485|          2| 0.16666666666666666|
|      22254|      1209.70260044|          5|  0.4166666666666667|
|      17427|      416.009644625|          4|  0.3333333333333333|
|      73463|        785.7597944|          4|  0.3333333333333333|
|      20158|      1747.93736494|          5|  0.4166666666666667|
|      90022|      1095.16751995|          4|  0.3333333333333333|
|      56527| 1902.4520112500002|          4|  0.3333333333333333|
|      80305| 2828.1666306333336|          6|                 0.5|
|      70170|      2295.52931944|          5|  0.4166666666666667|
|      89669| 2985.1704357500003|          4|  0.3333333333333333|
|      78888| 1464.2913076250002|          4|  0.3333333333333333|
|      13772| 152.60330021666667|          3|                0