# TB Data Acquisition



In [198]:
import sys
import os
from pathlib import Path
import sqlite3


project_root = next((p for p in [Path.cwd()] + list(Path.cwd().parents) 
                     if (p / 'notebooks' / '01_data_acquisition.ipynb').exists()),
                    Path.cwd())
os.chdir(project_root)

WHO_GHO_API_BASE = "https://ghoapi.azureedge.net/api"
WHO_INDICATORS = {
    "tb_incidence": "TB_e_inc_tbhiv_num",
    "tb_mortality": "TB_e_mort_exc_tbhiv_num",
}

START_YEAR = 2020
END_YEAR = 2025
YEARS = list(range(START_YEAR, END_YEAR + 1))

DATABASE_PATH = str(project_root / "data" / "database" / "tb_data.db")
RAW_DATA_DIR = project_root / "data" / "raw"

print(f"✓ Project: {project_root}")
print(f"✓ Database: {DATABASE_PATH}")


✓ Project: /Users/joshua/datascienceproject
✓ Database: /Users/joshua/datascienceproject/data/database/tb_data.db


## Database Functions


In [199]:
def get_connection(db_path, timeout=30.0):
    os.makedirs(os.path.dirname(db_path), exist_ok=True)
    conn = sqlite3.connect(db_path, timeout=timeout)
    conn.row_factory = sqlite3.Row
    conn.execute("PRAGMA journal_mode=WAL")
    return conn

def create_schema(db_path):
    conn = get_connection(db_path)
    cursor = conn.cursor()
    
    cursor.execute("""
        CREATE TABLE IF NOT EXISTS country_metadata (
            country_code TEXT PRIMARY KEY,
            region_code TEXT,
            region_name TEXT,
            UNIQUE(country_code)
        )
    """)
    
    cursor.execute("""
        CREATE TABLE IF NOT EXISTS tb_data (
            id INTEGER PRIMARY KEY,
            country_code TEXT NOT NULL,
            year INTEGER NOT NULL,
            number_of_cases REAL,
            number_of_deaths REAL,
            mortality_rate REAL,
            FOREIGN KEY (country_code) REFERENCES country_metadata(country_code),
            UNIQUE(country_code, year)
        )
    """)
    
    cursor.execute("CREATE INDEX IF NOT EXISTS idx_tb_country_year ON tb_data(country_code, year)")
    
    conn.commit()
    conn.close()
    print(f"Database schema created successfully at {db_path}")

def create_unified_table(db_path: str):
    conn = get_connection(db_path)
    cursor = conn.cursor()
    
    try:
        cursor.execute("DROP VIEW IF EXISTS unified_tb_data")
    except sqlite3.OperationalError:
        pass
    
    try:
        cursor.execute("DROP TABLE IF EXISTS unified_tb_data")
    except sqlite3.OperationalError:
        pass
    
    cursor.execute("""
        CREATE TABLE unified_tb_data AS
        SELECT 
            cm.country_code, cm.region_code, cm.region_name,
            tb.year, 
            tb.tb_incidence_num AS number_of_cases,
            tb.tb_mortality_num AS number_of_deaths
        FROM country_metadata cm
        INNER JOIN tb_data tb ON cm.country_code = tb.country_code
        ORDER BY cm.country_code, tb.year
    """)
    
    conn.commit()
    conn.close()


## API Fetching Functions


In [200]:
import requests
import pandas as pd
import time

def fetch(indicator: str, years=None):
    if years is None:
        years = YEARS
        
    url = f"{WHO_GHO_API_BASE}/{indicator}"
    response = requests.get(url, timeout=60)
    response.raise_for_status()
    data = response.json()
    
    df = pd.DataFrame(data['value'])
    df = df[df['TimeDim'].isin(years)].copy()
    
    df.rename(columns={
        'SpatialDim': 'country_code', 'TimeDim': 'year', 'NumericValue': 'value',
        'Low': 'value_low', 'High': 'value_high',
        'ParentLocation': 'region_name', 'ParentLocationCode': 'region_code'
    }, inplace=True)
    
    df = df[df.get('SpatialDimType', '') == 'COUNTRY'].copy()
    df = df.dropna(subset=['country_code', 'year'])
    df = df[df['value'] > 0].copy()
    return df


## Database Loading Functions


In [201]:
def cleanup_old_data(db_path, keep_years=None):
    if keep_years is None:
        keep_years = YEARS
    
    conn = get_connection(db_path, timeout=30.0)
    cursor = conn.cursor()
    
    placeholders = ','.join([f':year_{i}' for i in range(len(keep_years))])
    params = {f'year_{i}': year for i, year in enumerate(keep_years)}
    
    cursor.execute(f"""
        DELETE FROM tb_data 
        WHERE year NOT IN ({placeholders})
    """, params)
    deleted = cursor.rowcount
    
    conn.commit()
    conn.close()
    
    if deleted > 0:
        print(f"Cleaned up {deleted} records outside year range {min(keep_years)}-{max(keep_years)}")

def load_country_metadata(df, db_path):
    conn = get_connection(db_path)
    cursor = conn.cursor()

    country_df = df[['country_code', 'region_code', 'region_name']].drop_duplicates()
    country_df = country_df.dropna(subset=['country_code'])
    
    for _, row in country_df.iterrows():
        cursor.execute("""
            INSERT OR REPLACE INTO country_metadata (country_code, region_code, region_name)
            VALUES (:country_code, :region_code, :region_name)
        """, {
            'country_code': row['country_code'],
            'region_code': row.get('region_code'),
            'region_name': row.get('region_name')
        })
    
    conn.commit()
    conn.close()

def load_tb_incidence(df, db_path):
    df = df[df['year'].isin(YEARS)].copy()
    load_country_metadata(df, db_path)
    
    conn = get_connection(db_path)
    cursor = conn.cursor()
    
    inserted = 0
    for _, row in df.iterrows():
        cursor.execute("""
            INSERT INTO tb_data 
            (country_code, year, tb_incidence_num)
            VALUES (:country_code, :year, :value)
            ON CONFLICT(country_code, year) DO UPDATE SET
                tb_incidence_num = excluded.tb_incidence_num
        """, {
            'country_code': row['country_code'],
            'year': int(row['year']),
            'value': row.get('value')
        })
        inserted += 1
    
    conn.commit()
    conn.close()
    print(f"Loaded {inserted} TB incidence records (years {min(df['year'])}-{max(df['year'])})")

def load_tb_mortality(df, db_path):
    df = df[df['year'].isin(YEARS)].copy()
    load_country_metadata(df, db_path)
    
    conn = get_connection(db_path)
    cursor = conn.cursor()
    
    inserted = 0
    for _, row in df.iterrows():
        cursor.execute("""
            INSERT INTO tb_data 
            (country_code, year, tb_mortality_num)
            VALUES (:country_code, :year, :value)
            ON CONFLICT(country_code, year) DO UPDATE SET
                tb_mortality_num = excluded.tb_mortality_num
        """, {
            'country_code': row['country_code'],
            'year': int(row['year']),
            'value': row.get('value')
        })
        inserted += 1
    
    conn.commit()
    conn.close()
    print(f"Loaded {inserted} TB mortality records (years {min(df['year'])}-{max(df['year'])})")



## Data Acquisition Orchestration


In [202]:
def save_csv(df, filename):
    RAW_DATA_DIR.mkdir(parents=True, exist_ok=True)
    filepath = RAW_DATA_DIR / filename
    df.to_csv(filepath, index=False)
    print(f"Saved {len(df)} records to {filepath}")

def acquire_all():
    datasets = {}
    
    for key, indicator in [('tb_incidence', WHO_INDICATORS['tb_incidence']),
                          ('tb_mortality', WHO_INDICATORS['tb_mortality'])]:
        df = fetch(indicator, YEARS)
        datasets[key] = df
        save_csv(df, f'{key}.csv')
        time.sleep(1)
    
    return datasets

def load_all(datasets, db_path, cleanup=True):
    if cleanup:
        cleanup_old_data(db_path, YEARS)
    
    loaders = {
        'tb_incidence': load_tb_incidence,
        'tb_mortality': load_tb_mortality
    }
    
    for key, loader in loaders.items():
        loader(datasets[key], db_path)


## Workflow: Create Database Schema


In [203]:
create_schema(DATABASE_PATH)


Database schema created successfully at /Users/joshua/datascienceproject/data/database/tb_data.db


## Workflow: Fetch Data from APIs


In [204]:
datasets = acquire_all()
print(f"Fetched {len(datasets)} datasets")


Saved 835 records to /Users/joshua/datascienceproject/data/raw/tb_incidence.csv
Saved 884 records to /Users/joshua/datascienceproject/data/raw/tb_mortality.csv
Fetched 2 datasets


## Workflow: Load Data into Database


In [205]:
load_all(datasets, DATABASE_PATH)
create_unified_table(DATABASE_PATH)

Loaded 835 TB incidence records (years 2020-2024)
Loaded 884 TB mortality records (years 2020-2024)


## Workflow: Verify Data


In [206]:
import pandas as pd

conn = get_connection(DATABASE_PATH)
summary = pd.read_sql("""
    SELECT *
    FROM unified_tb_data
    ORDER BY year
""", conn)
conn.close()

print(summary)

    country_code region_code            region_name  year  number_of_cases  \
0            AFG         EMR  Eastern Mediterranean  2020             13.0   
1            AGO         AFR                 Africa  2020          15000.0   
2            ALB         EUR                 Europe  2020              4.0   
3            ARE         EMR  Eastern Mediterranean  2020              5.0   
4            ARG         AMR               Americas  2020            730.0   
..           ...         ...                    ...   ...              ...   
880          VUT         WPR        Western Pacific  2024              2.0   
881          YEM         EMR  Eastern Mediterranean  2024            120.0   
882          ZAF         AFR                 Africa  2024         134000.0   
883          ZMB         AFR                 Africa  2024          19000.0   
884          ZWE         AFR                 Africa  2024          20000.0   

     number_of_deaths  
0             14000.0  
1             2