# Data Processing Pipeline
## Load and clean data

In [4]:
import pandas as pd
import numpy as np

# ---- KAMIS ----
kamis = pd.read_csv("../data/raw/kamis_data.csv")
 
kamis = kamis.dropna(subset=["Date", "Retail", "Commodity"])
kamis["Date"] = pd.to_datetime(kamis["Date"], errors="coerce")
kamis["Retail"] = pd.to_numeric(kamis["Retail"].str.replace("/Kg", "").str.replace(",", ""), errors="coerce")
kamis["Wholesale"] = pd.to_numeric(kamis["Wholesale"].str.replace("/Kg", "").str.replace(",", ""), errors="coerce")

# ---- World Bank ----
wb = pd.read_csv("../data/raw/worldbank_data.csv")
wb = wb[wb["ISO3"] == "KEN"].copy()
wb["price_date"] = pd.to_datetime(wb["price_date"], errors="coerce")

for col in ["maize", "beans", "oil", "potatoes", "sorghum"]:
    wb[col] = pd.to_numeric(wb[col], errors="coerce")


## Commodity Standardization

In [5]:
commodity_map = {
    "Maize": "maize",
    "Dry Maize": "maize",
    "Beans (Yellow-Green)": "beans",
    "Beans (Red)": "beans",
    "Cooking Oil": "oil",
    "Irish Potatoes": "potatoes",
    "Red Sorghum": "sorghum",
    "White Sorghum": "sorghum"
}

kamis["commodity_clean"] = kamis["Commodity"].map(commodity_map)

## Aggregating to Monthly level

In [6]:
# KAMIS monthly
kamis_monthly = kamis.groupby([
    pd.Grouper(key="Date", freq="M"),
    "commodity_clean",
    "County"
]).agg(
    retail_mean=("Retail", "mean"),
    retail_std=("Retail", "std"),
    retail_min=("Retail", "min"),
    retail_max=("Retail", "max"),
    wholesale_mean=("Wholesale", "mean"),
    supply_volume=("Supply Volume", "sum")
).reset_index()

# WB monthly
wb_monthly = wb.groupby([
    pd.Grouper(key="price_date", freq="M"),
    "adm1_name"
]).agg({
    "maize": "mean",
    "beans": "mean",
    "oil": "mean",
    "potatoes": "mean",
    "sorghum": "mean"
}).reset_index()

## Feature Engineering

In [7]:
def add_time_features(df, date_col):
    df["year"] = df[date_col].dt.year
    df["month"] = df[date_col].dt.month
    df["quarter"] = df[date_col].dt.quarter
    df["is_dry_season"] = df["month"].isin([12, 1, 2, 3]).astype(int)
    df["is_harvest_season"] = df["month"].isin([7, 8, 9, 10]).astype(int)
    return df

kamis_monthly = add_time_features(kamis_monthly, "Date")

In [8]:
# price volatility and trends
kamis_monthly["price_volatility"] = kamis_monthly["retail_max"] - kamis_monthly["retail_min"]
kamis_monthly["price_trend_3m"] = kamis_monthly.groupby(["commodity_clean","County"])["retail_mean"].transform(lambda x: x.rolling(3).mean())
kamis_monthly["price_change_rate"] = kamis_monthly.groupby(["commodity_clean","County"])["retail_mean"].pct_change()

## Merging with world bank data

In [9]:
# reshape WB
wb_long = wb_monthly.melt(
    id_vars=["price_date", "adm1_name"],
    value_vars=["maize", "beans", "oil", "potatoes", "sorghum"],
    var_name="commodity_clean",
    value_name="wb_price"
)

# merge
unified = pd.merge(
    kamis_monthly,
    wb_long,
    left_on=["Date", "commodity_clean", "County"],
    right_on=["price_date", "commodity_clean", "adm1_name"],
    how="outer"
)

## Adding Lag Features

In [10]:
for lag in [1, 3, 12]:
    unified[f"retail_price_lag{lag}"] = unified.groupby(["commodity_clean", "County"])["retail_mean"].shift(lag)

## Validating Dataset

In [11]:
summary = {
    "rows": len(unified),
    "date_range": (unified["Date"].min(), unified["Date"].max()),
    "missing_by_col": unified.isnull().sum().to_dict()
}
print(summary)

{'rows': 10197, 'date_range': (Timestamp('2022-02-28 00:00:00'), Timestamp('2025-09-30 00:00:00')), 'missing_by_col': {'Date': 8939, 'commodity_clean': 0, 'County': 8939, 'retail_mean': 9291, 'retail_std': 9413, 'retail_min': 9291, 'retail_max': 9291, 'wholesale_mean': 9350, 'supply_volume': 8939, 'year': 8939, 'month': 8939, 'quarter': 8939, 'is_dry_season': 8939, 'is_harvest_season': 8939, 'price_volatility': 9291, 'price_trend_3m': 9472, 'price_change_rate': 9384, 'price_date': 1197, 'adm1_name': 1197, 'wb_price': 7672, 'retail_price_lag1': 9386, 'retail_price_lag3': 9544, 'retail_price_lag12': 10063}}
