<h1> DAT204M - HW1 Data Collection </h1>

## Data Collection + ETL Overview (WDI)

Our journey begins with a clear objective: to gather key World Bank Development Indicators (WDI) for ASEAN countries spanning 1990 to the present. We aim to create datasets that are ready for analysis, modeling, and visualization.
To do this, we employ an ETL (Extract, Transform, Load) pipeline:

   *Extract:* Collect raw WDI data via the World Bank API.

   *Transform:* Clean the data, handle missing values, and reshape it into a usable format.

   *Load:* Save the final datasets for further analysis.

The ETL process ensures reproducibility, consistency, and quality of our data, which is crucial when working with multi-country, multi-year indicators.


**Outputs:**
After we perform a basic Extract-Transform-Load (ETL) workflow, we produce a dataset containing:
   - One row per (country, year) pair, with indicators as columns
   - Suitable for statistical modeling or correlation analysis

**Requirements:**
    - Python 3.8+
    - pandas, requests, urllib3, dash, plotly

**Author:** Julian Roger Go, Charisse Nethercott, Gabriel Masangkay, John Carlo Gonzales, Edmar Dizon

**Date:** October 23, 2025

### Config: ASEAN WDI pull (countries, years, indicators)

Before extracting data, we define what we want and from where:

**Countries:** The 10 ASEAN nations, identified by ISO3 codes.

**Timeframe:** From 1990 to the most recent available year.

**Indicators:** Key metrics like energy use per capita and urban population percentage.

This configuration acts as a blueprint for the ETL process, making it easy to extend in the future (e.g., adding more countries or indicators).

In [2]:
import requests
import pandas as pd
import time
from requests.adapters import HTTPAdapter
from urllib3.util.retry import Retry

# ASEAN-10 ISO3 country codes
# (Source: ISO 3166-1 alpha-3 codes)
COUNTRIES = [
    "BRN",  # Brunei Darussalam
    "KHM",  # Cambodia
    "IDN",  # Indonesia
    "LAO",  # Lao PDR
    "MYS",  # Malaysia
    "MMR",  # Myanmar
    "PHL",  # Philippines
    "SGP",  # Singapore
    "THA",  # Thailand
    "VNM",  # Vietnam
]

START_YEAR = 1990  # Minimum year to include in the dataset
STEM = "asean_energy_urban_wdi"  # Output filename prefix

# Indicator codes from the World Bank Open Data API.
# Last two CO₂ indicators are modern EDGAR-based replacements for retired series.
INDICATORS = {
    "EG.ELC.RNEW.ZS": "renewable_electricity_pct",              # Renewable electricity (% of total)
    "EG.FEC.RNEW.ZS": "renewable_energy_consumption_pct",       # Renewable energy use (% of final energy)
    "EG.USE.PCAP.KG.OE": "energy_use_kg_oe_per_capita",         # Energy use (kg of oil equivalent per capita)
    "SP.POP.TOTL": "population_total",                          # Total population
    "NY.GDP.MKTP.CD": "gdp_current_usd",                        # GDP (current US$)
    "EN.GHG.CO2.PC.CE.AR5": "co2_per_capita_tco2e_excl_lulucf", # CO₂ per capita (tCO₂e, excl. LULUCF)
    "EN.GHG.CO2.MT.CE.AR5": "co2_total_mtco2e_excl_lulucf",     # Total CO₂ (MtCO₂e, excl. LULUCF)
    "SP.URB.TOTL.IN.ZS": "urban_pop_pct",                       # Urban population (% of total)
}

**What it sets up**

- `COUNTRIES`: ISO3 list for ASEAN-10. Used to loop API calls by country.
- `START_YEAR`: drop observations earlier than this. Keeps the final dataset focused and consistent across indicators.
- `STEM`: base filename prefix for saved outputs, for example `asean_energy_urban_wdi.parquet` or `asean_energy_urban_wdi.csv`.
- `INDICATORS`: mapping from World Bank indicator codes to readable column names for your final DataFrame.

**Notes on indicators**

- Economics, energy, and urbanization focus:
  - Renewable electricity share of total generation
  - Renewable share of final energy use
  - Energy use per capita (kg oil equivalent)
  - Gross Domestic Product (in current US$)  
  - Urban population percent
  - Total population
  - CO₂ per capita
  - Total CO₂
- Scale and units differ by series.
- CO2 series use the EDGAR-based AR5 codes that exclude LULUCF. These are more current replacements for retired legacy codes.

### Robust HTTP session for the World Bank API

Interacting with APIs can be unpredictable. Network errors or transient server issues may occur. To address this, we create a `requests.Session`:

*   Handles retries and timeouts gracefully.
*   Avoids dropping requests due to temporary failures.

This ensures our data collection is reliable and repeatable, which is especially important for long historical time series.

In [3]:
WB_TIMEOUT = 60        # seconds per request (default 30)
WB_PER_PAGE = 1000       # number of records per API page (smaller = fewer timeouts)
WB_MAX_RETRIES = 5       # total retry attempts
WB_BACKOFF = 1.5         # exponential backoff factor between retries

# Robust session with retry/backoff for transient network/API issues
_session = requests.Session()
_retry = Retry(
    total=WB_MAX_RETRIES,
    connect=WB_MAX_RETRIES,
    read=WB_MAX_RETRIES,
    backoff_factor=WB_BACKOFF,
    status_forcelist=[429, 500, 502, 503, 504],  # common transient server errors
    allowed_methods=["GET"],                      # only retry GET requests
    raise_on_status=False,
)
_adapter = HTTPAdapter(max_retries=_retry)
_session.mount("https://", _adapter)
_session.mount("http://", _adapter)

**Parameters**

- `WB_TIMEOUT`: per request timeout that is passed to `_session.get(..., timeout=WB_TIMEOUT)`. Prevents hangs on slow connections.
- `WB_PER_PAGE`: page size sent to the API (example: `per_page=1000`).
- `WB_MAX_RETRIES`: maximum number of retry attempts for eligible failures on connect and read.
- `WB_BACKOFF`: exponential backoff factor between retries. Sleep grows with each retry, which reduces the chance of hitting rate limits.

**Retry configuration**

- `Retry(..., status_forcelist=[429, 500, 502, 503, 504])`: only these HTTP statuses trigger retries.
- `allowed_methods=["GET"]`: retries are applied only to idempotent GET requests.
- `raise_on_status=False`: do not raise on HTTP status by default.

The effective backoff follows an exponential pattern scaled by `WB_BACKOFF`. Each retry waits longer than the previous one.

**Adapter mounting**

- `_session.mount("https://", _adapter)` and `.mount("http://", _adapter)` attach the retrying adapter to all HTTP and HTTPS requests made with `_session`.

### World Bank API Fetch Utilities

This snippet provides two helpers for pulling indicator data from the World Bank API with pagination, basic retry behavior, and safer JSON parsing.

`wb_get_all_pages(url, base_params)`
Fetches and flattens all pages from a World Bank API endpoint.

`fetch_indicator_series(country, indicator, start_year)`
Builds a clean list of yearly records for one indicator in one country.

In [4]:
def wb_get_all_pages(url, base_params):
    """
    Fetch all pages from the WB API with retry/backoff and safer parsing.
    Returns list of rows or {"error": "..."} on unrecoverable issues.
    """
    rows_all, page = [], 1

    while True:
        params = dict(base_params, page=page)
        try:
            # Use robust session + longer timeout
            r = _session.get(url, params=params, timeout=WB_TIMEOUT)
            r.raise_for_status()
            data = r.json()
        except requests.exceptions.ReadTimeout as e:
            return {"error": f"Read timeout after {WB_TIMEOUT}s: {e}"}
        except requests.exceptions.RequestException as e:
            return {"error": f"HTTP error: {e}"}
        except ValueError as e:  # JSON decode error
            return {"error": f"Invalid JSON: {e}"}

        # API-level error payloads (e.g., retired indicator)
        if isinstance(data, dict) and "message" in data:
            messages = "; ".join(m.get("value", str(m)) for m in data.get("message", []))
            return {"error": messages}

        # Expected shape: [pager, [rows...]]
        if not (isinstance(data, list) and len(data) >= 2 and isinstance(data[1], list)):
            return {"error": f"Unexpected JSON: {str(data)[:200]}"}

        pager, rows = data[0], data[1]
        rows_all.extend(rows)

        total_pages = pager.get("pages") or 1
        if page >= total_pages:
            break
        page += 1

        # Be polite; tiny delay helps avoid rate limits / throttling
        time.sleep(0.2)

    return rows_all

**Parameters**

- `url` (`str`): Base API endpoint. Example: `https://api.worldbank.org/v2/country/PHL/indicator/SP.POP.TOTL`
- `base_params` (`dict`): Query parameters shared by all pages.
   Typical values: `{"format": "json", "per_page": WB_PER_PAGE}`.
   The function adds `page=<n>` during pagination.

**Behavior**

1. Iterates over pages by incrementing `page=1, 2, 3, ...`.
2. Uses the shared `_session` with a request timeout `WB_TIMEOUT`.
3. Parses JSON and validates the expected World Bank response shape: `[pager, rows]`.
4. Accumulates all `rows` into a single list.
5. Sleeps briefly between pages to reduce throttling risk.

**Returns**

- `list`: A concatenated list of row objects from all pages on success.
- `dict`: `{"error": "<message>"}` when an unrecoverable problem occurs, such as:
  - Read timeout after all retries
  - HTTP errors that cannot be retried
  - Invalid JSON or an unexpected payload shape
  - API-level message payloads that indicate a problem

**Error signaling and examples**

- Read timeout: `{"error": "Read timeout after 20s: ..."}`
- HTTP error: `{"error": "HTTP error: 502 Server Error ..."}`
- Invalid JSON: `{"error": "Invalid JSON: ..."}`
- API message payload: `{"error": "<joined WB 'message' values>"}`

In [5]:
def fetch_indicator_series(country, indicator, start_year):
    """
    Fetch a single indicator for a single country from the World Bank API.

    Args:
        country (str): ISO3 country code (e.g., 'PHL')
        indicator (str): Indicator code (e.g., 'SP.POP.TOTL')
        start_year (int): Filter out years earlier than this

    Returns:
        list of dictionaries with keys:
            ['country', 'year', 'indicator', 'indicator_name', 'value']
    """
    base = f"https://api.worldbank.org/v2/country/{country}/indicator/{indicator}"
    rows_or_err = wb_get_all_pages(base, {"format": "json", "per_page": WB_PER_PAGE})

    if isinstance(rows_or_err, dict) and "error" in rows_or_err:
        print(f"[skip] {country} | {indicator}: {rows_or_err['error']}")
        return []

    out = []
    for item in rows_or_err:
        # Parse year; skip invalid entries
        try:
            year = int(item.get("date"))
        except (TypeError, ValueError):
            continue

        # Filter early years
        if year < start_year:
            continue

        # Build a clean record
        out.append({
            "country": country,
            "year": year,
            "indicator": item.get("indicator", {}).get("id", indicator),
            "indicator_name": item.get("indicator", {}).get("value", indicator),
            "value": item.get("value"),
        })

    return out

**Parameters**

- `country` (`str`): ISO3 country code. Example: `"PHL"`, `"IDN"`.
- `indicator` (`str`): Indicator code. Example: `"SP.POP.TOTL"`, `"NY.GDP.MKTP.CD"`.
- `start_year` (`int`): Minimum year to keep. Rows with `year < start_year` are dropped.

**Behavior**

1. Builds the endpoint
    `https://api.worldbank.org/v2/country/{country}/indicator/{indicator}`
2. Calls `wb_get_all_pages` with `{"format": "json", "per_page": WB_PER_PAGE}`.
3. If `wb_get_all_pages` returns an error dict, prints a `[skip]` message and returns an empty list.
4. Iterates over rows, parses `date` to an integer `year`, and drops rows with invalid or too-early years.
5. Constructs clean records with a stable shape.

**Returns**

A `list[dict]`, each record with keys:

- `country` (`str`): Same as input country code
- `year` (`int`): Parsed year
- `indicator` (`str`): Indicator code. Falls back to the requested code if missing in the payload
- `indicator_name` (`str`): Indicator descriptive name. Falls back to the requested code if missing
- `value` (`float | int | None`): Reported value for that year. May be `None` if the World Bank has no value

### Time-Series Imputation and Missing Data Analysis

To improve data completeness and reliability, two helper functions handle remaining gaps after cleaning:

- `impute_time_series_data(df)` fills missing `value`s within each country–indicator series using forward fill and linear interpolation, creating smoother and more continuous time trends.
- `analyze_missing_data(wide_df)` audits the final dataset, counting missing indicators per country-year and summarizing remaining NaNs by column.

Together, they ensure the dataset is consistent, well-imputed, and ready for analysis.

In [6]:
def impute_time_series_data(df):
    """Performs time-series imputation on the 'value' column."""
    print("Applying time-series imputation (Forward Fill + Linear Interpolation)...")

    df = df.sort_values(["country", "indicator", "year"]).reset_index(drop=True)

    df["value"] = (
        df.groupby(["country", "indicator"])["value"]
        .transform(lambda x: x.interpolate(method="linear").ffill().bfill())
    )

    initial_na_count = df["value"].isna().sum()
    if initial_na_count > 0:
        print(f"Warning: {initial_na_count} NaN values remain (likely due to missing data at the start of a series).")

    return df

**Parameters**

- `df` (`pd.DataFrame`): Long-format data with at least `country`, `indicator`, `year`, and `value`.

**Returns**

- `pd.DataFrame`: Same columns as input, with `value` imputed where possible.

**Behavior & assumptions**

- Sorts by `country`, `indicator`, `year` to ensure chronological interpolation.
- For each `country`–`indicator` series:
  - `ffill()` fills gaps using prior known values (good for short runs of missing data).
  - `interpolate(method="linear")` fills remaining internal gaps between known points.
- Leading NaNs (at the start of a series) cannot be forward-filled/interpolated and are reported.

**When to use**

- After coercing `value` to numeric and before pivoting to wide format.
- As part of the “Handling Missing Data” requirement in the cleaning workflow.

**Notes**

- Interpolation assumes roughly linear change between adjacent years.

In [7]:
def analyze_missing_data(wide_df):
    """
    Performs the analysis of remaining missing values and prints a report.
    This function is now called within main() to ensure wide_df is defined.
    """
    print("\n" + "=" * 50)
    print("ANALYSIS: Missing Indicator Count Report")
    print("=" * 50)

    # Count NaNs per row
    wide_df['missing_indicator_count'] = wide_df.isna().sum(axis=1)

    # Filter for only the rows that actually have missing data
    rows_with_nans = wide_df[wide_df['missing_indicator_count'] > 0].copy()

    if rows_with_nans.empty:
        print("No missing indicator values remain in the wide dataset after imputation.")
    else:
        # Sort to see which country-years have the highest count of NaNs
        report_df = rows_with_nans.sort_values(
            by='missing_indicator_count', ascending=False
        ).head(10)

        print(f"Total rows (country-years) with one or more NaNs: {len(rows_with_nans):,}")
        print("\nTop 10 rows with the highest number of missing indicators:")

        # Show only the identifying columns and the new NaN count
        print(report_df[['country', 'year', 'missing_indicator_count']].to_string(index=False))

    print("\n--- Summary of Remaining Gaps ---")
    # Show the total count of NaNs remaining in each column (indicator)
    print("Total remaining NaNs per indicator column:")
    # We drop 'missing_indicator_count' since we just added it and it will be zero
    print(wide_df.drop(columns=['missing_indicator_count'], errors='ignore').isna().sum().rename('Total NaNs').to_string())
    print("=" * 50)

    return wide_df # Return the modified DataFrame

**Parameters**

- `wide_df` (`pd.DataFrame`): Wide-format table with columns `country`, `year`, and one column per indicator.

**Returns**

- `pd.DataFrame`: The same frame with an added helper column `missing_indicator_count` (you can drop it later).

**What it reports**

- Number of missing indicator values per row (`country`–`year`).
- Top 10 `country`–`year` combinations with the most missing indicators.
- Total NaNs per indicator column (useful to decide where to focus imputation or filtering).

**When to use**

- After you’ve created the wide dataset (post-pivot) and optionally run imputation.
- As evidence for your **Handling Missing Data** discussion: it quantifies what remains.

### End-to-end ETL + Imputation + Missingness Audit (ASEAN WDI)

This block runs the full pipeline: extract World Bank indicators for ASEAN, clean and type-coerce, impute gaps in long format, pivot to wide, analyze remaining missingness, apply a final reliability filter, and save outputs.

In [8]:
def main():
    """
    Executes the full Extract-Transform-Load (ETL) process.
    """
    all_rows = []

    # --- EXTRACT ---
    print("Fetching World Bank data for ASEAN countries...\n")
    for c in COUNTRIES:
        for code in INDICATORS:
            all_rows.extend(fetch_indicator_series(c, code, START_YEAR))

    # --- TRANSFORM ---
    long_df = pd.DataFrame(all_rows)
    if long_df.empty:
        raise SystemExit("No data retrieved. Check your internet connection or indicator list.")

    # Data cleaning and type coercion
    long_df["year"] = pd.to_numeric(long_df["year"], errors="coerce").astype("Int64")
    long_df["value"] = pd.to_numeric(long_df["value"], errors="coerce")
    long_df["country"] = long_df["country"].astype(str).str.strip().str.upper()
    long_df["indicator"] = long_df["indicator"].astype(str).str.strip()
    long_df["indicator_name"] = long_df["indicator_name"].astype(str).str.strip()
    long_df = long_df.dropna(subset=["year"]).copy()
    long_df["year"] = long_df["year"].astype(int)
    long_df = long_df.drop_duplicates(subset=["country", "year", "indicator"])

    # Impute missing values within time series
    long_df = impute_time_series_data(long_df.copy())

    # Map short indicator names and sort
    long_df["short_name"] = long_df["indicator"].map(INDICATORS)
    long_df = long_df.sort_values(["country", "year", "indicator"]).reset_index(drop=True)

    # Pivot to wide format (one row per country-year)
    wide_df = (
        long_df
        .pivot(index=["country", "year"], columns="short_name", values="value")
        .reset_index()
        .sort_values(["country", "year"])
        .reset_index(drop=True)
    )

    # --- OUTPUT & ANALYSIS ---
    print(
        f"\nRows (wide): {len(wide_df):,}  | "
        f"Year range: {wide_df['year'].min()}-{wide_df['year'].max()}  | "
        f"Countries: {wide_df['country'].nunique()}"
    )

    # Analyze remaining missing values (adds 'missing_indicator_count')
    wide_df = analyze_missing_data(wide_df)

    # --- LOAD ---
    # Save imputed long data and cleaned wide data
    wide_df.to_csv(f"../data/{STEM}.csv", index=False)

    # Final preview
    print("\nTop 10 rows (wide) after cleanup and analysis:")
    print(wide_df.head(10).to_string(index=False))

if __name__ == "__main__":
    main()


Fetching World Bank data for ASEAN countries...

[skip] SGP | SP.URB.TOTL.IN.ZS: HTTP error: 429 Client Error: Too Many Requests for url: https://api.worldbank.org/v2/country/SGP/indicator/SP.URB.TOTL.IN.ZS?format=json&per_page=1000&page=1
Applying time-series imputation (Forward Fill + Linear Interpolation)...

Rows (wide): 350  | Year range: 1990-2024  | Countries: 10

ANALYSIS: Missing Indicator Count Report
Total rows (country-years) with one or more NaNs: 35

Top 10 rows with the highest number of missing indicators:
country  year  missing_indicator_count
    SGP  1990                        1
    SGP  2016                        1
    SGP  2010                        1
    SGP  2011                        1
    SGP  2012                        1
    SGP  2013                        1
    SGP  2014                        1
    SGP  2015                        1
    SGP  2017                        1
    SGP  2008                        1

--- Summary of Remaining Gaps ---
Total re

**What this pipeline does**

**1) Extract**

- Iterates `COUNTRIES × INDICATORS`, calling `fetch_indicator_series(...)`.
- Collects all records into `all_rows`.

**2) Transform (clean + coerce)**

- Builds `long_df` and applies core cleaning:
  - **Type conversion:** `year`/`value` → numeric; `year` to `Int64` then `int`.
  - **String standardization:** trim/uppercase country; trim indicator fields.
  - **Handle missing:** drop rows with missing `year` after coercion.
  - **Deduplication:** remove duplicate keys (`country`, `year`, `indicator`).
- **Imputation:** `impute_time_series_data(long_df)` forward-fills and linearly interpolates `value` within each `country×indicator` series.
- **Restructuring:** map readable `short_name`, sort, then **pivot to wide**: one row per (`country`,`year`) and one column per indicator.

**3) Output & analysis (audit missingness)**

- Prints summary (rows, year range, country count).
- Runs `analyze_missing_data(wide_df)` to add `missing_indicator_count` and print a missingness report.

**4) Load (persist results)**

- Saves:
  - `"{STEM}_long.csv"` - long, **imputed** time series.
  - `"{STEM}_wide.csv"` - wide, **cleaned** (post-threshold) table.

### Final Result from pre-processing

- Sets global pandas display options so prints won’t truncate.
- Loads the cleaned wide file `{STEM}.csv`.
- Prints **all rows and columns** via `.to_string()`.

