# Task 1 — Data Exploration and Enrichment (All-in-one)

This notebook does **Task 1** end-to-end:

1. Creates/validates the expected folder structure (non-destructive)
2. Loads the starter datasets:
   - `data/raw/ethiopia_fi_unified_data.csv`
   - `data/raw/reference_codes.csv`
3. Explores the unified schema and summarizes:
   - Counts by `record_type`, `pillar`, `source_type`, `confidence`
   - Temporal coverage
   - Indicator coverage
   - Events and impact links
4. Adds **example enrichment rows** (observations/events/impact_links) using your **exact schema**
   - Enrichment rows are **skipped automatically** if the `record_id` already exists
5. Saves:
   - `data/processed/ethiopia_fi_unified_data_enriched.csv`
6. Writes:
   - `reports/data_enrichment_log.md`

> **Edit required:** In the enrichment cell, set `COLLECTED_BY` to your name, and update the example enrichment values/sources if you want them to be real (otherwise keep confidence `low`).

In [19]:
import pandas as pd
import numpy as np
from pathlib import Path
from datetime import date

pd.set_option("display.max_columns", 200)
pd.set_option("display.width", 140)


In [20]:
# 1) Create / validate folder structure (safe: won't overwrite existing files)
folders = [
    ".github/workflows",
    "data/raw",
    "data/processed",
    "notebooks",
    "src",
    "dashboard",
    "tests",
    "models",
    "reports/figures",
]

for f in folders:
    Path(f).mkdir(parents=True, exist_ok=True)

print("✅ Folder structure ready")


✅ Folder structure ready


In [21]:
# 2) Load datasets (required)
raw_path = Path("../data/raw")
df = pd.read_csv(raw_path / "ethiopia_fi_unified_data.csv")
ref = pd.read_csv(raw_path / "reference_codes.csv")

print("Unified data shape:", df.shape)
print("Reference codes shape:", ref.shape)
df.head()


Unified data shape: (43, 34)
Reference codes shape: (71, 4)


Unnamed: 0,record_id,record_type,category,pillar,indicator,indicator_code,indicator_direction,value_numeric,value_text,value_type,unit,observation_date,period_start,period_end,fiscal_year,gender,location,region,source_name,source_type,source_url,confidence,related_indicator,relationship_type,impact_direction,impact_magnitude,impact_estimate,lag_months,evidence_basis,comparable_country,collected_by,collection_date,original_text,notes
0,REC_0001,observation,,ACCESS,Account Ownership Rate,ACC_OWNERSHIP,higher_better,22.0,,percentage,%,2014-12-31,,,2014,all,national,,Global Findex 2014,survey,https://www.worldbank.org/en/publication/globa...,high,,,,,,,,Example_Trainee,2025-01-20,,Baseline year,
1,REC_0002,observation,,ACCESS,Account Ownership Rate,ACC_OWNERSHIP,higher_better,35.0,,percentage,%,2017-12-31,,,2017,all,national,,Global Findex 2017,survey,https://www.worldbank.org/en/publication/globa...,high,,,,,,,,Example_Trainee,2025-01-20,,,
2,REC_0003,observation,,ACCESS,Account Ownership Rate,ACC_OWNERSHIP,higher_better,46.0,,percentage,%,2021-12-31,,,2021,all,national,,Global Findex 2021,survey,https://www.worldbank.org/en/publication/globa...,high,,,,,,,,Example_Trainee,2025-01-20,,,
3,REC_0004,observation,,ACCESS,Account Ownership Rate,ACC_OWNERSHIP,higher_better,56.0,,percentage,%,2021-12-31,,,2021,male,national,,Global Findex 2021,survey,https://www.worldbank.org/en/publication/globa...,high,,,,,,,,Example_Trainee,2025-01-20,,Gender disaggregated,
4,REC_0005,observation,,ACCESS,Account Ownership Rate,ACC_OWNERSHIP,higher_better,36.0,,percentage,%,2021-12-31,,,2021,female,national,,Global Findex 2021,survey,https://www.worldbank.org/en/publication/globa...,high,,,,,,,,Example_Trainee,2025-01-20,,Gender disaggregated,


In [22]:
# 3) Schema quick check
df.columns.tolist()


['record_id',
 'record_type',
 'category',
 'pillar',
 'indicator',
 'indicator_code',
 'indicator_direction',
 'value_numeric',
 'value_text',
 'value_type',
 'unit',
 'observation_date',
 'period_start',
 'period_end',
 'fiscal_year',
 'gender',
 'location',
 'region',
 'source_name',
 'source_type',
 'source_url',
 'confidence',
 'related_indicator',
 'relationship_type',
 'impact_direction',
 'impact_magnitude',
 'impact_estimate',
 'lag_months',
 'evidence_basis',
 'comparable_country',
 'collected_by',
 'collection_date',
 'original_text',
 'notes']

In [23]:
df.dtypes


record_id                  str
record_type                str
category                   str
pillar                     str
indicator                  str
indicator_code             str
indicator_direction        str
value_numeric          float64
value_text                 str
value_type                 str
unit                       str
observation_date           str
period_start               str
period_end                 str
fiscal_year                str
gender                     str
location                   str
region                 float64
source_name                str
source_type                str
source_url                 str
confidence                 str
related_indicator      float64
relationship_type      float64
impact_direction       float64
impact_magnitude       float64
impact_estimate        float64
lag_months             float64
evidence_basis         float64
comparable_country         str
collected_by               str
collection_date            str
original

In [24]:
# Record counts
df["record_type"].value_counts(dropna=False)


record_type
observation    30
event          10
target          3
Name: count, dtype: int64

In [25]:
# Counts by record_type x pillar
pd.crosstab(df["record_type"], df["pillar"], dropna=False)


pillar,ACCESS,AFFORDABILITY,GENDER,USAGE,NaN
record_type,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1
event,0,0,0,0,10
observation,14,1,4,11,0
target,2,0,1,0,0


In [26]:
# Source types and confidence
display(df["source_type"].value_counts(dropna=False))
display(df["confidence"].value_counts(dropna=False))


source_type
operator      15
survey        10
regulator      7
research       4
policy         3
calculated     2
news           2
Name: count, dtype: int64

confidence
high      40
medium     3
Name: count, dtype: int64

In [27]:
# 4) Reference codes — inspect valid categorical values
# (If your reference_codes.csv uses different column names, adjust here)
ref.head()


Unnamed: 0,field,code,description,applies_to
0,record_type,observation,Actual measured value from a source,All
1,record_type,event,Policy launch market event or milestone,All
2,record_type,impact_link,Relationship between event and indicator (link...,All
3,record_type,target,Policy target or official goal,All
4,record_type,baseline,Starting point for comparison,All


In [28]:
# Try to show valid codes for key fields (works if columns exist: field_name/code/description)
if set(["field_name", "code"]).issubset(ref.columns):
    for field in ["record_type", "pillar", "category", "confidence", "impact_direction", "impact_magnitude", "relationship_type"]:
        subset = ref[ref["field_name"] == field]
        print("\nFIELD:", field)
        if subset.empty:
            print("  (no entries)")
        else:
            cols_show = [c for c in ["field_name","code","description"] if c in subset.columns]
            display(subset[cols_show].drop_duplicates().sort_values("code"))
else:
    print("reference_codes.csv does not have expected columns (field_name/code). Inspect ref.columns and adapt.")
    print("Columns:", ref.columns.tolist())


reference_codes.csv does not have expected columns (field_name/code). Inspect ref.columns and adapt.
Columns: ['field', 'code', 'description', 'applies_to']


In [29]:
# 5) Observations — temporal coverage + indicator coverage
obs = df[df["record_type"] == "observation"].copy()
obs["observation_date"] = pd.to_datetime(obs["observation_date"], errors="coerce")

print("Observation date range:", obs["observation_date"].min(), "to", obs["observation_date"].max())
print("Unique indicators:", obs["indicator_code"].nunique())

display(obs["indicator_code"].value_counts().head(30))

indicator_coverage = (
    obs.groupby("indicator_code")["observation_date"]
    .agg(["min","max","count"])
    .reset_index()
    .sort_values("count", ascending=False)
)
display(indicator_coverage.head(60))


Observation date range: 2014-12-31 00:00:00 to 2025-12-31 00:00:00
Unique indicators: 19


indicator_code
ACC_OWNERSHIP         6
ACC_FAYDA             3
ACC_MM_ACCOUNT        2
ACC_4G_COV            2
USG_P2P_COUNT         2
GEN_GAP_ACC           2
ACC_MOBILE_PEN        1
USG_P2P_VALUE         1
USG_ATM_COUNT         1
USG_ATM_VALUE         1
USG_CROSSOVER         1
USG_TELEBIRR_USERS    1
USG_TELEBIRR_VALUE    1
USG_MPESA_USERS       1
USG_MPESA_ACTIVE      1
USG_ACTIVE_RATE       1
AFF_DATA_INCOME       1
GEN_MM_SHARE          1
GEN_GAP_MOBILE        1
Name: count, dtype: int64

Unnamed: 0,indicator_code,min,max,count
4,ACC_OWNERSHIP,2014-12-31,2024-11-29,6
1,ACC_FAYDA,2024-08-15,2025-05-15,3
0,ACC_4G_COV,2023-06-30,2025-06-30,2
2,ACC_MM_ACCOUNT,2021-12-31,2024-11-29,2
6,GEN_GAP_ACC,2021-12-31,2024-11-29,2
15,USG_P2P_COUNT,2024-07-07,2025-07-07,2
3,ACC_MOBILE_PEN,2025-12-31,2025-12-31,1
7,GEN_GAP_MOBILE,2024-12-31,2024-12-31,1
8,GEN_MM_SHARE,2024-12-31,2024-12-31,1
9,USG_ACTIVE_RATE,2024-12-31,2024-12-31,1


In [30]:
# 6) Events — catalog and dates
events = df[df["record_type"] == "event"].copy()
events["observation_date"] = pd.to_datetime(events["observation_date"], errors="coerce")

event_cols = [c for c in ["record_id","category","indicator","indicator_code","value_text","observation_date","period_start","period_end","source_name","source_url","confidence"] if c in events.columns]
display(events[event_cols].sort_values("observation_date").reset_index(drop=True))


Unnamed: 0,record_id,category,indicator,indicator_code,value_text,observation_date,period_start,period_end,source_name,source_url,confidence
0,EVT_0001,product_launch,Telebirr Launch,EVT_TELEBIRR,Launched,2021-05-17,,,Ethio Telecom,https://www.ethiotelecom.et/,high
1,EVT_0009,policy,NFIS-II Strategy Launch,EVT_NFIS2,Launched,2021-09-01,2021-09-01,2025-06-30,NBE,https://nbe.gov.et/,high
2,EVT_0002,market_entry,Safaricom Ethiopia Commercial Launch,EVT_SAFARICOM,Launched,2022-08-01,,,News,,high
3,EVT_0003,product_launch,M-Pesa Ethiopia Launch,EVT_MPESA,Launched,2023-08-01,,,Safaricom,,high
4,EVT_0004,infrastructure,Fayda Digital ID Program Rollout,EVT_FAYDA,Launched,2024-01-01,,,NIDP,https://www.id.gov.et/,high
5,EVT_0005,policy,Foreign Exchange Liberalization,EVT_FX_REFORM,Implemented,2024-07-29,,,NBE,,high
6,EVT_0006,milestone,P2P Transaction Count Surpasses ATM,EVT_CROSSOVER,Achieved,2024-10-01,,,EthSwitch,https://ethswitch.com/,high
7,EVT_0007,partnership,M-Pesa EthSwitch Integration,EVT_MPESA_INTEROP,Launched,2025-10-27,,,EthSwitch,,high
8,EVT_0010,pricing,Safaricom Ethiopia Price Increase,EVT_SAFCOM_PRICE,Implemented,2025-12-15,,,News,,high
9,EVT_0008,infrastructure,EthioPay Instant Payment System Launch,EVT_ETHIOPAY,Launched,2025-12-18,,,NBE/EthSwitch,,high


In [31]:
# 7) Impact links — relationships between events and indicators
links = df[df["record_type"] == "impact_link"].copy()
link_cols = [c for c in ["record_id","parent_id","pillar","related_indicator","relationship_type","impact_direction","impact_magnitude","impact_estimate","lag_months","evidence_basis","source_url","confidence"] if c in links.columns]
display(links[link_cols].head(50))


Unnamed: 0,record_id,pillar,related_indicator,relationship_type,impact_direction,impact_magnitude,impact_estimate,lag_months,evidence_basis,source_url,confidence


In [32]:
# Validate that every impact_link.parent_id exists as an event record_id
event_ids = set(events["record_id"].astype(str)) if "record_id" in events.columns else set()

if "parent_id" in links.columns:
    missing = links[~links["parent_id"].astype(str).isin(event_ids)]
    print("Impact links with missing parent event:", missing.shape[0])
    if missing.shape[0] > 0:
        display(missing[link_cols].head(20))
else:
    print("No parent_id column found in impact_link records.")


No parent_id column found in impact_link records.


## Why events should not have `pillar`
Events are intentionally **pillar-agnostic** to avoid bias.  
Their effects on **ACCESS** or **USAGE** are captured through **impact_link** records, using:
- `parent_id` → the event record_id
- `pillar` → which pillar is affected (ACCESS/USAGE/etc.)
- `related_indicator` → the indicator being impacted
- `impact_direction`, `impact_magnitude`, `lag_months`, `evidence_basis`

In [33]:
# 8) Enrichment — add example rows using YOUR EXACT schema
# IMPORTANT: These are templates. Replace values with verified figures if you want 'high' confidence.

COLLECTED_BY = "Your Name"  # <-- change this
COLLECTION_DATE = str(date.today())

cols = df.columns.tolist()

def make_row(**kwargs):
    row = {c: np.nan for c in cols}
    for k, v in kwargs.items():
        if k in row:
            row[k] = v
    return row

new_rows = []

# --- Example Observation (template): Smartphone penetration proxy (fill with a real value + source) ---
new_rows.append(make_row(
    record_id="NEW_OBS_SMARTPHONE_PEN_2025",
    record_type="observation",
    category="ACCESS",
    pillar="ACCESS",
    indicator="Smartphone Penetration",
    indicator_code="ACC_SMARTPHONE_PEN",
    indicator_direction="higher_better",
    value_numeric=np.nan,          # TODO: fill with verified number
    value_text="TBD",
    value_type="percentage",
    unit="%",
    observation_date="2025-12-31",
    fiscal_year="2025",
    gender="all",
    location="national",
    source_name="TBD",
    source_type="research",
    source_url="",
    confidence="low",
    collected_by=COLLECTED_BY,
    collection_date=COLLECTION_DATE,
    original_text="",
    notes="Template: smartphone access is a strong enabler for digital payments adoption."
))

# --- Example Event (template): Regulatory change (fill with verified details) ---
new_rows.append(make_row(
    record_id="NEW_EVT_POLICY_EXAMPLE_2025",
    record_type="event",
    category="policy",
    pillar=np.nan,                 # keep empty for events
    indicator="Example Policy Change",
    indicator_code="EVT_POLICY_EXAMPLE",
    value_text="Implemented",
    value_type="categorical",
    observation_date="2025-06-01",
    fiscal_year="2025",
    gender="all",
    location="national",
    source_name="TBD",
    source_type="regulator",
    source_url="",
    confidence="low",
    collected_by=COLLECTED_BY,
    collection_date=COLLECTION_DATE,
    original_text="",
    notes="Template event: keep pillar empty; connect impacts via impact_link."
))

# --- Example Impact Link: Event -> Usage indicator (template) ---
new_rows.append(make_row(
    record_id="NEW_LINK_POLICY_TO_USAGE_2025",
    record_type="impact_link",
    parent_id="NEW_EVT_POLICY_EXAMPLE_2025",
    pillar="USAGE",
    related_indicator="USG_P2P_COUNT",   # Example existing indicator in your data
    relationship_type="drives",
    impact_direction="positive",
    impact_magnitude="medium",
    impact_estimate=np.nan,
    lag_months=3,
    evidence_basis="Template: policy can enable interoperability/acceptance, increasing usage over quarters.",
    source_url="",
    confidence="low",
    collected_by=COLLECTED_BY,
    collection_date=COLLECTION_DATE,
    original_text="",
    notes="Template link: adjust related_indicator to match the actual indicator you believe is impacted."
))

new_rows_df = pd.DataFrame(new_rows, columns=cols)

# Skip any enrichment rows whose record_id already exists
existing_ids = set(df["record_id"].astype(str)) if "record_id" in df.columns else set()
new_rows_df = new_rows_df[~new_rows_df["record_id"].astype(str).isin(existing_ids)].copy()

print("New enrichment rows to add:", new_rows_df.shape[0])
display(new_rows_df)


New enrichment rows to add: 3


Unnamed: 0,record_id,record_type,category,pillar,indicator,indicator_code,indicator_direction,value_numeric,value_text,value_type,unit,observation_date,period_start,period_end,fiscal_year,gender,location,region,source_name,source_type,source_url,confidence,related_indicator,relationship_type,impact_direction,impact_magnitude,impact_estimate,lag_months,evidence_basis,comparable_country,collected_by,collection_date,original_text,notes
0,NEW_OBS_SMARTPHONE_PEN_2025,observation,ACCESS,ACCESS,Smartphone Penetration,ACC_SMARTPHONE_PEN,higher_better,,TBD,percentage,%,2025-12-31,,,2025.0,all,national,,TBD,research,,low,,,,,,,,,Your Name,2026-02-03,,Template: smartphone access is a strong enable...
1,NEW_EVT_POLICY_EXAMPLE_2025,event,policy,,Example Policy Change,EVT_POLICY_EXAMPLE,,,Implemented,categorical,,2025-06-01,,,2025.0,all,national,,TBD,regulator,,low,,,,,,,,,Your Name,2026-02-03,,Template event: keep pillar empty; connect imp...
2,NEW_LINK_POLICY_TO_USAGE_2025,impact_link,,USAGE,,,,,,,,,,,,,,,,,,low,USG_P2P_COUNT,drives,positive,medium,,3.0,Template: policy can enable interoperability/a...,,Your Name,2026-02-03,,Template link: adjust related_indicator to mat...


In [34]:
# 9) Append + save enriched dataset
df_enriched = pd.concat([df, new_rows_df], ignore_index=True)

processed_path = Path("data/processed")
processed_path.mkdir(parents=True, exist_ok=True)

out_file = processed_path / "ethiopia_fi_unified_data_enriched.csv"
df_enriched.to_csv(out_file, index=False)

print("✅ Saved:", out_file)
print("Final shape:", df_enriched.shape)


✅ Saved: data\processed\ethiopia_fi_unified_data_enriched.csv
Final shape: (46, 34)


In [38]:
# 10) Write reports/data_enrichment_log.md (required)
reports_path = Path("reports")
reports_path.mkdir(parents=True, exist_ok=True)
log_path = reports_path / "data_enrichment_log.md"

def md_escape(s):
    if s is None or (isinstance(s, float) and np.isnan(s)):
        return ""
    return str(s).replace("\n", " ").strip()

lines = []
lines.append("# Data Enrichment Log – Task 1\n")
lines.append(f"- Collected by: {COLLECTED_BY}\n")
lines.append(f"- Collection date: {COLLECTION_DATE}\n\n")

lines.append("## Dataset Exploration Summary\n")
lines.append("- Unified schema confirmed (interpretation depends on `record_type`).\n")
lines.append("- Events are pillar-agnostic by design; impacts are defined through `impact_link` records.\n")
lines.append("- Impact links connect events to indicators using `parent_id`.\n\n")

lines.append("## Added / Proposed Records\n")
if new_rows_df.empty:
    lines.append("_No new records were added (all template record_ids already exist or were skipped)._\n")
else:
    for _, r in new_rows_df.iterrows():
        lines.append(f"### {md_escape(r.get('record_id'))}\n")
        lines.append(f"- record_type: {md_escape(r.get('record_type'))}\n")
        if md_escape(r.get('record_type')) == "event":
            lines.append(f"- category: {md_escape(r.get('category'))}\n")
            lines.append("- pillar: (blank by design)\n")
        else:
            lines.append(f"- pillar: {md_escape(r.get('pillar'))}\n")
            lines.append(f"- indicator_code: {md_escape(r.get('indicator_code'))}\n")
            lines.append(f"- related_indicator: {md_escape(r.get('related_indicator'))}\n")
        lines.append(f"- observation_date: {md_escape(r.get('observation_date'))}\n")
        lines.append(f"- source_name: {md_escape(r.get('source_name'))}\n")
        lines.append(f"- source_url: {md_escape(r.get('source_url'))}\n")
        lines.append(f"- original_text: {md_escape(r.get('original_text'))}\n")
        lines.append(f"- confidence: {md_escape(r.get('confidence'))}\n")
        lines.append(f"- notes: {md_escape(r.get('notes'))}\n\n")

with open(log_path, "w", encoding="utf-8") as f:
    f.write("".join(lines))

print("✅ Wrote:", log_path)


✅ Wrote: reports\data_enrichment_log.md
