# Fetch Kolada Data

This notebook fetches actual data values from the Kolada API for specified KPIs, municipalities, and years.

**API Endpoints:**
- `http://api.kolada.se/v2/data/kpi/{kpi}/municipality/{municipality}/year/{year}`
- `http://api.kolada.se/v2/oudata/kpi/{kpi}/ou/{ou}/year/{year}`

**Parameters to customize:**
- KPI IDs (comma-separated)
- Municipality IDs (comma-separated)
- Years (comma-separated)

**Output:** Kolada data tables in Lakehouse

In [None]:
import requests
import json
import pandas as pd
from datetime import datetime
import time
from typing import List, Optional

In [None]:
# Configuration
API_BASE_URL = "http://api.kolada.se/v2"
PER_PAGE = 5000
BATCH_SIZE = 10

# Default years
YEARS = [str(y) for y in range(2010, 2026)]
OU_YEARS = [str(y) for y in range(2010, 2026)]

# Read all KPI IDs from Lakehouse
df_kpi_dim = spark.table("dKpi").toPandas()
KPI_IDS = df_kpi_dim["id"].tolist()
print(f"Loaded {len(KPI_IDS)} KPIs from dKpi")

# Filter OU KPIs (has_ou_data == True or 1)
OU_KPI_IDS = df_kpi_dim[df_kpi_dim["has_ou_data"] == 1]["id"].tolist()
print(f"Loaded {len(OU_KPI_IDS)} OU KPIs from dKpi")

# Leave empty for all municipalities/OUs
MUNICIPALITY_IDS = []
OU_IDS = []

In [None]:
def fetch_data_paginated(url: str) -> List[dict]:
    """
    Fetch data from Kolada API with pagination support.
    
    Args:
        url: Initial API URL
        
    Returns:
        List of data objects
    """
    all_data = []
    page_count = 0
    
    while url:
        try:
            response = requests.get(url, timeout=30)
            response.raise_for_status()
            
            data = response.json()
            
            if 'values' in data:
                all_data.extend(data['values'])
                page_count += 1
                print(f"  Page {page_count}: Retrieved {len(data['values'])} items (Total: {len(all_data)})")
            
            # Check for next page
            url = data.get('next_page', None)
            
            # Be nice to the API
            if url:
                time.sleep(0.5)
                
        except requests.exceptions.RequestException as e:
            print(f"  Error fetching data: {e}")
            break
    
    return all_data

In [None]:
def fetch_kolada_data(kpi_ids: List[str], municipality_ids: Optional[List[str]], years: List[str]) -> pd.DataFrame:
    """
    Fetch Kolada data for specified parameters.
    
    Args:
        kpi_ids: List of KPI IDs
        municipality_ids: List of municipality IDs (None for all)
        years: List of years
        
    Returns:
        DataFrame with all fetched data
    """
    all_records = []
    
    # Build URL based on parameters
    kpi_param = ','.join(kpi_ids)
    year_param = ','.join(years)
    
    if municipality_ids and len(municipality_ids) > 0:
        municipality_param = ','.join(municipality_ids)
        url = f"{API_BASE_URL}/data/kpi/{kpi_param}/municipality/{municipality_param}/year/{year_param}?per_page={PER_PAGE}"
    else:
        url = f"{API_BASE_URL}/data/kpi/{kpi_param}/year/{year_param}?per_page={PER_PAGE}"
    
    print(f"Fetching data from: {url}")
    
    data = fetch_data_paginated(url)
    
    # Flatten the data structure
    for item in data:
        kpi = item.get('kpi')
        municipality = item.get('municipality')
        period = item.get('period')
        
        # Each item can have multiple values (by gender)
        if 'values' in item:
            for value_item in item['values']:
                record = {
                    'kpi': kpi,
                    'municipality': municipality,
                    'period': period,
                    'gender': value_item.get('gender'),
                    'value': value_item.get('value'),
                    'count': value_item.get('count'),
                    'status': value_item.get('status'),
                    'ingestion_timestamp': datetime.now(),
                    'source_system': 'Kolada API',
                    'ou_id': f"NO_OU_{municipality}",  # Placeholder for municipality-level data
                }
                all_records.append(record)
    
    if all_records:
        return pd.DataFrame(all_records)
    else:
        return pd.DataFrame()

In [None]:
def fetch_ou_data(kpi_ids: List[str], ou_ids: Optional[List[str]], years: List[str]) -> pd.DataFrame:
    """
    Fetch Kolada organizational unit data for specified parameters.
    
    Args:
        kpi_ids: List of KPI IDs
        ou_ids: List of OU IDs (None for all)
        years: List of years
        
    Returns:
        DataFrame with all fetched data
    """
    all_records = []
    
    # Build URL based on parameters
    kpi_param = ','.join(kpi_ids)
    year_param = ','.join(years)
    
    if ou_ids and len(ou_ids) > 0:
        ou_param = ','.join(ou_ids)
        url = f"{API_BASE_URL}/oudata/kpi/{kpi_param}/ou/{ou_param}/year/{year_param}?per_page={PER_PAGE}"
    else:
        url = f"{API_BASE_URL}/oudata/kpi/{kpi_param}/year/{year_param}?per_page={PER_PAGE}"
    
    print(f"Fetching OU data from: {url}")
    
    data = fetch_data_paginated(url)
    
    # Flatten the data structure
    for item in data:
        kpi = item.get('kpi')
        ou = item.get('ou')
        period = item.get('period')
        
        # Each item can have multiple values (by gender)
        if 'values' in item:
            for value_item in item['values']:
                record = {
                    'kpi': kpi,
                    'ou_id': ou,
                    'municipality': None,  # Will be looked up from dOrganizationalUnit
                    'period': period,
                    'gender': value_item.get('gender'),
                    'value': value_item.get('value'),
                    'count': value_item.get('count'),
                    'status': value_item.get('status'),
                    'ingestion_timestamp': datetime.now(),
                    'source_system': 'Kolada API'
                }
                all_records.append(record)
    
    if all_records:
        return pd.DataFrame(all_records)
    else:
        return pd.DataFrame()

In [None]:
# Fetch municipality data in batches
print("="*60)
print("Fetching Municipality Data")
print("="*60)
print(f"Total KPIs: {len(KPI_IDS)}")
print(f"Municipalities: {MUNICIPALITY_IDS if MUNICIPALITY_IDS else 'All'}")
print(f"Years: {YEARS}")
print(f"Batch size: {BATCH_SIZE}")
print()

# Check what's already been fetched
try:
    df_existing = spark.table("fKoladaData").select("kpi").distinct().toPandas()
    fetched_kpis = set(df_existing["kpi"].tolist())
    print(f"Already fetched {len(fetched_kpis)} KPIs in fKoladaData")
except:
    fetched_kpis = set()
    print("No existing fKoladaData table found, starting fresh")

# Filter out already-fetched KPIs
remaining_kpis = [k for k in KPI_IDS if k not in fetched_kpis]
print(f"Remaining KPIs to fetch: {len(remaining_kpis)} of {len(KPI_IDS)}")

for i in range(0, len(remaining_kpis), BATCH_SIZE):
    batch = remaining_kpis[i:i+BATCH_SIZE]
    print(f"\nBatch {i//BATCH_SIZE + 1}: Fetching {len(batch)} KPIs ({batch[:3]}...)")
    
    df_batch = fetch_kolada_data(batch, MUNICIPALITY_IDS if MUNICIPALITY_IDS else None, YEARS)
    
    if not df_batch.empty:
        spark_df = spark.createDataFrame(df_batch)
        spark_df.write.mode("append").format("delta").saveAsTable("fKoladaData")
        print(f"  ✓ Wrote {len(df_batch)} rows to fKoladaData")

## Fetch Organizational Unit Data

OU data is now enabled by default and will be merged with municipality data into a unified fact table.

In [None]:
# Fetch OU data in batches
print("="*60)
print("Fetching Organizational Unit Data")
print("="*60)
print(f"Total OU KPIs: {len(OU_KPI_IDS)}")
print(f"OUs: {OU_IDS if OU_IDS else 'All'}")
print(f"Years: {OU_YEARS}")
print(f"Batch size: {BATCH_SIZE}")
print()

# Check which OU KPIs have already been fetched
try:
    df_existing_ou = (
        spark.table("fKoladaData")
        .filter("ou_id NOT LIKE 'NO_OU_%'")
        .select("kpi").distinct().toPandas()
    )
    fetched_ou_kpis = set(df_existing_ou["kpi"].tolist())
    print(f"Already fetched {len(fetched_ou_kpis)} OU KPIs")
except:
    fetched_ou_kpis = set()
    print("No existing OU data found")

remaining_ou_kpis = [k for k in OU_KPI_IDS if k not in fetched_ou_kpis]
print(f"Remaining OU KPIs to fetch: {len(remaining_ou_kpis)} of {len(OU_KPI_IDS)}")

# Load OU-to-municipality lookup once
try:
    df_ou_dim = spark.table("dOrganizationalUnit").toPandas()
    ou_to_muni = df_ou_dim.set_index('id')['municipality'].to_dict()
except:
    ou_to_muni = {}
    print("⚠ Could not load dOrganizationalUnit for municipality lookup")

for i in range(0, len(remaining_ou_kpis), BATCH_SIZE):
    batch = remaining_ou_kpis[i:i+BATCH_SIZE]
    print(f"\nBatch {i//BATCH_SIZE + 1}: Fetching {len(batch)} OU KPIs ({batch[:3]}...)")
    
    df_batch = fetch_ou_data(batch, OU_IDS if OU_IDS else None, OU_YEARS)
    
    if not df_batch.empty:
        if ou_to_muni:
            df_batch['municipality'] = df_batch['ou_id'].map(ou_to_muni)
        
        spark_df = spark.createDataFrame(df_batch)
        spark_df.write.mode("append").format("delta").saveAsTable("fKoladaData")
        print(f"  ✓ Wrote {len(df_batch)} rows to fKoladaData")

In [None]:
# Add surrogate keys from dimension tables (post-processing)
try:
    df_fact = spark.table("fKoladaData").toPandas()
    
    if not df_fact.empty:
        print("\nAdding surrogate keys...")
        
        # Check if surrogate keys already exist
        has_keys = all(col in df_fact.columns for col in ['kpi_key', 'municipality_key', 'ou_key'])
        
        if not has_keys:
            # Load dimension tables
            df_kpi_dim = spark.table("dKpi").toPandas()
            df_muni_dim = spark.table("dMunicipality").toPandas()
            df_ou_dim = spark.table("dOrganizationalUnit").toPandas()
            
            # Create lookup dictionaries
            kpi_lookup = df_kpi_dim.set_index('id')['kpi_key'].to_dict()
            muni_lookup = df_muni_dim.set_index('id')['municipality_key'].to_dict()
            ou_lookup = df_ou_dim.set_index('id')['ou_key'].to_dict()
            
            # Add surrogate keys
            df_fact['kpi_key'] = df_fact['kpi'].map(kpi_lookup)
            df_fact['municipality_key'] = df_fact['municipality'].map(muni_lookup)
            df_fact['ou_key'] = df_fact['ou_id'].map(ou_lookup)
            
            print(f"  ✓ Added kpi_key (matched {df_fact['kpi_key'].notna().sum()}/{len(df_fact)} rows)")
            print(f"  ✓ Added municipality_key (matched {df_fact['municipality_key'].notna().sum()}/{len(df_fact)} rows)")
            print(f"  ✓ Added ou_key (matched {df_fact['ou_key'].notna().sum()}/{len(df_fact)} rows)")
            
            # Write back with overwrite to replace with enriched data
            spark_df = spark.createDataFrame(df_fact)
            spark_df.write.mode("overwrite").format("delta").saveAsTable("fKoladaData")
            print(f"\n✓ Updated fKoladaData with surrogate keys")
        else:
            print("  Surrogate keys already exist, skipping enrichment")
    else:
        print("No data in fKoladaData table")
        
except Exception as e:
    print(f"  ⚠ Could not add surrogate keys: {e}")
    print("  Make sure notebooks 01 and 02 have been run to create dimension tables")

In [None]:
# Data quality summary for unified fact table
try:
    df_fact = spark.table("fKoladaData").toPandas()
    
    if not df_fact.empty:
        print("\n" + "="*60)
        print("UNIFIED FACT TABLE SUMMARY")
        print("="*60)
        
        print(f"\nTotal data points: {len(df_fact)}")
        
        # Count by data source (based on ou_id pattern)
        municipality_count = df_fact['ou_id'].str.startswith('NO_OU_').sum()
        ou_count = len(df_fact) - municipality_count
        print(f"\nData points by source:")
        print(f"  Municipality-level: {municipality_count}")
        print(f"  OU-level: {ou_count}")
        
        print(f"\nData points by KPI:")
        print(df_fact['kpi'].value_counts())
        
        print(f"\nData points by period:")
        print(df_fact['period'].value_counts().sort_index())
        
        print(f"\nData points by gender:")
        print(df_fact['gender'].value_counts())
        
        print(f"\nNull values in value column:")
        print(df_fact['value'].isnull().sum(), "out of", len(df_fact))
        
        # Check if surrogate keys exist
        if 'kpi_key' in df_fact.columns:
            print(f"\nSurrogate key coverage:")
            print(f"  kpi_key: {df_fact['kpi_key'].notna().sum()}/{len(df_fact)}")
            print(f"  municipality_key: {df_fact['municipality_key'].notna().sum()}/{len(df_fact)}")
            print(f"  ou_key: {df_fact['ou_key'].notna().sum()}/{len(df_fact)}")
        
        print("\nSample of unified fact table:")
        display(df_fact.head(10))
    else:
        print("No data in fKoladaData table")
        
except Exception as e:
    print(f"Could not load fKoladaData table: {e}")