In [None]:
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
import datetime
import os
from dotenv import load_dotenv

# Load environment variables
load_dotenv()

# Initialize Spark session
spark = SparkSession.builder \
        .appName("test") \
        .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
        .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog") \
        .config("spark.jars.packages",
                "org.postgresql:postgresql:42.7.4,"
                "org.apache.hadoop:hadoop-azure:3.3.6,"
                "org.apache.hadoop:hadoop-azure-datalake:3.3.6,"
                "io.delta:delta-spark_2.13:4.0.0") \
        .getOrCreate()


# ADLS Gen2 configuration
storage_account_name = "firstprojectde"
storage_account_key = os.getenv("AZURE_STORAGE_KEY")
container_name = "steps"

# Set Hadoop configuration for ADLS Gen2
spark.conf.set(
    f"fs.azure.account.key.{storage_account_name}.dfs.core.windows.net",
    storage_account_key
)

# Base ADLS path
adls_base_path = f"abfss://{container_name}@{storage_account_name}.dfs.core.windows.net/"


In [3]:
# Import everything
from pyspark.sql import functions as F
from src.utils.config import get_config, get_path
from src.modeling.io_utils import write_delta_full, write_delta_daily_fact

# Initialize
config = get_config()
date = "2025-10-20"  # Your test date

In [4]:


# 1. READ MODELED DATA
print("Reading modeled data...")
fact_sales = spark.read.format("delta").load(
    get_path(config, "curated", "analytics", "fact_sales")
)
dim_store = spark.read.format("delta").load(
    get_path(config, "curated", "analytics", "dim_store")
)
dim_product = spark.read.format("delta").load(
    get_path(config, "curated", "analytics", "dim_product")
)



Reading modeled data...


In [5]:
dim_date = spark.read.format("delta").load(
    get_path(config, "curated", "analytics", "dim_date")
)

In [13]:
dim_date.count()

31

In [4]:
print(f"Fact sales count: {fact_sales.count()}")
print(f"Dim store count: {dim_store.count()}")
print(f"Dim product count: {dim_product.count()}")

Fact sales count: 10307
Dim store count: 22
Dim product count: 153


In [5]:
# 2. CREATE AGGREGATED TABLES
print("Creating aggregated tables...")

# Daily store sales
daily_store_sales = fact_sales.alias("f") \
    .join(dim_store.alias("s"), "store_id") \
    .groupBy("f.date", "s.store_id", "s.store_name", "s.town") \
    .agg(
        F.sum("f.line_amount").alias("daily_sales"),
        F.sum("f.quantity").alias("total_items_sold"),
        F.count("f.sale_id").alias("transaction_count"),
        F.avg("f.line_amount").alias("avg_transaction_value")
    )

# Daily product sales  
daily_product_sales = fact_sales.alias("f") \
    .join(dim_product.alias("p"), "product_id") \
    .groupBy("f.date", "p.product_id", "p.product_name", "p.category") \
    .agg(
        F.sum("f.line_amount").alias("daily_revenue"),
        F.sum("f.quantity").alias("total_quantity_sold"),
        F.count("f.sale_id").alias("times_sold")
    )


Creating aggregated tables...


In [7]:
print(f"Daily store sales count: {daily_store_sales.count()}")
print(f"Daily product sales count: {daily_product_sales.count()}")

Daily store sales count: 546
Daily product sales count: 2656


In [8]:
# Show some data
print("Daily store sales sample:")
daily_store_sales.show(5)

print("Daily product sales sample:")
daily_product_sales.show(5)

Daily store sales sample:
+----------+--------+--------------------+------+-----------------+----------------+-----------------+---------------------+
|      date|store_id|          store_name|  town|      daily_sales|total_items_sold|transaction_count|avg_transaction_value|
+----------+--------+--------------------+------+-----------------+----------------+-----------------+---------------------+
|2025-09-02|    S006|      Store Page PLC| Gabes|          1316.21|              23|               10|              131.621|
|2025-09-03|    S019|Store Cherry, Lee...|Sousse|49529.04999999998|             731|              237|   208.98333333333326|
|2025-08-13|    S014|   Store Vasquez LLC|Sousse|           608.72|              16|                8|                76.09|
|2025-09-02|    S012|Store Carlson-Horton|  Sfax|9708.839999999998|             168|               55|    176.5243636363636|
|2025-09-03|    S005|Store Villanueva-...|  Sfax|38878.75000000001|             586|              1

In [9]:


# 3. WRITE TO SERVING LAYER
print("Writing to serving layer...")
write_delta_full(spark, daily_store_sales, config, "serving", "aggregated", "daily_store_sales", "date")
write_delta_full(spark, daily_product_sales, config, "serving", "aggregated", "daily_product_sales", "date")

print("✅ Aggregation completed successfully!")
print("Check ADLS: steps/serving/aggregated/")

Writing to serving layer...
✅ Aggregation completed successfully!
Check ADLS: steps/serving/aggregated/


In [10]:
# Daily aggregation test
print("Testing DAILY aggregation...")

# Read only specific date
fact_sales_daily = spark.read.format("delta").load(
    get_path(config, "curated", "analytics", "fact_sales")
).filter(F.col("date") == date)

print(f"Fact sales for {date}: {fact_sales_daily.count()} rows")

Testing DAILY aggregation...
Fact sales for 2025-10-20: 188 rows


In [None]:
# Create daily aggregates
daily_store_sales_single_day = fact_sales_daily.alias("f") \
    .join(dim_store.alias("s"), "store_id") \
    .groupBy("f.date", "s.store_id", "s.store_name", "s.town") \
    .agg(
        F.sum("f.line_amount").alias("daily_sales"),
        F.sum("f.quantity").alias("total_items_sold"),
        F.count("f.sale_id").alias("transaction_count")
    )

print(f"Daily store sales for {date}: {daily_store_sales_single_day.count()} rows")
daily_store_sales_single_day.show()

# Append to existing table
write_delta_daily_fact(spark, daily_store_sales_single_day, config, "serving", "aggregated", "daily_store_sales", "date")

print("✅ Daily aggregation completed!")

In [4]:
from pyspark.sql import DataFrame
def create_store_performance_dashboard(fact_sales_df: DataFrame, dim_store_df: DataFrame, dim_date_df: DataFrame) -> DataFrame:
    """Create comprehensive store performance metrics for dashboards."""
    return fact_sales_df.alias("f") \
        .join(dim_store_df.alias("s"), "store_id") \
        .join(dim_date_df.alias("d"), "date_id") \
        .groupBy("s.store_id", "s.store_name", "s.town", "d.year", "d.month") \
        .agg(
            F.sum("f.line_amount").alias("monthly_revenue"),
            F.sum("f.quantity").alias("total_items_sold"),
            F.count_distinct("f.sale_id").alias("transaction_count"),
            F.avg("f.line_amount").alias("avg_transaction_value"),
            F.count_distinct("f.date").alias("trading_days"),
            (F.sum("f.line_amount") / F.count_distinct("f.date")).alias("avg_daily_revenue")
        ) \
        .orderBy("s.store_name", "d.year", "d.month")

In [22]:
def create_store_performance_dashboard(fact_sales_df: DataFrame, dim_store_df: DataFrame, dim_date_df: DataFrame) -> DataFrame:
    """Create comprehensive store performance metrics for dashboards."""
    joined_df = fact_sales_df.alias("f") \
        .join(dim_store_df.alias("s"), "store_id") \
        .join(dim_date_df.alias("d"), "date_id")
    
    result = joined_df.groupBy("s.store_id", "s.store_name", "s.town", "d.year", "d.month") \
        .agg(
            F.sum("f.line_amount").alias("monthly_revenue"),
            F.sum("f.quantity").alias("total_items_sold"),
            F.count_distinct("f.sale_id").alias("transaction_count"),
            F.avg("f.line_amount").alias("avg_transaction_value"),
            F.count_distinct("f.date").alias("trading_days")
        ) \
        .withColumn("month_year", F.concat(F.col("year"), F.lit("-"), F.lpad(F.col("month"), 2, "0")))  # Create clean month partition
    
    return result

In [23]:
store_performance = create_store_performance_dashboard(fact_sales, dim_store, dim_date)

In [25]:
store_performance.count()

42

In [24]:
store_performance.show()

+--------+--------------------+--------+----+-----+------------------+----------------+-----------------+---------------------+------------+----------+
|store_id|          store_name|    town|year|month|   monthly_revenue|total_items_sold|transaction_count|avg_transaction_value|trading_days|month_year|
+--------+--------------------+--------+----+-----+------------------+----------------+-----------------+---------------------+------------+----------+
|    S012|Store Carlson-Horton|    Sfax|2025|    9| 69780.48000000007|            1082|               63|   199.37280000000018|           3|   2025-09|
|    S016|    Store Thomas PLC| Bizerte|2025|    8|29426.960000000003|             507|               63|   115.39984313725492|          25|   2025-08|
|    S016|    Store Thomas PLC| Bizerte|2025|    9| 52376.54999999998|             876|               57|   172.85990099009894|           3|   2025-09|
|    S004|   Store Lopez-Chang|Monastir|2025|    8|29412.389999999996|             548| 

In [24]:
def create_product_sales_trends(fact_sales_df: DataFrame, dim_product_df: DataFrame, dim_date_df: DataFrame) -> DataFrame:
    """Create product performance with trends for analysis."""
    return fact_sales_df.alias("f") \
        .join(dim_product_df.alias("p"), "product_id") \
        .join(dim_date_df.alias("d"), "date_id") \
        .groupBy("p.product_id", "p.product_name", "p.category", "d.year", "d.month", "d.date") \
        .agg(
            F.sum("f.line_amount").alias("daily_revenue"),
            F.sum("f.quantity").alias("daily_quantity_sold"),
            F.count("f.sale_id").alias("times_sold_today")
        )

In [25]:
product_trends = create_product_sales_trends(fact_sales, dim_product, dim_date)

In [26]:
product_trends.count()

2656

In [27]:
product_trends.show()

+----------+------------+-----------+----+-----+----------+------------------+-------------------+----------------+
|product_id|product_name|   category|year|month|      date|     daily_revenue|daily_quantity_sold|times_sold_today|
+----------+------------+-----------+----+-----+----------+------------------+-------------------+----------------+
|      P116|         Hat|Accessories|2025|    9|2025-09-04|             951.3|                 15|               5|
|      P016|     Loafers|   Footwear|2025|    9|2025-09-03|           5380.78|                 55|              20|
|      P077|     Loafers|   Footwear|2025|    9|2025-09-03|3158.4000000000005|                 66|              22|
|      P104|       Shirt|   Clothing|2025|    9|2025-09-04|             936.4|                 10|               7|
|      P007|       Shirt|   Clothing|2025|    9|2025-09-04|           4118.38|                 46|              16|
|      P072|      Gloves|Accessories|2025|    9|2025-09-02|           11

In [6]:
from datetime import datetime, timedelta
month = (datetime.now() - timedelta(days=1)).strftime("%Y-%m")

In [None]:
def create_monthly_customer_metrics(fact_sales_df: DataFrame) -> DataFrame:
    """Create monthly customer performance metrics."""
    return fact_sales_df.withColumn("month", F.date_trunc("month", F.col("date"))) \
        .groupBy("month") \
        .agg(
            F.countDistinct("sale_id").alias("total_transactions"),
            F.sum("line_amount").alias("monthly_revenue"),
            F.avg("line_amount").alias("avg_sale_amount")
        )


In [8]:
from pyspark.sql import DataFrame

In [30]:
def create_monthly_customer_metrics(fact_sales_df: DataFrame) -> DataFrame:
    """Create monthly customer performance metrics."""
    return fact_sales_df.withColumn("month", F.date_trunc("month", F.col("date"))) \
        .withColumn("month_partition", F.date_format("month", "yyyy-MM")) \
        .groupBy("month_partition") \
        .agg(
            F.countDistinct("sale_id").alias("total_transactions"),
            F.sum("line_amount").alias("monthly_revenue"),
            F.avg("line_amount").alias("avg_sale_amount")
        ) \
        .withColumnRenamed("month_partition", "month_year")  # Rename back to month

In [8]:
month

'2025-10'

In [6]:
fact_sales = spark.read.format("delta").load(
    get_path(config, "curated", "analytics", "fact_sales")
).filter(F.date_format("date", "yyyy-MM") == month)

In [9]:
fact_sales.count()

188

In [10]:

dim_store = spark.read.format("delta").load(
    get_path(config, "curated", "analytics", "dim_store")
)
dim_date = spark.read.format("delta").load(
    get_path(config, "curated", "analytics", "dim_date")
)
monthly_customer_metrics = create_monthly_customer_metrics(fact_sales)
store_performance = create_store_performance_dashboard(fact_sales, dim_store, dim_date)


In [20]:
store_performance = create_store_performance_dashboard(fact_sales, dim_store, dim_date)


In [21]:
store_performance.show()

+--------+--------------------+--------+----+-----+------------------+----------------+-----------------+---------------------+------------+------------------+----------+
|store_id|          store_name|    town|year|month|   monthly_revenue|total_items_sold|transaction_count|avg_transaction_value|trading_days| avg_daily_revenue|month_year|
+--------+--------------------+--------+----+-----+------------------+----------------+-----------------+---------------------+------------+------------------+----------+
|    S012|Store Carlson-Horton|    Sfax|2025|    9| 69780.48000000007|            1082|               63|   199.37280000000018|           3| 23260.16000000002|   2025-09|
|    S016|    Store Thomas PLC| Bizerte|2025|    8|29426.960000000003|             507|               63|   115.39984313725492|          25|         1177.0784|   2025-08|
|    S016|    Store Thomas PLC| Bizerte|2025|    9| 52376.54999999998|             876|               57|   172.85990099009894|           3|17458

In [18]:
store_performance.count()

42

In [31]:
monthly_customer_metrics = create_monthly_customer_metrics(fact_sales)


In [32]:
monthly_customer_metrics.show()

+----------+------------------+------------------+------------------+
|month_year|total_transactions|   monthly_revenue|   avg_sale_amount|
+----------+------------------+------------------+------------------+
|   2025-08|              1050|511224.30999999924|112.87796643850723|
|   2025-09|              1040|        1096543.15|196.16156529516994|
+----------+------------------+------------------+------------------+



In [13]:
store_performance.show()

+--------+--------------------+--------+----+-----+------------------+----------------+-----------------+---------------------+------------+------------------+
|store_id|          store_name|    town|year|month|   monthly_revenue|total_items_sold|transaction_count|avg_transaction_value|trading_days| avg_daily_revenue|
+--------+--------------------+--------+----+-----+------------------+----------------+-----------------+---------------------+------------+------------------+
|    S012|Store Carlson-Horton|    Sfax|2025|   10|278.78000000000003|               4|                1|    69.69500000000001|           1|278.78000000000003|
|    S019|Store Cherry, Lee...|  Sousse|2025|   10|            259.78|               4|                2|               129.89|           1|            259.78|
|    S007|Store Cunningham-...|Monastir|2025|   10|1404.1599999999999|              26|                4|   108.01230769230769|           1|1404.1599999999999|
|    S001|Store Elliott and...|   Tunis|

In [None]:
# Write to serving layer
write_delta_full(spark, monthly_customer_metrics, config, "serving", "aggregated", "monthly_customer_metrics", "month")
write_delta_full(spark, store_performance, config, "serving", "aggregated", "store_performance", "month")