# Cities Heat Analysis - Mapping and Visualization

This notebook generates comprehensive maps and visualizations for the cities heat analysis results.

## Map Types Generated:
- **Buildings**: Height error maps within building footprints
- **Shade**: Comparison maps (both/local-only/global-only) for building, tree, and all shade at 12pm/3pm/6pm
- **UTCI**: Difference and normalized difference maps at 12pm/3pm/6pm
- **Masked Analysis**: All above maps for specific zones (pedestrian, LULC)
- **Multi-Resolution**: 1m and 20m resolution versions

## Directory Structure:
```
maps/
├── {city}/
│   ├── building/
│   │   ├── full-area/
│   │   ├── pedestrian/
│   │   └── 20m/
│   ├── shade/
│   │   ├── full-area/
│   │   ├── pedestrian/
│   │   └── 20m/
│   └── utci/
│       ├── full-area/
│       ├── pedestrian/
│       └── 20m/
```


In [None]:
import numpy as np
import rasterio
from rasterio.windows import from_bounds, Window
from rasterio.coords import BoundingBox
from rasterio.warp import transform_bounds
from rasterio.plot import show
import matplotlib.pyplot as plt
import matplotlib.colors as mcolors
from matplotlib.colors import ListedColormap, BoundaryNorm
import seaborn as sns
from pathlib import Path
import yaml
import pandas as pd
from tqdm import tqdm
import warnings
import os
warnings.filterwarnings('ignore')

# Change to project root directory if running from notebooks folder
if Path.cwd().name == 'notebooks':
    os.chdir('..')

# Set up plotting style
plt.style.use('seaborn-v0_8')
sns.set_palette("husl")

# Configuration
MAPS_OUTPUT_DIR = Path("maps")
MAPS_OUTPUT_DIR.mkdir(exist_ok=True)

# Load city configuration
with open('config/city_config.yaml', 'r') as f:
    config = yaml.safe_load(f)

print("Available cities:", list(config.keys()))
print(f"Maps will be saved to: {MAPS_OUTPUT_DIR.absolute()}")


## Configuration


In [None]:
# Batch processing configuration
BATCH_PROCESS = True  # Set to True for faster batch processing
CITIES_TO_PROCESS = list(config.keys())  # Process all cities, or specify: ['Monterrey1', 'RiodeJaneiro']
RESOLUTIONS = ['1m', '20m']  # Resolutions to process
INCLUDE_MASKS = True  # Whether to include masked analysis

print(f"🔄 Batch processing: {'Enabled' if BATCH_PROCESS else 'Disabled'}")
print(f"🏙️  Cities to process: {CITIES_TO_PROCESS}")
print(f"📏 Resolutions: {RESOLUTIONS}")
print(f"🎭 Include masks: {'Yes' if INCLUDE_MASKS else 'No'}")


## Utility Functions


In [None]:
def get_overlap_window(src1, src2):
    """Get overlapping windows for two rasters"""
    if src1.crs != src2.crs:
        bounds2 = transform_bounds(src2.crs, src1.crs, *src2.bounds)
    else:
        bounds2 = src2.bounds
    
    overlap_bounds = BoundingBox(
        max(src1.bounds.left, bounds2.left),
        max(src1.bounds.bottom, bounds2.bottom),
        min(src1.bounds.right, bounds2.right),
        min(src1.bounds.top, bounds2.top)
    )
    
    if (overlap_bounds.right <= overlap_bounds.left) or (overlap_bounds.top <= overlap_bounds.bottom):
        raise ValueError("No overlapping region between rasters.")
    
    window1 = from_bounds(*overlap_bounds, transform=src1.transform)
    window2 = from_bounds(*overlap_bounds, transform=src2.transform)
    return window1.round_offsets(), window2.round_offsets()

def shrink_window(window, pixels):
    """Shrink window by specified pixels from all sides"""
    return Window(
        col_off=window.col_off + pixels,
        row_off=window.row_off + pixels,
        width=max(1, window.width - 2 * pixels),
        height=max(1, window.height - 2 * pixels)
    )

def classify_shade_raster(raw_data):
    """Classify shade raster from raw values to class labels"""
    classified = np.full(raw_data.shape, -1, dtype=np.int8)
    classified[raw_data == 100] = 0  # Building Shade
    classified[raw_data == 150] = 1  # Tree Shade
    classified[raw_data == 200] = 2  # No Shade
    return classified

def load_and_align_rasters(local_path, global_path, mask_path=None):
    """Load and align two rasters, optionally with a mask"""
    with rasterio.open(local_path) as src_local, rasterio.open(global_path) as src_global:
        # Check alignment
        if src_local.transform != src_global.transform or src_local.shape != src_global.shape:
            print(f"⚠️  Raster mismatch. Cropping to overlap.")
            win_local, win_global = get_overlap_window(src_local, src_global)
            win_local = shrink_window(win_local, 10)
            win_global = shrink_window(win_global, 10)
            local_data = src_local.read(1, window=win_local)
            global_data = src_global.read(1, window=win_global)
            transform = src_local.window_transform(win_local)
            crs = src_local.crs
        else:
            print(f"✅ Rasters aligned.")
            window = shrink_window(Window(0, 0, src_local.width, src_local.height), 10)
            local_data = src_local.read(1, window=window)
            global_data = src_global.read(1, window=window)
            transform = src_local.window_transform(window)
            crs = src_local.crs
    
    mask_data = None
    if mask_path:
        with rasterio.open(mask_path) as src_mask:
            # Align mask with the processed data
            if src_mask.transform != transform or src_mask.shape != local_data.shape:
                print(f"⚠️  Mask mismatch. Cropping mask.")
                # This is simplified - in practice you'd need more robust mask alignment
                mask_data = src_mask.read(1)[:local_data.shape[0], :local_data.shape[1]]
            else:
                mask_data = src_mask.read(1)
    
    return local_data, global_data, mask_data, transform, crs

def get_output_directory(city_name, map_type, resolution="1m", mask_name=None):
    """Get modular output directory path based on city, map type, resolution, and mask"""
    base_dir = MAPS_OUTPUT_DIR / city_name / map_type
    
    if resolution == "20m":
        output_dir = base_dir / "20m"
    elif mask_name:
        output_dir = base_dir / mask_name
    else:
        output_dir = base_dir / "full-area"
    
    output_dir.mkdir(parents=True, exist_ok=True)
    return output_dir

print("✅ Utility functions loaded")


## Building Height Error Maps


In [None]:
def create_building_height_error_map(city_name, city_config, resolution="1m", mask_name=None, mask_path=None):
    """Create building height error map within building footprints"""
    print(f"🏗️  Creating building height error map for {city_name} ({resolution}){' - ' + mask_name if mask_name else ''}...")
    
    # Get building data paths
    local_dsm = city_config.get('building_height_local_paths', [None])[0]
    global_dsm = city_config.get('building_height_global_paths', [None])[0]
    
    if not local_dsm or not global_dsm:
        print(f"  ⚠️  Missing building height data for {city_name}")
        return []
    
    created_maps = []
    
    try:
        # Load and align data
        local_data, global_data, mask_data, transform, crs = load_and_align_rasters(local_dsm, global_dsm, mask_path)
        
        # Calculate height error within buildings (assuming these are height data)
        # Create building footprint mask (buildings > 0m height)
        building_mask = (local_data > 0) & (global_data > 0)
        
        # Apply additional mask if provided
        if mask_data is not None:
            building_mask = building_mask & (mask_data == 1)
        
        # Calculate height error within buildings
        height_error = global_data - local_data
        height_error[~building_mask] = np.nan
        
        # Create the map
        fig, ax = plt.subplots(1, 1, figsize=(12, 10))
        
        # Define color map (red = more error, blue/green = less error)
        cmap = plt.cm.RdBu_r
        vmin, vmax = np.nanpercentile(height_error, [5, 95])
        
        im = ax.imshow(height_error, cmap=cmap, vmin=vmin, vmax=vmax)
        
        # Add colorbar
        cbar = plt.colorbar(im, ax=ax, shrink=0.8)
        cbar.set_label('Height Error (Global - Local) [m]', rotation=270, labelpad=20)
        
        # Title
        mask_suffix = f" ({mask_name})" if mask_name else ""
        ax.set_title(f'Building Height Error Map - {city_name} ({resolution}){mask_suffix}\\n(Red = Overestimation, Blue = Underestimation)', 
                    fontsize=14, fontweight='bold')
        ax.set_xlabel('Pixel X')
        ax.set_ylabel('Pixel Y')
        
        # Get output directory using modular structure
        output_dir = get_output_directory(city_name, "building", resolution, mask_name)
        
        # Save map
        mask_suffix = f"_{mask_name}" if mask_name else ""
        output_path = output_dir / f"building_height_error_{city_name}_{resolution}{mask_suffix}.png"
        plt.savefig(output_path, dpi=300, bbox_inches='tight')
        plt.close()
        
        created_maps.append(output_path)
        print(f"  ✅ Created: {output_path}")
        
    except Exception as e:
        print(f"  ❌ Error creating building height error map: {e}")
    
    return created_maps

# Process building height error maps
if BATCH_PROCESS:
    print("\\n" + "="*50)
    print("🏗️  BUILDING HEIGHT ERROR MAPS")
    print("="*50)
    
    all_building_maps = []
    for city_name in CITIES_TO_PROCESS:
        if city_name in config:
            city_config = config[city_name]
            
            # Full area maps (1m resolution)
            maps = create_building_height_error_map(city_name, city_config, "1m")
            all_building_maps.extend(maps)
            
            # Masked maps (if available)
            if INCLUDE_MASKS and 'pedestrian_mask_path' in city_config and city_config['pedestrian_mask_path']:
                maps = create_building_height_error_map(city_name, city_config, "1m", "pedestrian", city_config['pedestrian_mask_path'])
                all_building_maps.extend(maps)
            
            # 20m resolution maps (if available)
            if "20m" in RESOLUTIONS and any(key.endswith('_20m') for key in city_config.keys()):
                maps = create_building_height_error_map(city_name, city_config, "20m")
                all_building_maps.extend(maps)
    
    print(f"✅ Created {len(all_building_maps)} building height error maps")
else:
    print("⏩ Skipping building maps - set BATCH_PROCESS=True to generate")


## Shade Comparison Maps

Creates maps showing:
- **Green**: Areas shaded in both versions
- **Blue**: Areas shaded only in local (LiDAR) version  
- **Red**: Areas shaded only in global version
- **Gray**: No shade in either version


In [None]:
def create_shade_comparison_map(local_data, global_data, shade_type, time_step, city_name, 
                               resolution="1m", mask_name=None):
    """Create shade comparison map showing areas with different shade coverage"""
    
    # Create comparison categories
    # 0 = Both shaded, 1 = Local only, 2 = Global only, 3 = Neither
    comparison = np.full(local_data.shape, 3, dtype=np.int8)  # Start with "neither"
    
    if shade_type == "building":
        local_shade = (local_data == 0)  # Building shade
        global_shade = (global_data == 0)
    elif shade_type == "tree":
        local_shade = (local_data == 1)  # Tree shade
        global_shade = (global_data == 1)
    else:  # "all"
        local_shade = (local_data == 0) | (local_data == 1)  # Any shade
        global_shade = (global_data == 0) | (global_data == 1)
    
    # Set comparison categories
    comparison[local_shade & global_shade] = 0  # Both shaded
    comparison[local_shade & ~global_shade] = 1  # Local only
    comparison[~local_shade & global_shade] = 2  # Global only
    
    # Create the map
    fig, ax = plt.subplots(1, 1, figsize=(12, 10))
    
    # Define colors and labels
    colors = ['darkgreen', 'blue', 'red', 'lightgray']
    labels = ['Both Shaded', 'Local Only', 'Global Only', 'No Shade']
    cmap = ListedColormap(colors)
    bounds = [0, 1, 2, 3, 4]
    norm = BoundaryNorm(bounds, cmap.N)
    
    im = ax.imshow(comparison, cmap=cmap, norm=norm)
    
    # Add colorbar with custom labels
    cbar = plt.colorbar(im, ax=ax, shrink=0.8, ticks=[0.5, 1.5, 2.5, 3.5])
    cbar.ax.set_yticklabels(labels)
    
    # Title
    mask_suffix = f" ({mask_name})" if mask_name else ""
    ax.set_title(f'{shade_type.title()} Shade Comparison - {city_name} - {time_step} ({resolution}){mask_suffix}', 
                fontsize=14, fontweight='bold')
    ax.set_xlabel('Pixel X')
    ax.set_ylabel('Pixel Y')
    
    # Get output directory using modular structure
    output_dir = get_output_directory(city_name, "shade", resolution, mask_name)
    
    # Save map
    mask_suffix = f"_{mask_name}" if mask_name else ""
    output_path = output_dir / f"shade_{shade_type}_comparison_{city_name}_{time_step}_{resolution}{mask_suffix}.png"
    plt.savefig(output_path, dpi=300, bbox_inches='tight')
    plt.close()
    
    return output_path

def create_shade_maps_for_city(city_name, city_config, resolution="1m", mask_name=None, mask_path=None):
    """Create all shade comparison maps for a city"""
    print(f"🌳 Creating shade maps for {city_name} ({resolution}){' - ' + mask_name if mask_name else ''}...")
    
    # Select paths based on resolution
    if resolution == "20m":
        shade_local = city_config.get('shade_local_paths_20m', city_config.get('shade_local_paths', []))
        shade_global = city_config.get('shade_global_paths_20m', city_config.get('shade_global_paths', []))
    else:
        shade_local = city_config.get('shade_local_paths', [])
        shade_global = city_config.get('shade_global_paths', [])
    
    if not shade_local or not shade_global:
        print(f"  ⚠️  Missing shade data for {city_name} ({resolution})")
        return []
    
    created_maps = []
    
    # Process each time step
    for local_path, global_path in zip(shade_local, shade_global):
        time_step = Path(local_path).stem.split('_')[-2] if Path(local_path).stem.endswith('_20m') else Path(local_path).stem.split('_')[-1]
        
        try:
            # Load and align data
            local_raw, global_raw, mask_data, transform, crs = load_and_align_rasters(local_path, global_path, mask_path)
            
            # Classify shade data
            local_data = classify_shade_raster(local_raw)
            global_data = classify_shade_raster(global_raw)
            
            # Apply mask if provided
            if mask_data is not None:
                # Apply mask (only analyze pixels where mask = 1)
                local_data[mask_data != 1] = -1
                global_data[mask_data != 1] = -1
            
            # Create maps for each shade type
            shade_types = ["building", "tree", "all"]
            for shade_type in shade_types:
                output_path = create_shade_comparison_map(
                    local_data, global_data, shade_type, time_step, 
                    city_name, resolution, mask_name
                )
                created_maps.append(output_path)
                
        except Exception as e:
            print(f"  ❌ Error processing {time_step}: {e}")
    
    print(f"  ✅ Created {len(created_maps)} shade maps")
    return created_maps

# Process shade maps
if BATCH_PROCESS:
    print("\\n" + "="*50)
    print("🌳 SHADE COMPARISON MAPS")
    print("="*50)
    
    all_shade_maps = []
    for city_name in CITIES_TO_PROCESS:
        if city_name in config:
            city_config = config[city_name]
            
            # Full area maps (1m resolution)
            maps = create_shade_maps_for_city(city_name, city_config, "1m")
            all_shade_maps.extend(maps)
            
            # Masked maps (if available)
            if INCLUDE_MASKS and 'pedestrian_mask_path' in city_config and city_config['pedestrian_mask_path']:
                maps = create_shade_maps_for_city(city_name, city_config, "1m", "pedestrian", city_config['pedestrian_mask_path'])
                all_shade_maps.extend(maps)
            
            # 20m resolution maps (if available)
            if "20m" in RESOLUTIONS and any(key.endswith('_20m') for key in city_config.keys()):
                maps = create_shade_maps_for_city(city_name, city_config, "20m")
                all_shade_maps.extend(maps)
                
                # 20m masked maps
                if INCLUDE_MASKS and 'pedestrian_mask_path' in city_config and city_config['pedestrian_mask_path']:
                    maps = create_shade_maps_for_city(city_name, city_config, "20m", "pedestrian", city_config['pedestrian_mask_path'])
                    all_shade_maps.extend(maps)
    
    print(f"✅ Created {len(all_shade_maps)} shade comparison maps")
else:
    print("⏩ Skipping shade maps - set BATCH_PROCESS=True to generate")


## UTCI Difference Maps

Creates maps showing thermal comfort differences between global and local data:
- **Red**: Global UTCI higher than local
- **Blue**: Local UTCI higher than global


In [None]:
def create_utci_difference_map(local_data, global_data, time_step, city_name, 
                              resolution="1m", mask_name=None, normalized=False):
    """Create UTCI difference map"""
    
    # Calculate difference
    utci_diff = global_data - local_data
    
    if normalized:
        # Normalize by local values (avoid division by zero)
        with np.errstate(divide='ignore', invalid='ignore'):
            utci_diff = utci_diff / np.abs(local_data)
            utci_diff[~np.isfinite(utci_diff)] = 0
    
    # Create the map
    fig, ax = plt.subplots(1, 1, figsize=(12, 10))
    
    # Define color map (red = global higher, blue = local higher)
    cmap = plt.cm.RdBu_r
    vmin, vmax = np.nanpercentile(utci_diff, [5, 95])
    
    # Make colormap symmetric around zero
    abs_max = max(abs(vmin), abs(vmax))
    vmin, vmax = -abs_max, abs_max
    
    im = ax.imshow(utci_diff, cmap=cmap, vmin=vmin, vmax=vmax)
    
    # Add colorbar
    cbar = plt.colorbar(im, ax=ax, shrink=0.8)
    if normalized:
        cbar.set_label('Normalized UTCI Difference\\n(Global - Local) / |Local|', rotation=270, labelpad=20)
    else:
        cbar.set_label('UTCI Difference (Global - Local) [°C]', rotation=270, labelpad=20)
    
    # Title
    mask_suffix = f" ({mask_name})" if mask_name else ""
    diff_type = "Normalized " if normalized else ""
    ax.set_title(f'{diff_type}UTCI Difference - {city_name} - {time_step} ({resolution}){mask_suffix}\\n(Red = Global Higher, Blue = Local Higher)', 
                fontsize=14, fontweight='bold')
    ax.set_xlabel('Pixel X')
    ax.set_ylabel('Pixel Y')
    
    # Get output directory using modular structure
    output_dir = get_output_directory(city_name, "utci", resolution, mask_name)
    
    # Save map
    mask_suffix = f"_{mask_name}" if mask_name else ""
    diff_suffix = "_normalized" if normalized else ""
    output_path = output_dir / f"utci_difference{diff_suffix}_{city_name}_{time_step}_{resolution}{mask_suffix}.png"
    plt.savefig(output_path, dpi=300, bbox_inches='tight')
    plt.close()
    
    return output_path

def create_utci_maps_for_city(city_name, city_config, resolution="1m", mask_name=None, mask_path=None):
    """Create all UTCI difference maps for a city"""
    print(f"🌡️  Creating UTCI maps for {city_name} ({resolution}){' - ' + mask_name if mask_name else ''}...")
    
    # Select paths based on resolution
    if resolution == "20m":
        utci_local = city_config.get('utci_local_paths_20m', city_config.get('utci_local_paths', []))
        utci_global = city_config.get('utci_global_paths_20m', city_config.get('utci_global_paths', []))
    else:
        utci_local = city_config.get('utci_local_paths', [])
        utci_global = city_config.get('utci_global_paths', [])
    
    if not utci_local or not utci_global:
        print(f"  ⚠️  Missing UTCI data for {city_name} ({resolution})")
        return []
    
    created_maps = []
    
    # Process each time step
    for local_path, global_path in zip(utci_local, utci_global):
        time_step = Path(local_path).stem.split('_')[-2] if Path(local_path).stem.endswith('_20m') else Path(local_path).stem.split('_')[-1]
        
        try:
            # Load and align data
            local_data, global_data, mask_data, transform, crs = load_and_align_rasters(local_path, global_path, mask_path)
            
            # Apply mask if provided
            if mask_data is not None:
                # Apply mask (only analyze pixels where mask = 1)
                local_data[mask_data != 1] = np.nan
                global_data[mask_data != 1] = np.nan
            
            # Create difference map
            output_path = create_utci_difference_map(
                local_data, global_data, time_step, city_name, 
                resolution, mask_name, normalized=False
            )
            created_maps.append(output_path)
            
            # Create normalized difference map
            output_path = create_utci_difference_map(
                local_data, global_data, time_step, city_name, 
                resolution, mask_name, normalized=True
            )
            created_maps.append(output_path)
                
        except Exception as e:
            print(f"  ❌ Error processing {time_step}: {e}")
    
    print(f"  ✅ Created {len(created_maps)} UTCI maps")
    return created_maps

# Process UTCI maps
if BATCH_PROCESS:
    print("\\n" + "="*50)
    print("🌡️  UTCI DIFFERENCE MAPS")
    print("="*50)
    
    all_utci_maps = []
    for city_name in CITIES_TO_PROCESS:
        if city_name in config:
            city_config = config[city_name]
            
            # Full area maps (1m resolution)
            maps = create_utci_maps_for_city(city_name, city_config, "1m")
            all_utci_maps.extend(maps)
            
            # Masked maps (if available)
            if INCLUDE_MASKS and 'pedestrian_mask_path' in city_config and city_config['pedestrian_mask_path']:
                maps = create_utci_maps_for_city(city_name, city_config, "1m", "pedestrian", city_config['pedestrian_mask_path'])
                all_utci_maps.extend(maps)
            
            # 20m resolution maps (if available)
            if "20m" in RESOLUTIONS and any(key.endswith('_20m') for key in city_config.keys()):
                maps = create_utci_maps_for_city(city_name, city_config, "20m")
                all_utci_maps.extend(maps)
                
                # 20m masked maps
                if INCLUDE_MASKS and 'pedestrian_mask_path' in city_config and city_config['pedestrian_mask_path']:
                    maps = create_utci_maps_for_city(city_name, city_config, "20m", "pedestrian", city_config['pedestrian_mask_path'])
                    all_utci_maps.extend(maps)
    
    print(f"✅ Created {len(all_utci_maps)} UTCI difference maps")
else:
    print("⏩ Skipping UTCI maps - set BATCH_PROCESS=True to generate")


## Summary and Statistics


In [None]:
if BATCH_PROCESS:
    print("\\n" + "="*50)
    print("📊 SUMMARY STATISTICS")
    print("="*50)
    
    # Count total maps created
    total_maps = 0
    if 'all_building_maps' in locals():
        total_maps += len(all_building_maps)
        print(f"🏗️  Building maps: {len(all_building_maps)}")
    
    if 'all_shade_maps' in locals():
        total_maps += len(all_shade_maps)
        print(f"🌳 Shade maps: {len(all_shade_maps)}")
    
    if 'all_utci_maps' in locals():
        total_maps += len(all_utci_maps)
        print(f"🌡️  UTCI maps: {len(all_utci_maps)}")
    
    print(f"\\n🎯 Total maps created: {total_maps}")
    
    # Show directory structure
    print(f"\\n📁 Maps saved to: {MAPS_OUTPUT_DIR.absolute()}")
    
    # List created directories
    for city_dir in MAPS_OUTPUT_DIR.iterdir():
        if city_dir.is_dir():
            print(f"\\n📍 {city_dir.name}/")
            for map_type_dir in city_dir.iterdir():
                if map_type_dir.is_dir():
                    print(f"  ├── {map_type_dir.name}/")
                    for sub_dir in map_type_dir.iterdir():
                        if sub_dir.is_dir():
                            map_count = len(list(sub_dir.glob("*.png")))
                            print(f"  │   ├── {sub_dir.name}/ ({map_count} maps)")
    
    print("\\n✅ Batch processing complete!")
else:
    print("\\n⏩ Batch processing disabled. Set BATCH_PROCESS=True to generate all maps.")
