# Article Aggregation - District Level

**Script**: `scripts/02_data_processing/02_aggregate_articles.py`

**Author**: Victor Collins Oppon, MSc Data Science, Middlesex University 2025

---

## Purpose

Aggregates GDELT GKG articles to IPC district-period observations using:
- **4-month temporal windows** (3 months before + assessment month)
- **Geographic matching hierarchy**: GADM3 (district) → GADM2 (zone) → GADM1 (state) → Country-level
- **Fuzzy matching**: 80% threshold for district name matching
- **SQLite streaming**: Memory-efficient processing of large CSV files (47GB)

**Runtime**: ~45 minutes (1.2 million articles processed)

**Input**: 
- `data/external/gdelt/african_gkg_articles_info.csv` (47GB)
- `data/district_level/ipc_reference.parquet` (from 02a)

**Output**: `data/district_level/articles_aggregated.parquet`

## Imports and Configuration

In [None]:
import pandas as pd
import numpy as np
from pathlib import Path
import sqlite3
import gc
import re
import unicodedata
from datetime import datetime, timedelta
from multiprocessing import Pool, cpu_count
from functools import partial
from rapidfuzz import fuzz
from config import BASE_DIR

# Paths
BASE_DIR = Path(str(BASE_DIR.parent.parent.parent))
LOCATIONS_FILE = BASE_DIR / 'aligned_data' / 'african_gkg_locations_aligned.parquet'
ARTICLES_FILE = BASE_DIR / 'aligned_data' / 'african_gkg_articles_info.csv'

# IPC reference from previous step
DISTRICT_DATA_DIR = BASE_DIR / 'data' / 'district_level'
IPC_REF_FILE = DISTRICT_DATA_DIR / 'ipc_reference.parquet'

# Output
OUTPUT_FILE = DISTRICT_DATA_DIR / 'articles_aggregated.parquet'
OUTPUT_CSV = DISTRICT_DATA_DIR / 'articles_aggregated.csv'

# SQLite temp file for memory-efficient processing
SQLITE_FILE = DISTRICT_DATA_DIR / 'temp_gkg_ipc_mapping.db'

print(f"Locations file: {LOCATIONS_FILE}")
print(f"Articles file: {ARTICLES_FILE}")
print(f"IPC reference: {IPC_REF_FILE}")
print(f"Output: {OUTPUT_FILE}")
print(f"SQLite temp: {SQLITE_FILE}")

## Constants

In [None]:
# Temporal aggregation window
AGGREGATION_MONTHS = 4  # 3 months before + assessment month

# Fuzzy matching threshold
FUZZY_THRESHOLD = 80  # 80% similarity

# SQLite batch size for inserts
SQLITE_BATCH_SIZE = 50000

# LHZ countries (use word-based lookup)
LHZ_COUNTRIES = ['Zimbabwe', 'Burundi', 'Kenya']

# FIPS to Country mapping (from 02a)
FIPS_TO_COUNTRY = {
    'AO': 'Angola', 'UV': 'Burkina Faso', 'BY': 'Burundi', 'CM': 'Cameroon',
    'CT': 'Central African Republic', 'CD': 'Chad', 'CG': 'Democratic Republic of the Congo',
    'ET': 'Ethiopia', 'KE': 'Kenya', 'LT': 'Lesotho', 'MA': 'Madagascar',
    'MI': 'Malawi', 'ML': 'Mali', 'MR': 'Mauritania', 'MZ': 'Mozambique',
    'NG': 'Niger', 'NI': 'Nigeria', 'RW': 'Rwanda', 'SO': 'Somalia',
    'OD': 'South Sudan', 'SU': 'Sudan', 'TO': 'Togo', 'UG': 'Uganda', 'ZI': 'Zimbabwe'
}

print(f"Aggregation window: {AGGREGATION_MONTHS} months")
print(f"Fuzzy threshold: {FUZZY_THRESHOLD}%")
print(f"SQLite batch size: {SQLITE_BATCH_SIZE:,}")
print(f"LHZ countries: {LHZ_COUNTRIES}")

## Utility Functions

Functions for text normalization, fuzzy matching, and lookup building.

In [None]:
def normalize_text(text):
    """Normalize text for matching: lowercase, strip, remove accents and special chars"""
    if pd.isna(text):
        return ''
    text = str(text).lower().strip()
    # Remove accents using unicode normalization
    text = unicodedata.normalize('NFKD', text)
    text = ''.join(c for c in text if not unicodedata.combining(c))
    # Remove special characters but keep spaces
    text = ''.join(c if c.isalnum() or c.isspace() else ' ' for c in text)
    # Collapse multiple spaces
    text = ' '.join(text.split())
    return text

# Test
print("Test normalization:")
print(f"  'Kasaï Central' → '{normalize_text('Kasaï Central')}'")
print(f"  'Nord-Kivu' → '{normalize_text('Nord-Kivu')}'")

In [None]:
def find_fuzzy_match(loc_name_norm, country_candidates, country_code):
    """
    Find best fuzzy match for a location name among IPC district candidates.
    Returns list of IPC records if match found, else empty list.
    """
    if country_code not in country_candidates:
        return []
    
    best_score = 0
    best_match = []
    
    # Try fuzzy matching against all district names in this country
    for district_norm, ipc_list in country_candidates[country_code]:
        score = fuzz.ratio(loc_name_norm, district_norm)
        if score >= FUZZY_THRESHOLD and score > best_score:
            best_score = score
            best_match = ipc_list
    
    return best_match

print("find_fuzzy_match defined")

In [None]:
def build_word_lookup(ipc_ref):
    """
    Build word-based lookup for LHZ countries (Zimbabwe, Burundi, Kenya).
    
    For these countries, IPC records often use long LHZ names that don't match
    GADM district names. Instead, we extract words from full_name_normalized
    and create lookup by individual words.
    """
    word_lookup = {}
    
    for country in LHZ_COUNTRIES:
        country_ipc = ipc_ref[ipc_ref['country'] == country]
        word_lookup[country] = {}
        
        for _, row in country_ipc.iterrows():
            full_name_norm = row['full_name_normalized']
            if pd.notna(full_name_norm):
                words = full_name_norm.split()
                for word in words:
                    if len(word) >= 3:  # Skip very short words
                        if word not in word_lookup[country]:
                            word_lookup[country][word] = []
                        word_lookup[country][word].append(row.to_dict())
    
    return word_lookup

print("build_word_lookup defined")

In [None]:
def combine_lookups(district_lookup, word_lookup):
    """
    Combine district-based and word-based lookups into unified structure.
    
    Returns: Dictionary with keys (country_fips, district_normalized) → list of IPC records
    """
    combined = {}
    
    # Add district lookup
    combined.update(district_lookup)
    
    # Add word lookup for LHZ countries
    for country, words in word_lookup.items():
        # Get FIPS code for country
        country_to_fips = {v: k for k, v in FIPS_TO_COUNTRY.items()}
        fips = country_to_fips.get(country)
        if not fips:
            continue
        
        for word, ipc_list in words.items():
            key = (fips, word)
            if key not in combined:
                combined[key] = ipc_list
    
    return combined

print("combine_lookups defined")

In [None]:
def precompute_fuzzy_matches(country_candidates, all_gadm_names_by_country):
    """
    Pre-compute ALL fuzzy matches at startup to avoid repeated computation.
    
    For each unique GADM name in the dataset, compute the best fuzzy match
    against IPC districts. Store results in a dictionary for instant lookup.
    
    This trades memory for speed - dramatically reduces processing time.
    """
    print("\nPre-computing fuzzy matches for all unique GADM names...")
    
    fuzzy_cache = {}
    total_names = sum(len(names) for names in all_gadm_names_by_country.values())
    
    processed = 0
    for country_code, gadm_names in all_gadm_names_by_country.items():
        if country_code not in country_candidates:
            continue
        
        for gadm_name_norm in gadm_names:
            # Find best fuzzy match
            best_match = find_fuzzy_match(gadm_name_norm, country_candidates, country_code)
            fuzzy_cache[(country_code, gadm_name_norm)] = best_match
            
            processed += 1
            if processed % 1000 == 0:
                print(f"   Processed {processed:,}/{total_names:,} names", end='\r')
    
    print(f"   Pre-computed {len(fuzzy_cache):,} fuzzy match mappings" + " " * 30)
    return fuzzy_cache

print("precompute_fuzzy_matches defined")

In [None]:
def ipc_info_to_dict(ipc_info):
    """
    Convert IPC record to dictionary for database insertion.
    Extracts key fields needed for downstream analysis.
    """
    return {
        'ipc_id': ipc_info.get('ipc_id'),
        'ipc_country': ipc_info.get('country'),
        'ipc_country_code': ipc_info.get('country_code'),
        'ipc_fips_code': ipc_info.get('fips_code'),
        'ipc_district': ipc_info.get('district'),
        'ipc_region': ipc_info.get('region'),
        'ipc_geographic_unit': ipc_info.get('geographic_unit_name'),
        'ipc_geographic_unit_full': ipc_info.get('geographic_unit_full_name'),
        'ipc_period_start': ipc_info.get('projection_start'),
        'ipc_period_end': ipc_info.get('projection_end'),
        'ipc_period_length_days': ipc_info.get('period_length_days'),
        'ipc_value': ipc_info.get('ipc_value'),
        'ipc_description': ipc_info.get('ipc_description'),
        'ipc_binary_crisis': ipc_info.get('ipc_binary_crisis'),
        'ipc_is_allowing_assistance': ipc_info.get('is_allowing_for_assistance'),
        'ipc_fewsnet_region': ipc_info.get('fewsnet_region'),
        'ipc_geographic_group': ipc_info.get('geographic_group'),
        'ipc_scenario': ipc_info.get('scenario'),
        'ipc_classification_scale': ipc_info.get('classification_scale'),
        'ipc_reporting_date': ipc_info.get('reporting_date')
    }

print("ipc_info_to_dict defined")

In [None]:
def get_match_for_row(gadm3_norm, gadm2_norm, gadm1_norm, country_code,
                       ipc_lookup, country_candidates, fuzzy_cache):
    """
    Find IPC match for a single row using hierarchical matching strategy.
    
    Priority order:
    1. GADM3 exact match (district level)
    2. GADM3 fuzzy match (>= 80% similarity)
    3. GADM2 exact match (zone level)
    4. GADM2 fuzzy match
    5. GADM1 exact match (state/province level)
    6. GADM1 fuzzy match
    7. Country-level match (last resort)
    
    Returns: (ipc_records, match_level)
    """
    # Try GADM3 exact
    key = (country_code, gadm3_norm)
    if key in ipc_lookup:
        return ipc_lookup[key], 'GADM3_exact'
    
    # Try GADM3 fuzzy
    if key in fuzzy_cache and fuzzy_cache[key]:
        return fuzzy_cache[key], 'GADM3_fuzzy'
    
    # Try GADM2 exact
    key = (country_code, gadm2_norm)
    if key in ipc_lookup:
        return ipc_lookup[key], 'GADM2_exact'
    
    # Try GADM2 fuzzy
    if key in fuzzy_cache and fuzzy_cache[key]:
        return fuzzy_cache[key], 'GADM2_fuzzy'
    
    # Try GADM1 exact
    key = (country_code, gadm1_norm)
    if key in ipc_lookup:
        return ipc_lookup[key], 'GADM1_exact'
    
    # Try GADM1 fuzzy
    if key in fuzzy_cache and fuzzy_cache[key]:
        return fuzzy_cache[key], 'GADM1_fuzzy'
    
    # Try country-level
    key = (country_code, '')
    if key in ipc_lookup:
        return ipc_lookup[key], 'country'
    
    return [], 'no_match'

print("get_match_for_row defined")

## Main Processing

Load IPC reference, build lookups, process locations, and aggregate articles.

In [None]:
print("=" * 80)
print("Article Aggregation - DISTRICT LEVEL")
print("=" * 80)
print(f"Start time: {datetime.now()}\n")

# Load IPC reference
print("1. Loading IPC reference...")
ipc_ref = pd.read_parquet(IPC_REF_FILE)
print(f"   Loaded {len(ipc_ref):,} IPC records")
print(f"   Unique districts: {ipc_ref['district'].nunique():,}")
print(f"   Date range: {ipc_ref['projection_start'].min()} to {ipc_ref['projection_end'].max()}")

# Show sample
ipc_ref.head()

In [None]:
# Build IPC lookup dictionary
print("\n2. Building IPC lookup dictionary...")

# Group by (country_fips, district_normalized)
ipc_lookup = {}

for _, row in ipc_ref.iterrows():
    key = (row['fips_code'], row['district_normalized'])
    if key not in ipc_lookup:
        ipc_lookup[key] = []
    ipc_lookup[key].append(row.to_dict())

print(f"   Created lookup with {len(ipc_lookup):,} unique (country, district) keys")

# Build country-level lookup for fallback
country_lookup = {}
for _, row in ipc_ref.iterrows():
    key = (row['fips_code'], '')
    if key not in country_lookup:
        country_lookup[key] = []
    country_lookup[key].append(row.to_dict())

# Merge lookups
ipc_lookup.update(country_lookup)

print(f"   Total lookup keys: {len(ipc_lookup):,}")

In [None]:
# Build word-based lookup for LHZ countries
print("\n3. Building word-based lookup for LHZ countries...")
word_lookup = build_word_lookup(ipc_ref)

for country, words in word_lookup.items():
    print(f"   {country}: {len(words):,} unique words")

# Combine lookups
ipc_lookup = combine_lookups(ipc_lookup, word_lookup)
print(f"\n   Combined lookup size: {len(ipc_lookup):,} keys")

In [None]:
# Build country candidates for fuzzy matching
print("\n4. Building country candidates for fuzzy matching...")

country_candidates = {}

for fips_code, country_name in FIPS_TO_COUNTRY.items():
    country_ipc = ipc_ref[ipc_ref['fips_code'] == fips_code]
    candidates = []
    
    for district_norm in country_ipc['district_normalized'].unique():
        if pd.notna(district_norm) and district_norm != '':
            ipc_list = country_ipc[country_ipc['district_normalized'] == district_norm].to_dict('records')
            candidates.append((district_norm, ipc_list))
    
    country_candidates[fips_code] = candidates

for fips, candidates in list(country_candidates.items())[:5]:
    print(f"   {FIPS_TO_COUNTRY[fips]}: {len(candidates)} district candidates")

In [None]:
# Setup SQLite database for memory-efficient processing
print("\n5. Setting up SQLite database...")

# Remove existing database if present
if SQLITE_FILE.exists():
    SQLITE_FILE.unlink()
    print("   Removed existing SQLite file")

# Create new database
conn = sqlite3.connect(str(SQLITE_FILE))
cursor = conn.cursor()

# Enable WAL mode for better performance
cursor.execute("PRAGMA journal_mode=WAL")
cursor.execute("PRAGMA synchronous=OFF")
cursor.execute("PRAGMA cache_size=-256000")  # 256MB cache

# Create table
cursor.execute('''
CREATE TABLE gkg_ipc (
    GKGRECORDID TEXT,
    match_level TEXT,
    ipc_id TEXT,
    ipc_country TEXT,
    ipc_country_code TEXT,
    ipc_fips_code TEXT,
    ipc_district TEXT,
    ipc_region TEXT,
    ipc_geographic_unit TEXT,
    ipc_geographic_unit_full TEXT,
    ipc_period_start TEXT,
    ipc_period_end TEXT,
    ipc_period_length_days INTEGER,
    ipc_value REAL,
    ipc_description TEXT,
    ipc_binary_crisis INTEGER,
    ipc_is_allowing_assistance TEXT,
    ipc_fewsnet_region TEXT,
    ipc_geographic_group TEXT,
    ipc_scenario TEXT,
    ipc_classification_scale TEXT,
    ipc_reporting_date TEXT
)
''')

conn.commit()
print("   SQLite database created and configured")

In [None]:
# Scan locations to get unique GADM names by country
print("\n6. Scanning locations for unique GADM names...")
locations = pd.read_parquet(LOCATIONS_FILE)

print(f"   Loaded {len(locations):,} location mentions")

# Normalize GADM names
locations['GADM3_normalized'] = locations['GADM3'].apply(normalize_text)
locations['GADM2_normalized'] = locations['GADM2'].apply(normalize_text)
locations['GADM1_normalized'] = locations['GADM1'].apply(normalize_text)

# Get unique GADM names by country for fuzzy matching
all_gadm_names_by_country = {}

for country_code in locations['ActionGeo_CountryCode'].unique():
    country_locs = locations[locations['ActionGeo_CountryCode'] == country_code]
    unique_names = set()
    
    for col in ['GADM3_normalized', 'GADM2_normalized', 'GADM1_normalized']:
        unique_names.update(country_locs[col].dropna().unique())
    
    all_gadm_names_by_country[country_code] = unique_names

total_unique_names = sum(len(names) for names in all_gadm_names_by_country.values())
print(f"   Found {total_unique_names:,} unique GADM names across all countries")

In [None]:
# Pre-compute all fuzzy matches
fuzzy_cache = precompute_fuzzy_matches(country_candidates, all_gadm_names_by_country)
print(f"   Fuzzy cache size: {len(fuzzy_cache):,} mappings")

In [None]:
# Process locations and match to IPC districts
print("\n7. Processing locations and matching to IPC districts...")

# Process sequentially (avoid multiprocessing memory issues)
insert_buffer = []
total_matched = 0
total_processed = 0

# Use vectorized operations for efficiency
gadm3_norms = locations['GADM3_normalized'].values
gadm2_norms = locations['GADM2_normalized'].values
gadm1_norms = locations['GADM1_normalized'].values
country_codes = locations['ActionGeo_CountryCode'].values
gkgids = locations['GKGRECORDID'].values

batch_size = 100000
n_batches = len(locations) // batch_size + 1

print(f"   Processing {len(locations):,} locations in {n_batches} batches...")

for batch_idx in range(n_batches):
    start_idx = batch_idx * batch_size
    end_idx = min((batch_idx + 1) * batch_size, len(locations))
    
    if start_idx >= len(locations):
        break
    
    # Process batch
    for i in range(start_idx, end_idx):
        gadm3_norm = gadm3_norms[i]
        gadm2_norm = gadm2_norms[i]
        gadm1_norm = gadm1_norms[i]
        country_code = country_codes[i]
        gkgid = gkgids[i]
        
        # Get match
        ipc_records, match_level = get_match_for_row(
            gadm3_norm, gadm2_norm, gadm1_norm, country_code,
            ipc_lookup, country_candidates, fuzzy_cache
        )
        
        if ipc_records:
            # Add to buffer for each IPC record
            for ipc_info in ipc_records:
                ipc_dict = ipc_info_to_dict(ipc_info)
                row_tuple = (
                    gkgid, match_level,
                    ipc_dict['ipc_id'], ipc_dict['ipc_country'], ipc_dict['ipc_country_code'],
                    ipc_dict['ipc_fips_code'], ipc_dict['ipc_district'], ipc_dict['ipc_region'],
                    ipc_dict['ipc_geographic_unit'], ipc_dict['ipc_geographic_unit_full'],
                    str(ipc_dict['ipc_period_start']), str(ipc_dict['ipc_period_end']),
                    ipc_dict['ipc_period_length_days'], ipc_dict['ipc_value'],
                    ipc_dict['ipc_description'], ipc_dict['ipc_binary_crisis'],
                    ipc_dict['ipc_is_allowing_assistance'], ipc_dict['ipc_fewsnet_region'],
                    ipc_dict['ipc_geographic_group'], ipc_dict['ipc_scenario'],
                    ipc_dict['ipc_classification_scale'], str(ipc_dict['ipc_reporting_date'])
                )
                insert_buffer.append(row_tuple)
            
            total_matched += 1
        
        total_processed += 1
        
        # Flush buffer if needed
        if len(insert_buffer) >= SQLITE_BATCH_SIZE:
            cursor.executemany('INSERT INTO gkg_ipc VALUES (?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?)', insert_buffer)
            conn.commit()
            insert_buffer.clear()
            gc.collect()
    
    print(f"   Batch {batch_idx+1}/{n_batches}: {total_processed:,} processed, {total_matched:,} matched ({total_matched/total_processed*100:.1f}%)")

# Flush remaining buffer
if insert_buffer:
    cursor.executemany('INSERT INTO gkg_ipc VALUES (?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?)', insert_buffer)
    conn.commit()
    insert_buffer.clear()

print(f"\n   Total locations matched: {total_matched:,} / {total_processed:,} ({total_matched/total_processed*100:.1f}%)")

In [None]:
# Deduplicate in SQLite using GROUP BY
print("\n8. Deduplicating GKG-IPC mappings...")

# Count before deduplication
cursor.execute("SELECT COUNT(*) FROM gkg_ipc")
before_count = cursor.fetchone()[0]
print(f"   Before deduplication: {before_count:,} rows")

# Create deduplicated table
cursor.execute('''
CREATE TABLE gkg_ipc_dedup AS
SELECT DISTINCT * FROM gkg_ipc
''')

# Count after deduplication
cursor.execute("SELECT COUNT(*) FROM gkg_ipc_dedup")
after_count = cursor.fetchone()[0]
print(f"   After deduplication: {after_count:,} rows")
print(f"   Removed {before_count - after_count:,} duplicates")

# Drop original table and rename
cursor.execute("DROP TABLE gkg_ipc")
cursor.execute("ALTER TABLE gkg_ipc_dedup RENAME TO gkg_ipc")
conn.commit()

print("   Deduplication complete")

In [None]:
# Create index for faster lookups
print("\n9. Creating SQLite index...")
cursor.execute("CREATE INDEX idx_gkgid ON gkg_ipc(GKGRECORDID)")
conn.commit()
print("   Index created")

In [None]:
# Load articles in chunks and merge with GKG-IPC mappings
print("\n10. Loading articles and merging with IPC mappings...")

# Process articles in chunks to manage memory
chunksize = 100000
article_chunks = []

for chunk_idx, chunk in enumerate(pd.read_csv(ARTICLES_FILE, chunksize=chunksize)):
    # Query SQLite for matching IPC records
    gkgids = chunk['GKGRECORDID'].unique()
    placeholders = ','.join(['?' for _ in gkgids])
    
    query = f"SELECT * FROM gkg_ipc WHERE GKGRECORDID IN ({placeholders})"
    gkg_ipc_chunk = pd.read_sql_query(query, conn, params=gkgids)
    
    if len(gkg_ipc_chunk) > 0:
        # Merge chunk with IPC mappings
        merged = chunk.merge(gkg_ipc_chunk, on='GKGRECORDID', how='inner')
        article_chunks.append(merged)
    
    if (chunk_idx + 1) % 10 == 0:
        print(f"   Processed {(chunk_idx + 1) * chunksize:,} articles", end='\r')

print(f"\n   Concatenating {len(article_chunks)} chunks...")
articles_with_ipc = pd.concat(article_chunks, ignore_index=True)

print(f"   Merged articles: {len(articles_with_ipc):,}")
print(f"   Unique GKGRECORDIDs: {articles_with_ipc['GKGRECORDID'].nunique():,}")

In [None]:
# Close SQLite connection
conn.close()
print("\n   SQLite connection closed")

# Optionally remove SQLite file
# SQLITE_FILE.unlink()
# print("   SQLite file removed")

## Temporal Aggregation

Aggregate articles to IPC district-period observations using 4-month windows.

In [None]:
# Filter articles to temporal window
print("\n11. Filtering articles to temporal windows...")

# Convert dates
articles_with_ipc['DATE'] = pd.to_datetime(articles_with_ipc['DATE'], format='%Y%m%d')
articles_with_ipc['ipc_period_start'] = pd.to_datetime(articles_with_ipc['ipc_period_start'])
articles_with_ipc['ipc_period_end'] = pd.to_datetime(articles_with_ipc['ipc_period_end'])

# Calculate temporal window (3 months before + assessment month)
window_start_offset = pd.DateOffset(months=AGGREGATION_MONTHS - 1)

articles_with_ipc['window_start'] = articles_with_ipc['ipc_period_start'] - window_start_offset
articles_with_ipc['window_end'] = articles_with_ipc['ipc_period_end']

# Filter to articles within window
articles_filtered = articles_with_ipc[
    (articles_with_ipc['DATE'] >= articles_with_ipc['window_start']) &
    (articles_with_ipc['DATE'] <= articles_with_ipc['window_end'])
].copy()

print(f"   Before temporal filter: {len(articles_with_ipc):,} article-IPC pairs")
print(f"   After temporal filter: {len(articles_filtered):,} article-IPC pairs")
print(f"   Filtered out: {len(articles_with_ipc) - len(articles_filtered):,} pairs ({(1 - len(articles_filtered)/len(articles_with_ipc))*100:.1f}%)")

In [None]:
# Aggregate by (IPC record, period)
print("\n12. Aggregating articles by IPC district-period...")

# Group by IPC geographic_unit_full and period
group_cols = [
    'ipc_id', 'ipc_country', 'ipc_country_code', 'ipc_fips_code',
    'ipc_district', 'ipc_region', 'ipc_geographic_unit', 'ipc_geographic_unit_full',
    'ipc_period_start', 'ipc_period_end', 'ipc_period_length_days',
    'ipc_value', 'ipc_description', 'ipc_binary_crisis',
    'ipc_is_allowing_assistance', 'ipc_fewsnet_region', 'ipc_geographic_group',
    'ipc_scenario', 'ipc_classification_scale', 'ipc_reporting_date',
    'match_level'
]

# Aggregation functions
agg_dict = {
    'GKGRECORDID': 'nunique',  # Unique articles
    'GoldsteinScale': ['mean', 'std', 'min', 'max'],
    'NumMentions': ['sum', 'mean', 'std'],
    'AvgTone': ['mean', 'std', 'min', 'max'],
    'DATE': 'nunique',  # Unique days
    'SourceCommonName': 'nunique'  # Unique sources
}

articles_agg = articles_filtered.groupby(group_cols, as_index=False).agg(agg_dict)

# Flatten column names
articles_agg.columns = ['_'.join(col).strip('_') if col[1] else col[0] for col in articles_agg.columns.values]

# Rename columns
rename_dict = {
    'GKGRECORDID_nunique': 'article_count',
    'GoldsteinScale_mean': 'avg_goldstein',
    'GoldsteinScale_std': 'std_goldstein',
    'GoldsteinScale_min': 'min_goldstein',
    'GoldsteinScale_max': 'max_goldstein',
    'NumMentions_sum': 'total_mentions',
    'NumMentions_mean': 'avg_mentions',
    'NumMentions_std': 'std_mentions',
    'AvgTone_mean': 'avg_tone',
    'AvgTone_std': 'std_tone',
    'AvgTone_min': 'min_tone',
    'AvgTone_max': 'max_tone',
    'DATE_nunique': 'unique_days',
    'SourceCommonName_nunique': 'unique_sources'
}

articles_agg = articles_agg.rename(columns=rename_dict)

print(f"   Aggregated to {len(articles_agg):,} district-period observations")
print(f"   Unique districts: {articles_agg['ipc_district'].nunique():,}")
print(f"   Unique IPC IDs: {articles_agg['ipc_id'].nunique():,}")
print(f"   Countries: {articles_agg['ipc_country'].nunique()}")

## Summary Statistics

In [None]:
print("=" * 80)
print("Article Aggregation Summary - DISTRICT LEVEL")
print("=" * 80)

print(f"\nTotal district-period observations: {len(articles_agg):,}")
print(f"Unique districts: {articles_agg['ipc_district'].nunique():,}")
print(f"Unique IPC assessments: {articles_agg['ipc_id'].nunique():,}")
print(f"Date range: {articles_agg['ipc_period_start'].min()} to {articles_agg['ipc_period_end'].max()}")
print(f"Countries: {articles_agg['ipc_country'].nunique()}")

print(f"\nArticle statistics:")
print(f"   Total articles: {articles_agg['article_count'].sum():,}")
print(f"   Mean articles per district-period: {articles_agg['article_count'].mean():.1f}")
print(f"   Median articles per district-period: {articles_agg['article_count'].median():.1f}")
print(f"   Max articles per district-period: {articles_agg['article_count'].max():,}")

print(f"\nMatch level distribution:")
print(articles_agg['match_level'].value_counts())

print(f"\nDistricts per country:")
district_counts = articles_agg.groupby('ipc_country')['ipc_district'].nunique().sort_values(ascending=False)
for country, count in district_counts.head(15).items():
    records = len(articles_agg[articles_agg['ipc_country'] == country])
    articles = articles_agg[articles_agg['ipc_country'] == country]['article_count'].sum()
    print(f"   {country}: {count} districts, {records:,} periods, {articles:,} articles")

## Verification

In [None]:
print("\n" + "=" * 80)
print("Verification Checks")
print("=" * 80)

# Check for missing values
print("\nMissing values in key columns:")
for col in ['ipc_id', 'ipc_district', 'ipc_period_start', 'ipc_value', 'article_count']:
    missing = articles_agg[col].isna().sum()
    print(f"   {col}: {missing} ({missing/len(articles_agg)*100:.1f}%)")

# Check uniqueness
unique_check = articles_agg.groupby(['ipc_geographic_unit_full', 'ipc_period_start']).size()
duplicates = (unique_check > 1).sum()
print(f"\n(geographic_unit_full, period) duplicates: {duplicates}")
print("(Expected: 0 - each observation should be unique)")

if duplicates > 0:
    print("\nWARNING: Found duplicate observations. Investigation needed.")
    dup_idx = unique_check[unique_check > 1].head(3).index
    for idx in dup_idx:
        dup_rows = articles_agg[
            (articles_agg['ipc_geographic_unit_full'] == idx[0]) &
            (articles_agg['ipc_period_start'] == idx[1])
        ]
        print(f"\n   Duplicate: {idx[0][:50]}..., {idx[1]}")
        print(f"   Rows: {len(dup_rows)}, IPC values: {dup_rows['ipc_value'].unique()}")

## Save Output Files

In [None]:
# Save
print(f"\n13. Saving to {OUTPUT_FILE}...")
articles_agg.to_parquet(OUTPUT_FILE, index=False)
print("   [OK] Parquet saved")

# Also save as CSV for inspection
print(f"\n14. Saving CSV to {OUTPUT_CSV}...")
articles_agg.to_csv(OUTPUT_CSV, index=False)
print("   [OK] CSV saved")

print("\n" + "=" * 80)
print("Article Aggregation Complete!")
print("=" * 80)
print("\nKey columns for downstream scripts:")
print("   - ipc_id: Unique IPC assessment ID")
print("   - ipc_geographic_unit_full: THE PRIMARY GEOGRAPHIC IDENTIFIER")
print("   - ipc_district: Extracted district name")
print("   - article_count: Number of articles in 4-month window")
print("   - avg_goldstein, avg_tone: Aggregated event characteristics")
print(f"\nEnd time: {datetime.now()}")