# Notebook 05 -- Batch Pipeline: Pre-calculo CLIMADA para Railway

**CLIMARISK-OG** | Petrobras | TRL5  
Ativo: REDUC -- Refinaria Duque de Caxias, RJ  

Este notebook consolida a logica dos NB01-NB04 em um **pipeline unico**
que executa todas as analises de risco climatico em sequencia e exporta
os resultados em formato JSON padronizado.

**Objetivo**: demonstrar que o motor CLIMADA pode rodar como batch job
automatizado no Railway, sem intervencao manual.

**Evidencia TRL5**: script de producao funcional, reprodutivel e auditavel.

In [1]:
!pip install climada climada-petals boto3 --quiet

[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m56.1/56.1 kB[0m [31m3.4 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m8.1/8.1 MB[0m [31m59.5 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m5.3/5.3 MB[0m [31m45.0 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m140.6/140.6 kB[0m [31m10.3 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m14.6/14.6 MB[0m [31m53.5 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m86.8/86.8 kB[0m [31m5.7 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m58.2/58.2 kB[0m [31m4.9 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m11.8/11.8 MB[0m [31m60.4 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━

In [2]:
# =============================================================================
# BLOCO 1: VERIFICACAO DE AMBIENTE
# =============================================================================
import sys
print(f"Python: {sys.version}")

import climada
try:
    ver = climada.__version__
except AttributeError:
    from importlib.metadata import version
    ver = version("climada")
print(f"CLIMADA versao: {ver}")

# Verificar se boto3 esta disponivel (para upload ao R2)
try:
    import boto3
    BOTO3_OK = True
    print("boto3: disponivel")
except ImportError:
    BOTO3_OK = False
    print("boto3: NAO disponivel (upload ao R2 sera simulado)")

CLIMADA_OK = True
print("\nOK - Ambiente verificado")

Python: 3.12.12 (main, Oct 10 2025, 08:52:57) [GCC 11.4.0]
CLIMADA versao: 6.1.0
boto3: disponivel

OK - Ambiente verificado


In [3]:
# =============================================================================
# BLOCO 2: IMPORTS E CONFIGURACAO
# =============================================================================
import os
import json
import time
import numpy as np
from datetime import datetime
import warnings
warnings.filterwarnings('ignore')

# CLIMADA core
from climada.hazard import Hazard, Centroids
from climada.entity import Exposures, ImpactFuncSet, ImpactFunc
from climada.engine import ImpactCalc
from scipy.sparse import csr_matrix

# CLIMADA petals -- funcao de dano JRC para inundacao
from climada_petals.entity.impact_funcs.river_flood import (
    ImpfRiverFlood, flood_imp_func_set
)

import geopandas as gpd
from shapely.geometry import Point

# =============================================================================
# CONFIGURACAO DO PIPELINE
# =============================================================================
# Em producao, estas variaveis vem de environment variables do Railway.
# No Colab, usamos valores padrao.
R2_ENDPOINT = os.environ.get(
    'R2_ENDPOINT',
    'https://fcbf44d6ba0cfc8e007fb276e5582cc2.r2.cloudflarestorage.com'
)
R2_BUCKET = os.environ.get('S3_BUCKET_NAME', 'climarisk-og')
R2_ACCESS_KEY = os.environ.get('R2_ACCESS_KEY_ID', '')
R2_SECRET_KEY = os.environ.get('R2_SECRET_ACCESS_KEY', '')
OUTPUT_DIR = os.environ.get('OUTPUT_DIR', './outputs')

os.makedirs(OUTPUT_DIR, exist_ok=True)

print(f"R2 Endpoint: {R2_ENDPOINT}")
print(f"R2 Bucket: {R2_BUCKET}")
print(f"R2 Credentials: {'CONFIGURADAS' if R2_ACCESS_KEY else 'NAO CONFIGURADAS (modo local)'}")
print(f"Output dir: {OUTPUT_DIR}")
print("\nOK - Imports e configuracao carregados")

R2 Endpoint: https://fcbf44d6ba0cfc8e007fb276e5582cc2.r2.cloudflarestorage.com
R2 Bucket: climarisk-og
R2 Credentials: NAO CONFIGURADAS (modo local)
Output dir: ./outputs

OK - Imports e configuracao carregados


In [4]:
# =============================================================================
# BLOCO 3: REGISTRO DE ATIVOS
# =============================================================================
# Em producao, esta lista vira do banco de dados ou de um JSON de configuracao.
# Para TRL5, usamos apenas a REDUC como ativo de referencia.

ASSETS = [
    {
        'name': 'REDUC - Refinaria Duque de Caxias',
        'short_name': 'REDUC',
        'lat': -22.53,
        'lon': -43.28,
        'value_usd': 5_000_000_000,
        'bbox': {
            'lat_min': -23.0, 'lat_max': -22.0,
            'lon_min': -43.8, 'lon_max': -42.8
        },
        'hazards': ['RF', 'HW'],
    },
]

# Cenarios SSP para projecoes futuras
SSP_SCENARIOS = {
    'SSP2-4.5': {
        'description': 'Moderado -- desenvolvimento sustentavel parcial',
        'flood_factors': {'2030': 1.05, '2050': 1.12, '2100': 1.25},
        'heat_factors':  {'2030': 0.5,  '2050': 1.2,  '2100': 1.8},
    },
    'SSP5-8.5': {
        'description': 'Extremo -- combustiveis fosseis intensivos',
        'flood_factors': {'2030': 1.08, '2050': 1.22, '2100': 1.55},
        'heat_factors':  {'2030': 0.7,  '2050': 1.8,  '2100': 3.5},
    },
}

HORIZONS = ['2030', '2050', '2100']

print(f"Ativos registrados: {len(ASSETS)}")
for a in ASSETS:
    print(f"  - {a['name']} ({a['lat']}, {a['lon']}) | USD {a['value_usd']:,.0f}")
    print(f"    Hazards: {a['hazards']}")

print(f"\nCenarios SSP: {list(SSP_SCENARIOS.keys())}")
print(f"Horizontes: {HORIZONS}")

Ativos registrados: 1
  - REDUC - Refinaria Duque de Caxias (-22.53, -43.28) | USD 5,000,000,000
    Hazards: ['RF', 'HW']

Cenarios SSP: ['SSP2-4.5', 'SSP5-8.5']
Horizontes: ['2030', '2050', '2100']


In [5]:
# =============================================================================
# BLOCO 4: FUNCOES AUXILIARES DO PIPELINE
# =============================================================================

def build_centroids(bbox, n_lat=20, n_lon=20):
    """Constroi grade de centroids a partir de bounding box."""
    lats = np.linspace(bbox['lat_min'], bbox['lat_max'], n_lat)
    lons = np.linspace(bbox['lon_min'], bbox['lon_max'], n_lon)
    lon_grid, lat_grid = np.meshgrid(lons, lats)
    return Centroids(
        lat=lat_grid.flatten(),
        lon=lon_grid.flatten(),
        crs='EPSG:4326'
    )


def build_exposure(asset, hazard_types):
    """Constroi Exposure multi-hazard para um ativo."""
    # Carregar IDs de impact functions
    impf_set_jrc = flood_imp_func_set()
    RF_SA_ID = None
    for func_id in impf_set_jrc.get_ids().get('RF', []):
        func = impf_set_jrc.get_func(haz_type='RF', fun_id=func_id)
        if isinstance(func, list):
            func = func[0]
        if 'South America' in func.name or 'SouthAmerica' in func.name:
            RF_SA_ID = func_id
            break
    if RF_SA_ID is None:
        RF_SA_ID = 6

    HW_ID = 1

    data = {
        'value': [asset['value_usd']],
        'latitude': [asset['lat']],
        'longitude': [asset['lon']],
        'asset_name': [asset['name']],
    }
    if 'RF' in hazard_types:
        data['impf_RF'] = [RF_SA_ID]
    if 'HW' in hazard_types:
        data['impf_HW'] = [HW_ID]

    gdf = gpd.GeoDataFrame(
        data,
        geometry=[Point(asset['lon'], asset['lat'])],
        crs='EPSG:4326'
    )
    exp = Exposures(gdf)
    exp.value_unit = 'USD'
    exp.check()
    return exp, RF_SA_ID, HW_ID


def build_flood_hazard(centroids, asset_lat, asset_lon, scale_factor=1.0):
    """Constroi hazard de inundacao (identico ao NB01).
    scale_factor: multiplicador para projecoes futuras.
    """
    events = [
        {'name': 'flood_rp5',   'rp': 5,   'max_depth': 0.5,  'year': 2020},
        {'name': 'flood_rp10',  'rp': 10,  'max_depth': 1.0,  'year': 2015},
        {'name': 'flood_rp25',  'rp': 25,  'max_depth': 1.8,  'year': 2010},
        {'name': 'flood_rp50',  'rp': 50,  'max_depth': 2.5,  'year': 2000},
        {'name': 'flood_rp100', 'rp': 100, 'max_depth': 3.5,  'year': 1988},
        {'name': 'flood_rp250', 'rp': 250, 'max_depth': 4.5,  'year': 1966},
    ]
    n_events = len(events)
    n_centroids = centroids.size

    intensity = np.zeros((n_events, n_centroids))
    fraction = np.zeros((n_events, n_centroids))

    for i, evt in enumerate(events):
        dist = np.sqrt(
            (centroids.lat - asset_lat)**2 +
            (centroids.lon - asset_lon)**2
        )
        max_radius = 0.3
        depth = evt['max_depth'] * scale_factor * np.maximum(0, 1 - dist / max_radius)
        np.random.seed(42 + i)
        noise = np.random.normal(1.0, 0.2, n_centroids)
        noise = np.clip(noise, 0.5, 1.5)
        depth = depth * noise
        depth = np.maximum(0, depth)
        intensity[i, :] = depth
        fraction[i, :] = np.where(
            depth > 0.01,
            np.minimum(1.0, depth / (evt['max_depth'] * scale_factor)),
            0.0
        )

    haz = Hazard(
        haz_type='RF',
        centroids=centroids,
        event_id=np.arange(1, n_events + 1),
        event_name=[e['name'] for e in events],
        date=np.array([datetime(e['year'], 1, 15).toordinal() for e in events]),
        frequency=np.array([1.0 / e['rp'] for e in events]),
        frequency_unit='1/year',
        intensity=csr_matrix(intensity),
        fraction=csr_matrix(fraction),
        units='m',
    )
    haz.check()
    return haz, events


def build_heat_hazard(centroids, asset_lat, asset_lon, delta_offset=0.0):
    """Constroi hazard de ondas de calor (identico ao NB02).
    delta_offset: acrescimo em deg_C para projecoes futuras.
    """
    HEAT_THRESHOLD_C = 40.0
    events = [
        {'name': 'heatwave_rp2',   'rp': 2,   'delta_above': 2.0,  'year': 2023},
        {'name': 'heatwave_rp5',   'rp': 5,   'delta_above': 3.5,  'year': 2020},
        {'name': 'heatwave_rp10',  'rp': 10,  'delta_above': 5.0,  'year': 2016},
        {'name': 'heatwave_rp25',  'rp': 25,  'delta_above': 6.5,  'year': 2010},
        {'name': 'heatwave_rp50',  'rp': 50,  'delta_above': 8.0,  'year': 1998},
        {'name': 'heatwave_rp100', 'rp': 100, 'delta_above': 10.0, 'year': 1984},
    ]
    n_events = len(events)
    n_centroids = centroids.size

    intensity = np.zeros((n_events, n_centroids))
    fraction = np.zeros((n_events, n_centroids))

    for i, evt in enumerate(events):
        np.random.seed(100 + i)
        base_temp = evt['delta_above'] + delta_offset
        spatial_var = np.random.normal(0, 0.5, n_centroids)
        dist = np.sqrt(
            (centroids.lat - asset_lat)**2 +
            (centroids.lon - asset_lon)**2
        )
        urban_heat = np.maximum(0, 0.5 * (1 - dist / 0.3))
        temp_field = base_temp + spatial_var + urban_heat
        temp_field = np.maximum(0, temp_field)
        intensity[i, :] = temp_field
        fraction[i, :] = np.where(temp_field > 0.1, 1.0, 0.0)

    haz = Hazard(
        haz_type='HW',
        centroids=centroids,
        event_id=np.arange(1, n_events + 1),
        event_name=[e['name'] for e in events],
        date=np.array([datetime(e['year'], 7, 15).toordinal() for e in events]),
        frequency=np.array([1.0 / e['rp'] for e in events]),
        frequency_unit='1/year',
        intensity=csr_matrix(intensity),
        fraction=csr_matrix(fraction),
        units='deg_C above threshold',
    )
    haz.check()
    return haz, events, HEAT_THRESHOLD_C


def build_impact_func_set(rf_sa_id, hw_id):
    """Constroi ImpactFuncSet unificado (identico ao NB03)."""
    # Inundacao via climada_petals
    impf_set_jrc = flood_imp_func_set()
    impf_flood = impf_set_jrc.get_func(haz_type='RF', fun_id=rf_sa_id)
    if isinstance(impf_flood, list):
        impf_flood = impf_flood[0]

    # Calor customizada (identica ao NB02)
    impf_heat = ImpactFunc(
        id=hw_id,
        haz_type='HW',
        name='Heat Wave -- Industrial Facility (Refinery)',
        intensity_unit='deg_C above threshold',
        intensity=np.array([0.0, 1.0, 2.0, 3.0, 5.0, 7.0, 10.0, 15.0]),
        mdd=np.array(       [0.0, 0.02, 0.05, 0.10, 0.25, 0.45, 0.70, 0.95]),
        paa=np.array(       [0.0, 0.10, 0.20, 0.35, 0.55, 0.75, 0.90, 1.00]),
    )

    impf_set = ImpactFuncSet([impf_flood, impf_heat])
    impf_set.check()
    return impf_set, impf_flood, impf_heat


print("OK - Funcoes auxiliares definidas")

OK - Funcoes auxiliares definidas


In [6]:
# =============================================================================
# BLOCO 5: FUNCAO DE CALCULO DE IMPACTO POR HAZARD
# =============================================================================

def compute_impact_single(exp, impf_set, haz, events, hazard_type):
    """Calcula impacto para um hazard e retorna dicionario de resultados."""
    imp = ImpactCalc(exp, impf_set, haz).impact(save_mat=True)
    eai = float(imp.aai_agg)
    value = float(exp.gdf['value'].sum())

    result = {
        'eai_usd': eai,
        'eai_ratio_pct': float((eai / value) * 100) if value > 0 else 0.0,
        'impact_by_return_period': {
            str(evt['rp']): float(imp.at_event[i])
            for i, evt in enumerate(events)
        },
    }
    return result, imp


def run_baseline(asset, centroids, exp, impf_set, rf_sa_id):
    """Executa analise baseline (NB01 + NB02 + NB03 combinados)."""
    results = {
        'scenario': 'baseline',
        'horizon': 'historical',
        'hazards': {},
    }

    total_eai = 0.0

    if 'RF' in asset['hazards']:
        haz_flood, events_rf = build_flood_hazard(
            centroids, asset['lat'], asset['lon']
        )
        res_rf, _ = compute_impact_single(
            exp, impf_set, haz_flood, events_rf, 'RF'
        )
        impf_set_jrc = flood_imp_func_set()
        impf_flood = impf_set_jrc.get_func(haz_type='RF', fun_id=rf_sa_id)
        if isinstance(impf_flood, list):
            impf_flood = impf_flood[0]
        results['hazards']['RF'] = {
            'type': 'RF',
            'type_name': 'River Flood',
            'n_events': len(events_rf),
            'return_periods': [e['rp'] for e in events_rf],
            'intensity_unit': 'm',
            'max_intensity': float(haz_flood.intensity.max()),
            'impact_function': {
                'name': impf_flood.name,
                'id': int(impf_flood.id) if isinstance(impf_flood.id, (int, np.integer)) else str(impf_flood.id),
                'source': 'Huizinga et al. (2017), doi: 10.2760/16510',
                'loaded_via': 'climada_petals.entity.impact_funcs.river_flood.flood_imp_func_set()',
                'type': 'structural_damage',
            },
            'results': res_rf,
        }
        total_eai += res_rf['eai_usd']

    if 'HW' in asset['hazards']:
        haz_heat, events_hw, threshold = build_heat_hazard(
            centroids, asset['lat'], asset['lon']
        )
        res_hw, _ = compute_impact_single(
            exp, impf_set, haz_heat, events_hw, 'HW'
        )
        results['hazards']['HW'] = {
            'type': 'HW',
            'type_name': 'Heat Wave',
            'n_events': len(events_hw),
            'return_periods': [e['rp'] for e in events_hw],
            'intensity_unit': 'deg_C above threshold',
            'threshold_c': threshold,
            'max_intensity': float(haz_heat.intensity.max()),
            'impact_function': {
                'name': 'Heat Wave -- Industrial Facility (Refinery)',
                'source': 'Custom -- ECA/McKinsey (2009), Kjellstrom et al. (2016), ILO (2019)',
                'type': 'operational_loss',
            },
            'results': res_hw,
        }
        total_eai += res_hw['eai_usd']

    results['aggregated'] = {
        'eai_total_usd': total_eai,
        'eai_total_ratio_pct': float((total_eai / asset['value_usd']) * 100),
    }

    return results


def run_ssp_projection(asset, centroids, exp, impf_set, scenario_name, scenario_cfg, horizon):
    """Executa analise sob cenario SSP para um horizonte."""
    results = {
        'scenario': scenario_name,
        'horizon': horizon,
        'hazards': {},
    }

    total_eai = 0.0

    if 'RF' in asset['hazards']:
        sf = scenario_cfg['flood_factors'][horizon]
        haz_flood, events_rf = build_flood_hazard(
            centroids, asset['lat'], asset['lon'], scale_factor=sf
        )
        res_rf, _ = compute_impact_single(
            exp, impf_set, haz_flood, events_rf, 'RF'
        )
        results['hazards']['RF'] = {
            'type': 'RF',
            'type_name': 'River Flood',
            'scale_factor': sf,
            'results': res_rf,
        }
        total_eai += res_rf['eai_usd']

    if 'HW' in asset['hazards']:
        delta = scenario_cfg['heat_factors'][horizon]
        haz_heat, events_hw, threshold = build_heat_hazard(
            centroids, asset['lat'], asset['lon'], delta_offset=delta
        )
        res_hw, _ = compute_impact_single(
            exp, impf_set, haz_heat, events_hw, 'HW'
        )
        results['hazards']['HW'] = {
            'type': 'HW',
            'type_name': 'Heat Wave',
            'delta_offset_c': delta,
            'results': res_hw,
        }
        total_eai += res_hw['eai_usd']

    results['aggregated'] = {
        'eai_total_usd': total_eai,
        'eai_total_ratio_pct': float((total_eai / asset['value_usd']) * 100),
    }

    return results


print("OK - Funcoes de calculo definidas")

OK - Funcoes de calculo definidas


In [7]:
# =============================================================================
# BLOCO 6: FUNCAO DE UPLOAD AO R2
# =============================================================================

def upload_to_r2(filepath, r2_key):
    """Faz upload de um arquivo local para o Cloudflare R2.
    Retorna True se sucesso, False se credenciais nao configuradas.
    """
    if not R2_ACCESS_KEY or not BOTO3_OK:
        print(f"  [SIMULADO] Upload: {filepath} -> s3://{R2_BUCKET}/{r2_key}")
        return False

    try:
        s3 = boto3.client(
            's3',
            endpoint_url=R2_ENDPOINT,
            aws_access_key_id=R2_ACCESS_KEY,
            aws_secret_access_key=R2_SECRET_KEY,
            region_name='auto',
        )
        s3.upload_file(filepath, R2_BUCKET, r2_key)
        print(f"  [OK] Upload: {filepath} -> s3://{R2_BUCKET}/{r2_key}")
        return True
    except Exception as e:
        print(f"  [ERRO] Upload falhou: {e}")
        return False


print("OK - Funcao de upload definida")

OK - Funcao de upload definida


In [8]:
# =============================================================================
# BLOCO 7: EXECUCAO DO PIPELINE COMPLETO
# =============================================================================
print("=" * 70)
print("  CLIMARISK-OG | BATCH PIPELINE | INICIO")
print("=" * 70)

pipeline_start = time.time()
pipeline_results = []
all_files = []

for asset in ASSETS:
    asset_start = time.time()
    short = asset['short_name']
    print(f"\n{'='*60}")
    print(f"  ATIVO: {asset['name']}")
    print(f"{'='*60}")

    # --- Preparacao ---
    centroids = build_centroids(asset['bbox'])
    exp, rf_sa_id, hw_id = build_exposure(asset, asset['hazards'])
    impf_set, impf_flood, impf_heat = build_impact_func_set(rf_sa_id, hw_id)
    print(f"  Centroids: {centroids.size}")
    print(f"  Exposure: USD {asset['value_usd']:,.0f}")
    print(f"  Impact Functions: RF(id={rf_sa_id}), HW(id={hw_id})")

    # --- Baseline ---
    print(f"\n  --- Baseline ---")
    baseline = run_baseline(asset, centroids, exp, impf_set, rf_sa_id)
    eai_b = baseline['aggregated']['eai_total_usd']
    ratio_b = baseline['aggregated']['eai_total_ratio_pct']
    print(f"  EAI Total: USD {eai_b:,.0f} ({ratio_b:.2f}%)")

    if 'RF' in baseline['hazards']:
        eai_rf = baseline['hazards']['RF']['results']['eai_usd']
        print(f"    RF: USD {eai_rf:,.0f}")
    if 'HW' in baseline['hazards']:
        eai_hw = baseline['hazards']['HW']['results']['eai_usd']
        print(f"    HW: USD {eai_hw:,.0f}")

    # --- Projecoes SSP ---
    projections = []
    for ssp_name, ssp_cfg in SSP_SCENARIOS.items():
        for horizon in HORIZONS:
            print(f"\n  --- {ssp_name} | {horizon} ---")
            proj = run_ssp_projection(
                asset, centroids, exp, impf_set,
                ssp_name, ssp_cfg, horizon
            )
            eai_p = proj['aggregated']['eai_total_usd']
            ratio_p = proj['aggregated']['eai_total_ratio_pct']
            delta_pct = ((eai_p - eai_b) / eai_b * 100) if eai_b > 0 else 0
            print(f"  EAI Total: USD {eai_p:,.0f} ({ratio_p:.2f}%) | Delta vs baseline: {delta_pct:+.1f}%")
            proj['delta_vs_baseline_pct'] = delta_pct
            projections.append(proj)

    # --- Montar JSON consolidado ---
    asset_result = {
        'metadata': {
            'pipeline': 'nb05_batch_pipeline',
            'version': '1.0',
            'date': datetime.now().isoformat(),
            'climada_version': ver,
            'methodology': 'CLIMADA multi-hazard probabilistic impact (H x E x V)',
        },
        'asset': {
            'name': asset['name'],
            'short_name': short,
            'lat': asset['lat'],
            'lon': asset['lon'],
            'value_usd': asset['value_usd'],
        },
        'baseline': baseline,
        'projections': projections,
        'scenarios_config': {
            ssp_name: {
                'description': cfg['description'],
                'flood_factors': cfg['flood_factors'],
                'heat_factors': cfg['heat_factors'],
            }
            for ssp_name, cfg in SSP_SCENARIOS.items()
        },
        'limitations': [
            'Dados de hazard sinteticos para ambos os riscos',
            'Valor de exposicao estimado',
            'Funcao de dano de calor customizada (nao calibrada com dados reais)',
            'Funcao de dano de inundacao generica (residencial, JRC)',
            'Independencia entre hazards assumida (soma simples de EAIs)',
            'Fatores de escala SSP baseados em medias globais IPCC AR6',
            'Ativo unico (REDUC)',
            'Sem modelagem de correlacao entre cenarios',
        ],
    }

    # --- Salvar JSON ---
    filename = f"results_pipeline_{short.lower()}.json"
    filepath = os.path.join(OUTPUT_DIR, filename)
    with open(filepath, 'w', encoding='utf-8') as f:
        json.dump(asset_result, f, indent=2, ensure_ascii=False)
    print(f"\n  JSON salvo: {filepath}")
    all_files.append(filepath)

    # --- Upload ao R2 ---
    r2_key = f"outputs/{short.lower()}/{filename}"
    upload_to_r2(filepath, r2_key)

    asset_elapsed = time.time() - asset_start
    print(f"  Tempo: {asset_elapsed:.1f}s")
    pipeline_results.append(asset_result)

pipeline_elapsed = time.time() - pipeline_start
print(f"\n{'='*70}")
print(f"  PIPELINE CONCLUIDO em {pipeline_elapsed:.1f}s")
print(f"  Ativos processados: {len(ASSETS)}")
print(f"  Arquivos gerados: {len(all_files)}")
print(f"{'='*70}")

  CLIMARISK-OG | BATCH PIPELINE | INICIO

  ATIVO: REDUC - Refinaria Duque de Caxias
  Centroids: 400
  Exposure: USD 5,000,000,000
  Impact Functions: RF(id=61), HW(id=1)

  --- Baseline ---
  EAI Total: USD 1,345,325,260 (26.91%)
    RF: USD 938,842,272
    HW: USD 406,482,988

  --- SSP2-4.5 | 2030 ---
  EAI Total: USD 1,501,276,670 (30.03%) | Delta vs baseline: +11.6%

  --- SSP2-4.5 | 2050 ---
  EAI Total: USD 1,737,748,975 (34.75%) | Delta vs baseline: +29.2%

  --- SSP2-4.5 | 2100 ---
  EAI Total: USD 1,973,065,719 (39.46%) | Delta vs baseline: +46.7%

  --- SSP5-8.5 | 2030 ---
  EAI Total: USD 1,573,686,630 (31.47%) | Delta vs baseline: +17.0%

  --- SSP5-8.5 | 2050 ---
  EAI Total: USD 1,963,159,723 (39.26%) | Delta vs baseline: +45.9%

  --- SSP5-8.5 | 2100 ---
  EAI Total: USD 2,801,082,961 (56.02%) | Delta vs baseline: +108.2%

  JSON salvo: ./outputs/results_pipeline_reduc.json
  [SIMULADO] Upload: ./outputs/results_pipeline_reduc.json -> s3://climarisk-og/outputs/reduc/re

In [9]:
# =============================================================================
# BLOCO 8: SUMARIO DE RESULTADOS
# =============================================================================
print("\n" + "=" * 70)
print("  SUMARIO DE RESULTADOS")
print("=" * 70)

for asset_result in pipeline_results:
    asset_name = asset_result['asset']['short_name']
    value = asset_result['asset']['value_usd']
    bl = asset_result['baseline']

    print(f"\n  ATIVO: {asset_result['asset']['name']}")
    print(f"  Valor: USD {value:,.0f}")
    print(f"\n  {'Cenario':<20} {'Horizonte':<12} {'EAI (USD M)':<15} {'Ratio (%)':<12} {'Delta (%)':<12}")
    print(f"  {'-'*71}")

    # Baseline
    eai_bl = bl['aggregated']['eai_total_usd']
    ratio_bl = bl['aggregated']['eai_total_ratio_pct']
    print(f"  {'Baseline':<20} {'historico':<12} {eai_bl/1e6:<15,.1f} {ratio_bl:<12.2f} {'---':<12}")

    # Projecoes
    for proj in asset_result['projections']:
        ssp = proj['scenario']
        hor = proj['horizon']
        eai_p = proj['aggregated']['eai_total_usd']
        ratio_p = proj['aggregated']['eai_total_ratio_pct']
        delta = proj.get('delta_vs_baseline_pct', 0)
        print(f"  {ssp:<20} {hor:<12} {eai_p/1e6:<15,.1f} {ratio_p:<12.2f} {delta:<+12.1f}")

print(f"\n{'='*70}")


  SUMARIO DE RESULTADOS

  ATIVO: REDUC - Refinaria Duque de Caxias
  Valor: USD 5,000,000,000

  Cenario              Horizonte    EAI (USD M)     Ratio (%)    Delta (%)   
  -----------------------------------------------------------------------
  Baseline             historico    1,345.3         26.91        ---         
  SSP2-4.5             2030         1,501.3         30.03        +11.6       
  SSP2-4.5             2050         1,737.7         34.75        +29.2       
  SSP2-4.5             2100         1,973.1         39.46        +46.7       
  SSP5-8.5             2030         1,573.7         31.47        +17.0       
  SSP5-8.5             2050         1,963.2         39.26        +45.9       
  SSP5-8.5             2100         2,801.1         56.02        +108.2      



In [10]:
# =============================================================================
# BLOCO 9: RESUMO EXECUTIVO
# =============================================================================
print("\n" + "=" * 70)
print("  RESUMO EXECUTIVO -- CLIMARISK-OG | Notebook 05 | Batch Pipeline")
print("=" * 70)

for ar in pipeline_results:
    bl = ar['baseline']
    projs = ar['projections']
    worst = max(projs, key=lambda p: p['aggregated']['eai_total_usd'])
    best = min(projs, key=lambda p: p['aggregated']['eai_total_usd'])

    print(f"""
  ATIVO:       {ar['asset']['name']}
  LOCALIZACAO: ({ar['asset']['lat']}, {ar['asset']['lon']})
  VALOR:       USD {ar['asset']['value_usd']/1e9:.0f} bilhoes

  BASELINE:
    EAI Total:  USD {bl['aggregated']['eai_total_usd']:>15,.0f}
    Ratio:      {bl['aggregated']['eai_total_ratio_pct']:>14.2f}%

  CENARIO MAIS SEVERO ({worst['scenario']} | {worst['horizon']}):
    EAI Total:  USD {worst['aggregated']['eai_total_usd']:>15,.0f}
    Delta:      {worst.get('delta_vs_baseline_pct', 0):>+14.1f}%

  CENARIO MAIS BRANDO ({best['scenario']} | {best['horizon']}):
    EAI Total:  USD {best['aggregated']['eai_total_usd']:>15,.0f}
    Delta:      {best.get('delta_vs_baseline_pct', 0):>+14.1f}%

  PIPELINE:
    Cenarios processados: {len(projs)} (baseline + {len(projs)} projecoes)
    Tempo total: {pipeline_elapsed:.1f}s
    JSONs gerados: {len(all_files)}
    Upload R2: {'ATIVO' if R2_ACCESS_KEY else 'SIMULADO (sem credenciais)'}

  LIMITACOES (TRL5):
    1. Dados de hazard sinteticos
    2. Valor de exposicao estimado
    3. Funcao de dano de calor customizada (nao calibrada)
    4. Funcao de dano de inundacao generica residencial (JRC)
    5. Independencia entre hazards (soma simples)
    6. Fatores de escala SSP baseados em medias globais IPCC AR6
    7. Ativo unico (REDUC)

======================================================================
  FIM DO NOTEBOOK 05 -- Batch Pipeline v1.0
======================================================================
""")


  RESUMO EXECUTIVO -- CLIMARISK-OG | Notebook 05 | Batch Pipeline

  ATIVO:       REDUC - Refinaria Duque de Caxias
  LOCALIZACAO: (-22.53, -43.28)
  VALOR:       USD 5 bilhoes

  BASELINE:
    EAI Total:  USD   1,345,325,260
    Ratio:               26.91%

  CENARIO MAIS SEVERO (SSP5-8.5 | 2100):
    EAI Total:  USD   2,801,082,961
    Delta:              +108.2%

  CENARIO MAIS BRANDO (SSP2-4.5 | 2030):
    EAI Total:  USD   1,501,276,670
    Delta:               +11.6%

  PIPELINE:
    Cenarios processados: 6 (baseline + 6 projecoes)
    Tempo total: 0.5s
    JSONs gerados: 1
    Upload R2: SIMULADO (sem credenciais)

  LIMITACOES (TRL5):
    1. Dados de hazard sinteticos
    2. Valor de exposicao estimado
    3. Funcao de dano de calor customizada (nao calibrada)
    4. Funcao de dano de inundacao generica residencial (JRC)
    5. Independencia entre hazards (soma simples)
    6. Fatores de escala SSP baseados em medias globais IPCC AR6
    7. Ativo unico (REDUC)

  FIM DO NOTEBO

In [11]:
# =============================================================================
# BLOCO 10: DIAGNOSTICO E ARTEFATOS
# =============================================================================
print("=" * 60)
print("  PARTE 1: ARTEFATOS GERADOS")
print("=" * 60)

for fp in all_files:
    if os.path.exists(fp):
        size = os.path.getsize(fp) / 1024
        print(f"  OK {fp} ({size:.1f} KB)")
    else:
        print(f"  FALTA {fp}")

print("\n" + "=" * 60)
print("  PARTE 2: DIAGNOSTICO DE CONSISTENCIA")
print("=" * 60)

checks = []

# Check 1: Pipeline gerou resultados
ok = len(pipeline_results) > 0
checks.append(('Pipeline executou', ok, f'{len(pipeline_results)} ativos'))

# Check 2: Baseline EAI > 0
for ar in pipeline_results:
    ok = ar['baseline']['aggregated']['eai_total_usd'] > 0
    checks.append((
        f"Baseline EAI > 0 ({ar['asset']['short_name']})",
        ok,
        f"USD {ar['baseline']['aggregated']['eai_total_usd']:,.0f}"
    ))

# Check 3: Todas as projecoes tem EAI > 0
for ar in pipeline_results:
    for proj in ar['projections']:
        ok = proj['aggregated']['eai_total_usd'] > 0
        checks.append((
            f"EAI > 0 ({proj['scenario']} {proj['horizon']})",
            ok,
            f"USD {proj['aggregated']['eai_total_usd']:,.0f}"
        ))

# Check 4: JSONs existem
for fp in all_files:
    ok = os.path.exists(fp)
    checks.append((f'JSON existe: {os.path.basename(fp)}', ok, ''))

# Check 5: JSON e valido
for fp in all_files:
    try:
        with open(fp, 'r') as jf:
            json.load(jf)
        ok = True
    except Exception:
        ok = False
    checks.append((f'JSON valido: {os.path.basename(fp)}', ok, ''))

# Check 6: EAI de projecoes SSP5-8.5 2100 > baseline
for ar in pipeline_results:
    bl_eai = ar['baseline']['aggregated']['eai_total_usd']
    worst = [p for p in ar['projections']
             if p['scenario'] == 'SSP5-8.5' and p['horizon'] == '2100']
    if worst:
        ok = worst[0]['aggregated']['eai_total_usd'] > bl_eai
        checks.append((
            'SSP5-8.5 2100 > Baseline',
            ok,
            f"{worst[0]['aggregated']['eai_total_usd']:,.0f} vs {bl_eai:,.0f}"
        ))

for name, passed, detail in checks:
    status = 'OK' if passed else 'FALHA'
    print(f"  [{status}] {name}: {detail}")

n_passed = sum(1 for _, p, _ in checks if p)
print(f"\n  Resultado: {n_passed}/{len(checks)} checks passaram")

if n_passed == len(checks):
    print("  Notebook pronto para commit no GitHub.")
else:
    print("  ATENCAO: Verificar checks que falharam antes de commitar.")

# Mostrar conteudo do JSON (primeiro ativo)
if all_files:
    print(f"\nConteudo do JSON ({os.path.basename(all_files[0])}) -- primeiras 80 linhas:")
    with open(all_files[0], 'r') as jf:
        content = json.dumps(json.load(jf), indent=2, ensure_ascii=False)
    lines = content.split('\n')
    for line in lines[:80]:
        print(line)
    if len(lines) > 80:
        print(f"  ... ({len(lines) - 80} linhas omitidas)")

  PARTE 1: ARTEFATOS GERADOS
  OK ./outputs/results_pipeline_reduc.json (11.4 KB)

  PARTE 2: DIAGNOSTICO DE CONSISTENCIA
  [OK] Pipeline executou: 1 ativos
  [OK] Baseline EAI > 0 (REDUC): USD 1,345,325,260
  [OK] EAI > 0 (SSP2-4.5 2030): USD 1,501,276,670
  [OK] EAI > 0 (SSP2-4.5 2050): USD 1,737,748,975
  [OK] EAI > 0 (SSP2-4.5 2100): USD 1,973,065,719
  [OK] EAI > 0 (SSP5-8.5 2030): USD 1,573,686,630
  [OK] EAI > 0 (SSP5-8.5 2050): USD 1,963,159,723
  [OK] EAI > 0 (SSP5-8.5 2100): USD 2,801,082,961
  [OK] JSON existe: results_pipeline_reduc.json: 
  [OK] JSON valido: results_pipeline_reduc.json: 
  [OK] SSP5-8.5 2100 > Baseline: 2,801,082,961 vs 1,345,325,260

  Resultado: 11/11 checks passaram
  Notebook pronto para commit no GitHub.

Conteudo do JSON (results_pipeline_reduc.json) -- primeiras 80 linhas:
{
  "metadata": {
    "pipeline": "nb05_batch_pipeline",
    "version": "1.0",
    "date": "2026-03-01T17:05:06.782756",
    "climada_version": "6.1.0",
    "methodology": "CLIMAD