# NASA Clean Air Challenge – Data Integration & API Pipeline

Este notebook operacionaliza dados satelitais (TEMPO) e de superfície (OpenAQ, opcional AirNow / PGN / TOLNet) em endpoints web e tiles para o app React.

**Escopo**: descobrir, subset, reprojetar, tilear TEMPO; cruzar com estações; gerar métricas de comparação; preparar forecast básico + caching.

Legendas: (CORE) = essencial MVP, (OPT) = opcional valor extra.

---

## 1. Setup Environment & Dependencies (CORE)
Requisitos: Python >= 3.10

Pacotes principais:
- Autenticação & Acesso Cloud: `earthaccess`, `harmony-py`
- Dados científicos: `xarray`, `rioxarray`, `numpy`, `pandas`, `scipy`
- Geoespacial: `pyproj`, `rasterio`
- API & Async: `fastapi`, `uvicorn`, `aiohttp`, `backoff`
- Tiles & Imagens: `Pillow` (PIL), (opcional) `mercantile`, `pytiles`
- ML & Métricas: `scikit-learn`
- Cache: `redis` (cliente `redis`), fallback FS
- Utilidades: `python-dotenv`, `requests`

Observação: instalar via pip e congelar em `requirements.txt` para reprodutibilidade.

In [None]:
# (Optional run) Install dependencies inside this environment
# NOTE: Execute only once or manage via requirements.txt
# %pip install earthaccess harmony-py xarray rioxarray pyproj rasterio numpy pandas scipy aiohttp fastapi uvicorn Pillow scikit-learn redis backoff python-dotenv requests mercantile
import sys, platform
print('Python', sys.version)
assert sys.version_info >= (3,10), 'Python >=3.10 required'
print('Platform', platform.platform())

## 2. Configure Earthdata Login Authentication (CORE)
Use `.netrc` ou variáveis de ambiente:
```
machine urs.earthdata.nasa.gov login $EDL_USERNAME password $EDL_PASSWORD
```
Exemplo (interativo fallback).

In [None]:
from pathlib import Path
import os
from typing import Optional
try:
    import earthaccess
except ImportError:
    earthaccess = None

if earthaccess:
    auth = earthaccess.login(strategy='netrc')  # fallback to interactive if no netrc
    print('Authenticated:', auth)
else:
    print('earthaccess not installed in this environment.')

## 3. Discover TEMPO Products via CMR/Harmony (CORE)
Buscar coleções TEMPO NO₂ L3 (short_name ~ `TEMPO_NO2_L3*`). Filtrar por intervalo temporal + lat/lon bbox.

Parametros chave:
- temporal: `2025-10-01T00:00:00Z,2025-10-02T00:00:00Z`
- bounding box: `minLon,minLat,maxLon,maxLat`

In [None]:
import json, datetime as dt
if earthaccess:
    collections = earthaccess.search_data(identifier='TEMPO_NO2_L3*',
                                          temporal=('2025-10-01','2025-10-02'))
    print('Found collections:', len(collections))
    if collections:
        print(collections[0].umm.get('ShortName'))
else:
    print('Skip CMR search (earthaccess missing).')

## 4. Subset & Stream TEMPO NO₂ L3 Using Harmony (CORE)
Enviar job Harmony com variáveis selecionadas e bbox. Poll até `Successful`.

In [None]:
try:
    import harmony
except ImportError:
    harmony=None

if harmony and earthaccess:
    # Pseudocode (não executa sem collection real & params corretos)
    from harmony import Client
    hclient = Client()
    request = hclient.submit(
        collection_id='TEMPO_NO2_L3_SAMPLE',  # substituir por collection real
        spatial={'bbox':(-49.0,-2.0,-48.0,-1.0)},
        temporal={'start':'2025-10-01T00:00:00Z','end':'2025-10-01T02:00:00Z'},
        variables=['NO2_TROPO_COLUMN']
    )
    print('Harmony Job ID:', request.job_id)
else:
    print('Harmony client not available; skipping job submit example.')

## 5. Read Cloud-Hosted Dataset into Xarray (CORE)
Abrir NetCDF/Zarr remoto (HTTP ou S3) e inspecionar.

In [None]:
import xarray as xr
sample_url = os.getenv('TEMPO_SAMPLE_URL','')  # fornecer URL real
if sample_url:
    ds = xr.open_dataset(sample_url, engine='netcdf4', chunks={})
    print(ds)
    print('Variables:', list(ds.data_vars))
else:
    print('Defina TEMPO_SAMPLE_URL para abrir dataset.')

## 6. Reproject & Resample to Web Mercator (CORE)
Usar `rioxarray` para set CRS (EPSG:4326) e reprojetar para `EPSG:3857`.

In [None]:
try:
    import rioxarray  # noqa
except ImportError:
    rioxarray = None

if 'ds' in globals():
    var = list(ds.data_vars)[0] if ds.data_vars else None
    if var:
        da = ds[var]
        if hasattr(da, 'rio'):  # ensure rioxarray
            da = da.rio.write_crs('EPSG:4326')
            merc = da.rio.reproject('EPSG:3857')
            print('Reprojected shape:', merc.shape)
        else:
            print('rioxarray missing for reprojection.')
    else:
        print('No variable to reproject.')
else:
    print('Dataset not loaded; skip reprojection.')

## 7. Generate AQI Color-Mapped Raster Tiles (CORE)
Mapear valores normalizados para paleta (ex.: verde→amarelo→laranja→vermelho→roxo).

In [None]:
from math import log2
from io import BytesIO
from PIL import Image
import numpy as np

def aqi_colorramp():
    # return list of (threshold, (r,g,b)) sorted by threshold
    return [
        (0,(0,228,0)),(50,(255,255,0)),(100,(255,126,0)),(150,(255,0,0)),(200,(143,63,151)),(300,(126,0,35)),(500,(110,0,25))
    ]

def map_value_to_color(v):
    ramp = aqi_colorramp()
    for i,(th,_c) in enumerate(ramp):
        if v <= th:
            if i==0: return _c
            prev_th, prev_c = ramp[i-1]
            # linear blend
            span = th - prev_th if th>prev_th else 1
            t = (v - prev_th)/span
            return tuple(int(prev_c[j] + t*( _c[j]-prev_c[j])) for j in range(3))
    return ramp[-1][1]

def render_tile(merc_da, z:int,x:int,y:int, vmin:float, vmax:float):
    # Placeholder: would window merc_da by tile bounds; here random demo
    size=256
    data = np.random.uniform(vmin,vmax,(size,size))
    arr = np.zeros((size,size,4), dtype=np.uint8)
    for i in range(size):
        for j in range(size):
            c = map_value_to_color(data[i,j])
            arr[i,j,:3]=c; arr[i,j,3]=180
    img = Image.fromarray(arr, mode='RGBA')
    bio = BytesIO()
    img.save(bio, format='PNG')
    return bio.getvalue()

print('Tile renderer ready (demo).')

## 8. FastAPI Tile Endpoint /tempo/tiles/{z}/{x}/{y}.png (CORE)
Cache + render ou retornar imagem pré-gerada.

In [None]:
from fastapi import FastAPI, HTTPException
from fastapi.responses import Response
app = FastAPI(title='Clean Air API')

@app.get('/tempo/tiles/{z}/{x}/{y}.png')
async def tempo_tile(z:int,x:int,y:int, product:str='no2', time:str|None=None):
    try:
        data = render_tile(None,z,x,y,0,150)  # placeholder merc dataset
        return Response(data, media_type='image/png', headers={'Cache-Control':'public, max-age=600'})
    except Exception as e:
        raise HTTPException(status_code=500, detail=str(e))

print('FastAPI app object created (not running uvicorn here).')

## 9. Extract Point Time Series (CORE)
Selecionar pixel mais próximo ou interpolar bilinear para lat/lon.

In [None]:
import pandas as pd

def extract_point_series(granule_datasets:list, lat:float, lon:float, var:str)->pd.DataFrame:
    rows=[]
    for ds in granule_datasets:
        if var not in ds: continue
        da = ds[var]
        # very naive nearest index approach
        if all(c in da.coords for c in ('lat','lon')):
            lat_idx = abs(da['lat']-lat).argmin()
            lon_idx = abs(da['lon']-lon).argmin()
            val = da.isel(lat=lat_idx, lon=lon_idx).item()
            ts = pd.to_datetime(ds.time.values[0]) if 'time' in ds.coords else None
            rows.append({'ts':ts,'value':val})
    return pd.DataFrame(rows).sort_values('ts')

print('Point extraction helper defined.')

## 10. Fetch OpenAQ v3 Measurements (CORE)
Uso de paginação e bbox.

In [None]:
import requests, math

def fetch_openaq(bbox:tuple[str,str,str,str], parameter:str, date_from:str, date_to:str, limit=1000, max_pages=3):
    all_rows=[]
    for page in range(1,max_pages+1):
        url='https://api.openaq.org/v3/measurements'
        params={
            'parameter':parameter,
            'date_from':date_from,
            'date_to':date_to,
            'bbox':','.join(bbox),
            'limit':limit,
            'page':page
        }
        r = requests.get(url, params=params, timeout=30)
        r.raise_for_status()
        js = r.json()
        for item in js.get('results', []):
            all_rows.append({
                'ts': item['date']['utc'],
                'value': item['value'],
                'unit': item['unit'],
                'lat': item['coordinates']['latitude'] if item.get('coordinates') else None,
                'lon': item['coordinates']['longitude'] if item.get('coordinates') else None
            })
        if len(js.get('results',[])) < limit:
            break
    return pd.DataFrame(all_rows)

print('OpenAQ fetch helper ready.')

## 11. Merge Satellite vs Ground (CORE)
Alinhar por timestamp mais próximo (tolerância, ex: 30 min).

In [None]:
def align_series(sat_df:pd.DataFrame, ground_df:pd.DataFrame, tolerance='30min'):
    sat = sat_df.copy(); ground = ground_df.copy()
    sat['ts'] = pd.to_datetime(sat['ts'])
    ground['ts'] = pd.to_datetime(ground['ts'])
    ground = ground.set_index('ts')
    rows=[]
    for _,r in sat.iterrows():
        ts = r['ts']
        window = ground.loc[ts-pd.Timedelta(tolerance): ts+pd.Timedelta(tolerance)]
        if not window.empty:
            g = window.iloc[(window.index-ts).abs().argmin()]
            rows.append({'ts': ts, 'satellite_value': r['value'], 'ground_value': g['value']})
    return pd.DataFrame(rows)

print('Alignment helper defined.')

## 12. Comparison Metrics (CORE)
RMSE, MAE, r (correlação de Pearson), Bias.

In [None]:
import numpy as np

def compute_metrics(df:pd.DataFrame):
    df = df.dropna()
    s = df['satellite_value'].to_numpy(); g = df['ground_value'].to_numpy()
    if len(df)==0:
        return {}
    rmse = float(np.sqrt(((s-g)**2).mean()))
    mae = float(np.abs(s-g).mean())
    bias = float((s-g).mean())
    r = float(np.corrcoef(s,g)[0,1]) if len(df)>1 else float('nan')
    return {'rmse':rmse,'mae':mae,'bias':bias,'r':r,'n':int(len(df))}

print('Metrics helper ready.')

## 13. AQI Mapping (CORE)
Função piecewise para NO₂ (exemplo simplificado).

In [None]:
NO2_BREAKS = [0,53,100,360,649,1249,2049]  # ppb-like example; adjust to standard
AQI_BREAKS = [0,50,100,150,200,300,500]
LABELS = ['Good','Moderate','USG','Unhealthy','Very Unhealthy','Hazardous']

def aqi_from_no2(conc:float):
    for i in range(1,len(NO2_BREAKS)):
        if conc <= NO2_BREAKS[i]:
            c_low, c_high = NO2_BREAKS[i-1], NO2_BREAKS[i]
            a_low, a_high = AQI_BREAKS[i-1], AQI_BREAKS[i]
            aqi = (a_high - a_low)/(c_high - c_low) * (conc - c_low) + a_low
            return round(aqi), LABELS[i-1]
    return 500, LABELS[-1]

print('AQI conversion ready. Example:', aqi_from_no2(70))

## 14. IMERG Precipitation (OPT)
Subsetting similar ao TEMPO para variável `precipitationRate`.

## 15. MERRA-2 PBL Height & Winds (OPT)
Coleções: PBLH, U10M, V10M. Interpolar para grade TEMPO.

## 16. Feature Engineering para Forecast (CORE)
Lags (1h,3h,6h), médias móveis, sin/cos hora do dia, vento (sqrt(u^2+v^2)), PBL normalizado.

In [None]:
def add_features(df:pd.DataFrame):
    df = df.sort_values('ts').copy()
    df['lag1'] = df['aqi'].shift(1)
    df['lag3'] = df['aqi'].shift(3)
    df['lag6'] = df['aqi'].shift(6)
    df['roll3'] = df['aqi'].rolling(3).mean()
    df['roll6'] = df['aqi'].rolling(6).mean()
    df['hour'] = pd.to_datetime(df['ts']).dt.hour
    df['hour_sin'] = np.sin(2*np.pi*df['hour']/24)
    df['hour_cos'] = np.cos(2*np.pi*df['hour']/24)
    return df
print('Feature engineering helper defined.')

## 17. Forecast Baseline (CORE)
Persistência e regressão linear simples.

In [None]:
from sklearn.linear_model import LinearRegression

def train_linear_forecast(df:pd.DataFrame, horizon=48):
    df_feat = add_features(df.dropna())
    feature_cols = ['aqi','lag1','lag3','lag6','roll3','roll6','hour_sin','hour_cos']
    df_feat = df_feat.dropna(subset=feature_cols)
    X = df_feat[feature_cols].values
    y = df_feat['aqi'].values
    if len(X) < 10:
        return []
    model = LinearRegression().fit(X,y)
    # naive iterative forecast
    last = df_feat.iloc[-1].copy()
    preds=[]
    cur_time = pd.to_datetime(last['ts']) if 'ts' in last else pd.Timestamp.utcnow()
    history = list(df_feat['aqi'].values)
    for h in range(1,horizon+1):
        cur_time += pd.Timedelta(hours=1)
        hour = cur_time.hour
        features = [history[-1], history[-1], history[-3] if len(history)>=3 else history[-1], history[-6] if len(history)>=6 else history[-1],
                    np.mean(history[-3:]) if len(history)>=3 else history[-1], np.mean(history[-6:]) if len(history)>=6 else history[-1],
                    np.sin(2*np.pi*hour/24), np.cos(2*np.pi*hour/24)]
        pred = float(model.predict([features])[0])
        history.append(pred)
        aqi_val, label = aqi_from_no2(pred)  # reusando NO2 p/ demo; ajustar p/ conc real
        preds.append({'ts': cur_time.isoformat(), 'aqi': aqi_val, 'aqi_label': label})
    return preds

print('Forecast trainer ready (demo).')

## 18. /forecast Endpoint Schema (CORE)
Resposta: `{location:{lat,lon}, horizonHours:48, points:[{ts,aqi,aqi_label}]}`.

In [None]:
@app.get('/forecast')
async def forecast_endpoint(lat:float, lon:float, horizon:int=48):
    # placeholder historical series
    hist = pd.DataFrame({'ts': pd.date_range(end=pd.Timestamp.utcnow(), periods=72, freq='H'), 'aqi': np.random.randint(10,120,72)})
    preds = train_linear_forecast(hist, horizon=horizon)
    return {'location': {'lat':lat,'lon':lon}, 'horizonHours': horizon, 'points': preds}

print('Forecast endpoint stub added.')

## 19. Caching Strategy (CORE)
Chaves: `tempo:{product}:{time}:{z}:{x}:{y}` e `compare:{lat}:{lon}:{start}:{end}`.

## 20. Async Orchestration & Rate Limiting (CORE)
`aiohttp` + `asyncio.Semaphore` + `backoff` para OpenAQ / múltiplos granules.

## 21. Frontend Contract Test Calls (CORE)
Validar headers/imagens e JSON schema antes de integrar React.

## 22. Environment & Secrets (.env) (CORE)
Variáveis: `EDL_USERNAME`, `EDL_PASSWORD`, `REDIS_URL`, `CACHE_DIR`, `DEFAULT_BBOX`.

## 23. Provenance & Logging (CORE)
Registrar: coleção, versão, granules, hora processamento, commit git, parâmetros de subset.