# AHEAD ETL

## Integrated Pipeline

# ---------------------------------------------------------------
### AHEAD INTEGRATED ANALYTICS PIPELINE  –  COUNTRY: KENYA
# ---------------------------------------------------------------
What this cell does:
1. Connects to the AHEAD DB (using DB_CONN in .env)
2. Retrieves ALL indicators for a given country + unit level
3. Joins a CSV mapping file to add SHORT indicator names (indicator_type) using indicator_code as the key
4. Cleans data PER INDICATOR:
    - flags missing values
    - flags negatives and sets them to NaN
    - de-duplicates (country, unit, indicator, date)
    - flags outliers by indicator_type (IQR rule)
5. Pivots to wide format and computes derived indicators
ONLY when both numerator & denominator short names
actually exist in the data (no errors)
6. Creates data-quality tables and exports an Excel file that countries can review
# ---------------------------------------------------------------

### Libraries

In [1]:
import os
from pathlib import Path

import numpy as np
import pandas as pd
from dotenv import load_dotenv
from sqlalchemy import create_engine

import ahead_etl as ah   # UNICEF AHEAD ETL package

In [None]:
# ------------------------------------------------------------
# 0. USER SETTINGS
# ------------------------------------------------------------

# Country + level you want to analyze
COUNTRY_CONFIG = "kenya"   # name used in config.yaml
COUNTRY_CODE   = "KEN"     # as it appears in observation.country_code
UNIT_LEVEL     = 4         # lowest admin level (change if needed)

# Optional date filters (set to None to use full period)
DATE_MIN = None            # e.g. "2018-01-01"
DATE_MAX = None            # e.g. "2025-07-31"

# Path to indicator mapping CSV (your file)
MAP_PATH = Path(r"C:\Users\Amobi Andrew\ahead-etl\Data_id_12_10_25.csv")

# Output Excel path for Deliverable 2
DQ_EXCEL_PATH = f"dq_review_{COUNTRY_CODE.lower()}_level{UNIT_LEVEL}.xlsx"


In [3]:
# ------------------------------------------------------------
# 1. CONNECT TO THE AHEAD DATABASE
# ------------------------------------------------------------

# Load environment variables (including DB_CONN) from .env
load_dotenv()

# Example DB_CONN in .env:
#   DB_CONN=postgresql+psycopg2://user:pass@host:port/ahead
engine = create_engine(os.environ["DB_CONN"])

# Create AHEAD DB helper (not strictly required for this pipeline,
# but useful if you later want to use other ahead_etl utilities)
db = ah.Database(engine=engine, schema="public")

# Optional quick test (can be commented out later)
_ = pd.read_sql("SELECT * FROM information_schema.tables LIMIT 3;", con=engine)

# Optional: read config for metadata
config = ah.dhis2.Config.from_file("config.yaml", COUNTRY_CONFIG)
country_id = db.get_country_id(config.iso)
config


Config(iso='ken', username='tashrik', password='Unicef123!', url='https://hiskenya.org/api/', ou='LEVEL-5', dxs=['F6NOaTioUje', 'ETX9cUWF43c'], timeout=300, _Config__session=<requests.sessions.Session object at 0x0000020DD3D6E7E0>)

In [4]:
# ------------------------------------------------------------
# 2. LOAD INDICATOR MAPPING (CSV) AND STANDARDIZE COLUMNS
# ------------------------------------------------------------
# The CSV should have at least:
#   - "indicator_code/ data_id"
#   - "indicator_name"
#   - "data_elements/ indicator_type"     (short analytic name)

ind_map = pd.read_csv(MAP_PATH)

# Rename key columns to simpler names
rename_map = {
    "indicator_code/ data_id": "indicator_code",
    "indicator_name": "indicator_name",
    "data_elements/ indicator_type": "indicator_type",
}
ind_map = ind_map.rename(
    columns={k: v for k, v in rename_map.items() if k in ind_map.columns}
)

# Ensure indicator_code is string so it matches DB
ind_map["indicator_code"] = ind_map["indicator_code"].astype(str)

print("Indicator mapping – sample rows:")
display(ind_map[["indicator_code", "indicator_name", "indicator_type"]].head())

Indicator mapping – sample rows:


Unnamed: 0,indicator_code,indicator_name,indicator_type
0,pFHAXocl50Q,OPD attendance - total male (717),opd_m
1,xNZOF242UaV,OPD attendance - total female (717),opd_f
2,f9vesk5d4IY,MOH 711 New ANC clients,anc1
3,ku80YRejJPg,MOH710 DPT/Hep+HiB1 doses Administered,penta1
4,OyddF1qflzP,MOH710 DPT/Hep+HiB3 doses Administered,penta3


In [5]:
# ------------------------------------------------------------
# 3. RETRIEVE COUNTRY DATA FROM AHEAD (ALL INDICATORS)
# ------------------------------------------------------------
# NOTE: All indicators for this country/level are included.

DATE_MIN = "2018-01-01"
DATE_MAX = "2024-12-31"

base_query = """
SELECT
    country_code,
    country_name,
    unit_code,
    unit_name,
    unit_level,
    indicator_code,
    indicator_name,
    date,
    value
FROM observation
WHERE unit_level   = %(unit_level)s
  AND country_code = %(country_code)s
  AND date >= %(date_min)s
  AND date <= %(date_max)s
"""

params = {
    "unit_level": UNIT_LEVEL,
    "country_code": COUNTRY_CODE,
    "date_min": DATE_MIN,
    "date_max": DATE_MAX,
}

df_raw = pd.read_sql(base_query, con=engine, params=params)

df_raw["date"]  = pd.to_datetime(df_raw["date"], errors="coerce")
df_raw["value"] = pd.to_numeric(df_raw["value"], errors="coerce")
df_raw["indicator_code"] = df_raw["indicator_code"].astype(str)

print(f"Shape (raw {COUNTRY_CODE} level-{UNIT_LEVEL}):", df_raw.shape)
print("Date range:", df_raw["date"].min(), "→", df_raw["date"].max())
df_raw.sample(5, random_state=1)


Shape (raw KEN level-4): (3574011, 9)
Date range: 2018-01-01 00:00:00 → 2024-12-01 00:00:00


Unnamed: 0,country_code,country_name,unit_code,unit_name,unit_level,indicator_code,indicator_name,date,value
2724505,KEN,Kenya,m9cV82YWKBf,North Maragoli Ward,4,pct05jgGcLt,Estimated Number of Pregnant Women,2019-09-01,515.0
2458397,KEN,Kenya,ZlcAxf7QN8c,Muthwani Ward,4,Jw3alL29ZEi,MOH710 Measles-Rubella 2 Dose Adm (at 1 1/2 - ...,2024-05-01,31.0
1676953,KEN,Kenya,kQXcKGM8chb,Kolwa East Ward,4,hNyiGOxeAE0,Essential - Hypertension cases - total,2018-11-01,103.0
1590736,KEN,Kenya,gccbvaIlS0w,Kisa Central Ward,4,Fz0LzxMT1vV,MOH 711 Pregnant women completing 4 ANC visits,2021-12-01,75.0
3074007,KEN,Kenya,iKmwecDBqgX,Siboti Ward,4,etbuJ3chcZL,MOH 711 Rev 2020_No. clients with 1st ANC con...,2022-12-01,10.0


In [6]:
# ------------------------------------------------------------
# 4. JOIN DB DATA TO MAPPING TO ADD SHORT NAMES
# ------------------------------------------------------------
# Join ONLY on indicator_code (stable key).
# We keep all rows; unmatched codes will have NaN indicator_type.

df = df_raw.merge(
    ind_map[["indicator_code", "indicator_type"]],
    on="indicator_code",
    how="left",
)

# Ensure short names are strings (even if some are missing)
df["indicator_type"] = df["indicator_type"].astype(str)

print("After mapping – sample with short names:")
display(df[["indicator_code", "indicator_name", "indicator_type"]].head())

After mapping – sample with short names:


Unnamed: 0,indicator_code,indicator_name,indicator_type
0,ETX9cUWF43c,MOH 731 Initial test at ANC HV02-04,anc_hiv_test
1,ETX9cUWF43c,MOH 731 Initial test at ANC HV02-04,anc_hiv_test
2,ETX9cUWF43c,MOH 731 Initial test at ANC HV02-04,anc_hiv_test
3,ETX9cUWF43c,MOH 731 Initial test at ANC HV02-04,anc_hiv_test
4,ETX9cUWF43c,MOH 731 Initial test at ANC HV02-04,anc_hiv_test


In [7]:
# ------------------------------------------------------------
# 5. DATA CLEANING (PER INDICATOR)
# ------------------------------------------------------------
#   - missing flags
#   - negative value flags
#   - duplicate flags

df_clean = df.copy()  # start from joined data (df)

# 5.1 Flag missing values
df_clean["is_missing"] = df_clean["value"].isna().astype(int)

# 5.2 Flag negative values and set them to NaN in value_clean
df_clean["flag_negative"] = (df_clean["value"] < 0).astype(int)
df_clean["value_clean"] = df_clean["value"].copy()
df_clean.loc[df_clean["flag_negative"] == 1, "value_clean"] = np.nan

# 5.3 Flag potential duplicates (but do NOT aggregate them)
dup_keys = ["country_code", "unit_code", "indicator_code", "date"]

df_clean["is_duplicate"] = df_clean.duplicated(dup_keys, keep=False).astype(int)

print("Clean data shape (fast mode):", df_clean.shape)
df_clean.head()


Clean data shape (fast mode): (3574011, 14)


Unnamed: 0,country_code,country_name,unit_code,unit_name,unit_level,indicator_code,indicator_name,date,value,indicator_type,is_missing,flag_negative,value_clean,is_duplicate
0,KEN,Kenya,X5bxxQN8nP5,Elburgon Ward,4,ETX9cUWF43c,MOH 731 Initial test at ANC HV02-04,2018-06-01,12.0,anc_hiv_test,0,0,12.0,0
1,KEN,Kenya,X5bxxQN8nP5,Elburgon Ward,4,ETX9cUWF43c,MOH 731 Initial test at ANC HV02-04,2018-07-01,16.0,anc_hiv_test,0,0,16.0,0
2,KEN,Kenya,X5bxxQN8nP5,Elburgon Ward,4,ETX9cUWF43c,MOH 731 Initial test at ANC HV02-04,2018-08-01,22.0,anc_hiv_test,0,0,22.0,0
3,KEN,Kenya,X5bxxQN8nP5,Elburgon Ward,4,ETX9cUWF43c,MOH 731 Initial test at ANC HV02-04,2018-09-01,23.0,anc_hiv_test,0,0,23.0,0
4,KEN,Kenya,X5bxxQN8nP5,Elburgon Ward,4,ETX9cUWF43c,MOH 731 Initial test at ANC HV02-04,2018-10-01,147.0,anc_hiv_test,0,0,147.0,0


In [8]:
# ------------------------------------------------------------
# 6. OUTLIER DETECTION (Z-SCORE BY INDICATOR TYPE)
# ------------------------------------------------------------

# Compute mean and std for each indicator_type
stats = (
    df_clean.groupby("indicator_type")["value_clean"]
    .agg(["mean", "std"])
    .rename(columns={"mean": "mean_val", "std": "std_val"})
)

# Merge stats back to df_clean
df_clean = df_clean.merge(stats, on="indicator_type", how="left")

# Compute z-score
df_clean["zscore"] = (df_clean["value_clean"] - df_clean["mean_val"]) / df_clean["std_val"]

# Flag outliers
# Threshold: |z| > 3 (commonly used)
df_clean["flag_outlier"] = (df_clean["zscore"].abs() > 3).astype(int)

# Clean up thresholds for reporting
df_clean["outlier_threshold_hi"] = df_clean["mean_val"] + 3 * df_clean["std_val"]
df_clean["outlier_threshold_lo"] = df_clean["mean_val"] - 3 * df_clean["std_val"]

print("Outlier detection completed (fast mode).")

Outlier detection completed (fast mode).


In [9]:
# ------------------------------------------------------------
# 6.2 DERIVED PERCENTAGE INDICATORS (USING indicator_type SHORT NAMES)
# ------------------------------------------------------------
# Uses actual short names from Data_id_12.10.25.csv:
#   anc1, anc4, anc8, anc1_12w, anc1_ado, anc_hiv_test,
#   delivery, deliveries_estimated, delivery_hiv,
#   bcg, penta1, penta3, measles1, skilled_del,
#   syphilistest_ancqoc, etc.
#
# Logic:
#   1. Pivot df_clean (long) -> df_wide (wide) by indicator_type.
#   2. Define derived indicators in terms of these short names.
#   3. Split into:
#        - available_specs   : numerators & denominators present now
#        - placeholder_specs : at least one component missing
#   4. Compute only available_specs to avoid runtime errors.
#   5. Keep placeholder_specs as documentation for future expansion.
# ------------------------------------------------------------

id_cols = [
    "country_code",
    "country_name",
    "unit_code",
    "unit_name",
    "unit_level",
    "date",
]

# 6.3 Pivot to wide format by indicator_type
df_wide = df_clean.pivot_table(
    index=id_cols,
    columns="indicator_type",
    values="value_clean",
    aggfunc="sum",
).reset_index()

available_types = set(df_wide.columns) - set(id_cols)

print("Number of short-name indicators available:", len(available_types))
print("Sample of available indicator_type values:")
print(sorted(list(available_types))[:40])  # adjust slice if needed


# 6.4 Define derived indicator specs using real short names
# ---------------------------------------------------------
# NOTE: These names come from your CSV (currently available or expected):
#   anc1                  -> New ANC clients
#   anc4                  -> Pregnant women completing 4 ANC visits
#   anc1_12w              -> ANC1 before 12 weeks
#   anc1_ado              -> Adolescents (ANC1)
#   anc_hiv_test          -> Initial HIV test at ANC
#   syphilistest_ancqoc   -> Syphilis tests at ANC (QOC)
#   delivery              -> Total deliveries in facilities
#   deliveries_estimated  -> Estimated deliveries
#   skilled_del           -> Skilled deliveries
#   delivery_hiv          -> Deliveries from HIV+ women
#   bcg                   -> BCG doses administered
#   penta3                -> DPT/Hep+HiB3 doses
#   measles1              -> Measles/Rubella 1 doses
#
# Some HIV/EID/adolescent indicators will remain placeholders for now.
#
# ----------------------------------------------------------------------
# IMPORTANT – INDICATORS NOT YET AVAILABLE / NOT FULLY MAPPED IN AHEAD
# ----------------------------------------------------------------------
# The following indicators are part of the proposed integrated analytics
# but are (as of this pipeline version) missing or incomplete in the AHEAD
# database for this country. Once UNICEF adds them (using the short names
# below), the same pipeline will automatically compute the corresponding
# derived indicators without any code changes.
#
#   MOH731B Number screened_HBV                          -> anc_hbv_test
#   MOH 731 Positive Results _L&D HV02-12               -> anc_hiv_test_pos_L&D
#   MOH 731 Positive Results_ANC HV02-11                -> anc_hiv_test_pos_ANC
#   MOH 731 Positive Results_PNC<=6wks HV02-13          -> anc_hiv_test_pos_PNC
#   MOH 731 Start HAART_L&D HV02-18                     -> maternal_arvs_newly_initiated_L&D
#   MOH 731 Start HAART_ANC HV02-17                     -> maternal_arvs_newly_initiated_ANC
#   MOH 731 Start HAART_PNC<=6wks HV02-19               -> maternal_arvs_newly_initiated_PNC
#   MOH 731 On HAART at 1st ANC HV02-16                 -> maternal_arvs_currently on ART
#   MOH 731 Known Positive at 1st ANC HV02-03           -> pregnant_hiv_pos_known_pos
#   MOH 731 Syphilis Screened_Positive HV02-25          -> syphilis_test_pos
#   MOH 731 Syphilis Screened_1st ANC HV02-24           -> syphilis_test
#   MOH731B Positive_HBV                                -> anc_hbv_test_pos
#   MOH 711 Deliveries from HIV+ve Women                -> delivery_hiv
#   MOH 731 On ART_1-9 HV03-029                         -> art_old_9
#   MOH 731 On ART_10-14 (F) HV03-031                   -> art_old_14_f
#   MOH 731 On ART_10-14 (M) HV03-030                   -> art_old_14_m
#   Proportion of under 1 year receiving DPT/Hep+HiB3   -> under1_DPT/Hep+HiB3
#   Proportion of children under one year fully immunized-> under1_fully_immunized
#   MOH 731 On ART_<1 HV03-028                          -> art_old_0
#   MOH 731 On ART_15-19 (F) HV03-033                   -> art_old_19_f
#   MOH 731 On ART_15-19 (M) HV03-032                   -> art_old_19_m
#   MOH 731 On ART_20-24 (F) HV03-035                   -> art_old_24_f
#   MOH 731 On ART_20-24 (M) HV03-034                   -> art_old_24_m
#   MOH 731 On ART_25+ (F) HV03-037                     -> art_old_25+_f
#   MOH 731 On ART_25+ (M) HV03-036                     -> art_old_25+_m
#   MOH 731_HIV_TB_OnART_<1 (F) HV03-16                 -> art_new_0_f
#   MOH 731_HIV_TB_OnART_<1 (M) HV03-15                 -> art_new_0_m
#   MOH 731_HIV_TB_OnART_1-4 (F) HV03-18                -> art_new_4_f
#   MOH 731_HIV_TB_OnART_1-4 (M) HV03-17                -> art_new_4_m
#   MOH 731_HIV_TB_OnART_5-9 (F) HV03-20                -> art_new_9_f
#   MOH 731_HIV_TB_OnART_5-9 (M) HV03-19                -> art_new_9_m
#   MOH 731_HIV_TB_OnART_10-14 (F) HV03-22              -> art_new_14_f
#   MOH 731_HIV_TB_OnART_10-14 (M) HV03-21              -> art_new_14_m
#   MOH 731_HIV_TB_OnART_15-19 (F) HV03-24              -> art_new_19_f
#   MOH 731_HIV_TB_OnART_15-19 (M) HV03-23              -> art_new_19_m
#   MOH 731_HIV_TB_OnART_20-24 (F) HV03-26              -> art_new_24_f
#   MOH 731_HIV_TB_OnART_20-24 (M) HV03-25              -> art_new_24_m
#   MOH 731_HIV_TB_OnART_25+ (F) HV03-28                -> art_new_25+_f
#   MOH 731_HIV_TB_OnART_25+ (M) HV03-27                -> art_new_25+_m
#
# Once these short names are populated in AHEAD, they will automatically
# be picked up in `available_types` and can be used to compute additional
# PMTCT, ART, and paediatric/adolescent HIV indicators in this pipeline.
# ---------------------------------------------------------

DERIVED_INDICATORS = {
    # ------------------------------------------------------------------
    # ANC COVERAGE & COMPLETION
    # ------------------------------------------------------------------
    "pct_anc1_coverage_est": {
        "numerator":   "anc1",
        "denominator": "deliveries_estimated",
        "description": "% ANC1 coverage among estimated deliveries (ANC1 / deliveries_estimated)",
    },
    "pct_anc4_coverage_est": {
        "numerator":   "anc4",
        "denominator": "deliveries_estimated",
        "description": "% ANC4 coverage among estimated deliveries (ANC4 / deliveries_estimated)",
    },
    "pct_anc4_completion": {
        "numerator":   "anc4",
        "denominator": "anc1",
        "description": "% ANC4 completion among ANC1 clients (ANC4 / ANC1)",
    },
    "pct_anc1_early_booking": {
        "numerator":   "anc1_12w",
        "denominator": "anc1",
        "description": "% ANC1 before 12 weeks among all ANC1 (ANC1<12w / ANC1)",
    },
    "pct_anc1_adolescent": {
        "numerator":   "anc1_ado",
        "denominator": "anc1",
        "description": "% ANC1 among adolescents (ANC1_ado / ANC1)",
    },

    # ------------------------------------------------------------------
    # HIV & SYPHILIS TESTING COVERAGE IN ANC / L&D / PNC
    # ------------------------------------------------------------------
    "pct_hiv_testing_anc": {
        "numerator":   "anc_hiv_test",
        "denominator": "anc1",
        "description": "% HIV testing coverage at ANC (ANC HIV tests / ANC1)",
    },
    "pct_syphilis_testing_anc": {
        "numerator":   "syphilistest_ancqoc",
        "denominator": "anc1",
        "description": "% Syphilis testing coverage at ANC (syphilistest_ancqoc / ANC1)",
    },
    "pct_hiv_testing_ld": {
        "numerator":   "HIV testing at L&D",
        "denominator": "delivery",
        "description": "% HIV testing coverage at L&D (HIV tests at L&D / facility deliveries)",
    },
    "pct_hiv_testing_pnc": {
        "numerator":   "HIV testing at PNC",
        "denominator": "delivery",
        "description": "% HIV testing coverage at PNC (HIV tests at PNC / facility deliveries)",
    },
    "pct_delivery_hiv_among_deliveries": {
        "numerator":   "delivery_hiv",
        "denominator": "delivery",
        "description": "% deliveries from HIV+ women (delivery_hiv / delivery)",
    },

    # ------------------------------------------------------------------
    # DELIVERY & SKILLED BIRTH ATTENDANCE
    # ------------------------------------------------------------------
    "pct_facility_delivery_est": {
        "numerator":   "delivery",
        "denominator": "deliveries_estimated",
        "description": "% facility deliveries among estimated deliveries (delivery / deliveries_estimated)",
    },
    "pct_skilled_delivery_fac": {
        "numerator":   "skilled_del",
        "denominator": "delivery",
        "description": "% skilled deliveries among facility deliveries (skilled_del / delivery)",
    },
    "pct_skilled_delivery_est": {
        "numerator":   "skilled_del",
        "denominator": "deliveries_estimated",
        "description": "% skilled deliveries among estimated deliveries (skilled_del / deliveries_estimated)",
    },

    # ------------------------------------------------------------------
    # IMMUNIZATION COVERAGE – USING ESTIMATED DELIVERIES AS DENOMINATOR
    # ------------------------------------------------------------------
    "pct_bcg_coverage": {
        "numerator":   "bcg",
        "denominator": "deliveries_estimated",
        "description": "% BCG coverage (BCG doses / deliveries_estimated)",
    },
    "pct_penta3_coverage": {
        "numerator":   "penta3",
        "denominator": "deliveries_estimated",
        "description": "% Penta3 coverage (Penta3 doses / deliveries_estimated)",
    },
    "pct_measles1_coverage": {
        "numerator":   "measles1",
        "denominator": "deliveries_estimated",
        "description": "% Measles1 coverage (Measles1 doses / deliveries_estimated)",
    },

    # ------------------------------------------------------------------
    # ADOLESCENT HIV TESTING – PLACEHOLDERS (MAY OR MAY NOT BE AVAILABLE)
    # ------------------------------------------------------------------
    "pct_hiv_testing_10_19": {
        "numerator":   "HIV testing among 10-19",
        "denominator": "Population 10-19",
        "description": "% HIV testing among adolescents 10–19 years",
    },
    "pct_hiv_testing_15_24": {
        "numerator":   "HIV testing among 15-24",
        "denominator": "Population 15-24",
        "description": "% HIV testing among young people 15–24 years",
    },
    "pct_hiv_positivity_10_19": {
        "numerator":   "Positive HIV tests among 10-19",
        "denominator": "HIV testing among 10-19",
        "description": "% HIV positivity among adolescents 10–19 years",
    },
    "pct_hiv_positivity_15_24": {
        "numerator":   "Positive HIV tests among 15-24",
        "denominator": "HIV testing among 15-24",
        "description": "% HIV positivity among young people 15–24 years",
    },

    # ------------------------------------------------------------------
    # EID (EARLY INFANT DIAGNOSIS) – EXAMPLE PLACEHOLDER
    # ------------------------------------------------------------------
    "pct_eid_8wks_coverage": {
        "numerator":   "eid_8wks",       # infants with EID at 8 weeks
        "denominator": "delivery_hiv",   # deliveries from HIV+ mothers
        "description": "% EID 8 weeks coverage (EID 8wks / deliveries from HIV+ women)",
    },
}


# 6.5 Split into AVAILABLE vs PLACEHOLDER based on what actually exists
# ---------------------------------------------------------------------
available_specs   = {}
placeholder_specs = {}

for new_col, spec in DERIVED_INDICATORS.items():
    num = spec["numerator"]
    den = spec["denominator"]
    if (num in available_types) and (den in available_types):
        available_specs[new_col] = spec
    else:
        placeholder_specs[new_col] = spec

print("\nDerived indicators that CAN be computed with CURRENT data:")
for k, v in available_specs.items():
    print(f"  - {k}: {v['numerator']} / {v['denominator']}  --> AVAILABLE")

print("\nDerived indicators that are PLACEHOLDERS (components missing now):")
for k, v in placeholder_specs.items():
    print(f"  - {k}: {v['numerator']} / {v['denominator']}  --> PLACEHOLDER")


# ------------------------------------------------------------
# 6A. AVAILABLE DERIVED INDICATORS – RUN THIS NOW
# ------------------------------------------------------------
df_derived = df_wide.copy()

for new_col, spec in available_specs.items():
    num = spec["numerator"]
    den = spec["denominator"]

    with np.errstate(divide="ignore", invalid="ignore"):
        df_derived[new_col] = df_derived[num] / df_derived[den]

print("\nColumns in df_derived (sample):")
print(df_derived.columns.tolist()[:40])


# ------------------------------------------------------------
# 6B. PLACEHOLDER DERIVED INDICATORS – FOR FUTURE USE ONLY
# ------------------------------------------------------------

placeholder_table = (
    pd.DataFrame.from_dict(placeholder_specs, orient="index")
    .reset_index()
    .rename(columns={"index": "derived_indicator_name"})
)

print("\nPlaceholder derived indicators (for documentation only):")
display(placeholder_table)


Number of short-name indicators available: 43
Sample of available indicator_type values:
['HIV testing at L&D', 'HIV testing at PNC', 'anc1', 'anc1_12w', 'anc1_ado', 'anc4', 'anc8', 'anc_hbv_test', 'anc_hiv_test', 'bcg', 'cervical_cancer', 'csection', 'deliveries_assisted_vag', 'deliveries_breach', 'deliveries_estimated', 'deliveries_normal', 'delivery', 'dia_ors_zinc', 'eid_8wks', 'fp_new', 'fp_revisits', 'hpv1', 'hypertension', 'malaria_rdt', 'measles1', 'measles2', 'mmr_deaths', 'mnps', 'nan', 'opd_f', 'opd_m', 'penta1', 'penta3', 'pnc1', 'pneumonia', 'population_total', 'population_under1', 'pregnancies', 'rutf', 'skilled_del']

Derived indicators that CAN be computed with CURRENT data:
  - pct_anc1_coverage_est: anc1 / deliveries_estimated  --> AVAILABLE
  - pct_anc4_coverage_est: anc4 / deliveries_estimated  --> AVAILABLE
  - pct_anc4_completion: anc4 / anc1  --> AVAILABLE
  - pct_anc1_early_booking: anc1_12w / anc1  --> AVAILABLE
  - pct_anc1_adolescent: anc1_ado / anc1  --> AVA

Unnamed: 0,derived_indicator_name,numerator,denominator,description
0,pct_delivery_hiv_among_deliveries,delivery_hiv,delivery,% deliveries from HIV+ women (delivery_hiv / d...
1,pct_hiv_testing_10_19,HIV testing among 10-19,Population 10-19,% HIV testing among adolescents 10–19 years
2,pct_hiv_testing_15_24,HIV testing among 15-24,Population 15-24,% HIV testing among young people 15–24 years
3,pct_hiv_positivity_10_19,Positive HIV tests among 10-19,HIV testing among 10-19,% HIV positivity among adolescents 10–19 years
4,pct_hiv_positivity_15_24,Positive HIV tests among 15-24,HIV testing among 15-24,% HIV positivity among young people 15–24 years
5,pct_eid_8wks_coverage,eid_8wks,delivery_hiv,% EID 8 weeks coverage (EID 8wks / deliveries ...


In [10]:
# ------------------------------------------------------------
# 7. DATA-QUALITY TABLES (FAST MODE WITH OUTLIERS)
# ------------------------------------------------------------
# Assumes df_clean already contains:
#   - value_clean       : cleaned numeric value
#   - is_missing        : 1 if value is missing, else 0
#   - flag_negative     : 1 if original value < 0, else 0
#   - is_duplicate      : 1 if row is duplicate by (country, unit, indicator, date)
#   - flag_outlier      : 1 if row flagged as outlier (z-score or other rule), else 0
#   - zscore            : standardized value within indicator_type
#   - outlier_threshold_lo / outlier_threshold_hi : bounds used to flag outlier
#
# This section produces:
#   1) Indicator-level DQ summary (completeness, negatives, duplicates, outliers)
#   2) Unit-level DQ summary (reporting completeness)
#   3) Detail table of duplicates
#   4) Detail table of outliers

# 7.1 Indicator-level completeness & quality summary
# --------------------------------------------------
# For each indicator, we compute:
#   - n_expected   : total number of records (time x units)
#   - n_missing    : number of missing (NaN) values
#   - n_negative   : number of negative values flagged
#   - n_duplicates : number of records flagged as duplicates
#   - n_outliers   : number of records flagged as outliers
#   - pct_*        : percentages for quick prioritization
#
# NOTE: We now group by indicator_code + indicator_name + indicator_type
#       so the summary has both the short analytic name and full label.

dq_indicator = (
    df_clean
    .groupby(
        [
            "country_code",
            "indicator_code",
            "indicator_name",
            "indicator_type",   # short name from mapping
        ],
        dropna=False,
    )
    .agg(
        n_expected   = ("value_clean", "size"),
        n_missing    = ("value_clean", lambda x: x.isna().sum()),
        n_negative   = ("flag_negative", "sum"),
        n_duplicates = ("is_duplicate", "sum"),
        n_outliers   = ("flag_outlier", "sum"),
    )
    .reset_index()
)

# Convert proportions to percentage values (e.g., 0.021821 → 2.1821%)
dq_indicator["pct_missing"]    = (dq_indicator["n_missing"]    / dq_indicator["n_expected"]) * 100
dq_indicator["pct_negative"]   = (dq_indicator["n_negative"]   / dq_indicator["n_expected"]) * 100
dq_indicator["pct_duplicates"] = (dq_indicator["n_duplicates"] / dq_indicator["n_expected"]) * 100
dq_indicator["pct_outliers"]   = (dq_indicator["n_outliers"]   / dq_indicator["n_expected"]) * 100

print("Indicator-level DQ summary (first 10 rows):")
display(dq_indicator.head(10))


# 7.2 Unit-level completeness summary
# --------------------------------------------------
# For each unit (facility/district/etc.), we compute:
#   - n_indicators_reported : total non-missing values
#   - n_indicators_missing  : total missing values
#   - n_duplicates          : how many rows flagged as duplicates
#   - pct_reported          : proportion of non-missing values

dq_unit = (
    df_clean
    .groupby(["country_code", "unit_code", "unit_name", "unit_level"], dropna=False)
    .agg(
        n_indicators_reported = ("value_clean", lambda x: x.notna().sum()),
        n_indicators_missing  = ("value_clean", lambda x: x.isna().sum()),
        n_duplicates          = ("is_duplicate", "sum"),
    )
    .reset_index()
)

dq_unit["pct_reported"] = dq_unit["n_indicators_reported"] / (
    dq_unit["n_indicators_reported"] + dq_unit["n_indicators_missing"]
)

print("Unit-level DQ summary (first 10 rows):")
display(dq_unit.head(10))


# 7.3 Duplicate detail table
# --------------------------------------------------
# This table lists all rows that are part of a duplicate cluster.
# It allows country teams to inspect which units/indicators/dates
# have multiple records and decide which one to keep.

dq_duplicates = df_clean[df_clean["is_duplicate"] == 1][
    [
        "country_code",
        "unit_code",
        "unit_name",
        "indicator_code",
        "indicator_name",
        "indicator_type",
        "date",
        "value_clean",
    ]
].copy()

print("Number of duplicate rows flagged:", dq_duplicates.shape[0])


# 7.4 Outlier detail table
# --------------------------------------------------
# This table lists all rows flagged as outliers with:
#   - zscore: how extreme the value is
#   - outlier_threshold_lo/hi: calculated bounds
# This is useful for country validation and triage.

dq_outliers = df_clean[df_clean["flag_outlier"] == 1][
    [
        "country_code",
        "unit_code",
        "unit_name",
        "indicator_code",
        "indicator_name",
        "indicator_type",
        "date",
        "value_clean",
        "zscore",
        "outlier_threshold_lo",
        "outlier_threshold_hi",
    ]
].copy()

print("Number of outlier rows flagged:", dq_outliers.shape[0])


# Quick shapes summary
print("\nDQ table shapes (fast mode with outliers):")
print("  - dq_indicator (indicator-level):", dq_indicator.shape)
print("  - dq_unit      (unit-level)     :", dq_unit.shape)
print("  - dq_duplicates                :", dq_duplicates.shape)
print("  - dq_outliers                  :", dq_outliers.shape)


Indicator-level DQ summary (first 10 rows):


Unnamed: 0,country_code,indicator_code,indicator_name,indicator_type,n_expected,n_missing,n_negative,n_duplicates,n_outliers,pct_missing,pct_negative,pct_duplicates,pct_outliers
0,KEN,C6fThjYiDZA,Essential - Women aged 25-49 screened for cerv...,cervical_cancer,56737,0,0,0,488,0.0,0.0,0.0,0.860109
1,KEN,ETX9cUWF43c,MOH 731 Initial test at ANC HV02-04,anc_hiv_test,104072,0,0,0,2271,0.0,0.0,0.0,2.182143
2,KEN,F6NOaTioUje,MOH731B Number screened_HBV,anc_hbv_test,203,0,0,0,5,0.0,0.0,0.0,2.463054
3,KEN,FXeDYp2ATE9,Total deliveries in facilities,delivery,111575,0,0,0,2334,0.0,0.0,0.0,2.091866
4,KEN,Fz0LzxMT1vV,MOH 711 Pregnant women completing 4 ANC visits,anc4,120115,0,0,0,2425,0.0,0.0,0.0,2.018899
5,KEN,Jw3alL29ZEi,MOH710 Measles-Rubella 2 Dose Adm (at 1 1/2 - ...,measles2,117000,0,0,0,236,0.0,0.0,0.0,0.201709
6,KEN,Kx64gGqaFVq,MOH 711 Assisted vaginal delivery,deliveries_assisted_vag,6142,0,0,0,62,0.0,0.0,0.0,1.009443
7,KEN,LQpQQP3KnU1,MOH 731 Initial test at PNC_PNC<=6wks HV02-06,HIV testing at PNC,21095,0,0,0,351,0.0,0.0,0.0,1.663901
8,KEN,MVIKFGNvNJp,Family Planning Revisits,fp_revisits,113847,0,0,0,1093,0.0,0.0,0.0,0.96006
9,KEN,Mh01TRA38Ng,OP: Beneficiaries who Received RUTF Sachet,rutf,41294,0,0,0,22,0.0,0.0,0.0,0.053277


Unit-level DQ summary (first 10 rows):


Unnamed: 0,country_code,unit_code,unit_name,unit_level,n_indicators_reported,n_indicators_missing,n_duplicates,pct_reported
0,KEN,A1sIHVzJM77,Kware Ward,4,2208,0,0,1.0
1,KEN,A20qmlqFi5D,Naivasha East Ward,4,2236,0,0,1.0
2,KEN,A4Ko5vFs83F,Guticha Ward,4,1797,0,0,1.0
3,KEN,A7U8KMUSpj1,Kanuni Ward,4,2356,0,0,1.0
4,KEN,A7a1GZJvTGD,Kamenu Ward,4,2840,0,0,1.0
5,KEN,A7kJTCLxqNq,Loiyamorok Ward,4,2191,0,0,1.0
6,KEN,A7zboeNsKAz,Sango Ward,4,2494,0,0,1.0
7,KEN,AAVwWFH70cF,Magwagwa Ward,4,2648,0,0,1.0
8,KEN,AEMY7F8XyZT,Elwak South Ward,4,1231,0,0,1.0
9,KEN,AIL7D1wnOIl,Ganga Ward,4,2340,0,0,1.0


Number of duplicate rows flagged: 0
Number of outlier rows flagged: 53819

DQ table shapes (fast mode with outliers):
  - dq_indicator (indicator-level): (45, 13)
  - dq_unit      (unit-level)     : (1453, 8)
  - dq_duplicates                : (0, 8)
  - dq_outliers                  : (53819, 11)


In [11]:
# ------------------------------------------------------------
# 8. EXPORT TO EXCEL – DQ DATAPACK WITH BASIC FORMATTING
# ------------------------------------------------------------

def export_dq_datapack_formatted(path: str):
    """
    Exports a multi-sheet Excel 'datapack' with light but useful formatting:
      - Header row bold & shaded
      - Freeze header row + autofilter
      - Wider columns for text
      - Percentage-style display for pct_* columns (0–100 stored, e.g. 23.5 → "23.5%")
    """
    with pd.ExcelWriter(path, engine="xlsxwriter") as writer:
        # Write sheets
        dq_indicator.to_excel(writer, sheet_name="completeness_indicator", index=False)
        dq_unit.to_excel(writer, sheet_name="completeness_unit", index=False)
        dq_duplicates.to_excel(writer, sheet_name="duplicates", index=False)
        dq_outliers.to_excel(writer, sheet_name="outliers", index=False)
        df_derived.to_excel(writer, sheet_name="derived_indicators", index=False)

        workbook = writer.book

        # Common formats
        header_fmt = workbook.add_format({
            "bold": True,
            "bg_color": "#D9E1F2",
            "border": 1
        })
        percent_fmt = workbook.add_format({
            # Our pct_* values are 0–100, so we use a literal % sign (no Excel scaling)
            "num_format": '0.0"%"'
        })

        def format_sheet(df: pd.DataFrame, sheet_name: str):
            """Apply basic formatting to a given sheet."""
            ws = writer.sheets[sheet_name]

            # Header row format + autofilter
            ws.autofilter(0, 0, 0, len(df.columns) - 1)
            for col_num, col_name in enumerate(df.columns):
                ws.write(0, col_num, col_name, header_fmt)

            # Set a reasonable default column width
            ws.set_column(0, len(df.columns) - 1, 16)

            # Freeze header row
            ws.freeze_panes(1, 0)

            # Apply % style to any pct_* columns (if present)
            pct_cols = [i for i, c in enumerate(df.columns) if c.startswith("pct_")]
            for col_idx in pct_cols:
                ws.set_column(col_idx, col_idx, 14, percent_fmt)

        # Apply formatting to each sheet
        format_sheet(dq_indicator, "completeness_indicator")
        format_sheet(dq_unit, "completeness_unit")
        format_sheet(dq_duplicates, "duplicates")
        format_sheet(dq_outliers, "outliers")
        format_sheet(df_derived, "derived_indicators")

    print("Formatted data-quality review Excel datapack exported to:", path)


# Call instead of the previous export_dq_datapack:
export_dq_datapack_formatted(DQ_EXCEL_PATH)


Formatted data-quality review Excel datapack exported to: dq_review_ken_level4.xlsx


In [12]:
# ------------------------------------------------------------
# 9. GEOMETRY & UNIT-LEVEL OUTLIER SUMMARY FOR MAPS (STREAMLIT)
# ------------------------------------------------------------
# Assumes:
#   - engine          : SQLAlchemy engine already created earlier
#   - COUNTRY_CODE    : e.g. "KEN"
#   - UNIT_LEVEL      : e.g. 4
#   - dq_unit         : unit-level DQ summary (Section 7)
#   - df_clean        : cleaned long-format data with flag_outlier (Section 5/6)
#
# Produces:
#   - dq_unit_with_outliers : dq_unit + outlier counts
#   - dq_unit_geo           : dq_unit_with_outliers + lat/lon
#   - dq_unit_with_outliers.parquet : file for the Streamlit map
# ------------------------------------------------------------

import geopandas as gpd
from sqlalchemy import create_engine

# 9.0 Rebuild DB connection string from existing engine
DB_CONN = engine.url.render_as_string(hide_password=False)
print("Using DB_CONN:", DB_CONN)

# Use a *fresh* engine for this query to avoid any stale transactions
engine_geo = create_engine(DB_CONN)

# 9.1 Pull unique units + geometry from the DB
unit_query = """
SELECT DISTINCT
    country_code,
    country_name,
    unit_code,
    unit_name,
    unit_level,
    unit_geometry
FROM observation
WHERE country_code = %(country_code)s
  AND unit_level   = %(unit_level)s
  AND unit_geometry IS NOT NULL;
"""

df_units_geo = pd.read_sql(
    unit_query,
    con=engine_geo,
    params={"country_code": COUNTRY_CODE, "unit_level": UNIT_LEVEL},
)

print("Units returned from DB (with geometry column):", df_units_geo.shape)

# 9.2 Convert WKT -> geometry and compute centroids
gdf_units = gpd.GeoDataFrame(
    df_units_geo.copy(),
    geometry=gpd.GeoSeries.from_wkt(df_units_geo["unit_geometry"]),
    crs="EPSG:4326",  # WGS84
)

# Centroid coordinates for each unit (used by Streamlit map)
gdf_units["lon"] = gdf_units.geometry.centroid.x
gdf_units["lat"] = gdf_units.geometry.centroid.y

df_geo = (
    gdf_units[
        [
            "country_code",
            "unit_code",
            "unit_name",
            "unit_level",
            "lat",
            "lon",
        ]
    ]
    .dropna(subset=["lat", "lon"])
    .copy()
)

print("Units with valid centroid coordinates:", df_geo.shape)

# 9.3 Summarize outliers at unit level (for anomaly maps)
df_outliers_unit = (
    df_clean
    .groupby(
        ["country_code", "unit_code", "unit_name", "unit_level"],
        dropna=False,
    )["flag_outlier"]
    .sum()
    .reset_index()
    .rename(columns={"flag_outlier": "n_outliers_unit"})
)

# 9.4 Merge outlier counts into dq_unit to create dq_unit_with_outliers
dq_unit_with_outliers = dq_unit.merge(
    df_outliers_unit,
    on=["country_code", "unit_code", "unit_name", "unit_level"],
    how="left",
)

dq_unit_with_outliers["n_outliers_unit"] = dq_unit_with_outliers[
    "n_outliers_unit"
].fillna(0)

print("dq_unit_with_outliers columns:")
print(dq_unit_with_outliers.columns.tolist())

# 9.5 Attach lat/lon to unit-level summary for mapping
dq_unit_geo = dq_unit_with_outliers.merge(
    df_geo[
        [
            "country_code",
            "unit_code",
            "lat",
            "lon",
        ]
    ],
    on=["country_code", "unit_code"],
    how="left",
)

# Ensure lat/lon are numeric and drop any remaining invalid rows
dq_unit_geo["lat"] = pd.to_numeric(dq_unit_geo["lat"], errors="coerce")
dq_unit_geo["lon"] = pd.to_numeric(dq_unit_geo["lon"], errors="coerce")
dq_unit_geo = dq_unit_geo.dropna(subset=["lat", "lon"]).copy()

print("dq_unit_geo shape after dropping NaN lat/lon:", dq_unit_geo.shape)

# 9.6 Save parquet for the Streamlit app
DQ_UNIT_GEO_PATH = "dq_unit_with_outliers.parquet"
dq_unit_geo.to_parquet(DQ_UNIT_GEO_PATH, index=False)

print("Saved unit-level geo DQ file for Streamlit map:", DQ_UNIT_GEO_PATH)

Using DB_CONN: postgresql+psycopg://dapmhealthprdusr:xVW4T4G156Y%25D@pgsql-17-reserved-dapm-health-prd.postgres.database.azure.com:5432/dapm_health_prd


Units returned from DB (with geometry column): (1446, 6)



  gdf_units["lon"] = gdf_units.geometry.centroid.x

  gdf_units["lat"] = gdf_units.geometry.centroid.y


Units with valid centroid coordinates: (1446, 6)
dq_unit_with_outliers columns:
['country_code', 'unit_code', 'unit_name', 'unit_level', 'n_indicators_reported', 'n_indicators_missing', 'n_duplicates', 'pct_reported', 'n_outliers_unit']
dq_unit_geo shape after dropping NaN lat/lon: (1446, 11)
Saved unit-level geo DQ file for Streamlit map: dq_unit_with_outliers.parquet
