In [0]:
df = spark.table("default.grocery_chain_data")

df.printSchema()
df.show(10)
df.count()

root
 |-- customer_id: long (nullable = true)
 |-- store_name: string (nullable = true)
 |-- transaction_date: date (nullable = true)
 |-- aisle: string (nullable = true)
 |-- product_name: string (nullable = true)
 |-- quantity: double (nullable = true)
 |-- unit_price: double (nullable = true)
 |-- total_amount: double (nullable = true)
 |-- discount_amount: double (nullable = true)
 |-- final_amount: double (nullable = true)
 |-- loyalty_points: long (nullable = true)

+-----------+-----------------+----------------+-----------------+------------+--------+----------+------------+---------------+------------+--------------+
|customer_id|       store_name|transaction_date|            aisle|product_name|quantity|unit_price|total_amount|discount_amount|final_amount|loyalty_points|
+-----------+-----------------+----------------+-----------------+------------+--------+----------+------------+---------------+------------+--------------+
|       2824|GreenGrocer Plaza|      2023-08-26|    

1980

In [0]:
df.printSchema()
df.show(5, truncate=False)

root
 |-- customer_id: long (nullable = true)
 |-- store_name: string (nullable = true)
 |-- transaction_date: date (nullable = true)
 |-- aisle: string (nullable = true)
 |-- product_name: string (nullable = true)
 |-- quantity: double (nullable = true)
 |-- unit_price: double (nullable = true)
 |-- total_amount: double (nullable = true)
 |-- discount_amount: double (nullable = true)
 |-- final_amount: double (nullable = true)
 |-- loyalty_points: long (nullable = true)

+-----------+-----------------+----------------+--------------+------------+--------+----------+------------+---------------+------------+--------------+
|customer_id|store_name       |transaction_date|aisle         |product_name|quantity|unit_price|total_amount|discount_amount|final_amount|loyalty_points|
+-----------+-----------------+----------------+--------------+------------+--------+----------+------------+---------------+------------+--------------+
|2824       |GreenGrocer Plaza|2023-08-26      |Produce      

In [0]:
#Null inspection
from pyspark.sql.functions import col

df.select([
    col(c).isNull().alias(c) for c in df.columns
]).show()

+-----------+----------+----------------+-----+------------+--------+----------+------------+---------------+------------+--------------+
|customer_id|store_name|transaction_date|aisle|product_name|quantity|unit_price|total_amount|discount_amount|final_amount|loyalty_points|
+-----------+----------+----------------+-----+------------+--------+----------+------------+---------------+------------+--------------+
|      false|     false|           false|false|       false|   false|     false|       false|          false|       false|         false|
|      false|     false|           false|false|       false|   false|     false|       false|          false|       false|         false|
|      false|     false|           false|false|       false|   false|     false|       false|          false|       false|         false|
|      false|     false|           false|false|       false|   false|     false|       false|          false|       false|         false|
|      false|     false|          

In [0]:
#Computes statitistics
df.describe().show()

+-------+-----------------+----------------+--------------+------------+------------------+------------------+-----------------+-----------------+-----------------+-----------------+
|summary|      customer_id|      store_name|         aisle|product_name|          quantity|        unit_price|     total_amount|  discount_amount|     final_amount|   loyalty_points|
+-------+-----------------+----------------+--------------+------------+------------------+------------------+-----------------+-----------------+-----------------+-----------------+
|  count|             1980|            1955|          1980|        1980|              1980|              1980|             1980|             1980|             1980|             1980|
|   mean|5542.958080808081|            NULL|          NULL|        NULL|2.9681818181818183|15.488045454545459|45.90257575757571| 4.46959090909091| 41.4329848484848|255.1479797979798|
| stddev|2575.771855895511|            NULL|          NULL|        NULL|1.41902817354

In [0]:
#Standardize column names, column cleaning function
def clean_column_name(c):
    return (
        c.lower()
         .replace(" ", "_")
         .replace("-", "_")
         .replace("/", "_")
    )

In [0]:
#Apply cleaned names and create new dataframe
df_clean = df.select([
    col(c).alias(clean_column_name(c)) for c in df.columns
])

In [0]:
#Fixing data types
from pyspark.sql.types import IntegerType, DoubleType
from pyspark.sql.functions import col

df_clean = (
    df_clean
    .withColumn("quantity", col("quantity").cast(IntegerType()))
    .withColumn("loyalty_points", col("loyalty_points").cast(IntegerType()))
    .withColumn("unit_price", col("unit_price").cast(DoubleType()))
    .withColumn("total_amount", col("total_amount").cast(DoubleType()))
    .withColumn("discount_amount", col("discount_amount").cast(DoubleType()))
    .withColumn("final_amount", col("final_amount").cast(DoubleType()))
)

In [0]:
#Date casting
from pyspark.sql.functions import to_date

df_clean = df_clean.withColumn(
    "transaction_date",
    to_date(col("transaction_date"), "yyyy-MM-dd")
)

In [0]:
# Drop rows missing critical business dimensions
df_clean = df_clean.dropna(
    subset=[
        "customer_id",
        "store_name",
        "transaction_date",
        "product_name"
    ]
)

# Fill safe defaults for non-critical numeric fields
df_clean = df_clean.fillna({
    "quantity": 0,
    "discount_amount": 0.0,
    "loyalty_points": 0
})

In [0]:
from pyspark.sql.functions import col, sum

df_clean.select([
    sum(col(c).isNull().cast("int")).alias(c)
    for c in df_clean.columns
]).show()

+-----------+----------+----------------+-----+------------+--------+----------+------------+---------------+------------+--------------+
|customer_id|store_name|transaction_date|aisle|product_name|quantity|unit_price|total_amount|discount_amount|final_amount|loyalty_points|
+-----------+----------+----------------+-----+------------+--------+----------+------------+---------------+------------+--------------+
|          0|         0|               0|    0|           0|       0|         0|           0|              0|           0|             0|
+-----------+----------+----------------+-----+------------+--------+----------+------------+---------------+------------+--------------+



In [0]:
df_clean.count()
df_clean.show(10)

+-----------+-----------------+----------------+-----------------+------------+--------+----------+------------+---------------+------------+--------------+
|customer_id|       store_name|transaction_date|            aisle|product_name|quantity|unit_price|total_amount|discount_amount|final_amount|loyalty_points|
+-----------+-----------------+----------------+-----------------+------------+--------+----------+------------+---------------+------------+--------------+
|       2824|GreenGrocer Plaza|      2023-08-26|          Produce|       Pasta|       2|      7.46|       14.92|            0.0|       14.92|           377|
|       5506| ValuePlus Market|      2024-02-13|            Dairy|      Cheese|       1|      1.85|        1.85|           3.41|       -1.56|           111|
|       4657| ValuePlus Market|      2023-11-23|           Bakery|      Onions|       4|      7.38|       29.52|           4.04|       25.48|           301|
|       2679|SuperSave Central|      2025-01-13|   Snacks 

In [0]:
from pyspark.sql.functions import col, length, trim

df_clean.filter(
    trim(col("store_name")) == ""
).show(10, truncate=False)

+-----------+----------+----------------+-----+------------+--------+----------+------------+---------------+------------+--------------+
|customer_id|store_name|transaction_date|aisle|product_name|quantity|unit_price|total_amount|discount_amount|final_amount|loyalty_points|
+-----------+----------+----------------+-----+------------+--------+----------+------------+---------------+------------+--------------+
+-----------+----------+----------------+-----+------------+--------+----------+------------+---------------+------------+--------------+



In [0]:
from pyspark.sql.functions import when, trim

df_clean = df_clean.withColumn(
    "store_name",
    when(trim(col("store_name")) == "", None)
    .otherwise(col("store_name"))
)

In [0]:
df_clean = df_clean.dropna(subset=["store_name"])

In [0]:
df_clean.select(
    col("store_name"),
).count()

df_clean.select([
    sum(col(c).isNull().cast("int")).alias(c)
    for c in df_clean.columns
]).show()

+-----------+----------+----------------+-----+------------+--------+----------+------------+---------------+------------+--------------+
|customer_id|store_name|transaction_date|aisle|product_name|quantity|unit_price|total_amount|discount_amount|final_amount|loyalty_points|
+-----------+----------+----------------+-----+------------+--------+----------+------------+---------------+------------+--------------+
|          0|         0|               0|    0|           0|       0|         0|           0|              0|           0|             0|
+-----------+----------+----------------+-----+------------+--------+----------+------------+---------------+------------+--------------+



In [0]:
from pyspark.sql.functions import trim, when, col

string_cols = [
    "customer_id",
    "aisle",
    "product_name"
]

for c in string_cols:
    df_clean = df_clean.withColumn(
        c,
        when(trim(col(c)) == "", None).otherwise(col(c))
    )

In [0]:
df_clean = df_clean.dropna(subset=["customer_id",
    "aisle",
    "product_name"])

In [0]:
#Save as new cleaned table
df_clean.write.mode("overwrite").saveAsTable(
    "default.grocery_store_sales_clean"
)

In [0]:
df = spark.table("default.grocery_store_sales_clean")


In [0]:
import base64
from IPython.display import HTML
df = spark.table("default.grocery_store_sales_clean")
df.printSchema()
# Convert to pandas DataFrame
pandas_df = df.toPandas()


csv_data = pandas_df.to_csv(index=False)
b64 = base64.b64encode(csv_data.encode()).decode()

HTML(f'<a download="cleaned_fmcg_data.csv" href="data:text/csv;base64,{b64}">Click here to download cleaned_fmcg_data.csv</a>')

root
 |-- customer_id: long (nullable = true)
 |-- store_name: string (nullable = true)
 |-- transaction_date: date (nullable = true)
 |-- aisle: string (nullable = true)
 |-- product_name: string (nullable = true)
 |-- quantity: integer (nullable = true)
 |-- unit_price: double (nullable = true)
 |-- total_amount: double (nullable = true)
 |-- discount_amount: double (nullable = true)
 |-- final_amount: double (nullable = true)
 |-- loyalty_points: integer (nullable = true)



In [0]:
spark.sql("SHOW TABLES").show()

+--------+--------------------+-----------+
|database|           tableName|isTemporary|
+--------+--------------------+-----------+
| default|         daily_sales|      false|
| default|  grocery_chain_data|      false|
| default|grocery_store_sal...|      false|
| default|grocery_store_sal...|      false|
| default|      sales_by_aisle|      false|
| default|    sales_by_product|      false|
| default|      sales_by_store|      false|
| default|        weekly_sales|      false|
+--------+--------------------+-----------+



In [0]:
from pyspark.sql.functions import year, month, weekofyear

df_transformed = (
    df
    .withColumn("year", year("transaction_date"))
    .withColumn("month", month("transaction_date"))
    .withColumn("week", weekofyear("transaction_date"))
)

#df_transformed.show(10)

In [0]:
from pyspark.sql.functions import when, col

df_transformed = df_transformed.withColumn(
    "has_discount",
    when(col("discount_amount") > 0, 1).otherwise(0)
)

#df_transformed.show(10)

In [0]:
df_transformed = df_transformed.filter(
    (col("quantity") >= 0) &
    (col("final_amount") >= 0) &
    (col("unit_price") >= 0)
)


In [0]:
df_transformed.printSchema()
df_transformed.show(5, truncate=False)
df_transformed.count()

root
 |-- customer_id: long (nullable = true)
 |-- store_name: string (nullable = true)
 |-- transaction_date: date (nullable = true)
 |-- aisle: string (nullable = true)
 |-- product_name: string (nullable = true)
 |-- quantity: integer (nullable = true)
 |-- unit_price: double (nullable = true)
 |-- total_amount: double (nullable = true)
 |-- discount_amount: double (nullable = true)
 |-- final_amount: double (nullable = true)
 |-- loyalty_points: integer (nullable = true)
 |-- year: integer (nullable = true)
 |-- month: integer (nullable = true)
 |-- week: integer (nullable = true)
 |-- has_discount: integer (nullable = false)

+-----------+-----------------+----------------+--------------+------------+--------+----------+------------+---------------+------------+--------------+----+-----+----+------------+
|customer_id|store_name       |transaction_date|aisle         |product_name|quantity|unit_price|total_amount|discount_amount|final_amount|loyalty_points|year|month|week|has_disco

1942

In [0]:
df_transformed.write.mode("overwrite").saveAsTable(
    "default.grocery_store_sales_transformed"
)

In [0]:
df = spark.table("default.grocery_store_sales_transformed")
df.printSchema()

root
 |-- customer_id: long (nullable = true)
 |-- store_name: string (nullable = true)
 |-- transaction_date: date (nullable = true)
 |-- aisle: string (nullable = true)
 |-- product_name: string (nullable = true)
 |-- quantity: integer (nullable = true)
 |-- unit_price: double (nullable = true)
 |-- total_amount: double (nullable = true)
 |-- discount_amount: double (nullable = true)
 |-- final_amount: double (nullable = true)
 |-- loyalty_points: integer (nullable = true)
 |-- year: integer (nullable = true)
 |-- month: integer (nullable = true)
 |-- week: integer (nullable = true)
 |-- has_discount: integer (nullable = true)



In [0]:
import base64
from IPython.display import HTML
df = spark.table("default.grocery_store_sales_transformed")
df.printSchema()
# Convert to pandas DataFrame
pandas_df = df.toPandas()


csv_data = pandas_df.to_csv(index=False)
b64 = base64.b64encode(csv_data.encode()).decode()

HTML(f'<a download="cleaned&transformed_fmcg_data.csv" href="data:text/csv;base64,{b64}">Click here to download cleaned&transformed_fmcg_data.csv</a>')

root
 |-- customer_id: long (nullable = true)
 |-- store_name: string (nullable = true)
 |-- transaction_date: date (nullable = true)
 |-- aisle: string (nullable = true)
 |-- product_name: string (nullable = true)
 |-- quantity: integer (nullable = true)
 |-- unit_price: double (nullable = true)
 |-- total_amount: double (nullable = true)
 |-- discount_amount: double (nullable = true)
 |-- final_amount: double (nullable = true)
 |-- loyalty_points: integer (nullable = true)
 |-- year: integer (nullable = true)
 |-- month: integer (nullable = true)
 |-- week: integer (nullable = true)
 |-- has_discount: integer (nullable = true)



In [0]:
#Sales by product
from pyspark.sql.functions import sum

sales_by_product = (
    df
    .groupBy("product_name")
    .agg(
        sum("final_amount").alias("total_revenue"),
        sum("quantity").alias("total_units")
    )
)

sales_by_product.orderBy("total_revenue", ascending=False).show(10)

+--------------+------------------+-----------+
|  product_name|     total_revenue|total_units|
+--------------+------------------+-----------+
|      Tomatoes|           5381.98|        362|
|      Potatoes|           5306.25|        353|
|Chicken Breast| 5138.790000000001|        377|
|         Bread| 5066.549999999999|        345|
|          Eggs| 4825.589999999999|        333|
|       Bananas|           4697.32|        315|
|        Cereal|           4689.24|        334|
|        Onions| 4611.350000000002|        338|
|        Apples|           4555.64|        322|
|  Orange Juice|4509.3600000000015|        325|
+--------------+------------------+-----------+
only showing top 10 rows


In [0]:
#Sales by store for store performance
sales_by_store = (
    df
    .groupBy("store_name")
    .agg(
        sum("final_amount").alias("total_revenue"),
        sum("quantity").alias("total_units"),
        sum("has_discount").alias("discounted_transactions")
    )
)

sales_by_store.orderBy("total_revenue", ascending=False).show(10)

+------------------+-----------------+-----------+-----------------------+
|        store_name|    total_revenue|total_units|discounted_transactions|
+------------------+-----------------+-----------+-----------------------+
| GreenGrocer Plaza|9883.009999999998|        673|                    181|
| SuperSave Central|9861.490000000002|        696|                    194|
|  City Fresh Store|9788.700000000003|        689|                    200|
| MegaMart Westside|9297.530000000002|        657|                    175|
|    Corner Grocery|9286.060000000003|        660|                    183|
|  ValuePlus Market|          8663.57|        638|                    180|
|FreshMart Downtown|8212.530000000002|        543|                    155|
|  QuickStop Market|8184.160000000005|        614|                    167|
|FamilyFood Express|7719.719999999999|        609|                    173|
+------------------+-----------------+-----------+-----------------------+



In [0]:
#Daily sales
daily_sales = (
    df
    .groupBy("transaction_date")
    .agg(
        sum("final_amount").alias("daily_revenue"),
        sum("quantity").alias("units_sold")
    )
)

daily_sales.orderBy("transaction_date").show(10)

+----------------+------------------+----------+
|transaction_date|     daily_revenue|units_sold|
+----------------+------------------+----------+
|      2023-08-07|             40.52|         2|
|      2023-08-08|            228.57|        15|
|      2023-08-11|             95.99|         6|
|      2023-08-12|            129.94|        11|
|      2023-08-13|             77.53|         9|
|      2023-08-14|            322.79|        16|
|      2023-08-16|211.70999999999998|        11|
|      2023-08-17|186.41000000000003|         9|
|      2023-08-18|             99.12|         5|
|      2023-08-19|            532.52|        26|
+----------------+------------------+----------+
only showing top 10 rows


In [0]:
#Weekly sales (using derived columns)
weekly_sales = (
    df
    .groupBy("year", "week")
    .agg(
        sum("final_amount").alias("weekly_revenue"),
        sum("quantity").alias("weekly_units")
    )
)

weekly_sales.orderBy("year", "week").show(10)

+----+----+------------------+------------+
|year|week|    weekly_revenue|weekly_units|
+----+----+------------------+------------+
|2023|  32| 572.5500000000001|          43|
|2023|  33|           1409.11|          71|
|2023|  34|            630.05|          45|
|2023|  35|416.34000000000003|          37|
|2023|  36|            611.91|          42|
|2023|  37| 884.6800000000001|          63|
|2023|  38|             839.7|          52|
|2023|  39| 730.7000000000002|          60|
|2023|  40| 947.3600000000001|          75|
|2023|  41| 665.4999999999999|          66|
+----+----+------------------+------------+
only showing top 10 rows


In [0]:
#Aisle performance
sales_by_aisle = (
    df
    .groupBy("aisle")
    .agg(
        sum("final_amount").alias("total_revenue"),
        sum("quantity").alias("total_units")
    )
)

sales_by_aisle.orderBy("total_revenue", ascending=False).show(10)

+-----------------+------------------+-----------+
|            aisle|     total_revenue|total_units|
+-----------------+------------------+-----------+
|    Personal Care|           8289.12|        576|
|   Snacks & Candy| 8032.960000000002|        544|
|Health & Wellness| 7903.280000000002|        532|
|     Canned Goods|7808.7000000000035|        561|
|        Beverages|7704.7900000000045|        588|
|            Dairy| 7375.500000000002|        501|
|           Bakery|           7230.69|        506|
|     Frozen Foods| 7090.459999999998|        516|
|  Household Items| 6773.709999999999|        503|
|   Meat & Seafood|           6644.24|        484|
+-----------------+------------------+-----------+
only showing top 10 rows


In [0]:
#Save aggregated tables as gold layer
sales_by_product.write.mode("overwrite").saveAsTable(
    "default.sales_by_product"
)

sales_by_store.write.mode("overwrite").saveAsTable(
    "default.sales_by_store"
)

daily_sales.write.mode("overwrite").saveAsTable(
    "default.daily_sales"
)

weekly_sales.write.mode("overwrite").saveAsTable(
    "default.weekly_sales"
)

sales_by_aisle.write.mode("overwrite").saveAsTable(
    "default.sales_by_aisle"
)

#To check
spark.table("default.sales_by_product").show(5)
spark.table("default.sales_by_store").show(5)
spark.table("default.daily_sales").show(5)

spark.table("default.sales_by_product").count()

+------------+------------------+-----------+
|product_name|     total_revenue|total_units|
+------------+------------------+-----------+
|    Tomatoes|           5381.98|        362|
|      Salmon|4397.9000000000015|        333|
|      Apples|           4555.64|        322|
|        Rice|           4205.35|        304|
|        Milk|3632.3699999999994|        278|
+------------+------------------+-----------+
only showing top 5 rows
+------------------+-----------------+-----------+-----------------------+
|        store_name|    total_revenue|total_units|discounted_transactions|
+------------------+-----------------+-----------+-----------------------+
| GreenGrocer Plaza|9883.009999999998|        673|                    181|
|FreshMart Downtown|8212.530000000002|        543|                    155|
|FamilyFood Express|7719.719999999999|        609|                    173|
| MegaMart Westside|9297.530000000002|        657|                    175|
|  QuickStop Market|8184.160000000005

18