# ETL Pipeline (Processed Graph)

Builds a richer, connected graph from COVID + Finance + World Bank + Wikipedia samples.

In [1]:
from pathlib import Path
import pandas as pd
import numpy as np
import zipfile
import re
import bz2
import io
import urllib.request

root = Path.cwd()
if root.name == 'notebooks':
    root = root.parent

raw = root / 'data' / 'raw'
processed = root / 'data' / 'processed'
processed.mkdir(parents=True, exist_ok=True)

(raw / 'world_bank').mkdir(parents=True, exist_ok=True)
(raw / 'finance').mkdir(parents=True, exist_ok=True)

print('Raw:', raw)
print('Processed:', processed)


Raw: C:\Users\SANTANU\Downloads\migraph\data\raw
Processed: C:\Users\SANTANU\Downloads\migraph\data\processed


## Download World Bank indicators (multi?country)

In [2]:
wb_specs = [
    ('world_bank_gdp_multi.zip', 'NY.GDP.MKTP.CD'),
    ('world_bank_gdp_pc_multi.zip', 'NY.GDP.PCAP.CD'),
    ('world_bank_pop_multi.zip', 'SP.POP.TOTL'),
    ('world_bank_inflation_multi.zip', 'FP.CPI.TOTL.ZG'),
    ('world_bank_unemployment_multi.zip', 'SL.UEM.TOTL.ZS'),
]
countries = 'USA;IND;BRA;CHN;DEU;JPN;GBR;FRA;CAN;AUS'
for filename, indicator in wb_specs:
    url = f'https://api.worldbank.org/v2/country/{countries}/indicator/{indicator}?downloadformat=csv'
    out = raw / 'world_bank' / filename
    if not out.exists():
        print('Downloading:', url)
        urllib.request.urlretrieve(url, out)
    else:
        print('Already exists:', out)


Already exists: C:\Users\SANTANU\Downloads\migraph\data\raw\world_bank\world_bank_gdp_multi.zip
Already exists: C:\Users\SANTANU\Downloads\migraph\data\raw\world_bank\world_bank_gdp_pc_multi.zip
Already exists: C:\Users\SANTANU\Downloads\migraph\data\raw\world_bank\world_bank_pop_multi.zip
Already exists: C:\Users\SANTANU\Downloads\migraph\data\raw\world_bank\world_bank_inflation_multi.zip
Already exists: C:\Users\SANTANU\Downloads\migraph\data\raw\world_bank\world_bank_unemployment_multi.zip


## Download Finance tickers (Stooq)

In [3]:
tickers = ['aapl.us', 'msft.us', 'amzn.us', 'goog.us', 'tsla.us', 'meta.us']
for t in tickers:
    url = f'https://stooq.com/q/d/l/?s={t}&i=d'
    out = raw / 'finance' / f'{t}.csv'
    if not out.exists():
        print('Downloading:', url)
        urllib.request.urlretrieve(url, out)
    else:
        print('Already exists:', out)


Already exists: C:\Users\SANTANU\Downloads\migraph\data\raw\finance\aapl.us.csv
Already exists: C:\Users\SANTANU\Downloads\migraph\data\raw\finance\msft.us.csv
Already exists: C:\Users\SANTANU\Downloads\migraph\data\raw\finance\amzn.us.csv
Already exists: C:\Users\SANTANU\Downloads\migraph\data\raw\finance\goog.us.csv
Already exists: C:\Users\SANTANU\Downloads\migraph\data\raw\finance\tsla.us.csv
Already exists: C:\Users\SANTANU\Downloads\migraph\data\raw\finance\meta.us.csv


## Load COVID (OWID) sample

In [4]:
covid_path = raw / 'covid' / 'owid-covid-latest.csv'
covid = pd.read_csv(covid_path)
covid = covid[['location','total_cases','total_deaths','population']].dropna().head(250)
covid['id'] = covid['location'].str.lower().str.replace(' ', '_')
covid['type'] = 'country'
covid['cases_per_m'] = covid['total_cases'] / (covid['population'] / 1_000_000)
covid['deaths_per_m'] = covid['total_deaths'] / (covid['population'] / 1_000_000)
covid['attributes'] = covid.apply(lambda r: {
    'total_cases': float(r.total_cases),
    'total_deaths': float(r.total_deaths),
    'population': float(r.population),
    'cases_per_m': float(r.cases_per_m),
    'deaths_per_m': float(r.deaths_per_m),
}, axis=1)
covid_nodes = covid[['id','location','type','attributes']].rename(columns={'location':'label'})
print('COVID nodes:', len(covid_nodes))


COVID nodes: 246


## Load Finance (multi?ticker)

In [5]:
tickers = ['aapl.us', 'msft.us', 'amzn.us', 'goog.us', 'tsla.us', 'meta.us']
fin_nodes = []
for t in tickers:
    sym = t.split('.')[0].upper()
    fin_nodes.append({'id': sym.lower(), 'label': sym, 'type': 'ticker', 'attributes': {}})
fin_nodes = pd.DataFrame(fin_nodes)
print('Finance tickers:', len(fin_nodes))


Finance tickers: 6


## Load World Bank indicators

In [6]:
def load_wb_zip(path):
    with zipfile.ZipFile(path, 'r') as zf:
        data_csv = [n for n in zf.namelist() if n.startswith('API_') and n.endswith('.csv')][0]
        with zf.open(data_csv) as f:
            text = io.TextIOWrapper(f, encoding='utf-8-sig')
            return pd.read_csv(text, skiprows=4)

wb_gdp = load_wb_zip(raw / 'world_bank' / 'world_bank_gdp_multi.zip')
wb_gdp_pc = load_wb_zip(raw / 'world_bank' / 'world_bank_gdp_pc_multi.zip')
wb_pop = load_wb_zip(raw / 'world_bank' / 'world_bank_pop_multi.zip')
wb_infl = load_wb_zip(raw / 'world_bank' / 'world_bank_inflation_multi.zip')
wb_unemp = load_wb_zip(raw / 'world_bank' / 'world_bank_unemployment_multi.zip')

def build_indicator_nodes(df, indicator_code, prefix):
    nodes = []
    for _, row in df[df['Indicator Code']==indicator_code].iterrows():
        cc = row['Country Code']
        cname = row['Country Name']
        years = [c for c in df.columns if re.match(r'^\d{4}$', str(c))]
        years = sorted(years)[-10:]
        records = []
        for y in years:
            val = row[y]
            if pd.notna(val):
                records.append({'year': y, 'value': float(val)})
        nodes.append({
            'id': f'{prefix}_{cc.lower()}',
            'label': f'{cname} {prefix.upper()}',
            'type': 'wb_indicator',
            'attributes': {'years': records, 'code': indicator_code},
            'country_code': cc,
            'country_name': cname,
        })
    return pd.DataFrame(nodes)

gdp_nodes = build_indicator_nodes(wb_gdp, 'NY.GDP.MKTP.CD', 'gdp')
gdp_pc_nodes = build_indicator_nodes(wb_gdp_pc, 'NY.GDP.PCAP.CD', 'gdp_pc')
pop_nodes = build_indicator_nodes(wb_pop, 'SP.POP.TOTL', 'pop')
infl_nodes = build_indicator_nodes(wb_infl, 'FP.CPI.TOTL.ZG', 'inflation')
unemp_nodes = build_indicator_nodes(wb_unemp, 'SL.UEM.TOTL.ZS', 'unemployment')
wb_nodes = pd.concat([gdp_nodes, gdp_pc_nodes, pop_nodes, infl_nodes, unemp_nodes], ignore_index=True)
print('WB indicator nodes:', len(wb_nodes))


WB indicator nodes: 562


## Load Wikipedia sample (Simple English)

In [7]:
wiki_path = raw / 'wikipedia' / 'simplewiki-latest-pages-articles.xml.bz2'
topics = []
with bz2.open(wiki_path, 'rt', encoding='utf-8', errors='ignore') as f:
    for line in f:
        if '<title>' in line:
            title = line.strip().replace('<title>','').replace('</title>','')
            if title and ':' not in title:
                topics.append(title)
        if len(topics) >= 500:
            break

wiki_nodes = pd.DataFrame([
    {'id': f'wiki_{i}', 'label': t, 'type': 'wiki_topic', 'attributes': {}}
    for i, t in enumerate(topics)
])
print('Wiki topics:', len(wiki_nodes))


Wiki topics: 500


## Keyword extraction from wiki topics

In [8]:
stop = set(['the','of','and','in','to','a','an','for','on','by','with','at','from','as','is','are','was','were','it'])
def keywords(title):
    toks = re.findall(r'[A-Za-z]{3,}', title.lower())
    toks = [t for t in toks if t not in stop]
    return toks[:5]

wiki_nodes['keywords'] = wiki_nodes['label'].apply(keywords)


## Derived Nodes (COVID burden + population buckets)

In [9]:
burden_nodes = []
bucket_nodes = []
for _, row in covid.iterrows():
    bid = f'covid_burden_{row.id}'
    burden_nodes.append({
        'id': bid,
        'label': f'COVID burden {row.location}',
        'type': 'covid_burden',
        'attributes': {
            'cases_per_m': float(row.cases_per_m),
            'deaths_per_m': float(row.deaths_per_m),
        },
    })

    pop = float(row.population)
    if pop < 5_000_000:
        bucket = 'pop_lt_5m'
    elif pop < 20_000_000:
        bucket = 'pop_5m_20m'
    elif pop < 100_000_000:
        bucket = 'pop_20m_100m'
    else:
        bucket = 'pop_gt_100m'

    bucket_nodes.append({
        'id': bucket,
        'label': bucket.replace('_',' '),
        'type': 'population_bucket',
        'attributes': {},
    })

burden_nodes = pd.DataFrame(burden_nodes)
bucket_nodes = pd.DataFrame(bucket_nodes).drop_duplicates(subset=['id'])
print('Burden nodes:', len(burden_nodes))
print('Bucket nodes:', len(bucket_nodes))


Burden nodes: 246
Bucket nodes: 4


## Build Nodes

In [10]:
def _year_columns(df):
    years = [int(c) for c in df.columns if re.match(r'^\d{4}$', str(c))]
    return sorted(years)

def build_year_nodes(df, indicator_code, prefix, max_years=6):
    years = _year_columns(df)[-max_years:]
    nodes = []
    for _, row in df[df['Indicator Code']==indicator_code].iterrows():
        cname = row['Country Name']
        cc = row['Country Code']
        cid = cname.lower().replace(' ', '_')
        indicator_id = f'{prefix}_{cc.lower()}'
        for y in years:
            y_str = str(y)
            val = row.get(y_str)
            if pd.notna(val):
                nodes.append({
                    'id': f'{prefix}_year:{cid}:{y_str}',
                    'label': f'{prefix.upper()} {y_str} ({cname})',
                    'type': f'{prefix}_year',
                    'attributes': {
                        'year': int(y),
                        'value': float(val),
                        'indicator': indicator_code,
                        'country_name': cname,
                        'country_code': cc,
                        'country_id': cid,
                        'indicator_id': indicator_id,
                    },
                })
    return pd.DataFrame(nodes)

def build_recovery_nodes(df, indicator_code, prefix, base_year=2019, higher_is_better=True):
    years = _year_columns(df)
    nodes = []
    base_col = str(base_year)
    if base_col not in df.columns:
        return pd.DataFrame(nodes)
    for _, row in df[df['Indicator Code']==indicator_code].iterrows():
        cname = row['Country Name']
        cc = row['Country Code']
        cid = cname.lower().replace(' ', '_')
        indicator_id = f'{prefix}_{cc.lower()}'
        base_val = row.get(base_col)
        if pd.isna(base_val):
            continue
        recovery_year = None
        for y in years:
            if y <= base_year:
                continue
            val = row.get(str(y))
            if pd.isna(val):
                continue
            if higher_is_better:
                ok = val >= base_val
            else:
                ok = val <= base_val
            if ok:
                recovery_year = y
                break
        if recovery_year is None:
            continue
        nodes.append({
            'id': f'{prefix}_recovery:{cid}',
            'label': f'{prefix.upper()} recovery {recovery_year} ({cname})',
            'type': f'{prefix}_recovery',
            'attributes': {
                'year': int(recovery_year),
                'base_year': int(base_year),
                'base_value': float(base_val),
                'indicator': indicator_code,
                'country_name': cname,
                'country_code': cc,
                'country_id': cid,
                'indicator_id': indicator_id,
            },
        })
    return pd.DataFrame(nodes)

# COVID time-series nodes (yearly rollups)
covid_ts_path = raw / 'covid' / 'owid-covid-data.csv'
covid_year_nodes = pd.DataFrame([])
covid_peak_nodes = pd.DataFrame([])
covid_recovery_nodes = pd.DataFrame([])
covid_recovery_deaths = pd.DataFrame([])
covid_recovery_new_cases = pd.DataFrame([])
if covid_ts_path.exists():
    covid_ts = pd.read_csv(covid_ts_path)
    if 'location' not in covid_ts.columns and 'country' in covid_ts.columns:
        covid_ts['location'] = covid_ts['country']
    covid_ts = covid_ts[['location','date','total_cases','total_deaths','new_cases','population']]
    covid_ts = covid_ts.dropna(subset=['total_cases','total_deaths','population'])
    covid_ts['year'] = pd.to_datetime(covid_ts['date']).dt.year
    covid_ts['id'] = covid_ts['location'].str.lower().str.replace(' ', '_')
    covid_ts = covid_ts[covid_ts['id'].isin(covid['id'])]
    covid_annual = covid_ts.groupby(['id','location','year']).agg({
        'total_cases': 'max',
        'total_deaths': 'max',
        'new_cases': 'sum',
        'population': 'max',
    }).reset_index()
    covid_annual['cases_per_m'] = covid_annual['total_cases'] / (covid_annual['population'] / 1_000_000)
    covid_annual['deaths_per_m'] = covid_annual['total_deaths'] / (covid_annual['population'] / 1_000_000)
    covid_annual['new_cases_per_m'] = covid_annual['new_cases'] / (covid_annual['population'] / 1_000_000)
    covid_annual = covid_annual[covid_annual['year'] >= 2019]
    covid_year_nodes = pd.DataFrame({
        'id': covid_annual.apply(lambda r: f'covid_year:{r.id}:{int(r.year)}', axis=1),
        'label': covid_annual.apply(lambda r: f'COVID {int(r.year)} ({r.location})', axis=1),
        'type': 'covid_year',
        'attributes': covid_annual.apply(lambda r: {
            'year': int(r.year),
            'total_cases': float(r.total_cases),
            'total_deaths': float(r.total_deaths),
            'cases_per_m': float(r.cases_per_m),
            'deaths_per_m': float(r.deaths_per_m),
            'population': float(r.population),
            'country_id': r.id,
            'country_name': r.location,
        }, axis=1),
    })



    covid_peak_nodes = []
    for cid, grp in covid_annual.groupby('id'):
        g = grp.sort_values('year')
        peak_idx = g['cases_per_m'].idxmax()
        peak = g.loc[peak_idx]
        covid_peak_nodes.append({
            'id': f'covid_peak:{cid}',
            'label': f"COVID peak {int(peak['year'])} ({peak['location']})",
            'type': 'covid_peak',
            'attributes': {
                'year': int(peak['year']),
                'peak_cases_per_m': float(peak['cases_per_m']),
                'country_id': cid,
                'country_name': peak['location'],
            },
        })
    covid_peak_nodes = pd.DataFrame(covid_peak_nodes)


    covid_recovery_nodes = []
    covid_recovery_deaths = []
    covid_recovery_new_cases = []

    for cid, grp in covid_annual.groupby('id'):
        g = grp.sort_values('year')

        # Peak and recovery based on cases_per_m (legacy)
        peak_idx = g['cases_per_m'].idxmax()
        peak = g.loc[peak_idx]
        peak_year = int(peak['year'])
        peak_cases = float(peak['cases_per_m'])
        threshold_cases = peak_cases * 0.9
        after = g[g['year'] > peak_year]
        rec = after[after['cases_per_m'] <= threshold_cases]
        if not rec.empty:
            rec_row = rec.iloc[0]
            covid_recovery_nodes.append({
                'id': f'covid_recovery:{cid}',
                'label': f"COVID recovery {int(rec_row['year'])} ({rec_row['location']})",
                'type': 'covid_recovery',
                'attributes': {
                    'year': int(rec_row['year']),
                    'peak_year': peak_year,
                    'peak_cases_per_m': peak_cases,
                    'recovery_cases_per_m': float(rec_row['cases_per_m']),
                    'country_id': cid,
                    'country_name': rec_row['location'],
                },
            })

        # Recovery based on deaths_per_m
        peak_d_idx = g['deaths_per_m'].idxmax()
        peak_d = g.loc[peak_d_idx]
        peak_d_year = int(peak_d['year'])
        peak_deaths = float(peak_d['deaths_per_m'])
        threshold_deaths = peak_deaths * 0.7
        after_d = g[g['year'] > peak_d_year]
        rec_d = after_d[after_d['deaths_per_m'] <= threshold_deaths]
        if not rec_d.empty:
            rec_row = rec_d.iloc[0]
            covid_recovery_deaths.append({
                'id': f'covid_recovery_deaths:{cid}',
                'label': f"COVID deaths recovery {int(rec_row['year'])} ({rec_row['location']})",
                'type': 'covid_recovery_deaths',
                'attributes': {
                    'year': int(rec_row['year']),
                    'peak_year': peak_d_year,
                    'peak_deaths_per_m': peak_deaths,
                    'recovery_deaths_per_m': float(rec_row['deaths_per_m']),
                    'country_id': cid,
                    'country_name': rec_row['location'],
                },
            })

        # Recovery based on new_cases_per_m (non-cumulative)
        peak_n_idx = g['new_cases_per_m'].idxmax()
        peak_n = g.loc[peak_n_idx]
        peak_n_year = int(peak_n['year'])
        peak_new_cases = float(peak_n['new_cases_per_m'])
        threshold_new = peak_new_cases * 0.5
        after_n = g[g['year'] > peak_n_year]
        rec_n = after_n[after_n['new_cases_per_m'] <= threshold_new]
        if not rec_n.empty:
            rec_row = rec_n.iloc[0]
            covid_recovery_new_cases.append({
                'id': f'covid_recovery_new_cases:{cid}',
                'label': f"COVID new-cases recovery {int(rec_row['year'])} ({rec_row['location']})",
                'type': 'covid_recovery_new_cases',
                'attributes': {
                    'year': int(rec_row['year']),
                    'peak_year': peak_n_year,
                    'peak_new_cases_per_m': peak_new_cases,
                    'recovery_new_cases_per_m': float(rec_row['new_cases_per_m']),
                    'country_id': cid,
                    'country_name': rec_row['location'],
                },
            })

    covid_recovery_nodes = pd.DataFrame(covid_recovery_nodes)
    covid_recovery_deaths = pd.DataFrame(covid_recovery_deaths)
    covid_recovery_new_cases = pd.DataFrame(covid_recovery_new_cases)

gdp_year_nodes = build_year_nodes(wb_gdp, 'NY.GDP.MKTP.CD', 'gdp', max_years=12)
infl_year_nodes = build_year_nodes(wb_infl, 'FP.CPI.TOTL.ZG', 'inflation', max_years=12)
unemp_year_nodes = build_year_nodes(wb_unemp, 'SL.UEM.TOTL.ZS', 'unemployment', max_years=12)
year_nodes = pd.concat([gdp_year_nodes, infl_year_nodes, unemp_year_nodes], ignore_index=True)
recovery_nodes = pd.concat([
    build_recovery_nodes(wb_gdp, 'NY.GDP.MKTP.CD', 'gdp', higher_is_better=True),
    build_recovery_nodes(wb_infl, 'FP.CPI.TOTL.ZG', 'inflation', higher_is_better=False),
    build_recovery_nodes(wb_unemp, 'SL.UEM.TOTL.ZS', 'unemployment', higher_is_better=False),
], ignore_index=True)

nodes = pd.concat([
    covid_nodes,
    fin_nodes,
    wb_nodes.drop(columns=['country_code','country_name']),
    wiki_nodes.drop(columns=['keywords']),
    burden_nodes,
    bucket_nodes,
    year_nodes,
    recovery_nodes,
    covid_year_nodes,
    covid_peak_nodes,
    covid_recovery_nodes,
    covid_recovery_deaths,
    covid_recovery_new_cases,
], ignore_index=True)
nodes.to_parquet(processed / 'nodes.parquet', index=False)
print('Wrote nodes.parquet:', len(nodes))


Wrote nodes.parquet: 9807


## Build Edges

In [11]:
edges = []
for _, row in covid.iterrows():
    edges.append({
        'source': row.id,
        'target': row.id,
        'relation': 'cases_in',
        'weight': min(float(row.cases_per_m / 1_000_000), 1.0),
        'confidence': 0.8,
        'causal_type': 'correlational',
        'provenance': 'observed',
    })
    edges.append({
        'source': row.id,
        'target': f'covid_burden_{row.id}',
        'relation': 'has_burden',
        'weight': min(float(row.deaths_per_m / 10000), 1.0),
        'confidence': 0.8,
        'causal_type': 'correlational',
        'provenance': 'observed',
    })
    pop = float(row.population)
    if pop < 5_000_000:
        bucket = 'pop_lt_5m'
    elif pop < 20_000_000:
        bucket = 'pop_5m_20m'
    elif pop < 100_000_000:
        bucket = 'pop_20m_100m'
    else:
        bucket = 'pop_gt_100m'
    edges.append({
        'source': row.id,
        'target': bucket,
        'relation': 'has_population_bucket',
        'weight': 0.7,
        'confidence': 0.7,
        'causal_type': 'correlational',
        'provenance': 'observed',
    })

# link tickers to countries for demo
for sym in fin_nodes['id'].tolist():
    for cid in covid['id'].head(30):
        edges.append({
            'source': sym,
            'target': cid,
            'relation': 'market_exposure',
            'weight': 0.5,
            'confidence': 0.6,
            'causal_type': 'hypothetical',
            'provenance': 'inferred',
        })
        edges.append({
            'source': sym,
            'target': f'covid_burden_{cid}',
            'relation': 'exposed_to_burden',
            'weight': 0.4,
            'confidence': 0.5,
            'causal_type': 'hypothetical',
            'provenance': 'inferred',
        })

# link WB indicators to countries
for _, r in wb_nodes.iterrows():
    cname = r['country_name']
    cid = cname.lower().replace(' ', '_')
    if cid in covid['id'].values:
        edges.append({
            'source': r['id'],
            'target': cid,
            'relation': 'indicator_of',
            'weight': 0.8,
            'confidence': 0.8,
            'causal_type': 'correlational',
            'provenance': 'observed',
        })


# link year nodes to countries and indicators
for _, n in year_nodes.iterrows():
    attrs = n.get('attributes', {})
    cid = attrs.get('country_id')
    indicator_id = attrs.get('indicator_id')
    if cid:
        edges.append({
            'source': cid,
            'target': n['id'],
            'relation': f"has_{n['type']}",
            'weight': 0.7,
            'confidence': 0.7,
            'causal_type': 'observed',
            'provenance': 'observed',
        })
    if indicator_id:
        edges.append({
            'source': n['id'],
            'target': indicator_id,
            'relation': 'year_of_indicator',
            'weight': 0.7,
            'confidence': 0.7,
            'causal_type': 'observed',
            'provenance': 'observed',
        })

# link GDP recovery nodes to countries and indicators
for _, n in recovery_nodes.iterrows():
    attrs = n.get('attributes', {})
    cid = attrs.get('country_id')
    indicator_id = attrs.get('indicator_id')
    if cid:
        edges.append({
            'source': cid,
            'target': n['id'],
            'relation': 'gdp_recovered_in',
            'weight': 0.8,
            'confidence': 0.8,
            'causal_type': 'inferred',
            'provenance': 'observed',
        })
    if indicator_id:
        edges.append({
            'source': n['id'],
            'target': indicator_id,
            'relation': 'recovery_for_indicator',
            'weight': 0.8,
            'confidence': 0.8,
            'causal_type': 'inferred',
            'provenance': 'observed',
        })


# link COVID recovery nodes to countries and year nodes
for _, n in covid_recovery_nodes.iterrows():
    attrs = n.get('attributes', {})
    cid = attrs.get('country_id')
    year = attrs.get('year')
    if cid:
        edges.append({
            'source': cid,
            'target': n['id'],
            'relation': 'covid_recovered_in',
            'weight': 0.8,
            'confidence': 0.8,
            'causal_type': 'inferred',
            'provenance': 'observed',
        })
    if cid and year:
        edges.append({
            'source': n['id'],
            'target': f'covid_year:{cid}:{year}',
            'relation': 'recovery_year',
            'weight': 0.7,
            'confidence': 0.7,
            'causal_type': 'inferred',
            'provenance': 'observed',
        })

# link COVID year nodes to economic year nodes (same country/year)
year_node_ids = set(year_nodes['id']) if 'year_nodes' in globals() else set()
for _, n in covid_year_nodes.iterrows():
    attrs = n.get('attributes', {})
    cid = attrs.get('country_id')
    year = attrs.get('year')
    if not cid or year is None:
        continue
    for prefix in ['gdp', 'inflation', 'unemployment']:
        target = f'{prefix}_year:{cid}:{year}'
        if target in year_node_ids:
            edges.append({
                'source': n['id'],
                'target': target,
                'relation': f'covid_vs_{prefix}_year',
                'weight': 0.6,
                'confidence': 0.6,
                'causal_type': 'inferred',
                'provenance': 'observed',
            })


# link COVID peak nodes to countries and year nodes
for _, n in covid_peak_nodes.iterrows():
    attrs = n.get('attributes', {})
    cid = attrs.get('country_id')
    year = attrs.get('year')
    if cid:
        edges.append({
            'source': cid,
            'target': n['id'],
            'relation': 'covid_peak_in',
            'weight': 0.8,
            'confidence': 0.8,
            'causal_type': 'observed',
            'provenance': 'observed',
        })
    if cid and year:
        edges.append({
            'source': n['id'],
            'target': f'covid_year:{cid}:{year}',
            'relation': 'peak_year',
            'weight': 0.7,
            'confidence': 0.7,
            'causal_type': 'observed',
            'provenance': 'observed',
        })


# link COVID recovery to economic recovery nodes (same country)
for _, n in covid_recovery_nodes.iterrows():
    attrs = n.get('attributes', {})
    cid = attrs.get('country_id')
    if not cid:
        continue
    for prefix in ['gdp', 'inflation', 'unemployment']:
        target = f'{prefix}_recovery:{cid}'
        edges.append({
            'source': n['id'],
            'target': target,
            'relation': f'covid_vs_{prefix}_recovery',
            'weight': 0.6,
            'confidence': 0.6,
            'causal_type': 'inferred',
            'provenance': 'observed',
        })


# link COVID deaths/new-cases recovery nodes to countries
for _, n in covid_recovery_deaths.iterrows():
    attrs = n.get('attributes', {})
    cid = attrs.get('country_id')
    year = attrs.get('year')
    if cid:
        edges.append({
            'source': cid,
            'target': n['id'],
            'relation': 'covid_deaths_recovered_in',
            'weight': 0.8,
            'confidence': 0.8,
            'causal_type': 'inferred',
            'provenance': 'observed',
        })
    if cid and year:
        edges.append({
            'source': n['id'],
            'target': f'covid_year:{cid}:{year}',
            'relation': 'recovery_year',
            'weight': 0.7,
            'confidence': 0.7,
            'causal_type': 'inferred',
            'provenance': 'observed',
        })

for _, n in covid_recovery_new_cases.iterrows():
    attrs = n.get('attributes', {})
    cid = attrs.get('country_id')
    year = attrs.get('year')
    if cid:
        edges.append({
            'source': cid,
            'target': n['id'],
            'relation': 'covid_new_cases_recovered_in',
            'weight': 0.8,
            'confidence': 0.8,
            'causal_type': 'inferred',
            'provenance': 'observed',
        })
    if cid and year:
        edges.append({
            'source': n['id'],
            'target': f'covid_year:{cid}:{year}',
            'relation': 'recovery_year',
            'weight': 0.7,
            'confidence': 0.7,
            'causal_type': 'inferred',
            'provenance': 'observed',
        })

# link wiki topics using keyword extraction
country_labels = {r['label']: r['id'] for _, r in covid_nodes.iterrows()}
for _, row in wiki_nodes.iterrows():
    title = row['label']
    t = title.lower()
    kws = keywords(title)
    for cname, cid in country_labels.items():
        if cname.lower() in t:
            edges.append({
                'source': row['id'],
                'target': cid,
                'relation': 'mentions_country',
                'weight': 0.4,
                'confidence': 0.5,
                'causal_type': 'hypothetical',
                'provenance': 'inferred',
            })
    for kw in kws:
        if kw in ['covid', 'pandemic', 'health', 'disease']:
            for cid in covid['id'].head(40):
                edges.append({
                    'source': row['id'],
                    'target': f'covid_burden_{cid}',
                    'relation': 'mentions_pandemic',
                    'weight': 0.3,
                    'confidence': 0.4,
                    'causal_type': 'hypothetical',
                    'provenance': 'inferred',
                })
        if kw in ['gdp', 'economy', 'finance', 'market', 'inflation', 'unemployment']:
            for ind in wb_nodes['id'].head(20):
                edges.append({
                    'source': row['id'],
                    'target': ind,
                    'relation': 'mentions_economy',
                    'weight': 0.3,
                    'confidence': 0.4,
                    'causal_type': 'hypothetical',
                    'provenance': 'inferred',
                })

# link countries by similarity in burden (top 4 neighbors)
covid_sorted = covid.sort_values('cases_per_m')
ids = covid_sorted['id'].tolist()
for i in range(len(ids)):
    for j in range(1, 5):
        if i + j < len(ids):
            edges.append({
                'source': ids[i],
                'target': ids[i+j],
                'relation': 'similar_burden',
                'weight': 0.4,
                'confidence': 0.5,
                'causal_type': 'correlational',
                'provenance': 'inferred',
            })

edges = pd.DataFrame(edges)
edges.to_parquet(processed / 'edges.parquet', index=False)
print('Wrote edges.parquet:', len(edges))


Wrote edges.parquet: 17287


## Embeddings (random demo vectors)

In [12]:
dim = 16
node_emb = pd.DataFrame({
    'id': nodes['id'],
    'kind': 'node',
    'vector': [list(np.random.rand(dim)) for _ in range(len(nodes))]
})
edge_ids = edges.apply(
    lambda r: f"{r['source']}|{r['relation']}|{r['target']}",
    axis=1,
)
edge_emb = pd.DataFrame({
    'id': edge_ids,
    'kind': 'edge',
    'vector': [list(np.random.rand(dim)) for _ in range(len(edges))]
})
emb = pd.concat([node_emb, edge_emb], ignore_index=True)
emb.to_parquet(processed / 'embeddings.parquet', index=False)
print('Wrote embeddings.parquet:', len(emb))
print(' - nodes:', len(node_emb))
print(' - edges:', len(edge_emb))


Wrote embeddings.parquet: 27094
 - nodes: 9807
 - edges: 17287


In [13]:
nodes=pd.read_parquet(processed / 'nodes.parquet')
nodes["type"].value_counts()

type
unemployment_year           2811
inflation_year              2765
covid_year                  1708
wb_indicator                 562
wiki_topic                   500
country                      246
covid_burden                 246
covid_peak                   244
covid_recovery_new_cases     244
unemployment_recovery        174
inflation_recovery           159
gdp_year                     120
gdp_recovery                   9
covid_recovery_deaths          7
ticker                         6
population_bucket              4
covid_recovery                 2
Name: count, dtype: int64

In [14]:
nodes.query("id.str.contains('aapl')")[["id", "label", "type"]]


Unnamed: 0,id,label,type
246,aapl,AAPL,ticker
