## Tuberculosis data WHO GHO

In [48]:
# import libraries (installed in epi-core environment)
import pandas as pd
import requests
import sqlalchemy
from sqlalchemy import create_engine, text, URL
import json
from pathlib import Path
from dotenv import load_dotenv
import os

In [49]:
# Return a list of Indicator codes from api url
url = "https://ghoapi.azureedge.net/api/Indicator?$filter=contains(IndicatorName,%20%27Tuberculosis%27)"

response = requests.get(url) # fetch api response 
data = response.json() # json to python dict
indicator_codes = [item["IndicatorCode"] for item in data["value"]] #loop through each value and get code b
indicator_codes

['WHS3_522',
 'WHS3_54',
 'UHC_TB_DT',
 'TB_1',
 'TB_c_lab_cul_5m',
 'TB_c_new_snep_tsr',
 'TB_c_newinc',
 'TB_e_inc_num_014',
 'TB_c_lab_sm_100k',
 'TB_e_inc_tbhiv_100k',
 'TB_e_prev_num',
 'TB_effective_treatment_coverage',
 'TB_tot_newrel',
 'TB_e_inc_num',
 'TB_e_inc_tbhiv_num',
 'TB_e_mort_exc_tbhiv_num',
 'MDG_0000000018',
 'MDG_0000000020',
 'MDG_0000000017',
 'MDG_0000000031',
 'MDG_0000000022',
 'MDG_0000000023',
 'MDG_0000000030',
 'TB_Notification_agesex_num',
 'MDG_0000000024']

In [51]:
# See what the data looks like for each by querying each indicator code 
# loop through list of codes, fetch first record and print out keys (fields)

valid_indicator_codes = []  

for code in indicator_codes:
    url = f"{base_url}{code}"
    resp = requests.get(url)
    indicator_data = resp.json()

    if "value" in indicator_data and len(indicator_data["value"]) > 0:
        first_record = indicator_data["value"][0]
        # fields = list(first_record.keys())
        # print(f"Indicator: {code}")
        # print(f"first record: {first_record}\n")
        valid_indicator_codes.append(code)  # keep only if data exists
    # else:
        # print(f"Indicator: {code} (No data returned)n")

# replace indicator codes
indicator_codes = valid_indicator_codes

In [52]:
# check for .env

env_path = Path.cwd().parent / "env" / ".env"  # up one folder

if not os.path.exists(env_path):
    sys.exit(f"Missing {env_path} file. Please create one with your DB credentials.")

load_dotenv(dotenv_path=env_path) 

PG_USER = os.getenv("PGUSER")
PG_PASSWORD = os.getenv("PGPASSWORD")
PG_HOST = os.getenv("PGHOST")
PG_PORT = os.getenv("PGPORT")
PG_DATABASE = os.getenv("PGDATABASE")

# Validate
missing_vars = [var for var, val in {
    "PG_USER": PG_USER,
    "PG_PASSWORD": PG_PASSWORD,
    "PG_HOST": PG_HOST,
    "PG_PORT": PG_PORT,
    "PG_DATABASE": PG_DATABASE
}.items() if val is None]

if missing_vars:
    print(f"Missing required environment variables: {', '.join(missing_vars)} "
             f"\nMake sure they are defined in {env_path}")

In [53]:
# configuration

pg_url = URL.create(
    "postgresql+psycopg",
    username=os.getenv("PGUSER"),
    password=os.getenv("PGPASSWORD"),
    host=os.getenv("PGHOST", "localhost"),
    port=int(os.getenv("PGPORT", 5432)),
    database=os.getenv("PGDATABASE"),
)

INDICATORS = indicator_codes
BASE_URL = "https://ghoapi.azureedge.net/api"  # WHO GHO OData endpoint

In [54]:
# connect to db
engine = create_engine(pg_url, pool_pre_ping=True)

with engine.connect() as conn:
    # Drop existing tables if you want a clean run
    conn.execute(text("""
    DO $$ DECLARE r RECORD;
    BEGIN
        FOR r IN (SELECT tablename FROM pg_tables WHERE schemaname = 'public') LOOP
            EXECUTE 'DROP TABLE IF EXISTS public.' || quote_ident(r.tablename) || ' CASCADE';
        END LOOP;
    END $$;
    """))
    conn.commit()

# cur = psycopg2.connect(pg_url).cursor()

In [55]:
# create schema by running sql file

def run_sql_file(engine, path):
    if not os.path.exists(path):
        raise FileNotFoundError(f"SQL file not found: {path}")
    with open(path, "r", encoding="utf-8") as f:
        sql_text = f.read()
    # exec_driver_sql allows multiple statements separated by semicolons
    with engine.begin() as conn:      # begins a transaction
        conn.exec_driver_sql(sql_text)


try:
    run_sql_file(engine, os.path.join("..", "src\db", "schema.sql") if os.path.basename(os.getcwd())=="notebooks" else "scripts/schema.sql")
    print("Schema created/verified.")
except Exception as e:
    print("Failed to apply SQL files:", e)
    raise

Schema created/verified.


### Extract

In [56]:
import time, re
from datetime import date
from sqlalchemy import text

In [57]:
# populate a range of years (backfill) one time fix

# with engine.begin() as conn:
#     conn.execute(text("""
#     INSERT INTO dim_time (year, start_date, end_date)
#     SELECT gs, make_date(gs,1,1), make_date(gs,12,31)
#     FROM generate_series(1950, 2050) AS gs
#     ON CONFLICT (year) DO NOTHING;
#     """))

In [58]:
# extract functions

def get_json(url, retries=3):
    for i in range(retries):
        r = requests.get(url, timeout=60)
        if r.ok:
            return r.json()
        time.sleep(1 + i)
    r.raise_for_status()

def pick_dim(rec, target_type):
    # Finds the first Dim{1..3} whose type matches target_type
    for i in ("1","2","3"):
        if rec.get(f"Dim{i}Type") == target_type:
            return rec.get(f"Dim{i}")
    return None

def sex_label_from_code(code):
    if not code: return None
    mapping = {
        "SEX_FMLE": "Female",
        "SEX_FEMALE": "Female",
        "SEX_MALE": "Male",
        "SEX_MLE": "Male",
        "SEX_BTSX": "Both sexes",
        "SEX_UNKNOWN": "Unknown"
    }
    return mapping.get(code, code)

def agegroup_label_from_code(code):
    if not code: return None
    unit = "years" if "YEARS" in code else ("months" if "MONTHS" in code else "")
    if code.startswith("AGEGROUP_"):
        raw = code[len("AGEGROUP_"):]
        # Normalize common forms like YEARS15-24 -> "15–24 years"
        digits = re.findall(r"\d+", raw)
        if len(digits) == 2 and "-" in raw:
            label = f"{digits[0]}–{digits[1]} {unit}".strip()
            return label
        if len(digits) == 1 and ("PLUS" in raw or "MORE" in raw or raw.endswith("+")):
            label = f"{digits[0]}+ {unit}".strip()
            return label
        if raw in ("ALL", "YEARSALL", "MONTHSALL"):
            return "All ages"
        # generic fallback
        return raw.replace("_", " ").replace("-", "–").replace("YEARS", "").replace("MONTHS","").strip() + (f" {unit}" if unit else "")
    return code

In [59]:
# sql

insert_time_sql = text("""
INSERT INTO dim_time (year, start_date, end_date)
VALUES (:y, make_date(:y,1,1), make_date(:y,12,31))
ON CONFLICT (year) DO NOTHING;
""")

upsert_indicator_sql = text("""
INSERT INTO dim_indicator (indicator_code, indicator_name, language, source_url)
VALUES (:indicator_code, :indicator_name, :language, :source_url)
ON CONFLICT (indicator_code) DO UPDATE
  SET indicator_name = EXCLUDED.indicator_name,
      language = EXCLUDED.language,
      source_url = EXCLUDED.source_url;
""")

upsert_location_sql = text("""
INSERT INTO dim_location (spatial_dim, location_type, region_code, region_name)
VALUES (:spatial_dim, :location_type, :region_code, :region_name)
ON CONFLICT (spatial_dim) DO UPDATE
  SET location_type = EXCLUDED.location_type,
      region_code = EXCLUDED.region_code,
      region_name = EXCLUDED.region_name;
""")

upsert_sex_sql = text("""
INSERT INTO dim_sex (sex_code, sex_label)
VALUES (:sex_code, :sex_label)
ON CONFLICT (sex_code) DO UPDATE
  SET sex_label = EXCLUDED.sex_label;
""")

upsert_age_sql = text("""
INSERT INTO dim_agegroup (agegroup_code, agegroup_label)
VALUES (:agegroup_code, :agegroup_label)
ON CONFLICT (agegroup_code) DO UPDATE
  SET agegroup_label = EXCLUDED.agegroup_label;
""")

upsert_fact_sql = text("""
INSERT INTO fact_gho_observation
(id, indicator_code, spatial_dim, time_year,
 sex_code, agegroup_code,
 value_raw, numeric_value, low, high, comments,
 data_source_dim_type, data_source_dim,
 api_record_date, time_dim_type, time_dimension_value, time_begin, time_end)
VALUES
(:Id, :IndicatorCode, :SpatialDim, :TimeDim,
 :sex_code, :agegroup_code,
 :Value, :NumericValue, :Low, :High, :Comments,
 :DataSourceDimType, :DataSourceDim,
 :Date, :TimeDimType, :TimeDimensionValue, :TimeDimensionBegin, :TimeDimensionEnd)
ON CONFLICT (id) DO UPDATE SET
  value_raw = EXCLUDED.value_raw,
  numeric_value = EXCLUDED.numeric_value,
  low = EXCLUDED.low,
  high = EXCLUDED.high,
  comments = EXCLUDED.comments,
  api_record_date = EXCLUDED.api_record_date;
""")

In [60]:
insert_time_sql = text("""
INSERT INTO dim_time (year, start_date, end_date)
VALUES (:y, make_date(:y,1,1), make_date(:y,12,31))
ON CONFLICT (year) DO NOTHING;
""")

def ensure_time_years(conn, years):
    yrs = sorted({y for y in years if isinstance(y, int)})
    if not yrs:
        return
    conn.execute(insert_time_sql, [{"y": y} for y in yrs])


In [61]:
# # with engine

# # Upsert dim_indicator metadata ONLY for valid codes
# indicator_meta_rows = []
# for code in indicator_codes:
#     meta = get_json(base_url + f"Indicator?$filter=IndicatorCode%20eq%20%27{code}%27").get("value", [])
#     if meta:
#         m = meta[0]
#         indicator_meta_rows.append({
#             "indicator_code": m["IndicatorCode"],
#             "indicator_name": m.get("IndicatorName"),
#             "language": m.get("Language"),
#             "source_url": base_url + m["IndicatorCode"]
#         })
#     else:
#         # Fallback if metadata not returned: store code as name
#         indicator_meta_rows.append({
#             "indicator_code": code,
#             "indicator_name": code,
#             "language": None,
#             "source_url": base_url + code
#         })

# with engine.begin() as conn:
#     if indicator_meta_rows:
#         conn.execute(upsert_indicator_sql, indicator_meta_rows)

#     # For each valid indicator, fetch observations and upsert facts (and related dims)
#     for code in indicator_codes:
#         obs = get_json(base_url + code).get("value", [])
#         if not obs:
#             continue

#         # Upsert locations from this batch (countries & regions)
#         loc_rows = {}
#         for d in obs:
#             sd = d.get("SpatialDim")
#             if not sd:
#                 continue
#             loc_rows[sd] = {
#                 "spatial_dim": sd,
#                 "location_type": d.get("SpatialDimType"),
#                 "region_code": d.get("ParentLocationCode"),
#                 "region_name": d.get("ParentLocation")
#             }
#         if loc_rows:
#             conn.execute(upsert_location_sql, list(loc_rows.values()))

#         # Prepare/normalize rows (year, sex, agegroup) and collect dim members
#         sex_rows, age_rows = {}, {}
#         obs_rows = []
#         for d in obs:
#             # Normalize year to int
#             if isinstance(d.get("TimeDim"), str):
#                 try:
#                     d["TimeDim"] = int(d["TimeDim"])
#                 except:
#                     d["TimeDim"] = None

#             # Extract Sex / AgeGroup from any Dim1..Dim3
#             sx = pick_dim(d, "SEX")
#             ag = pick_dim(d, "AGEGROUP")
#             d["sex_code"] = sx
#             d["agegroup_code"] = ag

#             if sx and sx not in sex_rows:
#                 sex_rows[sx] = {"sex_code": sx, "sex_label": sex_label_from_code(sx)}
#             if ag and ag not in age_rows:
#                 age_rows[ag] = {"agegroup_code": ag, "agegroup_label": agegroup_label_from_code(ag)}

#             obs_rows.append(d)

#         if sex_rows:
#             conn.execute(upsert_sex_sql, list(sex_rows.values()))
#         if age_rows:
#             conn.execute(upsert_age_sql, list(age_rows.values()))

#         if obs_rows:
#             conn.execute(upsert_fact_sql, obs_rows)


### Transform and load

In [63]:
# with engine load

# Upsert dim_indicator metadata ONLY for valid codes
indicator_meta_rows = []
for code in indicator_codes:
    meta = get_json(base_url + f"Indicator?$filter=IndicatorCode%20eq%20%27{code}%27").get("value", [])
    if meta:
        m = meta[0]
        indicator_meta_rows.append({
            "indicator_code": m["IndicatorCode"],
            "indicator_name": m.get("IndicatorName"),
            "language": m.get("Language"),
            "source_url": base_url + m["IndicatorCode"]
        })
    else:
        # Fallback if metadata not returned: store code as name
        indicator_meta_rows.append({
            "indicator_code": code,
            "indicator_name": code,
            "language": None,
            "source_url": base_url + code
        })

with engine.begin() as conn:
    if indicator_meta_rows:
        conn.execute(upsert_indicator_sql, indicator_meta_rows)

    # For each valid indicator, fetch observations and upsert facts (and related dims)
    for code in indicator_codes:
        obs = get_json(base_url + code).get("value", [])
        if not obs:
            continue

        # Upsert locations from this batch (countries & regions)
        loc_rows = {}
        for d in obs:
            sd = d.get("SpatialDim")
            if not sd:
                continue
            loc_rows[sd] = {
                "spatial_dim": sd,
                "location_type": d.get("SpatialDimType"),
                "region_code": d.get("ParentLocationCode"),
                "region_name": d.get("ParentLocation")
            }
        if loc_rows:
            conn.execute(upsert_location_sql, list(loc_rows.values()))

        # Normalize year and collect years needed
        years_needed = []
        obs_rows = []
        sex_rows, age_rows = {}, {}
        
        for d in obs:
            # Normalize TimeDim -> int (NULL if unparsable)
            if isinstance(d.get("TimeDim"), str):
                try:
                    d["TimeDim"] = int(d["TimeDim"])
                except:
                    d["TimeDim"] = None
        
            # Skip rows that have no year to avoid NOT NULL/ FK issues
            if d.get("TimeDim") is None:
                continue
            years_needed.append(d["TimeDim"])
        
            # Extract Sex / Age group
            sx = pick_dim(d, "SEX")
            ag = pick_dim(d, "AGEGROUP")
            d["sex_code"] = sx
            d["agegroup_code"] = ag
        
            if sx and sx not in sex_rows:
                sex_rows[sx] = {"sex_code": sx, "sex_label": sex_label_from_code(sx)}
            if ag and ag not in age_rows:
                age_rows[ag] = {"agegroup_code": ag, "agegroup_label": agegroup_label_from_code(ag)}
        
            obs_rows.append(d)
        
        # Ensure dim_time has all required years BEFORE fact insert
        ensure_time_years(conn, years_needed)
        
        # Upsert sex/age dims then facts
        if sex_rows:
            conn.execute(upsert_sex_sql, list(sex_rows.values()))
        if age_rows:
            conn.execute(upsert_age_sql, list(age_rows.values()))
        if obs_rows:
            conn.execute(upsert_fact_sql, obs_rows)


In [66]:
with engine.begin() as conn:
    r = conn.execute(text("SELECT 1 FROM dim_time WHERE year = 2018")).fetchone()
    print("2018 present?" , bool(r))

2018 present? True


Notes

If you ran only part of the script (skipping the dim_time seed), or ran the fact load before seeding dim_time, the FK will fail.

Some records have string years; the code above converts them to int and skips records with missing years (otherwise you’d hit NOT NULL on time_year).