## 06: Latent‐Demand Imputation & Daily Aggregation

In this notebook we:

1. **Load** flattened hourly sales & stockout data (Parquet chunks).  
2. **Impute** missing/stockout hours with a simple per‐day mean strategy.  
3. **Aggregate** imputed hourly sales to daily totals along with contextual features.  
4. **Save** the resulting daily DataFrame for downstream modeling.  

In [3]:
import os, glob
import pandas as pd

# 🔧 Paths
HOURLY_CHUNKS_DIR = "/Users/jhilmitasri/Repositories/MyRepositories/freshretail-demand-forecasting/notebooks/data/freshretail_flattened_chunks"
OUTPUT_DAILY_PATH = "/Users/jhilmitasri/Repositories/MyRepositories/freshretail-demand-forecasting/notebooks/data/daily_dataset/daily_df_imputed.parquet"

# 🆔 Identifier columns for grouping
ID_COLS = [
    "city_id","store_id","management_group_id",
    "first_category_id","second_category_id","third_category_id",
    "product_id","dt"
]

In [4]:
# Prepare to accumulate per-day aggregates
daily_chunks = []

for fp in sorted(glob.glob(f"{HOURLY_CHUNKS_DIR}/*.parquet")):
    df = pd.read_parquet(
        fp,
        columns=ID_COLS + [
            "hourly_sale","hourly_stockout",
            "discount","holiday_flag","activity_flag",
            "precpt","avg_temperature","avg_humidity","avg_wind_level"
        ]
    )
    
    # 1️⃣ Raw daily sums & stats
    raw = df.groupby(ID_COLS, as_index=False).agg(
        raw_sale        = ("hourly_sale",     "sum"),
        oos_hours_total = ("hourly_stockout", "sum"),
        discount        = ("discount",        "mean"),
        holiday_flag    = ("holiday_flag",    "max"),
        activity_flag   = ("activity_flag",   "max"),
        precpt          = ("precpt",          "mean"),
        avg_temperature = ("avg_temperature", "mean"),
        avg_humidity    = ("avg_humidity",    "mean"),
        avg_wind_level  = ("avg_wind_level",  "mean"),
    )
    
    # 2️⃣ In-stock sums & counts
    instock = (
        df[df["hourly_stockout"] == 0]
        .groupby(ID_COLS, as_index=False)
        .agg(
            instock_sum   = ("hourly_sale", "sum"),
            instock_count = ("hourly_sale", "count")
        )
    )
    
    # 3️⃣ Merge & compute imputed daily sale
    agg = raw.merge(instock, on=ID_COLS, how="left")
    agg["instock_count"] = agg["instock_count"].fillna(0)
    agg["instock_mean"]  = agg.apply(
        lambda row: row.instock_sum / row.instock_count 
                    if row.instock_count > 0 else 0,
        axis=1
    )
    agg["daily_sale_imputed"] = (
        agg["raw_sale"] + agg["instock_mean"] * agg["oos_hours_total"]
    )
    
    daily_chunks.append(agg)
    
    # Progress log every 50 chunks
    if len(daily_chunks) % 50 == 0:
        print(f"  🔄 Processed {len(daily_chunks)} chunks…")

# Combine all chunks
daily_df = pd.concat(daily_chunks, ignore_index=True)
daily_df = daily_df[
    ID_COLS + [
        "daily_sale_imputed","oos_hours_total",
        "discount","holiday_flag","activity_flag",
        "precpt","avg_temperature","avg_humidity","avg_wind_level"
    ]
]

print(f"✅ Aggregation complete: {daily_df.shape[0]:,} daily rows")

  🔄 Processed 50 chunks…
  🔄 Processed 100 chunks…
  🔄 Processed 150 chunks…
  🔄 Processed 200 chunks…
  🔄 Processed 250 chunks…
  🔄 Processed 300 chunks…
  🔄 Processed 350 chunks…
  🔄 Processed 400 chunks…
  🔄 Processed 450 chunks…
✅ Aggregation complete: 4,500,000 daily rows


In [5]:
# Ensure the output directory exists
os.makedirs(os.path.dirname(OUTPUT_DAILY_PATH), exist_ok=True)

# Save the daily DataFrame
daily_df.to_parquet(OUTPUT_DAILY_PATH, index=False)
print(f"✅ Saved daily-level dataset → `{OUTPUT_DAILY_PATH}`")

✅ Saved daily-level dataset → `/Users/jhilmitasri/Repositories/MyRepositories/freshretail-demand-forecasting/notebooks/data/daily_dataset/daily_df_imputed.parquet`


In [6]:
# Reload the saved dataset to confirm
df_test = pd.read_parquet(OUTPUT_DAILY_PATH)
print("✅ Reload check:", df_test.shape)
df_test.sample(5)

✅ Reload check: (4500000, 17)


Unnamed: 0,city_id,store_id,management_group_id,first_category_id,second_category_id,third_category_id,product_id,dt,daily_sale_imputed,oos_hours_total,discount,holiday_flag,activity_flag,precpt,avg_temperature,avg_humidity,avg_wind_level
934190,0,217,5,16,25,103,296,2024-06-16,0.0,0,0.851,1,1,8.0507,26.98,80.56,1.52
3287424,12,90,0,28,72,154,835,2024-06-20,1.2,0,0.987,0,0,8.8239,28.16,86.7,1.75
2358848,1,125,6,20,50,229,864,2024-05-05,1.466667,6,1.0,1,0,9.7693,26.22,80.26,1.27
3984651,14,286,2,31,79,128,386,2024-06-17,1.292308,11,1.0,0,0,8.4199,27.34,88.74,1.75
3509075,12,357,6,20,47,140,119,2024-06-01,4.0,0,0.991,1,0,3.8521,23.2,77.49,1.39


In [7]:
# Ensure output directory exists
os.makedirs(os.path.dirname(OUTPUT_DAILY_PATH), exist_ok=True)

# Save
daily_df.to_parquet(OUTPUT_DAILY_PATH, index=False)
print(f"✅ Saved daily-level dataset with imputed demand → `{OUTPUT_DAILY_PATH}`")

# Quick verify load
df_test = pd.read_parquet(OUTPUT_DAILY_PATH)
print("Reload check:", df_test.shape)
df_test.sample(5)

✅ Saved daily-level dataset with imputed demand → `/Users/jhilmitasri/Repositories/MyRepositories/freshretail-demand-forecasting/notebooks/data/daily_dataset/daily_df_imputed.parquet`
Reload check: (4500000, 17)


Unnamed: 0,city_id,store_id,management_group_id,first_category_id,second_category_id,third_category_id,product_id,dt,daily_sale_imputed,oos_hours_total,discount,holiday_flag,activity_flag,precpt,avg_temperature,avg_humidity,avg_wind_level
1865878,0,586,6,4,28,1,796,2024-06-24,4.457143,10,1.0,0,0,11.2253,26.2,82.34,1.39
3134230,11,361,6,4,53,77,41,2024-06-06,0.8,9,0.857,0,1,0.8144,26.52,69.59,1.98
3337516,12,223,6,21,64,123,489,2024-05-13,1.4,16,0.771,0,1,1.8434,22.39,70.64,2.2
4312637,16,543,2,29,76,60,740,2024-04-14,3.76,14,1.0,1,0,3.3001,18.0,78.13,2.21
2963544,6,538,0,28,3,7,381,2024-04-21,0.0,24,1.0,1,0,5.4678,19.35,83.68,2.75


In [8]:
# Show basic summary statistics
daily_df.describe()

# Check for any all-zero-days (no sales, no stockouts)
zero_days = (daily_df["daily_sale_imputed"] == 0) & (daily_df["oos_hours_total"] == 0)
print(f"Days with zero sales & zero stockouts: {zero_days.sum()}")

# Verify no missing values in key columns
print("Missing values per column:")
print(daily_df[[
    "daily_sale_imputed","oos_hours_total","discount","holiday_flag"
]].isna().sum())

Days with zero sales & zero stockouts: 33516
Missing values per column:
daily_sale_imputed    0
oos_hours_total       0
discount              0
holiday_flag          0
dtype: int64
