In [None]:
import polars as pl
import xarray as xr
import plotly.express as px
import numpy as np

In [None]:
da = xr.open_dataarray("data/weather_forecast.zarr")
print(da)

In [None]:
# PyTorch + xarray/zarr training example for “shared-per-park → sum” model
import torch
import torch.nn as nn
import torch.nn.functional as F
from torch.utils.data import Dataset, DataLoader
import numpy as np
import xarray as xr


# -------- Dataset --------
class TimeAsBatchDataset(Dataset):
    """
    Yields (X_t, y_t) for each time t.
    X_t: (parks, forecast_step, feature) float32
    y_t: scalar float32
    Normalizes features over (time, park) so stats have shape (forecast_step, feature).
    """

    def __init__(
        self, x_da: xr.DataArray, y_total, times=None, normalize=True, norm_stats=None
    ):
        # x_da dims: time, forecast_step, park, feature (or as provided above)
        expected_dims = ("time", "forecast_step", "park", "feature")
        assert all(d in x_da.dims for d in expected_dims), (
            f"Expected dims {expected_dims}, got {x_da.dims}"
        )

        # align and choose times
        self.x_da = x_da
        if hasattr(y_total, "indexes") or hasattr(y_total, "coords"):
            # xarray DataArray/ Dataset or pandas Series with a time-like index
            common_times = np.intersect1d(
                np.asarray(x_da["time"].values),
                np.asarray(
                    y_total["time"].values
                    if "time" in getattr(y_total, "coords", {})
                    else y_total.index.values
                ),
            )
            self.y_is_xr = True
        else:
            raise ValueError(
                "y_total must be an xarray DataArray or pandas Series indexed by time."
            )
        if times is None:
            times = common_times
        self.times = np.asarray(times)

        # compute or set normalization stats: mean/std over (time, park)
        self.normalize = normalize
        fs_dim, feat_dim = x_da.sizes["forecast_step"], x_da.sizes["feature"]
        if norm_stats is not None:
            self.mean_fs_feat, self.std_fs_feat = norm_stats
        elif normalize:
            # mean/std over time and park -> shapes (forecast_step, feature)
            # (This may trigger a read; consider precomputing and passing norm_stats in production.)
            mu = x_da.mean(dim=("time", "park"), skipna=True)
            sd = x_da.std(dim=("time", "park"), skipna=True)
            self.mean_fs_feat = mu.transpose("forecast_step", "feature").values.astype(
                np.float32
            )
            self.std_fs_feat = (
                sd.transpose("forecast_step", "feature").values.astype(np.float32)
                + 1e-6
            )
        else:
            self.mean_fs_feat = np.zeros((fs_dim, feat_dim), dtype=np.float32)
            self.std_fs_feat = np.ones((fs_dim, feat_dim), dtype=np.float32)

        # cache sizes
        self.n_parks = x_da.sizes["park"]
        self.n_steps = x_da.sizes["forecast_step"]
        self.n_feat = x_da.sizes["feature"]
        self.y_total = y_total

    def __len__(self):
        return len(self.times)

    def __getitem__(self, idx):
        t = self.times[idx]
        # select one time → (forecast_step, park, feature)
        xt = (
            self.x_da.sel(time=t)
            .transpose("park", "forecast_step", "feature")
            .values.astype(np.float32)
        )  # (P,S,F)
        # normalize broadcast over parks
        if self.normalize:
            xt = (xt - self.mean_fs_feat[None, :, :]) / self.std_fs_feat[None, :, :]

        # replace NaNs if any
        if np.isnan(xt).any():
            xt = np.nan_to_num(xt, nan=0.0)

        # target
        if self.y_is_xr:
            yt = float(self.y_total.sel(time=t).values)
        else:
            yt = float(self.y_total.loc[t])
        return torch.from_numpy(xt), torch.tensor(yt, dtype=torch.float32)


# -------- Model: shared per-park φ, sum aggregation --------
class PerParkMLP(nn.Module):
    """Shared small MLP mapping (S,F) → scalar per park."""

    def __init__(
        self, forecast_steps: int, n_features: int, hidden=256, hidden2=64, nonneg=True
    ):
        super().__init__()
        in_dim = forecast_steps * n_features
        self.mlp = nn.Sequential(
            nn.Linear(in_dim, hidden),
            nn.ReLU(),
            nn.Linear(hidden, hidden2),
            nn.ReLU(),
            nn.Linear(hidden2, 1),
        )
        self.nonneg = nonneg

    def forward(self, x):  # x: (B, P, S, F)
        B, P, S, F = x.shape
        z = x.reshape(B * P, S * F)
        z = self.mlp(z)  # (B*P, 1)
        if self.nonneg:
            z = F.softplus(z)  # enforce non-negative per-park production
        return z.view(B, P)  # (B, P)


class SumOverParksModel(nn.Module):
    """
    Applies shared φ to each park, returns:
      - total = sum over parks
      - per_park = individual park scalars (useful for diagnostics)
    """

    def __init__(self, forecast_steps: int, n_features: int):
        super().__init__()
        self.phi = PerParkMLP(forecast_steps, n_features)

    def forward(self, x):  # x: (B, P, S, F)
        per_park = self.phi(x)  # (B, P)
        total = per_park.sum(dim=1)  # (B,)
        return total, per_park


# -------- Example training loop --------
def train_loop(
    x_da: xr.DataArray,
    y_total,
    train_times,
    val_times=None,
    batch_size=64,
    epochs=10,
    lr=1e-3,
    weight_decay=1e-4,
    num_workers=0,
    device=None,
):
    device = device or ("cuda" if torch.cuda.is_available() else "cpu")

    train_ds = TimeAsBatchDataset(x_da, y_total, times=train_times, normalize=True)
    val_ds = (
        TimeAsBatchDataset(
            x_da,
            y_total,
            times=val_times,
            normalize=True,
            norm_stats=(train_ds.mean_fs_feat, train_ds.std_fs_feat),
        )
        if val_times is not None
        else None
    )

    train_loader = DataLoader(
        train_ds,
        batch_size=batch_size,
        shuffle=True,
        num_workers=num_workers,
        pin_memory=True,
    )
    val_loader = (
        DataLoader(
            val_ds,
            batch_size=batch_size,
            shuffle=False,
            num_workers=num_workers,
            pin_memory=True,
        )
        if val_ds is not None
        else None
    )

    model = SumOverParksModel(
        forecast_steps=x_da.sizes["forecast_step"], n_features=x_da.sizes["feature"]
    ).to(device)
    opt = torch.optim.AdamW(model.parameters(), lr=lr, weight_decay=weight_decay)
    loss_fn = nn.MSELoss()

    for epoch in range(1, epochs + 1):
        # --- train ---
        model.train()
        running = []
        for xb, yb in train_loader:
            xb = xb.to(device, non_blocking=True)  # (B,P,S,F)
            yb = yb.to(device, non_blocking=True)  # (B,)
            opt.zero_grad(set_to_none=True)
            yhat, _ = model(xb)
            loss = loss_fn(yhat, yb)
            loss.backward()
            torch.nn.utils.clip_grad_norm_(model.parameters(), 1.0)
            opt.step()
            running.append(loss.item())

        msg = f"[{epoch:03d}] train MSE: {np.mean(running):.4f}"

        # --- validate ---
        if val_loader is not None:
            model.eval()
            v_running, v_mae = [], []
            with torch.no_grad():
                for xb, yb in val_loader:
                    xb = xb.to(device, non_blocking=True)
                    yb = yb.to(device, non_blocking=True)
                    yhat, _ = model(xb)
                    v_running.append(loss_fn(yhat, yb).item())
                    v_mae.append(torch.mean(torch.abs(yhat - yb)).item())
            msg += (
                f" | val MSE: {np.mean(v_running):.4f} | val MAE: {np.mean(v_mae):.4f}"
            )
        print(msg)

    return model


# ---------- Usage sketch ----------
# x_da: your weather_features DataArray with dims (time, forecast_step, park, feature)
# y_total: xarray DataArray (time,) with total production per bid area at each time

# x_da = xr.open_zarr("path/to.zarr")["weather_features"]  # if not already loaded
# y_total = xr.open_dataset("targets.zarr")["total_power"] # or any aligned (time,) series

# times = np.asarray(x_da["time"].values)
# split_time = int(0.8 * len(times))
# train_times, val_times = times[:split_time], times[split_time:]

# model = train_loop(x_da, y_total, train_times, val_times, epochs=20, batch_size=128, lr=3e-4)


In [None]:
train_loop()

In [None]:
weather_forecast = pl.scan_parquet("data/met_forecast.parquet")
weather_nowcast = pl.scan_parquet("data/met_nowcast.parquet")
windpower = pl.scan_parquet("data/wind_power_per_bidzone.parquet").rename(
    {"__index_level_0__": "time"}
)
windparks = pl.scan_csv("data/windparks_bidzone.csv", try_parse_dates=True).filter(
    pl.col("eic_code") == pl.col("eic_code").first().over("substation_name")
)

In [None]:
windparks.select(pl.col("prod_start_new").max()).collect()

In [None]:
bid_zone = "ELSPOT NO3"
ensemble_member = 0
bid_zone_weather = (
    weather_forecast.join(windparks, left_on="sid", right_on="substation_name")
    .filter(
        pl.col("bidding_area") == bid_zone,
        pl.col("time_ref") > pl.col("prod_start_new"),
    )
    .select(
        "sid",
        # "prod_start_new",
        "time_ref",
        "time",
        "lt",
        "operating_power_max",
        f"ws10m_{ensemble_member:02d}",
        f"t2m_{ensemble_member:02d}",
        f"rh2m_{ensemble_member:02d}",
        f"mslp_{ensemble_member:02d}",
        f"g10m_{ensemble_member:02d}",
    )
)
bid_zone_weather.tail(10).collect()

In [None]:
import datetime

for name, data in bid_zone_weather.collect(engine="streaming").group_by(
    "time_ref", "time", maintain_order=False
):
    print(name)
    print(data)
    break

In [None]:
import polars as pl
import xarray as xr
import numpy as np


def polars_to_xarray_dataarray(X: pl.LazyFrame, zarr_path: str) -> xr.DataArray:
    # Define dimensions
    features = [
        "lt",
        "operating_power_max",
        "ws10m_00",
        "t2m_00",
        "rh2m_00",
        "mslp_00",
        "g10m_00",
    ]

    # Unique sorted values for dimensions
    times = X.select("time").unique().collect().to_series().sort()
    forecast_steps = X.select("lt").unique().collect().to_series().sort()
    num_stations = X.select(pl.n_unique("sid")).collect().item()
    sids = list(range(num_stations))

    # Mapping for fast lookup
    times_idx = {t: i for i, t in enumerate(times)}
    # forecast_idx = {t: i for i, t in enumerate(forecast_steps)}
    num_features = len(features)

    # Initialize dense array with NaNs
    shape = (len(times_idx), len(forecast_steps), num_stations, num_features)
    data = np.full(shape, np.nan, dtype=np.float32)

    # Fill values
    for (time_ref, forecast_step), group_data in X.collect(engine="streaming").group_by(
        "time", "lt", maintain_order=False
    ):
        i = times_idx[time_ref]
        h = group_data.height
        print(time_ref, forecast_step)
        data[i, forecast_step, :h, :] = group_data.select(features).to_numpy()

    # Create xarray DataArray
    da = xr.DataArray(
        data,
        dims=["time", "forecast_step", "park", "feature"],
        coords={
            "time": times,
            "forecast_step": forecast_steps,
            "park": sids,
            "feature": features,
        },
        name="weather_features",
    )

    # Save to zarr
    da.to_dataset().to_zarr(zarr_path, mode="w")
    print(f"Saved DataArray to {zarr_path}")
    return da

In [None]:
X = polars_to_xarray_dataarray(bid_zone_weather, "data/weather_forecast.zarr")

In [None]:
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader
import pandas as pd


class SharedMLP(nn.Module):
    def __init__(self, input_dim):
        super().__init__()
        self.network = nn.Sequential(
            nn.Linear(input_dim, 64), nn.ReLU(), nn.Linear(64, 1)
        )

    def forward(self, x):
        return self.network(x).squeeze(-1)  # (N,) output


class SumModel(nn.Module):
    def __init__(self, input_dim):
        super().__init__()
        self.shared_mlp = SharedMLP(input_dim)

    def forward(self, x):
        outputs = self.shared_mlp(x)  # shape: (num_sids,)
        return outputs.sum()  # sum across all parks

In [None]:
class PowerDataset(Dataset):
    def __init__(self, X_df: pl.DataFrame, y_df: pl.DataFrame):
        self.feature_cols = [
            "lt",
            "operating_power_max",
            "ws10m_00",
            "t2m_00",
            "rh2m_00",
            "mslp_00",
            "g10m_00",
        ]
        self.sids = sorted(X_df["sid"].unique().to_list())
        self.sid_to_idx = {sid: i for i, sid in enumerate(self.sids)}
        self.num_sids = len(self.sids)
        self.num_features = len(self.feature_cols)

        # Create sorted list of (time_ref, lt) keys
        self.keys = (
            X_df.select(["time_ref", "lt"]).unique().sort(["time_ref", "lt"]).rows()
        )

        # Map from (time_ref, lt) to index
        self.key_to_idx = {key: idx for idx, key in enumerate(self.keys)}
        self.N = len(self.keys)

        # Initialize the full input tensor
        self.inputs = torch.zeros(
            (self.N, self.num_sids, self.num_features), dtype=torch.float32
        )

        # Fill the tensor
        for row in X_df.iter_rows(named=True):
            time_ref, lt, sid = row["time_ref"], row["lt"], row["sid"]
            print(time_ref, lt, sid)
            key = (time_ref, lt)
            if key not in self.key_to_idx:
                continue
            i = self.key_to_idx[key]
            j = self.sid_to_idx[sid]
            features = [float(row[col]) for col in self.feature_cols]
            self.inputs[i, j] = torch.tensor(features)

        # Load targets
        self.targets = torch.tensor(y_df["Power"].to_numpy(), dtype=torch.float32)

    def __len__(self):
        return self.N

    def __getitem__(self, idx):
        return self.inputs[idx], self.targets[idx]


In [None]:
def train_model(model, dataloader, epochs=20, lr=1e-3):
    optimizer = optim.Adam(model.parameters(), lr=lr)
    loss_fn = nn.MSELoss()

    model.train()
    for epoch in range(epochs):
        total_loss = 0
        for i, (X_batch, y_batch) in enumerate(dataloader):
            print(f"Batch {i}, dims={X_batch.shape}")
            optimizer.zero_grad()
            # Only one sample per batch, unpack
            preds = [model(x) for x in X_batch]  # list of scalars
            preds = torch.stack(preds)
            loss = loss_fn(preds, y_batch)
            loss.backward()
            optimizer.step()
            total_loss += loss.item()

        print(f"Epoch {epoch + 1}, Loss: {total_loss / len(dataloader):.4f}")


In [None]:
# Assuming your dataframes are X and y
dataset = PowerDataset(X, y)
dataloader = DataLoader(dataset, batch_size=4, shuffle=True)

model = SumModel(input_dim=7)
train_model(model, dataloader)


In [None]:
X

In [None]:
X = bid_zone_weather.collect()
da = polars_to_xarray_dataarray(X, "weather_forecast_array.zarr")

In [None]:
ds = xr.open_zarr("weather_forecast_array.zarr")
da = ds["weather_features"]