# Notebook 01: Data Ingestion & Exploratory Data Analysis (EDA)

**Purpose**: Load raw humanitarian aid datasets, perform comprehensive EDA, identify join keys, and write raw data to Delta Lake tables.

**Steps**:
1. Initialize Spark session and create database
2. Load all CSV datasets from `data/` using PySpark
3. Run schema and basic statistics for each dataset
4. Compute null counts, distinct values, min/max for numeric columns
5. Identify potential join keys from column names and value analysis
6. Write raw datasets to Delta tables: `humanitarian.raw_{dataset_name}`

## Step 1: Initialize Spark and Database

In [0]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, count, countDistinct, isnan, isnull, when, lit
from pyspark.sql.types import StructType, StructField, StringType, DoubleType, IntegerType
import os
from pathlib import Path

import pandas as pd
import numpy as np

# Initialize Spark session
spark = SparkSession.builder \
    .appName("humanitarian-aid-analytics") \
    .config("spark.sql.shuffle.partitions", "200") \
    .config("spark.sql.adaptive.enabled", "true") \
    .getOrCreate()

# Create database
spark.sql("CREATE DATABASE IF NOT EXISTS humanitarian")
print("✓ Database 'humanitarian' created successfully")

# Define data directory
#DATA_DIR = "/Workspace/Repos/..." # Update with your actual workspace path, or use relative path
ROOT_DIR = '/Workspace/Users/mail2pradyu@gmail.com/hacklytics-2026'
# For this exercise, assume data files are in ./data/ relative to notebook
DATA_DIR = "data/"

print(f"✓ Data directory: {DATA_DIR}")

✓ Database 'humanitarian' created successfully
✓ Data directory: data/


## Step 2: Load All CSV Datasets

In [0]:
import os

datasets_to_load = {
    "hno": "hpc_hno_2025.csv",
    "hrp": "humanitarian-response-plans.csv",
    "fts_requirements": "fts_requirements_funding_globalcluster_global.csv",
    "fts_incoming": "fts_incoming_funding_global.csv",
    "fts_internal": "fts_internal_funding_global.csv",
    "fts_outgoing": "fts_outgoing_funding_global.csv",
    "population": "cod_population_admin1.csv"
}

def safe_coerce(df):
    """Column-wise numeric coercion that won't crash on messy data."""
     # Fix duplicate column names by appending a suffix
    seen = {}
    new_cols = []
    for col in df.columns:
        if col in seen:
            seen[col] += 1
            new_cols.append(f"{col}_{seen[col]}")
        else:
            seen[col] = 0
            new_cols.append(col)
    df.columns = new_cols

    for col in df.columns:
        # Strip commas from numbers like "1,234,567"
        if df[col].dtype == object:
            cleaned = df[col].astype(str).str.replace(",", "", regex=False).str.strip()
            converted = pd.to_numeric(cleaned, errors="coerce")
            non_null_ratio = converted.notna().sum() / max(len(converted), 1)
            if non_null_ratio >= 0.8:
                df[col] = converted
    return df

raw_datasets = {}
df_data_dict = {}
for dataset_name, file_name in datasets_to_load.items():
    file_path = f"{ROOT_DIR}/{DATA_DIR}/{file_name}"
    if not os.path.exists(file_path):
        print(f"✗ File not found: {file_path}. Skipping {dataset_name}.")
        continue
    try:
        df = pd.read_csv(file_path, header=None, dtype=str)  # dtype=str prevents early casting

        # Bug fix: check row 0 for HXL tags, not row 1
        if str(df.iloc[1].values[0]).startswith('#'):
            # HXL format: row0=HXL tags, row1=real headers, row2+=data
            real_columns = df.iloc[1].tolist()
            df = df.iloc[2:].reset_index(drop=True)
            df.columns = real_columns
        else:
            # Standard format: row0=headers, row1+=data
            df.columns = df.iloc[0].tolist()
            df = df.iloc[1:].reset_index(drop=True)

        # Clean column names
        df.columns = [
            str(c).strip().replace(" ", "_").replace("#", "").replace("+", "_")
            for c in df.columns
        ]

        # Safe numeric coercion (bypasses coerce_numeric_like for robustness)
        df_data = safe_coerce(df)
        df_data_dict[dataset_name] = df_data
        spark_df = spark.createDataFrame(df_data)
        raw_datasets[dataset_name] = spark_df
        print(f"✓ Loaded {dataset_name}: {file_name} ({spark_df.count():,} rows)")

    except Exception as e:
        import traceback
        print(f"✗ Failed to load {dataset_name}: {str(e)}")
        traceback.print_exc()

print(f"\nTotal datasets loaded: {len(raw_datasets)} / {len(datasets_to_load)}")

✓ Loaded hno: hpc_hno_2025.csv (318,259 rows)
✓ Loaded hrp: humanitarian-response-plans.csv (910 rows)
✓ Loaded fts_requirements: fts_requirements_funding_globalcluster_global.csv (10,505 rows)
✓ Loaded fts_incoming: fts_incoming_funding_global.csv (4,947 rows)
✓ Loaded fts_internal: fts_internal_funding_global.csv (14 rows)
✓ Loaded fts_outgoing: fts_outgoing_funding_global.csv (1,050 rows)
✓ Loaded population: cod_population_admin1.csv (91,471 rows)

Total datasets loaded: 7 / 7


In [0]:
#null value analysis for pd dataframe
print("=" * 80)
print("NULL VALUE ANALYSIS - Pandas DataFrame")
print("=" * 80)

for dataset_name, df in df_data_dict.items():
    print(f"\n{dataset_name.upper()}")
    print("-" * 80)
    
    null_counts = df.isnull().sum().to_dict()
    
    # Calculate percentages
    row_count = len(df)
    
    for col_name, null_count in sorted(null_counts.items(), key=lambda x: x[1], reverse=True):
        null_pct = (null_count / row_count * 100) if row_count > 0 else 0
        if null_count > 0:
            print(f"  {col_name}: {null_count:,} nulls ({null_pct:.1f}%)")
    
    # Summary
    total_nulls = sum(null_counts.values())
    print(f"  Total null values: {total_nulls:,}")

NULL VALUE ANALYSIS

HNO
--------------------------------------------------------------------------------
  reached: 318,237 nulls (100.0%)
  affected: 315,027 nulls (99.0%)
  meta_info: 312,689 nulls (98.2%)
  adm1_code: 302,467 nulls (95.0%)
  adm1_name: 302,467 nulls (95.0%)
  population: 287,361 nulls (90.3%)
  adm3_code: 246,442 nulls (77.4%)
  adm3_name: 246,442 nulls (77.4%)
  adm2_code: 90,103 nulls (28.3%)
  adm2_name: 90,103 nulls (28.3%)
  targeted: 48,978 nulls (15.4%)
  inneed: 8,060 nulls (2.5%)
  sector_cluster_code: 5,570 nulls (1.8%)
  category: 337 nulls (0.1%)
  Total null values: 2,574,283

HRP
--------------------------------------------------------------------------------
  country_code_list: 23 nulls (2.5%)
  response_code: 1 nulls (0.1%)
  Total null values: 24

FTS_REQUIREMENTS
--------------------------------------------------------------------------------
  value_funding_pct: 3,306 nulls (31.5%)
  value_funding_required_usd: 2,480 nulls (23.6%)
  sector_clust

In [0]:
df

Unnamed: 0,ISO3,Country,ADM1_PCODE,ADM1_NAME,ADM2_PCODE,ADM2_NAME,ADM3_PCODE,ADM3_NAME,ADM4_PCODE,ADM4_NAME,Population_group,Gender,Age_range,Age_min,Age_max,Population,Reference_year,Source,Contributor
0,ALB,Albania,AL01,Berat,,,,,,,F_TL,f,all,,,62842,2021,UNFPA,UNFPA
1,ALB,Albania,AL01,Berat,,,,,,,M_TL,m,all,,,61786,2021,UNFPA,UNFPA
2,ALB,Albania,AL01,Berat,,,,,,,T_TL,all,all,,,124628,2021,UNFPA,UNFPA
3,ALB,Albania,AL01,Berat,,,,,,,F_00_04,f,0-4,0.0,4.0,3125,2021,UNFPA,UNFPA
4,ALB,Albania,AL01,Berat,,,,,,,F_05_09,f,5-9,5.0,9.0,3342,2021,UNFPA,UNFPA
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
91466,VIR,United States Virgin Islands,VI030,St. Thomas,,,,,,,M_65_69,m,65-69,65.0,69.0,1304,2010,United States Bureau of the Census,OCHA Latin America and the Caribbean (ROLAC)
91467,VIR,United States Virgin Islands,VI030,St. Thomas,,,,,,,M_70_74,m,70-74,70.0,74.0,795,2010,United States Bureau of the Census,OCHA Latin America and the Caribbean (ROLAC)
91468,VIR,United States Virgin Islands,VI030,St. Thomas,,,,,,,M_75_79,m,75-79,75.0,79.0,507,2010,United States Bureau of the Census,OCHA Latin America and the Caribbean (ROLAC)
91469,VIR,United States Virgin Islands,VI030,St. Thomas,,,,,,,M_80_84,m,80-84,80.0,84.0,246,2010,United States Bureau of the Census,OCHA Latin America and the Caribbean (ROLAC)


In [0]:
display(raw_datasets['hno'])

country_code,adm1_code,adm1_name,adm2_code,adm2_name,adm3_code,adm3_name,sector_description,sector_cluster_code,category,population,inneed,targeted,affected,reached,meta_info
AFG,,,,,,,Final HRP caseload,ALL,,46024352.0,22887726.0,16790412.0,,18100000.0,
AFG,,,,,,,Final HRP caseload,ALL,Adults,20999996.0,10241973.0,7737415.0,,,
AFG,,,,,,,Final HRP caseload,ALL,Boys,12241703.0,5927993.0,4431106.0,,,
AFG,,,,,,,Final HRP caseload,ALL,Children,24035916.0,12188191.0,8707494.0,,,
AFG,,,,,,,Final HRP caseload,ALL,Elderly,988441.0,457563.0,345502.0,,,
AFG,,,,,,,Final HRP caseload,ALL,Female-Headed Households,657518.0,336197.0,204279.0,,,
AFG,,,,,,,Final HRP caseload,ALL,Girls,11794213.0,6260198.0,4276389.0,,,
AFG,,,,,,,Final HRP caseload,ALL,Men,10970533.0,4958458.0,3978717.0,,,
AFG,,,,,,,Final HRP caseload,ALL,New IDPs (2025) - Boys,53557.0,53557.0,53557.0,,,
AFG,,,,,,,Final HRP caseload,ALL,New IDPs (2025) - Girls,50909.0,50909.0,50910.0,,,


In [0]:
df['population']

dtype('O')

In [0]:
# display(dbutils.fs.ls("dbfs:/Workspace/Users/mail2pradyu@gmail.com/hacklytics-2026/data/"))

path,name,size,modificationTime
dbfs:/Workspace/Users/mail2pradyu@gmail.com/hacklytics-2026/data/.DS_Store,.DS_Store,6148,1771701543036
dbfs:/Workspace/Users/mail2pradyu@gmail.com/hacklytics-2026/data/2025.html,2025.html,310565,1771701543037
dbfs:/Workspace/Users/mail2pradyu@gmail.com/hacklytics-2026/data/cod_population_admin1.csv,cod_population_admin1.csv,11868586,1771701543041
dbfs:/Workspace/Users/mail2pradyu@gmail.com/hacklytics-2026/data/corruption_index_2025.html,corruption_index_2025.html,310565,1771701543039
dbfs:/Workspace/Users/mail2pradyu@gmail.com/hacklytics-2026/data/fts_incoming_funding_global.csv,fts_incoming_funding_global.csv,2660462,1771701543040
dbfs:/Workspace/Users/mail2pradyu@gmail.com/hacklytics-2026/data/fts_internal_funding_global.csv,fts_internal_funding_global.csv,8393,1771701543035
dbfs:/Workspace/Users/mail2pradyu@gmail.com/hacklytics-2026/data/fts_outgoing_funding_global.csv,fts_outgoing_funding_global.csv,450528,1771701543032
dbfs:/Workspace/Users/mail2pradyu@gmail.com/hacklytics-2026/data/fts_requirements_funding_globalcluster_global.csv,fts_requirements_funding_globalcluster_global.csv,1295693,1771701543030
dbfs:/Workspace/Users/mail2pradyu@gmail.com/hacklytics-2026/data/hpc_hno_2025.csv,hpc_hno_2025.csv,27476346,1771701543042
dbfs:/Workspace/Users/mail2pradyu@gmail.com/hacklytics-2026/data/humanitarian-response-plans.csv,humanitarian-response-plans.csv,127887,1771701543038


## Step 3: Schema Analysis

In [0]:
print(raw_datasets)

{'hrp': DataFrame[0: string, 1: bigint, 2: string, 3: string, 4: string, 5: string, 6: string, 7: bigint, 8: bigint, 9: bigint], 'fts_requirements': DataFrame[0: string, 1: bigint, 2: string, 3: string, 4: string, 5: string, 6: bigint, 7: bigint, 8: string, 9: string, 10: bigint, 11: string], 'fts_incoming': DataFrame[0: string, 1: bigint, 2: string, 3: bigint, 4: string, 5: string, 6: string, 7: bigint, 8: bigint, 9: string, 10: string, 11: string, 12: string, 13: string, 14: string, 15: string, 16: string, 17: string, 18: string, 19: bigint, 20: bigint, 21: string, 22: string, 23: string, 24: string, 25: string, 26: string, 27: string, 28: string, 29: string, 30: bigint, 31: string, 32: double, 33: bigint, 34: string, 35: string, 36: string], 'fts_internal': DataFrame[0: string, 1: bigint, 2: string, 3: bigint, 4: string, 5: string, 6: string, 7: bigint, 8: bigint, 9: string, 10: string, 11: string, 12: string, 13: string, 14: string, 15: string, 16: string, 17: string, 18: string, 1

In [0]:
print("=" * 80)
print("DATASET SCHEMAS")
print("=" * 80)

for dataset_name, df in raw_datasets.items():
    print(dataset_name)
    print(f"\n{dataset_name.upper()}")
    print("-" * 80)
    df.printSchema()

DATASET SCHEMAS
hrp

HRP
--------------------------------------------------------------------------------
root
 |-- code: string (nullable = true)
 |-- internalId: long (nullable = true)
 |-- startDate: string (nullable = true)
 |-- endDate: string (nullable = true)
 |-- planVersion: string (nullable = true)
 |-- categories: string (nullable = true)
 |-- locations: string (nullable = true)
 |-- years: long (nullable = true)
 |-- origRequirements: long (nullable = true)
 |-- revisedRequirements: long (nullable = true)

fts_requirements

FTS_REQUIREMENTS
--------------------------------------------------------------------------------
root
 |-- countryCode: string (nullable = true)
 |-- id: long (nullable = true)
 |-- name: string (nullable = true)
 |-- code: string (nullable = true)
 |-- startDate: string (nullable = true)
 |-- endDate: string (nullable = true)
 |-- year: long (nullable = true)
 |-- clusterCode: long (nullable = true)
 |-- cluster: string (nullable = true)
 |-- requireme

## Step 4: Row Counts and Basic Statistics

In [0]:
print("=" * 80)
print("ROW COUNTS AND BASIC STATISTICS")
print("=" * 80)

dataset_stats = {}

for dataset_name, df in raw_datasets.items():
    row_count = df.count()
    col_count = len(df.columns)
    
    dataset_stats[dataset_name] = {
        "rows": row_count,
        "columns": col_count,
        "column_names": df.columns
    }
    
    print(f"\n{dataset_name.upper()}")
    print(f"  Row count: {row_count:,}")
    print(f"  Column count: {col_count}")
    print(f"  Columns: {', '.join(df.columns)}")

ROW COUNTS AND BASIC STATISTICS

HNO
  Row count: 318,259
  Column count: 16
  Columns: country_code, adm1_code, adm1_name, adm2_code, adm2_name, adm3_code, adm3_name, sector_description, sector_cluster_code, category, population, inneed, targeted, affected, reached, meta_info

HRP
  Row count: 910
  Column count: 10
  Columns: response_code, meta_id, date_start, date_end, response_name, response_type_list, country_code_list, date_year_list, value_requirements_orig_c_usd, value_requirements_revised_c_usd

FTS_REQUIREMENTS
  Row count: 10,505
  Column count: 12
  Columns: country_code, activity_appeal_id_fts_internal, activity_appeal_name, activity_appeal_id_external, date_start, date_end, date_year, sector_cluster_code, sector_cluster_name, value_funding_required_usd, value_funding_total_usd, value_funding_pct

FTS_INCOMING
  Row count: 4,947
  Column count: 37
  Columns: date, date_year_budget, description_notes, value_funding_total_usd, org_name_funder, org_type_funder_list, country_

## Step 5: Null Count Analysis

In [0]:
print("=" * 80)
print("NULL VALUE ANALYSIS - Spark DataFrame")
print("=" * 80)

for dataset_name, df in raw_datasets.items():
    print(f"\n{dataset_name.upper()}")
    print("-" * 80)
    
    null_counts = df.select([
        count(when(isnull(col(c)), 1)).alias(c) 
        for c in df.columns
    ]).collect()[0].asDict()
    
    # Calculate percentages
    row_count = df.count()
    
    for col_name, null_count in sorted(null_counts.items(), key=lambda x: x[1], reverse=True):
        null_pct = (null_count / row_count * 100) if row_count > 0 else 0
        if null_count > 0:
            print(f"  {col_name}: {null_count:,} nulls ({null_pct:.1f}%)")
    
    # Summary
    total_nulls = sum(null_counts.values())
    print(f"  Total null values: {total_nulls:,}")

NULL VALUE ANALYSIS - Spark DataFrame

HNO
--------------------------------------------------------------------------------
  reached: 318,237 nulls (100.0%)
  affected: 315,027 nulls (99.0%)
  meta_info: 312,689 nulls (98.2%)
  adm1_code: 302,467 nulls (95.0%)
  adm1_name: 302,467 nulls (95.0%)
  population: 287,361 nulls (90.3%)
  adm3_code: 246,442 nulls (77.4%)
  adm3_name: 246,442 nulls (77.4%)
  adm2_code: 90,103 nulls (28.3%)
  adm2_name: 90,103 nulls (28.3%)
  targeted: 48,978 nulls (15.4%)
  inneed: 8,060 nulls (2.5%)
  sector_cluster_code: 5,570 nulls (1.8%)
  category: 337 nulls (0.1%)
  Total null values: 2,574,283

HRP
--------------------------------------------------------------------------------
  country_code_list: 23 nulls (2.5%)
  response_code: 1 nulls (0.1%)
  Total null values: 24

FTS_REQUIREMENTS
--------------------------------------------------------------------------------
  value_funding_pct: 3,306 nulls (31.5%)
  value_funding_required_usd: 2,480 nulls (23.

## Step 6: Distinct Values Analysis

In [0]:
print("=" * 80)
print("DISTINCT VALUES ANALYSIS (Top Columns by Cardinality)")
print("=" * 80)

for dataset_name, df in raw_datasets.items():
    print(f"\n{dataset_name.upper()}")
    print("-" * 80)
    
    distinct_counts = {}
    for col_name in df.columns:
        try:
            distinct_count = df.select(countDistinct(col(col_name))).collect()[0][0]
            distinct_counts[col_name] = distinct_count
        except:
            distinct_counts[col_name] = "ERROR"
    
    # Sort by distinct count
    sorted_cols = sorted(distinct_counts.items(), key=lambda x: x[1] if isinstance(x[1], int) else 0, reverse=True)
    
    for col_name, distinct_count in sorted_cols[:15]:  # Show top 15
        print(f"  {col_name}: {distinct_count} distinct values")

DISTINCT VALUES ANALYSIS (Top Columns by Cardinality)

HNO
--------------------------------------------------------------------------------
  inneed: 53047 distinct values
  targeted: 36128 distinct values
  population: 20165 distinct values
  affected: 3028 distinct values
  adm2_code: 2480 distinct values
  adm2_name: 2454 distinct values
  adm3_code: 1200 distinct values
  adm3_name: 1191 distinct values
  adm1_code: 144 distinct values
  adm1_name: 144 distinct values
  sector_description: 144 distinct values
  category: 130 distinct values
  country_code: 22 distinct values
  reached: 20 distinct values
  sector_cluster_code: 19 distinct values

HRP
--------------------------------------------------------------------------------
  meta_id: 910 distinct values
  response_name: 910 distinct values
  value_requirements_revised_c_usd: 906 distinct values
  response_code: 899 distinct values
  value_requirements_orig_c_usd: 857 distinct values
  date_start: 208 distinct values
  countr

## Step 7: Sample Data Preview

In [0]:
print("=" * 80)
print("SAMPLE DATA (First 5 rows per dataset)")
print("=" * 80)

for dataset_name, df in raw_datasets.items():
    print(f"\n{dataset_name.upper()}")
    print("-" * 80)
    df.limit(5).display()

SAMPLE DATA (First 5 rows per dataset)

HNO
--------------------------------------------------------------------------------


country_code,adm1_code,adm1_name,adm2_code,adm2_name,adm3_code,adm3_name,sector_description,sector_cluster_code,category,population,inneed,targeted,affected,reached,meta_info
AFG,,,,,,,Final HRP caseload,ALL,,46024352,22887726.0,16790412.0,,18100000.0,
AFG,,,,,,,Final HRP caseload,ALL,Adults,20999996,10241973.0,7737415.0,,,
AFG,,,,,,,Final HRP caseload,ALL,Boys,12241703,5927993.0,4431106.0,,,
AFG,,,,,,,Final HRP caseload,ALL,Children,24035916,12188191.0,8707494.0,,,
AFG,,,,,,,Final HRP caseload,ALL,Elderly,988441,457563.0,345502.0,,,



HRP
--------------------------------------------------------------------------------


response_code,meta_id,date_start,date_end,response_name,response_type_list,country_code_list,date_year_list,value_requirements_orig_c_usd,value_requirements_revised_c_usd
HHTI26,8278,2026-01-01,2026-12-31,Haiti Besoins Humanitaires et Plan de Réponse 2026,Humanitarian needs and response plan | cluster | fr,HTI,2026,880327426,880327426
RREG26,8215,2026-01-01,2026-12-31,Regional Migrant Response Plan for Horn of Africa to Yemen and Southern Africa 2026,sector | en | Regional response plan,YEM | KEN | DJI | SOM | ETH | TZA,2026,0,52990581
FPSE26,8267,2026-01-01,2026-12-31,Escalation of Hostilities in the OPT Flash Appeal 2026,cluster | en | Flash appeal,PSE,2026,4064305808,4064305808
HMLI26,8128,2026-01-01,2026-12-31,Mali Besoins Humanitaires et Plan de Réponse 2026,Humanitarian needs and response plan | cluster | fr,MLI,2026,0,551000000
OPAK26,8313,2026-01-01,2026-04-30,Pakistan Floods Support Plan 2026,sector | en | Other,PAK,2026,0,64943909



FTS_REQUIREMENTS
--------------------------------------------------------------------------------


country_code,activity_appeal_id_fts_internal,activity_appeal_name,activity_appeal_id_external,date_start,date_end,date_year,sector_cluster_code,sector_cluster_name,value_funding_required_usd,value_funding_total_usd,value_funding_pct
AFG,1502,Afghanistan Humanitarian Needs and Response Plan 2026,HAFG26,2026-01-01,2026-12-31,2026,26512.0,Agriculture,,2191617.0,
AFG,1502,Afghanistan Humanitarian Needs and Response Plan 2026,HAFG26,2026-01-01,2026-12-31,2026,26480.0,Coordination and support services,24700000.0,3638398.0,15.0
AFG,1502,Afghanistan Humanitarian Needs and Response Plan 2026,HAFG26,2026-01-01,2026-12-31,2026,2.0,Early Recovery,,4003302.0,
AFG,1502,Afghanistan Humanitarian Needs and Response Plan 2026,HAFG26,2026-01-01,2026-12-31,2026,3.0,Education,60040000.0,7833533.0,13.0
AFG,1502,Afghanistan Humanitarian Needs and Response Plan 2026,HAFG26,2026-01-01,2026-12-31,2026,4.0,Emergency Shelter and NFI,160285975.0,3520751.0,2.0



FTS_INCOMING
--------------------------------------------------------------------------------


date,date_year_budget,description_notes,value_funding_total_usd,org_name_funder,org_type_funder_list,country_iso3_funder_list,date_year_start_funder,date_year_end_funder,activity_appeal_name,activity_appeal_id_external,activity_appeal_id_fts_internal,org_name_impl,org_type_impl_list,sector_cluster_name_list,country_iso3_impl_list,activity_project_name,activity_project_code,crisis_name,date_year_start_impl,date_year_end_impl,financial_contribution_type,financial_contribution_type_1,financial_method,financial_direction,financial_direction_type,status_text,date_reported,date_decision,description_keywords,value_funding_total,value_funding_total_currency,financial_fx,activity_id_fts_internal,activity_code,date_created,date_updated
2026-02-04,2026.0,"Providing humanitarian aid to vulnerable people affected by disasters induced by natural hazards, human-induced crises or exceptional situations or circumstances comparable to these, which have entailed or are likely to continue entailing major loss of life, physical and psychological or social suffering or material damage",31335988,European Commission's Humanitarian Aid and Civil Protection Department,Multilateral Organizations,,2026,2026,Venezuela Regional Refugee and Migrant Response Plan (RMRP) 2026,RREG26b,1525.0,,,,"ABW,ARG,BOL,BRA,CHL,COL,CRI,CUW,DOM,ECU,GUY,MEX,PAN,PER,PRY,TTO,URY",,,,2026,2026,financial,Standard,Traditional aid,incoming,shared,pledge,2026-02-04,,,,,,376064,ECHO/-AM/BUD/2026/91000,2026-02-04,2026-02-04
2024-08-02,,IOM	Regional/OSE	2024-26,1175767,"Sweden, Government of",Governments,SWE,2024,2024,,,,International Organization for Migration,Multilateral Organizations,,"ABW,ARG,BOL,BRA,CHL,COL,CRI,CUW,DOM,ECU,GUY,MEX,PAN,PER,PRY,TTO,URY",,,VENEZUELA Outflow - Regional Refugees and Migrants,2026,2026,financial,Standard,Traditional aid,incoming,shared,paid,2024-08-02,,Multiyear,12513923.0,SEK,10.6432,317615,CD.0101	Sweden- SIDA	15798,2025-05-22,2025-05-22
2026-02-20,2022.0,Humanitarian Mine Action in Afghanistan,1923077,"Germany, Government of",Governments,DEU,2022,2022,,,,HALO Trust,NGOs,Protection - Mine Action,AFG,,,,2026,2026,financial,Standard,Traditional aid,incoming,single,pledge,2026-02-19,2022-07-01,,1825000.0,EUR,0.949,379211,S07-440.70-AFG0122,2026-02-20,2026-02-20
2026-02-17,,Afghanistan Humanitarian Needs and Response Plan 2025 Emergency Shelter and NFI,30000,Switzerland for UNHCR,Private Organizations,,2026,2026,Afghanistan Humanitarian Needs and Response Plan 2026,HAFG26,1502.0,United Nations High Commissioner for Refugees,Multilateral Organizations,Emergency Shelter and NFI,AFG,,,,2026,2026,financial,Standard,Traditional aid,incoming,single,paid,2026-02-17,2025-12-31,,,,,378994,Private sector; Period ending: 2025-12-31,2026-02-17,2026-02-17
2026-02-17,2025.0,Afghanistan Humanitarian Needs and Response Plan 2025 Early Recovery,518000,USA for UNHCR,Private Organizations,,2026,2026,Afghanistan Humanitarian Needs and Response Plan 2026,HAFG26,1502.0,United Nations High Commissioner for Refugees,Multilateral Organizations,,AFG,,,,2026,2026,financial,Pass through,Traditional aid,incoming,single,paid,2026-02-17,2025-12-31,,,,,378999,Private sector; Period ending: 2025-12-31,2026-02-17,2026-02-17



FTS_INTERNAL
--------------------------------------------------------------------------------


date,date_year_budget,description_notes,value_funding_total_usd,org_name_funder,org_type_funder_list,country_iso3_funder_list,date_year_start_funder,date_year_end_funder,activity_appeal_name,activity_appeal_id_external,activity_appeal_id_fts_internal,org_name_impl,org_type_impl_list,sector_cluster_name_list,country_iso3_impl_list,activity_project_name,activity_project_code,crisis_name,date_year_start_impl,date_year_end_impl,financial_contribution_type,financial_contribution_type_1,financial_method,financial_direction,financial_direction_type,status_text,date_reported,date_decision,description_keywords,value_funding_total,value_funding_total_currency,financial_fx,activity_id_fts_internal,activity_code,date_created,date_updated
2025-06-27,2024.0,"Réponse d'urgence en santé, nutrition, soutien psychosocial, sécurité alimentaire et eau, hygiène et assainissement pour les populations déplacées internes et les communautés hôtes affectées par la crise dans les régions du Sahel et de l'Est du Burkina Faso",212925,Première Urgence Internationale,NGOs,BFA,2026,2026,Burkina Faso Besoins Humanitaires et Plan de Réponse 2026,HBFA26,1499.0,Solidarités International,NGOs,"Food Security,Nutrition,Water Sanitation Hygiene",BFA,,,,2026,2026,financial,Pass through,Traditional aid,internal,single,commitment,2025-06-04,2025-05-15,,190994.0,EUR,0.897,347580,ECHO/-AF/BUD/2025/91035,2025-06-27,2026-02-18
2026-02-02,2026.0,"Food Security	Provide food, nutrition, and self-reliance assistance to Rohingya refugees residing in the Cox’s Bazar district of Bangladesh.",1311046,World Food Programme,Multilateral Organizations,BGD,2026,2026,,,,Agency for Technical Cooperation and Development,NGOs,Food Security,BGD,,,,2026,2026,financial,Pass through,Traditional aid,internal,single,paid,2026-02-02,2026-02-01,,156670000.0,BDT,119.5,375916,WFP/BGD/2026/ACT/MULTI/001,2026-02-02,2026-02-02
2026-02-02,2026.0,"Food Security	Provide food, nutrition, and self-reliance assistance to Rohingya refugees residing in the Cox’s Bazar district of Bangladesh.",86699,World Food Programme,Multilateral Organizations,BGD,2026,2026,,,,Agency for Technical Cooperation and Development,NGOs,Food Security,BGD,,,,2026,2026,financial,Pass through,Traditional aid,internal,single,paid,2026-02-02,2026-02-16,,10533928.0,BDT,121.5,375917,2025_25_ACT1,2026-02-02,2026-02-02
2026-02-02,2026.0,"Supporting Rohingya Refugees in Cox’s Bazar Camps Through the Provision of Site Management Support, Emergency Preparedness and Response, Community Based-Protection, Drain Cleaning and Information Management Services",589490,United Nations High Commissioner for Refugees,Multilateral Organizations,BGD,2026,2026,,,,Agency for Technical Cooperation and Development,NGOs,Multi-sector,BGD,,,,2026,2026,financial,Pass through,Traditional aid,internal,single,paid,2026-02-02,2026-01-01,,72088730.0,BDT,122.29,375918,51022Y26M265326,2026-02-02,2026-02-02
2026-02-13,2026.0,Strategic Assistance for Emergency Response in Democratic Republic of Congo (SAFER IV) (RRM),2427229,Mercy Corps,NGOs,COD,2026,2026,République Démocratique du Congo Besoins Humanitaires et Plan de Réponse 2026,HCOD26,1509.0,Agency for Technical Cooperation and Development,NGOs,Multi-sector,COD,,,,2026,2026,financial,Pass through,Traditional aid,internal,single,pledge,2026-02-13,2023-12-07,,1833934.0,GBP,0.7556,376893,91619S004,2026-02-13,2026-02-13



FTS_OUTGOING
--------------------------------------------------------------------------------


date,date_year_budget,description_notes,value_funding_total_usd,org_name_funder,org_type_funder_list,country_iso3_funder_list,date_year_start_funder,date_year_end_funder,activity_appeal_name,activity_appeal_id_external,activity_appeal_id_fts_internal,org_name_impl,org_type_impl_list,sector_cluster_name_list,country_iso3_impl_list,activity_project_name,activity_project_code,crisis_name,date_year_start_impl,date_year_end_impl,financial_contribution_type,financial_contribution_type_1,financial_method,financial_direction,financial_direction_type,status_text,date_reported,date_decision,description_keywords,value_funding_total,value_funding_total_currency,financial_fx,activity_id_fts_internal,activity_code,date_created,date_updated
2026-01-15,2026.0,United Arab Emirates Contribution to Sudan Humanitarian Fund (SUHF) 2026,5000000,"United Arab Emirates, Government of",Governments,ARE,2026,2026,Sudan Humanitarian Needs and Response Plan 2026,HSDN26,1514.0,Sudan Humanitarian Fund,Pooled Funds,,SDN,,,,2026,2026,financial,Standard,Traditional aid,outgoing,single,commitment,2026-01-20,2026-01-15,,,,,374223,OCT/ARE/2026/77941/OCHA(CBPF-OCHA)/SDN/85108 -,2026-01-29,2026-01-29
2026-02-18,2025.0,Rohingya Humanitarian Crisis Joint Response Plan Early Recovery Enhancing staffing and operational capacities in key resettlement operations and for Complementary Pathways Globally: Resettlement Operational Support component,48450,"Australia, Government of",Governments,AUS,2026,2026,Rohingya Humanitarian Crisis Joint Response Plan 2026,RBGD26,1524.0,United Nations High Commissioner for Refugees,Multilateral Organizations,Protection,BGD,,,,2026,2026,financial,Standard,Traditional aid,outgoing,single,paid,2026-02-18,2025-06-10,,75000.0,AUD,1.548,379099,25-GOV-AU-007-01,2026-02-18,2026-02-18
2026-01-20,2026.0,Health	Improved access to essential health services by the vulnerable populations in the conflict affected areas,2443601,"Australia, Government of",Governments,AUS,2026,2026,Myanmar Humanitarian Needs and Response Plan 2026,HMMR26,1505.0,Access to Health Fund,Pooled Funds,Health,MMR,,,,2026,2026,financial,Standard,Traditional aid,outgoing,single,commitment,2026-01-20,2026-01-01,,3648296.0,AUD,1.493,374227,,2026-01-28,2026-01-28
2025-12-30,2026.0,Australia Contribution to Sudan Humanitarian Fund (SUHF) 2026,9758466,"Australia, Government of",Governments,AUS,2026,2026,Sudan Humanitarian Needs and Response Plan 2026,HSDN26,1514.0,Sudan Humanitarian Fund,Pooled Funds,,SDN,,,,2026,2026,financial,Standard,Traditional aid,outgoing,single,paid,2026-01-13,2026-01-12,,14000000.0,AUD,1.493,373455,OCT/AUS/2026/77811/OCHA(CBPF-OCHA)/SDN/84944 EOL No 77338/13,2026-02-04,2026-02-04
2025-04-22,,"Promoting resilience of refugees and vulnerable host communities in Jordan, Phase II – PRO-JORDAN II",111111,"Austria, Government of",Governments,AUT,2026,2026,,,,CARE Austria,NGOs,,JOR,,,,2026,2026,financial,Standard,Traditional aid,outgoing,single,commitment,2025-06-16,,,100000.0,EUR,0.9,346745,2857-00/2024,2025-06-16,2025-06-16



POPULATION
--------------------------------------------------------------------------------


ISO3,Country,ADM1_PCODE,ADM1_NAME,ADM2_PCODE,ADM2_NAME,ADM3_PCODE,ADM3_NAME,ADM4_PCODE,ADM4_NAME,Population_group,Gender,Age_range,Age_min,Age_max,Population,Reference_year,Source,Contributor
ALB,Albania,AL01,Berat,,,,,,,F_TL,f,all,,,62842,2021,UNFPA,UNFPA
ALB,Albania,AL01,Berat,,,,,,,M_TL,m,all,,,61786,2021,UNFPA,UNFPA
ALB,Albania,AL01,Berat,,,,,,,T_TL,all,all,,,124628,2021,UNFPA,UNFPA
ALB,Albania,AL01,Berat,,,,,,,F_00_04,f,0-4,0.0,4.0,3125,2021,UNFPA,UNFPA
ALB,Albania,AL01,Berat,,,,,,,F_05_09,f,5-9,5.0,9.0,3342,2021,UNFPA,UNFPA


## Step 8: Identify Potential Join Keys

In [0]:
print("=" * 80)
print("POTENTIAL JOIN KEYS - COLUMN NAME ANALYSIS")
print("=" * 80)

# Collect all column names
all_columns = {}
for dataset_name, df in raw_datasets.items():
    all_columns[dataset_name] = set(col.lower() for col in df.columns)

# Look for common column patterns
join_key_candidates = {
    "country": ["country", "countrycode", "code", "iso3", "iso2"],
    "cluster": ["cluster", "sector", "cluster_name", "sector_name", "clustercode", "clusterid"],
    "year": ["year", "date_year", "year_start", "year_end"],
    "appeal_id": ["appeal", "appeal_id", "appeal_code", "plan_id", "activity_id"],
    "funding": ["funding", "amount", "value"]
}

print("\nJoin key candidates found:")
for key_type, search_terms in join_key_candidates.items():
    print(f"\n{key_type.upper()}:")
    for dataset_name, columns in all_columns.items():
        matching = [col for col in columns if any(term in col for term in search_terms)]
        if matching:
            print(f"  {dataset_name}: {', '.join(matching)}")

POTENTIAL JOIN KEYS - COLUMN NAME ANALYSIS

Join key candidates found:

COUNTRY:
  hno: adm2_code, adm3_code, adm1_code, sector_cluster_code, country_code
  hrp: country_code_list, response_code
  fts_requirements: sector_cluster_code, country_code
  fts_incoming: activity_code, country_iso3_funder_list, country_iso3_impl_list, activity_project_code
  fts_internal: activity_code, country_iso3_funder_list, country_iso3_impl_list, activity_project_code
  fts_outgoing: activity_code, country_iso3_funder_list, country_iso3_impl_list, activity_project_code
  population: country, adm1_pcode, adm2_pcode, adm3_pcode, iso3, adm4_pcode

CLUSTER:
  hno: sector_description, sector_cluster_code
  fts_requirements: sector_cluster_name, sector_cluster_code
  fts_incoming: sector_cluster_name_list
  fts_internal: sector_cluster_name_list
  fts_outgoing: sector_cluster_name_list

YEAR:
  hrp: date_year_list
  fts_requirements: date_year
  fts_incoming: date_year_start_funder, date_year_start_impl, date

## Step 9: Detailed Key Column Analysis

In [0]:
# Analyze specific columns that are likely join keys
print("=" * 80)
print("DETAILED KEY COLUMN VALUE ANALYSIS")
print("=" * 80)

# Function to analyze a key column
def analyze_key_column(df, col_name, dataset_name):
    """Analyze a specific column for join key potential"""
    try:
        print(f"\n{dataset_name}.{col_name}:")
        
        # Count nulls
        null_count = df.filter(isnull(col(col_name))).count()
        total_rows = df.count()
        null_pct = (null_count / total_rows * 100) if total_rows > 0 else 0
        
        print(f"  Null count: {null_count} ({null_pct:.1f}%)")
        
        # Distinct values
        distinct_count = df.select(countDistinct(col(col_name))).collect()[0][0]
        print(f"  Distinct values: {distinct_count}")
        
        # Sample values
        sample_values = df.select(col(col_name)).dropna().distinct().limit(5).collect()
        sample_strs = [str(row[0]) for row in sample_values]
        print(f"  Sample values: {', '.join(sample_strs)}")
        
    except Exception as e:
        print(f"\n{dataset_name}.{col_name}: ERROR - {str(e)}")

# Analyze country codes
print("\n" + "="*80)
print("COUNTRY CODES")
country_cols = {
    "hno": ["countryCode", "country_code", "country"],
    "hrp": ["countryCode", "country_code", "country"],
    "fts_requirements": ["countryCode"],
    "fts_incoming": ["srcLocations", "destLocations"],
    "fts_internal": [],
    "fts_outgoing": [],
    "population": ["adm0_en"]
}

for dataset_name, col_names in country_cols.items():
    if dataset_name in raw_datasets:
        df = raw_datasets[dataset_name]
        for col_name in col_names:
            if col_name in df.columns:
                analyze_key_column(df, col_name, dataset_name)

DETAILED KEY COLUMN VALUE ANALYSIS

COUNTRY CODES

hno.country_code:
  Null count: 0 (0.0%)
  Distinct values: 22
  Sample values: AFG, BFA, COD, CAF, CMR


## Step 10: Numeric Column Statistics

In [0]:
print("=" * 80)
print("NUMERIC COLUMN STATISTICS")
print("=" * 80)

from pyspark.sql.functions import min as spark_min, max as spark_max, avg, stddev

for dataset_name, df in raw_datasets.items():
    numeric_cols = [f.name for f in df.schema.fields if f.dataType.typeName() in ["int", "double", "long", "float"]]
    
    if numeric_cols:
        print(f"\n{dataset_name.upper()}")
        print("-" * 80)
        
        # Calculate stats for top numeric columns
        for col_name in numeric_cols[:10]:  # Limit to first 10
            try:
                stats = df.select(
                    spark_min(col(col_name)).alias("min"),
                    spark_max(col(col_name)).alias("max"),
                    avg(col(col_name)).alias("avg"),
                    stddev(col(col_name)).alias("stddev")
                ).collect()[0]
                
                print(f"  {col_name}:")
                print(f"    Min: {stats['min']}, Max: {stats['max']}, Avg: {stats['avg']:.2f}, StdDev: {stats['stddev']}")
            except:
                pass

NUMERIC COLUMN STATISTICS

HNO
--------------------------------------------------------------------------------
  inneed:
    Min: -33929.0, Max: 30440770.0, Avg: 28801.67, StdDev: 348936.71242865693
  targeted:
    Min: 0.122027625, Max: 20934770.0, Avg: 14431.53, StdDev: 187437.72648223522

HRP
--------------------------------------------------------------------------------
  meta_id:
    Min: 1, Max: 8324, Avg: 2903.43, StdDev: 2847.719382219176
  date_year_list:
    Min: 2000, Max: 2026, Avg: 2014.54, StdDev: 7.727747908856779
  value_requirements_orig_c_usd:
    Min: 0, Max: 6080899576, Avg: 522449664.46, StdDev: 910129648.4359435
  value_requirements_revised_c_usd:
    Min: 0, Max: 6080899576, Avg: 606642235.42, StdDev: 972216360.8257846

FTS_REQUIREMENTS
--------------------------------------------------------------------------------
  activity_appeal_id_fts_internal:
    Min: 34, Max: 1565, Avg: 686.79, StdDev: 426.69618009062674
  date_year:
    Min: 2000, Max: 2026, Avg: 2015

## Step 11: Write Raw Data to Delta Tables

In [0]:
print("=" * 80)
print("WRITING RAW DATASETS TO DELTA TABLES")
print("=" * 80)

for dataset_name, df in raw_datasets.items():
    delta_table_name = f"humanitarian.raw_{dataset_name}"
    
    try:
        # Write to Delta with overwrite mode
        df.write \
            .format("delta") \
            .mode("overwrite") \
            .option("overwriteSchema", "true") \
            .saveAsTable(delta_table_name)
        
        row_count = df.count()
        print(f"✓ {delta_table_name}: {row_count:,} rows")
        
    except Exception as e:
        print(f"✗ Failed to write {delta_table_name}: {str(e)}")

WRITING RAW DATASETS TO DELTA TABLES
✓ humanitarian.raw_hno: 318,259 rows
✓ humanitarian.raw_hrp: 910 rows
✓ humanitarian.raw_fts_requirements: 10,505 rows
✓ humanitarian.raw_fts_incoming: 4,947 rows
✓ humanitarian.raw_fts_internal: 14 rows
✓ humanitarian.raw_fts_outgoing: 1,050 rows
✓ humanitarian.raw_population: 91,471 rows


## Step 12: Verify Delta Tables

In [0]:
# List all tables in humanitarian database
print("=" * 80)
print("TABLES IN HUMANITARIAN DATABASE")
print("=" * 80)

tables = spark.sql("SHOW TABLES IN humanitarian").collect()
for row in tables:
    table_name = row['tableName']
    print(f"✓ {table_name}")

TABLES IN HUMANITARIAN DATABASE
✓ raw_fts_incoming
✓ raw_fts_internal
✓ raw_fts_outgoing
✓ raw_fts_requirements
✓ raw_hno
✓ raw_hrp
✓ raw_population


## Step 13: Summary and Recommendations

In [0]:
print("=" * 80)
print("EDA SUMMARY")
print("=" * 80)

print("\nDATASETS LOADED:")
for dataset_name, stats in dataset_stats.items():
    print(f"  • {dataset_name}: {stats['rows']:,} rows, {stats['columns']} columns")

print("\nIDENTIFIED JOIN KEYS (LIKELY):")
print("  • Country code: countryCode (common across HNO, HRP, FTS datasets)")
print("  • Cluster/Sector: clusterCode / cluster (in FTS and HNO)")
print("  • Year: year or dateYear (in most datasets)")
print("  • Appeal/Plan ID: activity_appeal_id (in FTS datasets)")

print("\nNEXT STEPS:")
print("  1. Review EDA output above to confirm join keys")
print("  2. Proceed to Notebook 02 (02_pipeline.py) to join and feature engineer")
print("  3. Key join strategy: LEFT JOIN on HRP/HNO, joining with Funding and Population by country/cluster/year")

print("\nMISSING DATA NOTES:")
print("  • Many funding columns may have nulls - will be filled with 0 in pipeline")
print("  • Country/cluster codes should be non-null for joins")
print("  • Handle multi-valued fields (e.g., destLocations) in pipeline with explode()")

EDA SUMMARY

DATASETS LOADED:
  • hno: 318,259 rows, 16 columns
  • hrp: 910 rows, 10 columns
  • fts_requirements: 10,505 rows, 12 columns
  • fts_incoming: 4,947 rows, 37 columns
  • fts_internal: 14 rows, 37 columns
  • fts_outgoing: 1,050 rows, 37 columns
  • population: 91,471 rows, 19 columns

IDENTIFIED JOIN KEYS (LIKELY):
  • Country code: countryCode (common across HNO, HRP, FTS datasets)
  • Cluster/Sector: clusterCode / cluster (in FTS and HNO)
  • Year: year or dateYear (in most datasets)
  • Appeal/Plan ID: activity_appeal_id (in FTS datasets)

NEXT STEPS:
  1. Review EDA output above to confirm join keys
  2. Proceed to Notebook 02 (02_pipeline.py) to join and feature engineer
  3. Key join strategy: LEFT JOIN on HRP/HNO, joining with Funding and Population by country/cluster/year

MISSING DATA NOTES:
  • Many funding columns may have nulls - will be filled with 0 in pipeline
  • Country/cluster codes should be non-null for joins
  • Handle multi-valued fields (e.g., dest

In [0]:
print("\n✓ Notebook 01 completed successfully!")
print("Ready for Notebook 02 (Pipeline & Feature Engineering)")


✓ Notebook 01 completed successfully!
Ready for Notebook 02 (Pipeline & Feature Engineering)
