# Clean City Heat Vulnerability Analysis with WorldPop Data

Advanced demographic analysis using WorldPop age/sex data to calculate heat vulnerability ratios and comprehensive age distribution statistics for cities.

In [1]:
import ee
import folium
import plotly.graph_objects as go
import pandas as pd
import numpy as np
import ipywidgets as widgets
from IPython.display import display, clear_output

# Initialize GEE
ee.Initialize(project='tl-cities')
print('✅ GEE initialized')

✅ GEE initialized


In [2]:
# Load datasets
allCities = ee.FeatureCollection('projects/tl-cities/assets/GHS_UCDB_THEME_HAZARD_RISK_GLOBE_R2024A')
worldpopCollection = ee.ImageCollection('WorldPop/GP/100m/pop_age_sex')

print(f"Total cities: {allCities.size().getInfo()}")
print(f"WorldPop images: {worldpopCollection.size().getInfo()}")

# Get available years
years = worldpopCollection.aggregate_array('year').distinct().sort().getInfo()
print(f"Available years: {years}")

Total cities: 11422
WorldPop images: 243
Available years: [2020]


## Country and City Selection Functions

In [3]:
def get_countries():
    """Get all available countries"""
    countries = allCities.aggregate_array('GC_CNT_GAD').distinct().sort().getInfo()
    return [c for c in countries if c and c != 'Unknown']

def get_cities_by_country(country_name, limit=100):
    """Get cities for a specific country with population data"""
    print(f"🔍 Searching for cities in {country_name}...")
    
    country_cities = allCities.filter(ee.Filter.eq('GC_CNT_GAD', country_name))
    country_cities_sorted = country_cities.sort('GC_POP_TOT', False).limit(limit)
    
    try:
        city_info = country_cities_sorted.getInfo()
        city_list = []
        processed_names = set()
        
        for feature in city_info['features']:
            props = feature['properties']
            city_name = props.get('GC_UCN_MAI', 'Unknown')
            population = props.get('GC_POP_TOT', 0)
            
            if not city_name or city_name == 'Unknown' or city_name in processed_names:
                continue
            
            try:
                if population is not None and population != '':
                    pop_value = float(population)
                    if pop_value > 0:
                        display_name = f"{city_name} (Pop: {pop_value:,.0f})"
                        city_list.append((display_name, city_name, pop_value))
                        processed_names.add(city_name)
                else:
                    display_name = f"{city_name} (Pop: N/A)"
                    city_list.append((display_name, city_name, 0))
                    processed_names.add(city_name)
            except (ValueError, TypeError):
                display_name = f"{city_name} (Pop: N/A)"
                city_list.append((display_name, city_name, 0))
                processed_names.add(city_name)
        
        city_list.sort(key=lambda x: (x[2] == 0, -x[2]))
        print(f"✅ Found {len(city_list)} cities")
        
        return [(display, name) for display, name, pop in city_list]
        
    except Exception as e:
        print(f"❌ Error getting cities: {e}")
        return []

# Load countries
countries = get_countries()
print(f"📍 Loaded {len(countries)} countries")

📍 Loaded 191 countries


## Interactive City Selection

In [4]:
# Create interactive widgets
country_dropdown = widgets.Dropdown(
    options=countries,
    value='Brazil' if 'Brazil' in countries else countries[0],
    description='Country:',
    layout={'width': '300px'}
)

city_dropdown = widgets.Dropdown(
    options=[],
    description='City:',
    layout={'width': '400px'}
)

year_dropdown = widgets.Dropdown(
    options=years,
    value=years[-1] if years else 2020,
    description='Year:',
    layout={'width': '150px'}
)

def update_cities(change):
    """Update city dropdown when country changes"""
    selected_country = country_dropdown.value
    cities = get_cities_by_country(selected_country)
    city_dropdown.options = cities
    if cities:
        city_dropdown.value = cities[0][1]

country_dropdown.observe(update_cities, names='value')
update_cities(None)  # Load initial cities

# Display widgets
widgets_box = widgets.VBox([
    widgets.HBox([country_dropdown, year_dropdown]),
    city_dropdown
], layout={'padding': '10px'})

display(widgets_box)
print('✅ City selection widgets ready!')

🔍 Searching for cities in Brazil...
✅ Found 100 cities


VBox(children=(HBox(children=(Dropdown(description='Country:', index=22, layout=Layout(width='300px'), options…

✅ City selection widgets ready!


## WorldPop Data Processing Functions

In [5]:
def get_worldpop_for_year(year):
    """Get WorldPop data for specific year"""
    return worldpopCollection.filter(ee.Filter.eq('year', year)).mosaic()

def get_all_age_cohorts_summary(city_name, country_name, year):
    """Get total population for all age cohorts including heat vulnerability ratio"""
    print(f'📊 Getting age cohort totals for {city_name}, {country_name} ({year})')
    
    try:
        # Get city
        city = allCities.filter(
            ee.Filter.And(
                ee.Filter.eq('GC_CNT_GAD', country_name),
                ee.Filter.eq('GC_UCN_MAI', city_name)
            )
        ).first()
        
        city_geometry = city.geometry()
        worldpop_year = get_worldpop_for_year(year)
        
        # Define age cohorts using WorldPop bands
        age_cohorts = {
            '0-4': ['M_0', 'M_1', 'F_0', 'F_1'],
            '5-9': ['M_5', 'F_5'],
            '10-14': ['M_10', 'F_10'],
            '15-19': ['M_15', 'F_15'],
            '20-24': ['M_20', 'F_20'],
            '25-29': ['M_25', 'F_25'],
            '30-34': ['M_30', 'F_30'],
            '35-39': ['M_35', 'F_35'],
            '40-44': ['M_40', 'F_40'],
            '45-49': ['M_45', 'F_45'],
            '50-54': ['M_50', 'F_50'],
            '55-59': ['M_55', 'F_55'],
            '60-64': ['M_60', 'F_60'],
            '65-69': ['M_65', 'F_65'],
            '70-74': ['M_70', 'F_70'],
            '75-79': ['M_75', 'F_75'],
            '80+': ['M_80', 'F_80'],
            'Heat_Vuln_Ratio': ['M_0', 'M_1', 'F_0', 'F_1', 'M_65', 'F_65', 'M_70', 'F_70', 'M_75', 'F_75', 'M_80', 'F_80']
        }
        
        cohort_totals = {}
        
        for cohort_name, bands in age_cohorts.items():
            print(f"  🔍 Processing {cohort_name}")
            
            try:
                cohort_image = worldpop_year.select(bands).reduce(ee.Reducer.sum())
                
                total_pop = cohort_image.reduceRegion(
                    reducer=ee.Reducer.sum(),
                    geometry=city_geometry,
                    scale=90,
                    maxPixels=1e8,
                    bestEffort=True
                ).getInfo()
                
                population = total_pop.get('sum', 0)
                cohort_totals[cohort_name] = population
                print(f"     ✅ {cohort_name}: {population:,.0f} people")
                
            except Exception as e:
                print(f"     ❌ {cohort_name}: Error - {e}")
                cohort_totals[cohort_name] = 0
        
        return cohort_totals
        
    except Exception as e:
        print(f"❌ Error in age cohort summary: {e}")
        return None

## Statistical Analysis Functions

In [6]:
def extract_all_pixel_values_band_math(city_name, country_name, year):
    """Extract ALL pixel values using band math for statistical summary with heat vulnerability ratio"""
    print(f'🔬 Extracting pixel values for {city_name}, {country_name} ({year})')
    
    try:
        # Get city
        city = allCities.filter(
            ee.Filter.And(
                ee.Filter.eq('GC_CNT_GAD', country_name),
                ee.Filter.eq('GC_UCN_MAI', city_name)
            )
        ).first()
        
        city_geometry = city.geometry()
        worldpop_year = get_worldpop_for_year(year)
        
        print("  📊 Step 1: Creating cohort bands...")
        
        # Define age cohorts including heat vulnerability ratio
        age_cohorts = {
            '0-4': ['M_0', 'M_1', 'F_0', 'F_1'],
            '5-9': ['M_5', 'F_5'],
            '10-14': ['M_10', 'F_10'],
            '15-19': ['M_15', 'F_15'],  
            '20-24': ['M_20', 'F_20'],
            '25-29': ['M_25', 'F_25'],
            '30-34': ['M_30', 'F_30'],
            '35-39': ['M_35', 'F_35'],
            '40-44': ['M_40', 'F_40'],
            '45-49': ['M_45', 'F_45'],
            '50-54': ['M_50', 'F_50'],
            '55-59': ['M_55', 'F_55'],
            '60-64': ['M_60', 'F_60'],
            '65-69': ['M_65', 'F_65'],
            '70-74': ['M_70', 'F_70'],
            '75-79': ['M_75', 'F_75'],
            '80+': ['M_80', 'F_80'],
            'Heat_Vuln_Ratio': ['M_0', 'M_1', 'F_0', 'F_1', 'M_65', 'F_65', 'M_70', 'F_70', 'M_75', 'F_75', 'M_80', 'F_80']
        }
        
        # Create valid GEE band names
        cohort_to_band_name = {
            '0-4': 'age_0_4', '5-9': 'age_5_9', '10-14': 'age_10_14', '15-19': 'age_15_19',
            '20-24': 'age_20_24', '25-29': 'age_25_29', '30-34': 'age_30_34', '35-39': 'age_35_39',
            '40-44': 'age_40_44', '45-49': 'age_45_49', '50-54': 'age_50_54', '55-59': 'age_55_59',
            '60-64': 'age_60_64', '65-69': 'age_65_69', '70-74': 'age_70_74', '75-79': 'age_75_79',
            '80+': 'age_80_plus', 'Heat_Vuln_Ratio': 'heat_vuln_ratio'
        }
        
        # Create total population band
        all_band_names = []
        for cohort_name, bands in age_cohorts.items():
            if cohort_name != 'Heat_Vuln_Ratio':
                all_band_names.extend(bands)
        
        all_band_names = list(dict.fromkeys(all_band_names))  # Remove duplicates
        total_pop_image = worldpop_year.select(all_band_names).reduce(ee.Reducer.sum())
        print(f"     ✅ Total population band created from {len(all_band_names)} bands")
        
        # Create cohort sum bands
        cohort_images = {}
        for cohort_name, bands in age_cohorts.items():
            cohort_sum = worldpop_year.select(bands).reduce(ee.Reducer.sum())
            cohort_images[cohort_name] = cohort_sum
            
            if cohort_name == 'Heat_Vuln_Ratio':
                print(f"     ✅ {cohort_name}: (0-4 + 65+) vulnerability ratio")
            else:
                print(f"     ✅ {cohort_name} band created")
        
        print("  🔍 Step 2: Extracting pixel values...")
        
        # Create multi-band image
        band_list = [total_pop_image]
        band_names = ['total']
        
        for cohort_name, cohort_image in cohort_images.items():
            band_list.append(cohort_image)
            valid_band_name = cohort_to_band_name[cohort_name]
            band_names.append(valid_band_name)
        
        multi_band_image = ee.Image.cat(band_list).rename(band_names)
        
        # Extract pixel values
        pixel_dict = multi_band_image.reduceRegion(
            reducer=ee.Reducer.toList(),
            geometry=city_geometry,
            scale=90,
            maxPixels=1e9,
            bestEffort=True
        ).getInfo()
        
        print("  🧮 Step 3: Calculating percentages...")
        
        cohort_percentages = {}
        
        if pixel_dict and 'total' in pixel_dict:
            total_pixels = pixel_dict['total']
            
            if total_pixels and len(total_pixels) > 0:
                print(f"     📊 Processing {len(total_pixels):,} pixels")
                
                for cohort_name in age_cohorts.keys():
                    valid_band_name = cohort_to_band_name[cohort_name]
                    
                    if valid_band_name in pixel_dict:
                        cohort_pixels = pixel_dict[valid_band_name]
                        
                        if cohort_pixels and len(cohort_pixels) == len(total_pixels):
                            percentages = []
                            for cohort_val, total_val in zip(cohort_pixels, total_pixels):
                                if total_val is not None and total_val > 0:
                                    percentage = (cohort_val / total_val) * 100
                                    percentages.append(percentage)
                            
                            cohort_percentages[cohort_name] = percentages
                            
                            if cohort_name == 'Heat_Vuln_Ratio':
                                print(f"     ✅ {cohort_name}: {len(percentages):,} values (vulnerability ratio)")
                            else:
                                print(f"     ✅ {cohort_name}: {len(percentages):,} values")
        
        print("  ✅ Pixel extraction completed successfully")
        return cohort_percentages
        
    except Exception as e:
        print(f"❌ Error in pixel extraction: {e}")
        return None

def create_statistical_summary_table(city_name, country_name, year):
    """Create comprehensive statistical summary table with heat vulnerability ratio"""
    print(f'📊 Creating statistical summary for {city_name}, {country_name} ({year})')
    
    try:
        pixel_percentages = extract_all_pixel_values_band_math(city_name, country_name, year)
        
        if not pixel_percentages:
            print("❌ No pixel data available")
            return None
        
        print("  📋 Computing statistics...")
        
        summary_data = []
        
        for cohort_name, percentage_values in pixel_percentages.items():
            if not percentage_values or len(percentage_values) < 1:
                continue
                
            values = np.array(percentage_values)
            
            stats = {
                'Age_Cohort': cohort_name,
                'Pixel_Count': len(values),
                'Mean_%': np.mean(values),
                'Median_%': np.median(values),
                'Std_Dev_%': np.std(values),
                'Variance_%': np.var(values),
                'Min_%': np.min(values),
                'Max_%': np.max(values),
                'Range_%': np.max(values) - np.min(values),
                'Q1_%': np.percentile(values, 25),
                'Q3_%': np.percentile(values, 75),
                'IQR_%': np.percentile(values, 75) - np.percentile(values, 25),
                'Skewness': float(pd.Series(values).skew()) if len(values) > 2 else 0,
                'Kurtosis': float(pd.Series(values).kurtosis()) if len(values) > 2 else 0,
                'CV_%': (np.std(values) / np.mean(values)) * 100 if np.mean(values) > 0 else 0,
                'Zeros_Count': np.sum(values == 0),
                'Non_Zero_Count': np.sum(values > 0),
                'Above_Mean_Count': np.sum(values > np.mean(values))
            }
            
            summary_data.append(stats)
        
        if not summary_data:
            return None
            
        df = pd.DataFrame(summary_data)
        
        # Add vulnerability flag
        def get_vulnerability(cohort):
            if cohort == 'Heat_Vuln_Ratio':
                return 'Ratio'
            elif cohort in ['0-4', '65-69', '70-74', '75-79', '80+']:
                return 'High'
            else:
                return 'Low'
        
        df['Heat_Vulnerable'] = df['Age_Cohort'].apply(get_vulnerability)
        
        # Reorder columns
        column_order = [
            'Age_Cohort', 'Heat_Vulnerable', 'Pixel_Count', 'Mean_%', 'Median_%', 
            'Std_Dev_%', 'Variance_%', 'Min_%', 'Max_%', 'Range_%', 
            'Q1_%', 'Q3_%', 'IQR_%', 'CV_%', 'Skewness', 'Kurtosis',
            'Zeros_Count', 'Non_Zero_Count', 'Above_Mean_Count'
        ]
        
        df = df[column_order]
        
        # Sort with Heat_Vuln_Ratio at end
        df = df.sort_values('Age_Cohort', key=lambda x: x.map(lambda val: 'zzz' if val == 'Heat_Vuln_Ratio' else val))
        
        # Round numeric columns
        numeric_columns = df.select_dtypes(include=[np.number]).columns
        df[numeric_columns] = df[numeric_columns].round(3)
        
        print(f"✅ Statistical table created!")
        print(f"   Cohorts: {len(df)} (including Heat Vulnerability Ratio)")
        print(f"   Pixels analyzed: {df['Pixel_Count'].iloc[0]:,}")
        
        return df
        
    except Exception as e:
        print(f"❌ Error creating statistical table: {e}")
        return None

## Visualization Functions

In [7]:
def create_age_cohort_summary_chart(cohort_totals, city_name, country_name, year):
    """Create summary chart with age cohorts and heat vulnerability info"""
    if not cohort_totals:
        return None
    
    # Separate regular cohorts from heat vulnerability ratio
    regular_cohorts = {k: v for k, v in cohort_totals.items() if k != 'Heat_Vuln_Ratio'}
    heat_vuln_total = cohort_totals.get('Heat_Vuln_Ratio', 0)
    
    age_groups = list(regular_cohorts.keys())
    populations = list(regular_cohorts.values())
    
    # Color code vulnerable groups
    colors = []
    for age in age_groups:
        if age == '0-4':
            colors.append('lightblue')
        elif age in ['65-69', '70-74', '75-79', '80+']:
            colors.append('lightcoral')
        else:
            colors.append('lightgray')
    
    fig = go.Figure()
    
    fig.add_trace(go.Bar(
        x=age_groups,
        y=populations,
        marker_color=colors,
        text=[f'{pop:,.0f}' for pop in populations],
        textposition='auto',
        name='Population'
    ))
    
    total_pop = sum(populations)
    heat_vuln_percentage = (heat_vuln_total / total_pop * 100) if total_pop > 0 else 0
    
    fig.update_layout(
        title=f'Population by Age Cohort: {city_name}, {country_name} ({year})<br>Heat Vulnerability: {heat_vuln_percentage:.1f}% (0-4 + 65+)',
        xaxis_title='Age Cohorts',
        yaxis_title='Total Population',
        height=500,
        showlegend=False
    )
    
    fig.add_annotation(
        text=f"🌡️ Heat Vulnerable: {heat_vuln_total:,.0f} people ({heat_vuln_percentage:.1f}%)<br>Blue = Under-5s, Red = Over-65s",
        xref="paper", yref="paper",
        x=0.02, y=0.98,
        showarrow=False,
        font=dict(size=10),
        align="left"
    )
    
    return fig

## Main Analysis Interface

In [8]:
def run_heat_vulnerability_analysis():
    """Run complete heat vulnerability analysis for selected city"""
    selected_country = country_dropdown.value
    selected_city = city_dropdown.value
    selected_year = year_dropdown.value
    
    if not selected_city:
        print('❌ Please select a city')
        return None, None
    
    print(f'🚀 Running heat vulnerability analysis for {selected_city}, {selected_country} ({selected_year})')
    print('='*70)
    
    try:
        # Part 1: Age cohort summary chart
        print('\n📊 PART 1: Creating age cohort summary chart...')
        cohort_totals = get_all_age_cohorts_summary(selected_city, selected_country, selected_year)
        
        summary_chart = None
        if cohort_totals:
            summary_chart = create_age_cohort_summary_chart(
                cohort_totals, selected_city, selected_country, selected_year
            )
        
        # Part 2: Statistical summary table
        print('\n📋 PART 2: Creating statistical summary table...')
        stats_table = create_statistical_summary_table(selected_city, selected_country, selected_year)
        
        return summary_chart, stats_table
        
    except Exception as e:
        print(f'❌ Error in analysis: {e}')
        return None, None

# Create analysis button
analysis_button = widgets.Button(
    description='🌡️ Run Heat Vulnerability Analysis',
    button_style='primary',
    layout={'width': '280px'}
)

analysis_output = widgets.Output()

def on_analysis_click(button):
    with analysis_output:
        clear_output(wait=True)
        
        summary_chart, stats_table = run_heat_vulnerability_analysis()
        
        if summary_chart:
            print('\n📊 Age Cohort Summary:')
            summary_chart.show()
            
        if stats_table is not None:
            print('\n📋 Statistical Summary Table:')
            print('\n' + '='*80)
            display(stats_table)
            print('='*80)
            print('\n🌡️ HEAT VULNERABILITY RATIO EXPLANATION:')
            print('• Formula: (0-4 + 65+) / total population * 100')
            print('• Shows percentage of climate-vulnerable population per pixel')
            print('• Higher values = areas needing priority climate adaptation')
            print('• Use Mean_% for overall city vulnerability level')
            print('• Use Std_Dev_% to identify spatial inequality patterns')
            
        if not summary_chart and stats_table is None:
            print('❌ Analysis failed - check city selection and data availability')

analysis_button.on_click(on_analysis_click)

# Display interface
display(widgets.VBox([
    widgets.HTML('<h3>🌡️ City Heat Vulnerability Analysis</h3>'),
    widgets.HTML('''
    <p><strong>Analyzes WorldPop age/sex data to calculate:</strong></p>
    <ul>
        <li><strong>Age Distribution:</strong> Population percentages by 5-year age groups</li>
        <li><strong>Heat Vulnerability Ratio:</strong> (0-4 + 65+) / total * 100</li>
        <li><strong>Spatial Statistics:</strong> Mean, std dev, quartiles, skewness for each cohort</li>
        <li><strong>Climate Planning:</strong> Identify priority areas for adaptation</li>
    </ul>
    <p>Select a country and city above, then run the analysis.</p>
    '''),
    analysis_button,
    analysis_output
]))

print('\n🌡️ Heat vulnerability analysis interface ready!')
print('Select a country, city, and year above, then click the analysis button.')

VBox(children=(HTML(value='<h3>🌡️ City Heat Vulnerability Analysis</h3>'), HTML(value='\n    <p><strong>Analyz…


🌡️ Heat vulnerability analysis interface ready!
Select a country, city, and year above, then click the analysis button.
