# Data Pipelines & ETL

This notebook walks through the **offline** and **online** data pipelines: load config, align satellite data to 64×64, normalize, and save/load tensors. We use **synthetic data** when GEE is not run so you can see the full flow and visualizations.

## 1. Load ROI config and set paths

In [ ]:
import sys
from pathlib import Path

ROOT = Path.cwd().parent if Path.cwd().name == "notebooks" else Path.cwd()
if str(ROOT) not in sys.path:
    sys.path.insert(0, str(ROOT))

try:
    from src.pipeline.grid_align import load_roi_config
    cfg = load_roi_config(ROOT / "config" / "roi.yaml")
except Exception:
    cfg = {
        "roi": {"min_lon": 76.8, "min_lat": 28.4, "max_lon": 77.5, "max_lat": 28.9},
        "grid": {"height": 64, "width": 64},
        "products": ["NO2", "CO", "AEROSOL_INDEX"],
    }

roi = cfg["roi"]
grid_h, grid_w = cfg["grid"]["height"], cfg["grid"]["width"]
products = cfg["products"]
print("ROI:", roi)
print("Grid:", grid_h, "x", grid_w)
print("Products:", products)

## 2. Synthetic satellite-like data (or load from GEE)

When you run `scripts/download_ncr_30d.py` or `run_offline_pipeline.py`, real data is fetched from GEE. Here we generate synthetic 2D arrays per channel for visualization.

In [ ]:
import numpy as np
import matplotlib.pyplot as plt
from scipy.ndimage import gaussian_filter

np.random.seed(42)
H, W = 32, 32  # simulate coarse GEE output

def make_channel(name, hot_spot=(0.4, 0.6)):
    r = np.random.rand(H, W).astype(np.float32)
    r = gaussian_filter(r, sigma=3)
    hi, wi = int(H * hot_spot[0]), int(W * hot_spot[1])
    r[hi-3:hi+3, wi-3:wi+3] += 0.8
    return r

no2 = make_channel("NO2", (0.3, 0.5))
co  = make_channel("CO",  (0.6, 0.4))
aai = make_channel("AEROSOL_INDEX", (0.5, 0.5))

fig, axes = plt.subplots(1, 3, figsize=(12, 4))
for ax, arr, title in zip(axes, [no2, co, aai], products):
    im = ax.imshow(arr, cmap="YlOrRd", aspect="equal")
    ax.set_title(title)
    plt.colorbar(im, ax=ax)
plt.suptitle("Synthetic satellite-like channels (raw)")
plt.tight_layout()
plt.show()

## 3. Align to 64×64 grid (raster_to_grid)

In [ ]:
from src.pipeline.grid_align import raster_to_grid

target = (grid_h, grid_w)
no2_64 = raster_to_grid(no2, target_shape=target, method="bicubic")
co_64  = raster_to_grid(co,  target_shape=target, method="bicubic")
aai_64 = raster_to_grid(aai, target_shape=target, method="bicubic")

stacked = np.stack([no2_64, co_64, aai_64], axis=0)
print("Aligned stack shape:", stacked.shape)

fig, axes = plt.subplots(1, 3, figsize=(12, 4))
for ax, arr, title in zip(axes, [no2_64, co_64, aai_64], products):
    im = ax.imshow(arr, cmap="viridis", aspect="equal")
    ax.set_title(f"{title} (64×64)")
    plt.colorbar(im, ax=ax)
plt.suptitle("Aligned to NCR grid (bicubic)")
plt.tight_layout()
plt.show()

## 4. Normalize each channel (to_tensor)

In [ ]:
from src.pipeline.to_tensor import normalize_channel

norm_no2 = normalize_channel(no2_64, "NO2", method="minmax")
norm_co  = normalize_channel(co_64,  "CO",  method="minmax")
norm_aai = normalize_channel(aai_64, "AEROSOL_INDEX", method="minmax")

fig, axes = plt.subplots(2, 3, figsize=(12, 8))
for i, (raw, norm, title) in enumerate([(no2_64, norm_no2, "NO2"), (co_64, norm_co, "CO"), (aai_64, norm_aai, "AEROSOL_INDEX")]):
    axes[0, i].imshow(raw, cmap="plasma")
    axes[0, i].set_title(f"{title} raw")
    im = axes[1, i].imshow(norm, cmap="plasma", vmin=0, vmax=1)
    axes[1, i].set_title(f"{title} normalized")
    plt.colorbar(im, ax=axes[1, i])
plt.suptitle("Before vs after normalization (minmax)")
plt.tight_layout()
plt.show()

## 5. Stack days and save/load tensor

In [ ]:
from src.pipeline.to_tensor import stack_days_to_tensor, save_tensor, load_tensor

day_arrays = [np.stack([norm_no2, norm_co, norm_aai], axis=0) for _ in range(5)]
tensor = stack_days_to_tensor(day_arrays, channels_order=products)
print("Tensor shape (T, C, H, W):", tensor.shape)

out_dir = ROOT / "data" / "satellite"
out_dir.mkdir(parents=True, exist_ok=True)
out_path = out_dir / "notebook_demo.pt"
save_tensor(tensor, out_path)
print("Saved to", out_path)

loaded = load_tensor(out_path)
if hasattr(loaded, "numpy"):
    loaded = loaded.numpy()
print("Loaded shape:", loaded.shape)

## 6. Pipeline summary visualization

In [ ]:
import matplotlib.pyplot as plt

fig, ax = plt.subplots(1, 1, figsize=(10, 6))
ax.axis("off")
T, C, H, W = tensor.shape
ax.imshow(tensor[0].transpose(1, 2, 0), aspect="auto", extent=[0, 3, 0, 1])
ax.set_xticks([0.5, 1.5, 2.5])
ax.set_xticklabels(products)
ax.set_ylabel("Day 0")
ax.set_title("First day: 3 channels (NO2, CO, Aerosol Index) — (T, 3, 64, 64) tensor")
plt.tight_layout()
plt.show()

fig2, axes = plt.subplots(2, 3, figsize=(10, 6))
for t in range(min(2, T)):
    for c in range(3):
        axes[t, c].imshow(tensor[t, c], cmap="inferno")
        axes[t, c].set_title(f"Day {t}, {products[c]}")
plt.suptitle("Multi-day pipeline output (sample)")
plt.tight_layout()
plt.show()