In [13]:
#!/usr/bin/env python3
"""
AfricaPolis Workflow Demonstration Script
========================================

This script demonstrates the core GeoWorkflow pipeline for processing PM2.5 air quality
data and AFRICAPOLIS urban boundary data. The workflow transforms raw geospatial data
into analysis-ready enriched datasets through three key stages:

1. Spatial Clipping: Extract data for specific countries from global datasets
2. Raster Alignment: Ensure all raster data shares consistent spatial properties  
3. Statistical Enrichment: Calculate zonal statistics for urban areas

Target countries: Togo, Ghana, Tanzania, Kenya
Input data: PM2.5 concentrations, AFRICAPOLIS urban boundaries
Output: Enriched urban areas with air quality statistics

Run this script from the project root directory with the geoworkflow environment activated.
"""

import sys
from pathlib import Path
import logging
from typing import List, Dict, Any
import geopandas as gpd
import rasterio

# Add project source to path for imports
project_root = Path.cwd().parent
sys.path.insert(0, str(project_root / "src"))

try:
    from geoworkflow.processors.aoi.processor import AOIProcessor
    from geoworkflow.processors.spatial.clipper import ClippingProcessor
    from geoworkflow.processors.spatial.aligner import AlignmentProcessor
    from geoworkflow.processors.integration.enrichment import StatisticalEnrichmentProcessor
    from geoworkflow.schemas.config_models import (
        AOIConfig, ClippingConfig, AlignmentConfig, StatisticalEnrichmentConfig
    )
    from geoworkflow.core.logging_setup import setup_logging
except ImportError as e:
    print(f"Error importing GeoWorkflow modules: {e}")
    print("Please ensure you're running from the project root with the correct environment activated.")
    sys.exit(1)

In [14]:
def setup_environment():
    """
    Initialize logging and verify that all required input files exist.
    This function ensures we have a clean starting point for the demonstration.
    """
    
    # Configure logging to show processing steps clearly
    logger = setup_logging(level="INFO", log_file="notebooks/demo_processing.log")
    logger.info("=== Starting AfricaPolis Workflow Demonstration ===")
    
    # Define file paths based on the project structure
    input_paths = {
        "boundaries": project_root / "data" / "00_source" / "boundaries" / "africa_boundaries.geojson",
        "africapolis": project_root / "data" / "01_extracted" / "AFRICAPOLIS2020.geojson", 
        "pm25_data": project_root / "data" / "01_extracted" / "pm25" / "V6GL02_04_CNNPM25_GL_202201-202212_corrected.tif"
    }
    
    # Verify all required input files are available
    missing_files = []
    for file_type, file_path in input_paths.items():
        if not file_path.exists():
            missing_files.append(f"{file_type}: {file_path}")
    
    if missing_files:
        logger.error("Missing required input files:")
        for missing in missing_files:
            logger.error(f"  - {missing}")
        raise FileNotFoundError("Cannot proceed without required input data")
    
    logger.info("All required input files found - ready to begin processing")
    return logger, input_paths

logger, input_paths = setup_environment()

2025-09-10 19:21:02,677 - geoworkflow - INFO - === Starting AfricaPolis Workflow Demonstration ===
2025-09-10 19:21:02,681 - geoworkflow - INFO - All required input files found - ready to begin processing


In [None]:
def create_country_aoi(countries: List[str], input_paths: Dict[str, Path], logger) -> Path:
    """
    Create an Area of Interest (AOI) polygon that encompasses the specified countries.
    This AOI will be used to clip global datasets to our region of interest, making
    subsequent processing faster and more focused.
    
    The AOI creation process:
    - Filters the Africa boundaries file to our target countries
    - Dissolves individual country boundaries into a single polygon
    - Adds a small buffer to ensure we capture edge effects
    - Saves the result for use in clipping operations
    """
    
    logger.info(f"Creating AOI for countries: {', '.join(countries)}")
    
    # Define output path for the combined AOI
    aoi_output = project_root / "data" / "aoi" / "demo_countries_aoi.geojson"
    aoi_output.parent.mkdir(parents=True, exist_ok=True)
    
    try:
        aoi_config = AOIConfig(
            input_file="../data/00_source/boundaries/africa_boundaries.geojson",
            country_name_column="NAME_0",  # country name column in geojson above
            countries=["Togo", "Ghana", "Tanzania", "Kenya","Nigeria"],
            dissolve_boundaries=True,  # Combine all countries into single polygon
            buffer_km=50,  # Small buffer to ensure complete data capture
            output_file=aoi_output
        )
        
        processor = AOIProcessor(aoi_config)
        result = processor.process()
        
        if not result.success:
            raise RuntimeError(f"AOI creation failed: {result.message}")
            
        logger.info(f"Successfully created AOI with {result.metadata.get('feature_count', 'unknown')} features")
        logger.info(f"AOI saved to: {aoi_output}")
        
        return aoi_output
        
    except Exception as e:
        logger.error(f"Failed to create AOI: {e}")
        raise

# Define our target countries for the demonstration
target_countries = ["Togo", "Ghana", "Tanzania", "Kenya","Nigeria"]
aoi_file = create_country_aoi(target_countries, input_paths, logger)

2025-09-10 19:28:12,109 - geoworkflow - INFO - Creating AOI for countries: Togo, Ghana, Tanzania, Kenya, Nigeria
2025-09-10 19:28:12,111 - geoworkflow.AOIProcessor - INFO - Starting AOIProcessor processing


Output()

2025-09-10 19:28:15,082 - geoworkflow.AOIProcessor - INFO - Loading administrative boundaries


2025-09-10 19:28:18,019 - geoworkflow.AOIProcessor - INFO - Filtering 5 countries
2025-09-10 19:28:18,022 - geoworkflow.AOIProcessor - INFO - Dissolving country boundaries into single polygon


2025-09-10 19:28:19,787 - geoworkflow.AOIProcessor - INFO - Applying 50.0 km buffer
2025-09-10 19:28:19,788 - geoworkflow.AOIProcessor - INFO - Reprojecting from EPSG:4326 to ESRI:102022 for buffering


2025-09-10 19:28:19,833 - geoworkflow.AOIProcessor - INFO - Applying 50.0km (50000.0m) buffer


2025-09-10 19:28:30,637 - geoworkflow.AOIProcessor - INFO - Reprojecting back to EPSG:4326
2025-09-10 19:28:30,639 - geoworkflow.AOIProcessor - INFO - Saving AOI to /Users/juancheeto/Library/CloudStorage/Box-Box/UrbanStructureStudies/AfricaProject/geospatial-workflow-project/data/aoi/demo_countries_aoi.geojson


2025-09-10 19:28:30,658 - geoworkflow.utils.progress_utils - INFO - AOI saved successfully completed: 4/7 items in 15.6s (0.3 items/sec)
2025-09-10 19:28:30,659 - geoworkflow.AOIProcessor - INFO - Successfully completed AOIProcessor processing
2025-09-10 19:28:30,660 - geoworkflow - INFO - Successfully created AOI with 1 features
2025-09-10 19:28:30,660 - geoworkflow - INFO - AOI saved to: /Users/juancheeto/Library/CloudStorage/Box-Box/UrbanStructureStudies/AfricaProject/geospatial-workflow-project/data/aoi/demo_countries_aoi.geojson


In [None]:
def clip_all_extracted_data(aoi_file: Path, input_paths: Dict[str, Path], logger) -> Dict[str, Path]:
    """
    Clip all extracted data (both PM2.5 and AFRICAPOLIS) in one operation using recursive processing.
    
    This demonstrates the power of GeoWorkflow's unified ClippingProcessor:
    - Automatically discovers all raster and vector files in the extracted data directory
    - Recursively searches through subdirectories (finds pm25/*.tif and *.geojson)
    - Handles different data types (raster vs vector) with the same configuration
    - Applies the same AOI clipping to all discovered datasets
    - Maintains directory structure in the output
    """
    
    logger.info("Clipping all extracted data using recursive directory processing")
    
    # Set up output directory for all clipped data
    clipped_output_dir = project_root / "data" / "02_clipped" / "demo_all_data"
    clipped_output_dir.mkdir(parents=True, exist_ok=True)
    
    try:
        clipping_config = ClippingConfig(
            input_directory=project_root / "data" / "01_extracted",
            aoi_file=aoi_file,
            output_dir=clipped_output_dir,
            all_touched=True,  # Include pixels/features that partially overlap the AOI
            create_visualizations=False  # Skip visualization for faster processing
        )
        
        processor = ClippingProcessor(clipping_config)
        result = processor.process()
        
        if not result.success:
            raise RuntimeError(f"Data clipping failed: {result.message}")
            
        logger.info(f"Successfully clipped {result.metadata.get('processed_count', 'unknown')} files")
        
        # Look for clipped files more flexibly
        pm25_files = list(clipped_output_dir.rglob("*PM25*.tif"))
        africapolis_files = list(clipped_output_dir.rglob("*AFRICAPOLIS*.geojson"))
        
        clipped_files = {
            "pm25": pm25_files[0] if pm25_files else None,
            "africapolis": africapolis_files[0] if africapolis_files else None
        }
        
        if not clipped_files["pm25"]:
            logger.warning("No clipped PM2.5 file found")
        if not clipped_files["africapolis"]:
            logger.warning("No clipped AFRICAPOLIS file found")
            
        return clipped_files
        
    except Exception as e:
        logger.error(f"Failed to clip extracted data: {e}")
        raise

# Step 2: Clip PM2.5 data to our AOI
clipped_pm25_file = clip_all_extracted_data(aoi_file, input_paths, logger)


2025-09-10 18:29:43,586 - geoworkflow - INFO - Clipping all extracted data using recursive directory processing
2025-09-10 18:29:43,588 - geoworkflow - ERROR - Failed to clip extracted data: 1 validation error for ClippingConfig
raster_pattern
  Input should be a valid string [type=string_type, input_value=None, input_type=NoneType]
    For further information visit https://errors.pydantic.dev/2.11/v/string_type


ValidationError: 1 validation error for ClippingConfig
raster_pattern
  Input should be a valid string [type=string_type, input_value=None, input_type=NoneType]
    For further information visit https://errors.pydantic.dev/2.11/v/string_type

In [None]:
def align_raster_data(clipped_pm25_file: Path, logger) -> Path:
    """
    Align the PM2.5 raster data to ensure consistent spatial properties.
    
    Raster alignment is crucial because:
    - Different data sources often have slightly different grid systems
    - Misaligned grids cause errors in zonal statistics calculations
    - Alignment ensures pixel boundaries match exactly between datasets
    - Standardized grids enable accurate spatial analysis and comparison
    
    This step resamples the data to a consistent grid while preserving data quality.
    """
    
    logger.info("Aligning PM2.5 raster data to standard grid")
    
    # Set up output directory for aligned data
    aligned_output_dir = project_root / "data" / "03_processed" / "demo_aligned"
    aligned_output_dir.mkdir(parents=True, exist_ok=True)
    
    try:
        alignment_config = AlignmentConfig(
            input_directory=clipped_pm25_file.parent,
            reference_file=clipped_pm25_file,  # Use PM2.5 as its own reference
            output_dir=aligned_output_dir,
            resampling_method="bilinear",  # Good balance of speed and accuracy for continuous data
            file_pattern="*.tif"
        )
        
        processor = AlignmentProcessor(alignment_config)
        result = processor.process()
        
        if not result.success:
            raise RuntimeError(f"Raster alignment failed: {result.message}")
            
        logger.info(f"Successfully aligned {result.metadata.get('processed_count', 'unknown')} raster files")
        
        # Return path to the aligned PM2.5 file
        aligned_pm25_file = aligned_output_dir / clipped_pm25_file.name
        return aligned_pm25_file
        
    except Exception as e:
        logger.error(f"Failed to align raster data: {e}")
        raise



In [None]:
def enrich_urban_areas_with_pm25_stats(aligned_pm25_file: Path, clipped_africapolis_file: Path, logger) -> Path:
    """
    Calculate zonal statistics to enrich urban areas with PM2.5 air quality metrics.
    
    This enrichment process:
    - Calculates statistical summaries (mean, max, min, median) of PM2.5 concentrations
      for each urban area polygon in the AFRICAPOLIS dataset
    - Adds these statistics as new columns to the urban area attribute table
    - Enables analysis of air quality patterns across different cities and urban forms
    - Creates analysis-ready data that can be used for comparative studies
    
    The resulting dataset combines urban boundary geometries with quantitative
    air quality metrics, supporting research on urban environmental conditions.
    """
    
    logger.info("Enriching urban areas with PM2.5 statistics")
    
    # Set up output directory for enriched data
    enriched_output_dir = project_root / "data" / "04_analysis_ready"
    enriched_output_dir.mkdir(parents=True, exist_ok=True)
    
    try:
        enrichment_config = StatisticalEnrichmentConfig(
            coi_directory=clipped_africapolis_file.parent,
            coi_pattern="*africapolis*",  # Pattern to identify our urban boundary file
            raster_directory=aligned_pm25_file.parent,
            raster_pattern="*.tif",
            output_file=enriched_output_dir / "demo_enriched_urban_areas.geojson",
            statistics=["mean", "max", "min", "median"],  # Key statistical measures
            add_area_column=True,  # Include urban area size for analysis
            area_units="km2"
        )
        
        processor = StatisticalEnrichmentProcessor(enrichment_config)
        result = processor.process()
        
        if not result.success:
            raise RuntimeError(f"Statistical enrichment failed: {result.message}")
            
        logger.info(f"Successfully enriched {result.metadata.get('original_features', 'unknown')} urban areas")
        logger.info(f"Added {result.metadata.get('new_columns_added', 'unknown')} statistical columns")
        
        enriched_file = enrichment_config.output_file
        logger.info(f"Enriched data saved to: {enriched_file}")
        
        return enriched_file
        
    except Exception as e:
        logger.error(f"Failed to enrich urban areas with statistics: {e}")
        raise

In [10]:

def examine_results(enriched_file: Path, logger):
    """
    Examine the final enriched dataset to understand what we've accomplished.
    
    This examination:
    - Loads the enriched dataset and displays basic information
    - Shows the new statistical columns that were added
    - Provides sample data to illustrate the enrichment results
    - Demonstrates how to access the air quality metrics for analysis
    """
    
    logger.info("Examining enriched dataset results")
    
    try:
        # Load the enriched dataset
        enriched_gdf = gpd.read_file(enriched_file)
        
        logger.info(f"Final dataset contains {len(enriched_gdf)} urban areas")
        logger.info(f"Dataset columns: {list(enriched_gdf.columns)}")
        
        # Identify the new statistical columns (those containing our raster name)
        pm25_columns = [col for col in enriched_gdf.columns if 'pm25' in col.lower() or 'cnn' in col.lower()]
        
        if pm25_columns:
            logger.info(f"PM2.5 statistical columns added: {pm25_columns}")
            
            # Show sample statistics
            for col in pm25_columns:
                if enriched_gdf[col].dtype in ['float64', 'float32', 'int64', 'int32']:
                    logger.info(f"{col}: min={enriched_gdf[col].min():.2f}, "
                              f"max={enriched_gdf[col].max():.2f}, "
                              f"mean={enriched_gdf[col].mean():.2f}")
        
        # Show a sample of the enriched data
        logger.info("Sample of enriched urban areas:")
        sample_cols = ['ISO', 'COUNTRY_NA'] + pm25_columns[:2] + ['area_km2']
        available_cols = [col for col in sample_cols if col in enriched_gdf.columns]
        
        if available_cols:
            sample_data = enriched_gdf[available_cols].head(3)
            for idx, row in sample_data.iterrows():
                logger.info(f"  Urban area {idx}: " + 
                          ", ".join([f"{col}={row[col]}" for col in available_cols]))
        
        logger.info("=== Workflow demonstration completed successfully! ===")
        
    except Exception as e:
        logger.error(f"Failed to examine results: {e}")
        raise

aligned_pm25_file = align_raster_data(clipped_pm25_file, logger)


NameError: name 'align_raster_data' is not defined

In [None]:
"""
Execute the complete AfricaPolis workflow demonstration.

This main function orchestrates the entire data processing pipeline:
1. Sets up the processing environment and verifies input data
2. Creates an Area of Interest for our target countries  
3. Clips global datasets to our region of interest
4. Aligns raster data to ensure spatial consistency
5. Enriches urban areas with air quality statistics
6. Examines the final results

Each step builds on the previous ones, demonstrating how raw geospatial
data transforms into analysis-ready enriched datasets.
"""

# Define our target countries for the demonstration
target_countries = ["Togo", "Ghana", "Tanzania", "Kenya"]

try:
    # Initialize the processing environment
    logger, input_paths = setup_environment()
    
    # Step 1: Create Area of Interest for target countries
    aoi_file = create_country_aoi(target_countries, input_paths, logger)
    
    # Step 2: Clip PM2.5 data to our AOI
    clipped_pm25_file = clip_all_extracted_data(aoi_file, input_paths, logger)
    
    # Step 3: Clip AFRICAPOLIS data to our AOI
    clipped_africapolis_file = clip_africapolis_data(aoi_file, input_paths, logger)
    
    # Step 4: Align raster data for consistent processing
    aligned_pm25_file = align_raster_data(clipped_pm25_file, logger)
    
    # Step 5: Enrich urban areas with PM2.5 statistics
    enriched_file = enrich_urban_areas_with_pm25_stats(aligned_pm25_file, clipped_africapolis_file, logger)
    
    # Step 6: Examine the final results
    examine_results(enriched_file, logger)
    
    print("\n🎉 Demonstration completed successfully!")
    print(f"📁 Check the enriched results at: {enriched_file}")
    print("📊 The urban areas now include PM2.5 air quality statistics")
    print("🔍 Review the log file at: notebooks/demo_processing.log")
    
except Exception as e:
    print(f"\n❌ Demonstration failed: {e}")
    print("💡 Check the log file for detailed error information")
    print("🤖 Try asking Claude: 'My workflow failed with this error: [paste error message]'")

    
