# Forecasting Food Prices in Ethiopia

Principal data sources:
1. Ethiopia Food Prices Dataset from the World Food Programme Price Database, offering extensive food price data across Ethiopia.
2. Ethiopia Food Prices Dataset from the Famine Early Warning Systems Network,

Supplementary data sources:
1. Ethiopia Food Category Prices from the FAO [https://www.fao.org/worldfoodsituation/foodpricesindex/en/]
2. WFP Global Market Monitor, updated weekly [https://data.humdata.org/dataset/global-market-monitor]
3. Ethiopia Monthly Rainfall Data from WFP | CHIRPS [https://data.humdata.org/dataset/eth-rainfall-subnational]
4. Ethiopia Population Dataset from OCHA [https://data.humdata.org/dataset/cod-ps-eth]
5. Conflict in Ethiopia Dataset from the Armed Conflict Location & Event Data Project (ACLED), containing detailed records of conflict events [https://data.humdata.org/dataset/ethiopia-acled-conflict-data]

Other datasets to add:
1. Ethiopia Crop Production Statistics from Data Kimetrica, providing detailed statistics on crop production []
2. US Dollar — Ethiopian Birr Historical Data []
3. Anommoly hotspots [https://data.humdata.org/dataset/asap-hotspots-monthly]


In [43]:
import io
import re
import os
import numpy as np
import pandas as pd
import requests
from datetime import date, timedelta
from hdx.data.dataset import Dataset

  import pkg_resources


In [44]:
# ------------------------------------------------------------------------------
# Configuration
# ------------------------------------------------------------------------------
REQUEST_TIMEOUT = 90
USER_AGENT = {"User-Agent": "Mozilla/5.0 (foodsec-notebook/0.2)"}

# Lookback horizon for price data (approx 5y + 1mo padding for leap years)
FIVE_YEARS_AGO_ISO = (date.today() - timedelta(days=5*365 + 30)).isoformat()
FIVE_YEARS_AGO_TS  = (pd.Timestamp.today().normalize() - pd.offsets.DateOffset(years=5))

In [45]:
# ------------------------------------------------------------------------------
# Helpers (single source of truth)
# ------------------------------------------------------------------------------
def norm(s: str, *, lower=True, keep_internal_space=True) -> str:
    """Trim + collapse spaces; lowercase by default."""
    if s is None:
        return ""
    s = re.sub(r"\s+", " ", str(s).strip())
    return s.lower() if lower else s

def month_start(x):
    """Coerce to month-start Timestamp(s) for scalars, Series, or arrays."""
    obj = pd.to_datetime(x, errors="coerce")

    # Pandas Series → use .dt accessors
    if isinstance(obj, pd.Series):
        return obj.dt.to_period("M").dt.to_timestamp()

    # Numpy array / list → convert to Series then back
    if isinstance(obj, (np.ndarray, list, tuple)):
        ser = pd.to_datetime(pd.Series(obj), errors="coerce")
        return ser.dt.to_period("M").dt.to_timestamp().to_numpy()

    # Scalar (Timestamp/NaT) → use Timestamp methods
    if pd.isna(obj):
        return pd.NaT
    return pd.Timestamp(obj).to_period("M").to_timestamp()


def safe_numeric(sr: pd.Series) -> pd.Series:
    return pd.to_numeric(sr, errors="coerce")


# --- Product mappings (WFP -> FEWS canonical) ---------------------------------
FEWS_CANON = {
    "Maize Grain (White)", "Wheat Grain", "Goats (Local Quality)",
    "Sheep (Local Quality)", "Mixed Teff", "Oxen (Local Quality)",
    "Rice (Milled)", "Horse beans", "Refined Vegetable Oil",
    "Wheat Flour", "Sorghum (White)", "Refined sugar", "Sorghum (Red)",
    "Beans (Haricot)", "Camels (Local Quality)", "Diesel", "Gasoline",
    "Firewood", "Sorghum (Yellow)"
}

DIRECT_MAP = {
    "maize (white)": "Maize Grain (White)",
    "maize": "Maize Grain (White)",
    "wheat flour": "Wheat Flour",
    "wheat (white)": "Wheat Grain",
    "wheat (mixed)": "Wheat Grain",
    "wheat": "Wheat Grain",
    "rice": "Rice (Milled)",
    "sorghum (white)": "Sorghum (White)",
    "sorghum (red)": "Sorghum (Red)",
    "sorghum (yellow)": "Sorghum (Yellow)",
    "oil (vegetable)": "Refined Vegetable Oil",
    "sugar": "Refined sugar",
    "beans (haricot)": "Beans (Haricot)",
    "beans (fava)": "Horse beans",
    "horse beans": "Horse beans",
    "teff (mixed)": "Mixed Teff",
    "teff (white)": "Mixed Teff",
    "teff (red)": "Mixed Teff",
    "teff (sergegna)": "Mixed Teff",
    "livestock (goat)": "Goats (Local Quality)",
    "livestock (sheep)": "Sheep (Local Quality)",
    "livestock (camel)": "Camels (Local Quality)",
    "livestock (ox)": "Oxen (Local Quality)",
    "livestock (bull)": "Oxen (Local Quality)",
    "livestock (cattle)": "Oxen (Local Quality)",
    "diesel": "Diesel",
    "gasoline": "Gasoline",
}

REGEX_RULES = [
    (r"^maize.*white", "Maize Grain (White)"),
    (r"^wheat.*flour", "Wheat Flour"),
    (r"^wheat\b", "Wheat Grain"),
    (r"^rice\b", "Rice (Milled)"),
    (r"^sorghum.*white", "Sorghum (White)"),
    (r"^sorghum.*red", "Sorghum (Red)"),
    (r"^sorghum.*yellow", "Sorghum (Yellow)"),
    (r"^oil.*vegetable", "Refined Vegetable Oil"),
    (r"^beans.*haricot", "Beans (Haricot)"),
    (r"^beans.*fava", "Horse beans"),
    (r"^teff", "Mixed Teff"),
    (r"^livestock.*goat", "Goats (Local Quality)"),
    (r"^livestock.*sheep", "Sheep (Local Quality)"),
    (r"^livestock.*camel", "Camels (Local Quality)"),
    (r"^livestock.*(ox|bull|cattle)", "Oxen (Local Quality)"),
    (r"^diesel$", "Diesel"),
    (r"^gasoline$", "Gasoline"),
]

def map_wfp_product_to_fews(name: str) -> str | None:
    s = norm(name)
    for canon in FEWS_CANON:
        if norm(canon) == s:
            return canon
    if s in DIRECT_MAP:
        return DIRECT_MAP[s]
    for pat, target in REGEX_RULES:
        if re.search(pat, s):
            return target
    return None


# --- Unit normalization --------------------------------------------------------
EXPECTED_UNIT = {
    "Maize Grain (White)": "kg",
    "Wheat Grain": "kg",
    "Wheat Flour": "kg",
    "Rice (Milled)": "kg",
    "Mixed Teff": "kg",
    "Sorghum (White)": "kg",
    "Sorghum (Red)": "kg",
    "Sorghum (Yellow)": "kg",
    "Refined sugar": "kg",
    "Refined Vegetable Oil": "l",
    "Diesel": "l",
    "Gasoline": "l",
    "Goats (Local Quality)": "head",
    "Sheep (Local Quality)": "head",
    "Oxen (Local Quality)": "head",
    "Camels (Local Quality)": "head",
}

def normalize_unit_and_factor(u: str) -> tuple[str, float]:
    s = norm(u).replace(".", "")
    if s in {"kg","kgs","kilogram","kilograms"}: return "kg", 1.0
    if s in {"l","lt","ltr","litre","liter","litres","liters"}: return "l", 1.0
    if s in {"head","heads"}: return "head", 1.0
    if s in {"ea","each"}: return "ea", 1.0
    m = re.match(r"^(\d+)\s*kg$", s)
    if m:
        qty = float(m.group(1))
        if qty > 0:
            return "kg", 1.0 / qty
    return s, 1.0

# --- Admin-1 harmonization (used across datasets) ------------------------------
MAP_ADMIN_TO_PANEL = {
    # Variants → panel names
    "benishangul-gumuz": "B. Gumuz",
    "benshangul/gumuz": "B. Gumuz",
    "benishangul / gumuz": "B. Gumuz",
    "benishangul gumuz": "B. Gumuz",
    "snnpr": "SNNPR",
    "snnp": "SNNPR",
    "snnp": "SNNPR",
    "south west": "South Ethiopia",
    "sidama": "South Ethiopia",
}

def harmonize_admin1(name: str) -> str:
    n = norm(name)
    return MAP_ADMIN_TO_PANEL.get(n, n.title() if n else n)


# HDX resolver
def _parse_ts(x):
    try:
        return pd.to_datetime(x, utc=True)
    except Exception:
        return pd.NaT

def resolve_hdx_resource(dataset_slug: str,
                         *,
                         fmt: str | None = "xlsx",
                         name_regex: str | None = None):
    """
    Returns (resource_dict, url_string) for the most recently updated resource
    in the given HDX dataset that matches fmt and/or name_regex.
    """
    ds = Dataset.read_from_hdx(dataset_slug)
    resources = ds.get_resources()
    if not resources:
        raise RuntimeError(f"No resources found in HDX dataset: {dataset_slug}")

    # Filter by format and optional name regex
    filt = []
    for r in resources:
        rfmt = str(r.get("format", "")).lower()
        rname = str(r.get("name", "")).strip()
        if fmt and rfmt != fmt.lower():
            continue
        if name_regex and not re.search(name_regex, rname, flags=re.I):
            continue
        filt.append(r)

    cand = filt or resources  # fallback: anything in the dataset
    # Sort by last modified-ish fields
    cand.sort(key=lambda r: (
        _parse_ts(r.get("last_modified") or r.get("updated") or r.get("created"))
    ), reverse=True)

    res = cand[0]
    url = res.get("download_url") or res.get("url")
    if not url:
        raise RuntimeError("HDX resource has no downloadable URL.")
    return res, url



In [46]:
# ------------------------------------------------------------------------------
# FEWS NET prices
# ------------------------------------------------------------------------------
def load_fewsnet_prices() -> pd.DataFrame:
    url = "https://fdw.fews.net/api/marketpricefacts.csv"
    params = {
        "dataset": "FEWS_NET_Staple_Food_Price_Data",
        "country_code": "ET",
        "start_date": FIVE_YEARS_AGO_ISO,
        "schedule": "Daily",
        "price_type": "Retail",
        "fields": ",".join([
            "period_date","admin_1","market","product","unit_type","unit",
            "price_type","value"
        ]),
        "format": "csv",
    }
    r = requests.get(url, params=params, headers=USER_AGENT, timeout=REQUEST_TIMEOUT)
    r.raise_for_status()
    df = pd.read_csv(
        io.StringIO(r.text),
        parse_dates=["period_date"],
        dtype={
            "admin_1": "string", "market": "string", "product": "string",
            "unit_type": "string", "unit": "string", "price_type": "string",
            "value": "float64"
        },
    )
    df = df.dropna(subset=["period_date","admin_1","market","product","value"])
    return df.assign(source="FEWSNET")

In [47]:
# ------------------------------------------------------------------------------
# WFP prices → FEWS-like schema
# ------------------------------------------------------------------------------
def wfp_to_fewsnet_schema(df_wfp_raw: pd.DataFrame) -> pd.DataFrame:
    df = df_wfp_raw.copy()

    # Drop repeated header row if present
    if (df.columns == df.iloc[0]).all():
        df = df.drop(index=0).reset_index(drop=True)

    # Parse & clean
    df["date"] = pd.to_datetime(df["date"], errors="coerce")
    for col in ["price","usdprice","latitude","longitude"]:
        if col in df.columns:
            df[col] = safe_numeric(df[col])

    # Keep recent horizon
    df = df[df["date"] >= pd.Timestamp(FIVE_YEARS_AGO_TS)]

    # Standardize text columns
    for c in ["admin1","admin2","market","commodity","category","unit","pricetype","currency"]:
        if c in df.columns:
            df[c] = df[c].astype(str).str.strip()

    # Filter to Retail (drop wholesale)
    price_type = df["pricetype"].str.casefold().str.strip()
    df = df.loc[~price_type.str.contains("wholesale", na=False)].copy()

    out = pd.DataFrame({
        "period_date": df["date"],
        "admin_1": df["admin1"].replace({"nan": np.nan}),
        "market": df["market"],
        "product": df["commodity"],
        "unit": df["unit"],
        "price_type": "Retail",
        "value": df["price"],
    })
    out = out.dropna(subset=["period_date","admin_1","market","product","value"])
    out["value"] = safe_numeric(out["value"])
    out = out[out["value"] > 0]
    return out.assign(source="WFP").reset_index(drop=True)

def load_wfp_prices() -> pd.DataFrame:
    url = ("https://data.humdata.org/dataset/2e4f1922-e446-4b57-a98a-d0e2d5e34afa/"
           "resource/87bac18e-f3aa-4b29-8cf8-76763e823dc5/download/wfp_food_prices_eth.csv")
    r = requests.get(url, headers=USER_AGENT, timeout=REQUEST_TIMEOUT)
    r.raise_for_status()
    df_wfp_raw = pd.read_csv(io.StringIO(r.text))
    df = wfp_to_fewsnet_schema(df_wfp_raw)

    # Map products to FEWS canonical
    df["product"] = df["product"].map(map_wfp_product_to_fews)
    df = df.loc[df["product"].isin(FEWS_CANON)].reset_index(drop=True)
    return df

In [48]:
# ------------------------------------------------------------------------------
# Combine, unit normalization, monthly aggregation
# ------------------------------------------------------------------------------
def combine_and_aggregate(fews: pd.DataFrame, wfp: pd.DataFrame) -> pd.DataFrame:
    cols = ["period_date","admin_1","market","product","unit","price_type","value","source"]
    df_all = (pd.concat([fews[cols], wfp[cols]], ignore_index=True)
                .drop_duplicates()
                .copy())

    # (Optional) drop non-food items for modeling
    drop_products = ["Horse beans","Beans (Haricot)","Diesel","Gasoline","Firewood"]
    df_all = df_all.loc[~df_all["product"].isin(drop_products)].copy()

    # Normalize units and make monthly key
    work = df_all.copy()
    work["month"] = work["period_date"].values.astype("datetime64[M]")

    unit_parsed = work["unit"].map(normalize_unit_and_factor)
    work["unit_std"]    = [u for (u, f) in unit_parsed]
    work["unit_factor"] = [f for (u, f) in unit_parsed]
    work["value_std"]   = work["value"] * work["unit_factor"]

    # Keep only expected units (where we have an expected unit)
    exp = work["product"].map(EXPECTED_UNIT)
    work = work.loc[exp.isna() | (work["unit_std"] == exp)].copy()

    grp = ["admin_1","month","product","unit_std","source"]
    # First average within source, then across sources (mean/median & counts)
    by_src = (work.groupby(grp, as_index=False)
                  .agg(value_src_mean=("value_std","mean"),
                       value_src_median=("value_std","median"),
                       n_obs_src=("value_std","count")))

    grp2 = ["admin_1","month","product","unit_std"]
    out = (by_src.groupby(grp2, as_index=False)
                .agg(value_mean=("value_src_mean","mean"),
                     value_median=("value_src_median","median"),
                     n_obs=("n_obs_src","sum"),
                     sources=("source", lambda x: ",".join(sorted(set(map(str,x)))))))

    return out.rename(columns={"unit_std":"unit"}).reset_index(drop=True)

In [49]:
# ------------------------------------------------------------------------------
# QC: z-score outlier removal (log-scale optional)
# ------------------------------------------------------------------------------
def remove_outliers(work: pd.DataFrame, use_log=True, max_z=2.5, max_rz=3.5) -> pd.DataFrame:
    dfq = work.copy()
    gkeys = ["admin_1","month","product","unit"]

    vals = np.log(dfq["value_median"]) if use_log else dfq["value_median"]
    dfq["_val"] = vals.replace([np.inf, -np.inf], np.nan)

    def _mad(x):
        med = np.nanmedian(x)
        return np.nanmedian(np.abs(x - med))

    stats = (dfq.groupby(gkeys, group_keys=False)["_val"]
                .agg(n="count",
                     mean="mean",
                     std=lambda x: np.nanstd(x, ddof=1),
                     median="median",
                     mad=_mad)
                .reset_index())

    dfq = dfq.merge(stats, on=gkeys, how="left")

    dfq["z"]  = (dfq["_val"] - dfq["mean"]) / dfq["std"]
    dfq.loc[(dfq["n"] < 2) | (dfq["std"] <= 0), "z"] = np.nan

    dfq["rz"] = 0.6745 * (dfq["_val"] - dfq["median"]) / dfq["mad"]
    dfq.loc[(dfq["n"] < 2) | (dfq["mad"] <= 0), "rz"] = np.nan

    clean = dfq[~(dfq["z"].abs() > max_z) & ~(dfq["rz"].abs() > max_rz)].copy()
    clean["value_for_agg"] = np.exp(clean["_val"]) if use_log else clean["_val"]

    grp = ["admin_1","month","product","unit"]
    out = (clean.groupby(grp, as_index=False)
                .agg(value_mean=("value_for_agg","mean"),
                     value_median=("value_for_agg","median"),
                     n_obs=("value_for_agg","count")))
    return out

In [50]:
# ------------------------------------------------------------------------------
# Imputation (temporal + spatial + cross-admin)
# ------------------------------------------------------------------------------
def impute_prices(df_pm: pd.DataFrame,
                  max_interp_gap: int = 2,
                  use_spatial: bool = True) -> pd.DataFrame:
    """
    Requires columns: admin_1, product, unit, month, value_median
    Returns: value_imputed, impute_method (+ value_orig)
    """
    req = {"admin_1","product","unit","month","value_median"}
    missing_cols = req - set(df_pm.columns)
    if missing_cols:
        raise KeyError(f"impute_prices missing required columns: {missing_cols}")

    df = df_pm.copy()
    df["month"] = pd.to_datetime(df["month"], errors="coerce").dt.to_period("M").dt.to_timestamp()
    gkey = ["admin_1","product","unit"]

    # complete monthly grid per group
    full = []
    for (a,p,u), g in df.groupby(gkey, dropna=False):
        if g["month"].notna().sum() == 0:
            continue
        rng = pd.period_range(g["month"].min().to_period("M"), g["month"].max().to_period("M"), freq="M").to_timestamp()
        gi = g.set_index("month").reindex(rng)
        gi.index.name = "month"
        gi["admin_1"], gi["product"], gi["unit"] = a, p, u
        full.append(gi.reset_index())
    if not full:
        raise ValueError("No valid groups to impute (all months are NaT?)")

    df = pd.concat(full, ignore_index=True)
    df["value_orig"]    = df["value_median"]
    df["value_imputed"] = df["value_median"]
    df["impute_method"] = np.where(df["value_median"].notna(), "observed", "missing")

    # 1) temporal interpolation on log-prices
    def interp_group(g):
        g = g.sort_values("month")
        v = g["value_imputed"]
        logv = np.log(v.replace(0, np.nan))

        idx_time = pd.DatetimeIndex(pd.to_datetime(g["month"].values), name="month")
        s_time = pd.Series(logv.to_numpy(), index=idx_time)
        s_interp = s_time.interpolate(method="time", limit=max_interp_gap, limit_direction="both")

        filled = v.to_numpy(copy=True)
        missing = v.isna().to_numpy()
        interp_vals = np.exp(s_interp.to_numpy())
        has_interp = ~np.isnan(interp_vals)
        use = missing & has_interp

        filled[use] = interp_vals[use]
        g["value_imputed"] = filled
        g.loc[(missing) & (has_interp) & (g["impute_method"]=="missing"), "impute_method"] = "interp_time"
        return g

    df = (df.sort_values(gkey + ["month"])
            .groupby(gkey, group_keys=False)
            .apply(interp_group)
            .reset_index(drop=True))

    # 2) spatial scaled-national fallback
    if use_spatial:
        nat = (df.groupby(["product","unit","month"], as_index=False)
                 .agg(national=("value_orig","median")))
        merged = df.merge(nat, on=["product","unit","month"], how="left")
        merged["ratio"] = merged["value_orig"] / merged["national"]
        ratios = (merged.dropna(subset=["ratio"])
                        .groupby(gkey, as_index=False)
                        .agg(admin_nat_ratio=("ratio","median")))

        df = df.merge(ratios, on=gkey, how="left")
        df = df.merge(nat.rename(columns={"national":"nat_val"}), on=["product","unit","month"], how="left")

        mask_sp = df["value_imputed"].isna() & df["admin_nat_ratio"].notna() & df["nat_val"].notna()
        df.loc[mask_sp, "value_imputed"] = df.loc[mask_sp, "admin_nat_ratio"] * df.loc[mask_sp, "nat_val"]
        df.loc[mask_sp & (df["impute_method"]=="missing"), "impute_method"] = "scaled_national"

    # 3) cross-admin same-month median
    cross = (df.groupby(["product","unit","month"], as_index=False)
               .agg(cross_admin=("value_imputed","median")))
    df = df.merge(cross, on=["product","unit","month"], how="left")
    mask_cross = df["value_imputed"].isna() & df["cross_admin"].notna()
    df.loc[mask_cross, "value_imputed"] = df.loc[mask_cross, "cross_admin"]
    df.loc[mask_cross & (df["impute_method"]=="missing"), "impute_method"] = "cross_admin"

    return df.drop(columns=["cross_admin","nat_val","admin_nat_ratio"], errors="ignore")




In [51]:
# ------------------------------------------------------------------------------
# FAO Food Price Index + category mapping
# ------------------------------------------------------------------------------
PRODUCT_TO_FAO = {
    # cereals
    "maize grain (white)": "cereals",
    "wheat grain": "cereals",
    "wheat flour": "cereals",
    "mixed teff": "cereals",
    "sorghum (white)": "cereals",
    "sorghum (red)": "cereals",
    "sorghum (yellow)": "cereals",
    "rice (milled)": "cereals",
    # meat (livestock)
    "goats (local quality)": "meat",
    "sheep (local quality)": "meat",
    "oxen (local quality)": "meat",
    "camels (local quality)": "meat",
    # oils
    "refined vegetable oil": "oils",
    # sugar
    "refined sugar": "sugar",
}

def load_fao_indices() -> tuple[pd.DataFrame, pd.DataFrame]:
    url = ("https://www.fao.org/media/docs/worldfoodsituationlibraries/"
           "default-document-library/food_price_indices_data_oct25.csv?sfvrsn=523ebd2a_54&download=true")
    r = requests.get(url, headers=USER_AGENT, timeout=REQUEST_TIMEOUT)
    r.raise_for_status()
    df = pd.read_csv(io.StringIO(r.text), header=2)
    df = df.iloc[:, :7].dropna(how="all").reset_index(drop=True)
    df.columns = [re.sub(r"\s+", " ", str(c)).strip() for c in df.columns]

    # Parse dates
    if "Date" in df.columns:
        df["Date"] = pd.to_datetime(df["Date"], format="%Y-%m", errors="coerce")
    df["month"] = month_start(df["Date"])

    rename_map = {
        "Food Price Index": "fao_food_price_index",
        "Meat": "meat",
        "Dairy": "dairy",
        "Cereals": "cereals",
        "Oils": "oils",
        "Sugar": "sugar",
    }
    df = df.rename(columns=rename_map)

    value_cols = ["meat","dairy","cereals","oils","sugar"]
    long = (df[["month","fao_food_price_index"] + value_cols]
              .melt(id_vars=["month","fao_food_price_index"],
                    value_vars=value_cols,
                    var_name="fao_category",
                    value_name="fao_category_index"))
    # one-row per month overall FPI (for fallback)
    fpi_month = df[["month","fao_food_price_index"]].drop_duplicates("month")
    return long, fpi_month

def merge_fao(df_prices: pd.DataFrame, fao_long: pd.DataFrame, fpi_month: pd.DataFrame) -> pd.DataFrame:
    df = df_prices.copy()
    df["fao_category"] = df["product"].map(lambda x: PRODUCT_TO_FAO.get(norm(x), "other"))
    out = df.merge(fao_long, on=["month","fao_category"], how="left", validate="many_to_one")
    out = out.merge(fpi_month, on="month", how="left", suffixes=("","_overall"))
    out["fao_food_price_index"] = out["fao_food_price_index"].fillna(out["fao_food_price_index_overall"])
    return out.drop(columns=["fao_food_price_index_overall"])

In [52]:
# ------------------------------------------------------------------------------
# ACLED monthly events/fatalities by admin-1
# ------------------------------------------------------------------------------
def load_acled_monthly() -> pd.DataFrame:
    """
    Fetch the latest ACLED monthly admin1 events/fatalities for Ethiopia from HDX,
    robust to changing filenames/signed URLs.
    """
    # Pick the dataset by its HDX slug (stable), not a dated file URL
    DATASET_SLUG = "ethiopia-acled-conflict-data"  # HDX dataset key

    # Look for an Excel resource that sounds like monthly violence/events
    # (regex is forgiving; adjust if HDX renames things)
    NAME_RE = r"(political|violence|conflict).*month|monthly|by\s*month"

    try:
        res, url = resolve_hdx_resource(DATASET_SLUG, fmt="xlsx", name_regex=NAME_RE)
        r = requests.get(url, headers=USER_AGENT, timeout=REQUEST_TIMEOUT, allow_redirects=True)
        r.raise_for_status()
    except Exception as e:
        # If that specific match fails, try the latest Excel resource regardless of name.
        try:
            res, url = resolve_hdx_resource(DATASET_SLUG, fmt="xlsx", name_regex=None)
            r = requests.get(url, headers=USER_AGENT, timeout=REQUEST_TIMEOUT, allow_redirects=True)
            r.raise_for_status()
        except Exception as e2:
            # Last-resort: return an empty frame so the pipeline can still run
            print(f"[WARN] Couldn’t fetch ACLED from HDX ({e2}). Returning empty monthly table.")
            return pd.DataFrame(columns=["admin_1","month","Events","Fatalities"])

    # Load the workbook; prefer a sheet literally named “Data” else first sheet
    try:
        xls = pd.ExcelFile(io.BytesIO(r.content))
        sheet = "Data" if "Data" in xls.sheet_names else xls.sheet_names[0]
        df = pd.read_excel(xls, sheet_name=sheet)
    except Exception as e:
        raise RuntimeError(f"Failed reading ACLED Excel: {e}")

    # Try to detect columns (different files sometimes rename headers)
    colmap = {}
    # admin column
    for c in df.columns:
        if re.fullmatch(r"(?i)admin\s*1|admin1|region|state", str(c).strip(), flags=0):
            colmap["Admin1"] = c; break
    # month column
    for c in df.columns:
        if re.fullmatch(r"(?i)month", str(c).strip()):
            colmap["Month"] = c; break
    # year column
    for c in df.columns:
        if re.fullmatch(r"(?i)year", str(c).strip()):
            colmap["Year"] = c; break
    # events / fatalities
    for c in df.columns:
        if re.fullmatch(r"(?i)events?", str(c).strip()):
            colmap["Events"] = c; break
    for c in df.columns:
        if re.fullmatch(r"(?i)fatalit(y|ies)", str(c).strip()):
            colmap["Fatalities"] = c; break

    required = {"Admin1","Month","Year","Events","Fatalities"}
    if not required.issubset(colmap):
        # If any are missing, try to infer a pre-aggregated monthly sheet
        # that already has a full date column like 'month' or 'date'
        alt_time_col = None
        for c in df.columns:
            if re.fullmatch(r"(?i)(month|date)", str(c).strip()):
                alt_time_col = c; break
        if alt_time_col and "Admin1" in colmap and "Events" in colmap and "Fatalities" in colmap:
            df["month"] = month_start(df[alt_time_col])
            df["admin_1"] = df[colmap["Admin1"]].map(harmonize_admin1)
            out = (df.dropna(subset=["admin_1","month"])
                     .groupby(["admin_1","month"], as_index=False)
                     .agg(Events=("Events","sum"), Fatalities=("Fatalities","sum")))
            return out
        raise KeyError(f"Expected columns not found in ACLED file. Found: {list(df.columns)}")

    # Standard path: combine year + month to a Timestamp
    df = df.rename(columns={colmap[k]: k for k in colmap})
    df["Month"] = df["Month"].astype(str).str.strip()
    m1 = pd.to_datetime(df["Year"].astype(int).astype(str) + "-" + df["Month"], format="%Y-%B", errors="coerce")
    m2 = pd.to_datetime(df["Year"].astype(int).astype(str) + "-" + df["Month"], format="%Y-%b", errors="coerce")
    m3 = pd.to_datetime(df["Year"].astype(int).astype(str) + "-" + df["Month"].str.zfill(2), format="%Y-%m", errors="coerce")
    df["month"] = m1.fillna(m2).fillna(m3)
    df["month"] = month_start(df["month"])

    df["admin_1"] = df["Admin1"].map(harmonize_admin1)
    out = (df.drop(columns=["Month","Year"])
             .dropna(subset=["admin_1","month"])
             .groupby(["admin_1","month"], as_index=False)
             .agg(Events=("Events","sum"), Fatalities=("Fatalities","sum")))
    return out

In [53]:
# ------------------------------------------------------------------------------
# CHIRPS rainfall (subnational monthly)
# ------------------------------------------------------------------------------
PCODE1_TO_NAME = {
    "ET01":"Afar","ET02":"Amhara","ET03":"Benishangul-Gumuz","ET04":"Dire Dawa",
    "ET05":"Gambela","ET06":"Harari","ET07":"Oromia","ET08":"Somali",
    "ET09":"SNNPR","ET10":"Tigray","ET11":"Addis Ababa",
    "ET12":"Sidama","ET13":"South West","ET14":"Central Ethiopia","ET15":"South Ethiopia",
}

def load_rainfall_monthly() -> pd.DataFrame:
    url = ("https://data.humdata.org/dataset/423143be-315f-48d7-9e90-ae23738da564/"
           "resource/49e3a707-d153-423e-b22b-30484d678dd7/download/eth-rainfall-subnat-full.csv")
    r = requests.get(url, headers=USER_AGENT, timeout=REQUEST_TIMEOUT)
    r.raise_for_status()
    rf = pd.read_csv(io.StringIO(r.text))

    rf.columns = [c.strip() for c in rf.columns]
    rf["date"]  = pd.to_datetime(rf["date"], errors="coerce")
    rf["PCODE"] = rf["PCODE"].astype(str).str.strip().str.upper()
    rf = rf.dropna(subset=["date","PCODE"]).copy()

    rf["admin1_pcode"]   = rf["PCODE"].str.extract(r"^(ET\d{2})")
    rf["admin1_name_raw"] = rf["admin1_pcode"].map(PCODE1_TO_NAME)
    rf["admin_1"]        = rf["admin1_name_raw"].map(harmonize_admin1)

    num_cols = [c for c in ["n_pixels","rfh","rfh_avg","r1h","r1h_avg","r3h","r3h_avg","rfq","r1q","r3q"] if c in rf.columns]
    for c in num_cols:
        rf[c] = (rf[c].astype(str)
                    .str.replace(",", "", regex=False)
                    .str.replace("%", "", regex=False)
                    .str.replace("\u2014", "", regex=False)  # em dash
                    .str.replace("-", "", regex=False)
                    .str.strip())
        rf[c] = safe_numeric(rf[c])

    rf["month"] = month_start(rf["date"])
    rain = (rf.groupby(["admin_1","month"], as_index=False)
              .agg(rfh_month=("rfh","sum"),
                   rfh_avg_month=("rfh_avg","sum"),
                   rfq_month=("rfq","mean")))
    rain["rain_anom_pct"] = np.where(
        rain["rfh_avg_month"].gt(0),
        100.0 * (rain["rfh_month"]/rain["rfh_avg_month"] - 1.0),
        np.nan
    )
    return rain

def merge_rainfall(df_panel: pd.DataFrame, rain_m: pd.DataFrame) -> pd.DataFrame:
    df = df_panel.copy()
    df["admin_1"] = df["admin_1"].map(harmonize_admin1)
    df["month"]   = month_start(df["month"])

    # Clip to rainfall coverage to avoid early NaNs (optional)
    min_r, max_r = rain_m["month"].min(), rain_m["month"].max()
    df = df[(df["month"] >= min_r) & (df["month"] <= max_r)].copy()

    return df.merge(
        rain_m[["admin_1","month","rfh_month","rfh_avg_month","rfq_month","rain_anom_pct"]],
        on=["admin_1","month"], how="left"
    )

In [54]:
# ------------------------------------------------------------------------------
# WFP Global Market Monitor (signal-only features)
# ------------------------------------------------------------------------------
def load_gmm_features() -> pd.DataFrame:
    url = ("https://data.humdata.org/dataset/67259d7e-1554-4ffd-be8d-97244577546a/"
           "resource/2caea41d-2079-44a5-a52b-8bb62a11010f/download/global-market-monitor_subnational.csv")
    r = requests.get(url, headers=USER_AGENT, timeout=REQUEST_TIMEOUT)
    r.raise_for_status()
    gmm = pd.read_csv(io.StringIO(r.text))

    gmm = gmm[
        (gmm["CountryName"]=="Ethiopia") &
        (gmm["DataLevel"].str.contains("Subnational", case=False, na=False))
    ].copy()
    gmm = gmm[gmm["PriceType"].str.contains("Retail", case=False, na=False)].copy()

    gmm["Date"]  = pd.to_datetime(gmm["Date"], errors="coerce")
    gmm["month"] = month_start(gmm["Date"])
    gmm["admin_1"] = gmm["Admin1"].map(harmonize_admin1)

    ptm = (gmm["PriceTrendMonth"].astype(str)
           .str.strip()
           .str.replace(r"^nan$", "N/A", case=False, regex=True)
           .str.title())
    sev_map = {"Negative": -1, "Normal": 0, "Moderate": 1, "High": 2, "Severe": 3}
    gmm["ptm_severity"] = ptm.map(sev_map)

    return gmm[["admin_1","month","ptm_severity"]].drop_duplicates()

def merge_gmm(df_panel: pd.DataFrame, gmm_feat: pd.DataFrame) -> pd.DataFrame:
    df = df_panel.copy()
    df["admin_1"] = df["admin_1"].map(harmonize_admin1)
    df["month"]   = month_start(df["month"])
    return df.merge(gmm_feat, on=["admin_1","month"], how="left")

In [55]:
# ------------------------------------------------------------------------------
# OCHA Population (admin1, 2023)
# ------------------------------------------------------------------------------
def load_population_admin1() -> pd.DataFrame:
    url = ("https://data.humdata.org/dataset/3d9b037f-5112-4afd-92a7-190a9082bd80/"
           "resource/f82b20f1-8a76-46e9-ba9a-29e531f7af3c/download/eth_admpop_2023.xlsx")
    r = requests.get(url, headers=USER_AGENT, timeout=REQUEST_TIMEOUT)
    r.raise_for_status()
    pop_raw = pd.read_excel(io.BytesIO(r.content), sheet_name="ETH_admpop_adm1_2023")
    pop = pop_raw.copy()
    pop.columns = [re.sub(r"\s+"," ", str(c).strip()) for c in pop.columns]

    name_col  = next((c for c in pop.columns if re.fullmatch(r"(?i)admin1name_en", c)), None)
    pcode_col = next((c for c in pop.columns if re.fullmatch(r"(?i)admin1pcode", c)), None)

    cand_pop = [c for c in pop.columns if re.search(r"(?i)(population|total)", c)]
    pop_col = None
    for c in cand_pop:
        if safe_numeric(pop[c]).notna().sum() > 0:
            pop_col = c
            break
    if pop_col is None:
        raise ValueError("Population total column not found.")

    out = (pop[[name_col, pcode_col, pop_col]]
           .rename(columns={name_col:"admin1Name_en", pcode_col:"admin1Pcode", pop_col:"population_2023"})
           .dropna(subset=["admin1Name_en"]))
    out["population_2023"] = (out["population_2023"].astype(str)
                              .str.replace(",", "", regex=False)
                              .str.replace("\u202f", "", regex=False)
                              .str.strip())
    out["population_2023"] = safe_numeric(out["population_2023"])
    out["admin_1"] = out["admin1Name_en"].map(harmonize_admin1)
    return out[["admin_1","population_2023"]]

In [56]:
# ------------------------------------------------------------------------------
# MAIN PIPELINE
# ------------------------------------------------------------------------------
# 1) Load & combine prices
df_fews = load_fewsnet_prices()
df_wfp  = load_wfp_prices()
df_monthly = combine_and_aggregate(df_fews, df_wfp)

# 2) QC outliers (log-scale)
df_monthly_qc = remove_outliers(df_monthly, use_log=True, max_z=2.5, max_rz=3.5)

# 3) Impute gaps
df_imp = impute_prices(df_monthly_qc.rename(columns={"value_median":"value_median"}), max_interp_gap=2, use_spatial=True)
# Optional quick check:
# df_imp["impute_method"].value_counts(dropna=False)

# 4) FAO indices
fao_long, fpi_month = load_fao_indices()
df_with_fao = merge_fao(df_imp, fao_long, fpi_month)

# 5) ACLED
acled_m = load_acled_monthly()
df_with_conflict = merge_acled(df_with_fao, acled_m)

# 6) Rainfall
rain_m = load_rainfall_monthly()
df_with_rain = merge_rainfall(df_with_conflict, rain_m)

# 7) Global Market Monitor signals
gmm_feat = load_gmm_features()
df_with_gmm = merge_gmm(df_with_rain, gmm_feat)

# 8) Population (admin1 static)
pop_admin1 = load_population_admin1()
df_final = df_with_gmm.merge(pop_admin1, on="admin_1", how="left")

  df_wfp_raw = pd.read_csv(io.StringIO(r.text))
  df["date"] = pd.to_datetime(df["date"], errors="coerce")
  var = nanvar(a, axis=axis, dtype=dtype, out=out, ddof=ddof,
  .apply(interp_group)


[WARN] Couldn’t fetch ACLED from HDX (There is no HDX configuration! Use Configuration.create(**kwargs)). Returning empty monthly table.


  out["Events"]     = out["Events"].fillna(0).astype(int)
  out["Fatalities"] = out["Fatalities"].fillna(0).astype(int)
  rf = pd.read_csv(io.StringIO(r.text))
  rf["date"]  = pd.to_datetime(rf["date"], errors="coerce")
  gmm = pd.read_csv(io.StringIO(r.text))


In [72]:
# ------------------------------------------------------------------------------
# SAVE TIDY OUTPUTS
# ------------------------------------------------------------------------------
df_final.to_parquet("data/processed/ethiopia_foodprices_model_panel.parquet", index=False)

OSError: Cannot save file into a non-existent directory: 'data/processed'