In [53]:
import random
import duckdb
import numpy as np
import pandas as pd
from dataclasses import dataclass
from datetime import datetime
from pathlib import PosixPath
from typing import Tuple

import modal
import torch
import torch_frame as tf
from torch_frame.data import Dataset
from torch_frame.utils import infer_df_stype
from torch_frame.data.loader import DataLoader
from torch_frame.nn.encoder import EmbeddingEncoder, LinearEncoder
from torch_frame.nn.models import FTTransformer
from torchmetrics import Accuracy, MeanAbsoluteError

# Modal app
app        = modal.App("parcel-hp-sweep")
MINUTES     = 60
HOURS       = 60 * MINUTES
GPU_TYPE    = "T4"
vol         = modal.Volume.from_name("parcel")
VPATH       = PosixPath("../data/")
MODEL_DIR   = VPATH / "models"

# container images
base_image  = (
    modal.Image.debian_slim(python_version="3.12")
    .uv_pip_install(
        "duckdb", "pyarrow", "pandas", "numpy"
    )
)
torch_image = (
    base_image
    .uv_pip_install(
        "torch",
        "torchvision",
        "pytorch-frame",
        "torchmetrics",
        "wandb",
        "tensorboard"
    )
)

# helpers
def set_seed(seed: int):
    random.seed(seed)
    np.random.seed(seed)
    torch.manual_seed(seed)
    torch.backends.cudnn.deterministic = True
    torch.backends.cudnn.benchmark = False

@dataclass
class HParams:
    # model
    channels: int = 32
    n_layers: int = 10
    num_heads: int = 8
    encoder_pad_size: int = 2
    attn_dropout: float = 0.3
    ffn_dropout: float = 0.3
    gamma: float = 0.95
    # optimisation
    lr: float = 1e-2
    # dataset
    sample_rows: int = 100
    batch_size: int = 2
    val_split: float = 0.2
    seed: int = 42

# data module
def build_dataloaders(
    parquet_path: str,
    task: str,
    h: HParams,
) -> Tuple[DataLoader, DataLoader, tf.data.Dataset.col_stats, dict]:
    """
    Returns train & val DataLoaders plus col_stats / col_names_dict needed by model.
    `task` ∈ {"regression", "binary"}
    """
    if task not in {"regression", "binary"}:
        raise ValueError(f"task must be 'regression' or 'binary', got {task!r}")

    if task == "binary":
        target_col = "is_successful_delivery"
    else:
        target_col = "total_hours_from_receiving_to_last_delivery"

    q = f"""SELECT * FROM '{VPATH / parquet_path}'"""
    if h.sample_rows:
        q += f""" USING SAMPLE reservoir({h.sample_rows} ROWS) REPEATABLE ({h.seed})"""

    df = duckdb.sql(q).df()

    # # --- Build a canonical target column: df['y'] ---
    # if task == "binary":
    #     if "is_successful_delivery" not in df.columns:
    #         raise KeyError("Expected column 'is_successful_delivery' for binary task.")
    #     # Coerce to integers first
    #     y = df["is_successful_delivery"]
    #     # If upstream is dirty (e.g., durations leaked in), binarize by a rule YOU control.
    #     # Example rule: treat any positive value as success (adjust to your business logic).
    #     if not set(pd.unique(y.dropna())).issubset({0, 1}):
    #         y = (pd.to_numeric(y, errors="coerce").fillna(0) > 0).astype("int8")
    #     else:
    #         y = y.astype("int8")
    #     df = df.assign(y=y)
    #     target_col = "y"

    #     # Drop duration column to prevent accidental target pickup:
    #     if "total_hours_from_receiving_to_last_delivery" in df.columns:
    #         df = df.drop(columns=["total_hours_from_receiving_to_last_delivery"])

    #     # Validate label invariants
    #     bad = ~df["y"].isin([0, 1])
    #     if bad.any():
    #         # Surface a tiny sample to logs if it ever happens
    #         samples = df.loc[bad, ["y"]].head(5).to_dict(orient="records")
    #         raise RuntimeError(f"Binary target contains non-binary values. Samples: {samples}")

    # else:  # regression
    #     if "total_hours_from_receiving_to_last_delivery" not in df.columns:
    #         raise KeyError("Expected column 'total_hours_from_receiving_to_last_delivery' for regression task.")
    #     df = df.assign(
    #         y=pd.to_numeric(df["total_hours_from_receiving_to_last_delivery"], errors="coerce").astype("float32")
    #     )
    #     target_col = "y"
    df = df.drop(columns=[
        "customer_id",
        "last_delivery_datetime_hour",
        "last_delivery_datetime_day",
        "last_delivery_datetime_month",
        "last_delivery_datetime_is_non_working_day",
        "parcel_id",
        "is_successful_delivery_at_first_time",
        "total_hours_from_receiving_to_last_failed_delivery",
        "total_hours_from_receiving_to_last_success_delivery"
    ])

    if task == "regression":
        df[target_col] = df[target_col].astype(float)
    else:
        df[target_col] = df[target_col].astype(int)

    col2stype = infer_df_stype(df)
    col2stype[target_col] = tf.numerical if task == "regression" else tf.categorical

    full_ds = Dataset(df, col_to_stype=col2stype, target_col=target_col)
    full_ds.materialize(path=VPATH / "full_stats.pt")
    num_rows = full_ds.num_rows
    n_train  = int(num_rows * (1 - h.val_split))
    g        = torch.Generator().manual_seed(h.seed)
    print(num_rows)
    perm     = torch.randperm(num_rows, generator=g)
    print(perm.shape)
    train_ds = full_ds.index_select(perm[:n_train])
    val_ds   = full_ds.index_select(perm[n_train:])

    train_ds.materialize(path=VPATH / "train_stats.pt")
    val_ds.materialize(path=VPATH / "val_stats.pt", col_stats=train_ds.col_stats)

    train_loader = DataLoader(train_ds, batch_size=h.batch_size, shuffle=True)
    val_loader   = DataLoader(val_ds, batch_size=h.batch_size)

    return train_loader, val_loader, train_ds.col_stats, train_ds.tensor_frame.col_names_dict

In [55]:
parquet_path = "../data/parcel_tracking_output_data.parquet"
h = HParams()
q = f"""SELECT * FROM '{VPATH / parquet_path}'"""
if h.sample_rows:
    q += f""" USING SAMPLE reservoir({h.sample_rows} ROWS) REPEATABLE ({h.seed})"""

df = duckdb.sql(q).df()

In [56]:
df = df.drop(columns=[
        "customer_id",
        "last_delivery_datetime_hour",
        "last_delivery_datetime_day",
        "last_delivery_datetime_month",
        "last_delivery_datetime_is_non_working_day",
        "parcel_id",
        "is_successful_delivery_at_first_time",
        "total_hours_from_receiving_to_last_failed_delivery",
        "total_hours_from_receiving_to_last_success_delivery"
    ])

In [57]:
target_col = "is_successful_delivery"
task = "binary"
col2stype = infer_df_stype(df)
col2stype[target_col] = tf.numerical if task == "regression" else tf.categorical

full_ds = Dataset(df, col_to_stype=col2stype, target_col=target_col)
full_ds.materialize(path=VPATH / "full_stats.pt")
num_rows = full_ds.num_rows
n_train  = int(num_rows * (1 - h.val_split))
g        = torch.Generator().manual_seed(h.seed)
print(num_rows)
perm     = torch.randperm(num_rows, generator=g)

100


In [77]:
len(df.columns)

18

In [None]:
full_ds.index_select

Dataset()

In [66]:
full_ds.index_select(42)

IndexError: index 42 is out of bounds for dimension 0 with size 10

In [64]:
perm[:n_train]

tensor([42, 96, 62, 98, 46, 95, 60, 24, 78, 16, 68, 70, 11, 13, 97, 52, 99, 19,
        71, 10, 89, 86, 18, 40,  5, 38,  9, 82, 83, 43, 32, 94, 67, 93, 75, 59,
        79,  1, 50, 73, 66, 45, 63, 58, 22,  3, 87,  4, 61, 51, 12, 74, 21, 20,
         6, 35, 44, 48, 37, 33, 15, 88, 31, 69, 27, 81, 85, 56,  7, 65, 47, 41,
        29, 80, 57, 84, 36, 17, 34, 72])

In [61]:
train_ds = full_ds.index_select(perm[:n_train])
val_ds   = full_ds.index_select(perm[n_train:])

IndexError: index 42 is out of bounds for dimension 0 with size 10

In [60]:
n_train

80

In [59]:
perm[:num_rows]

tensor([42, 96, 62, 98, 46, 95, 60, 24, 78, 16, 68, 70, 11, 13, 97, 52, 99, 19,
        71, 10, 89, 86, 18, 40,  5, 38,  9, 82, 83, 43, 32, 94, 67, 93, 75, 59,
        79,  1, 50, 73, 66, 45, 63, 58, 22,  3, 87,  4, 61, 51, 12, 74, 21, 20,
         6, 35, 44, 48, 37, 33, 15, 88, 31, 69, 27, 81, 85, 56,  7, 65, 47, 41,
        29, 80, 57, 84, 36, 17, 34, 72,  2, 91,  8, 53, 30, 90, 26, 23, 54, 76,
        14, 55,  0, 64, 77, 39, 25, 92, 49, 28])

In [54]:
train_loader, val_loader, col_stats, col_names_dict = build_dataloaders(parquet_path="parcel_tracking_output_data.parquet", task="binary", h=HParams())

100
torch.Size([100])


IndexError: index 42 is out of bounds for dimension 0 with size 10

In [None]:
torch.from_file("../data/train_stats.pt")

tensor([])

In [None]:
val_loader

<torch_frame.data.loader.DataLoader at 0x7f1f05fc0a10>

In [None]:
for batch in train_loader:
    print(batch.y.float())
    # break

tensor([1., 1.])
tensor([0., 1.])
tensor([1., 1.])
tensor([1., 1.])


In [None]:
import logging as L
import os
import random
import subprocess
import sys
import duckdb
import numpy as np
import pandas as pd
from dataclasses import dataclass
from datetime import datetime
from itertools import product
from pathlib import Path, PosixPath
from typing import Optional, Tuple, List

import modal
import torch
import torch.nn.functional as F
import torch_frame as tf
from torch_frame.data import Dataset
from torch_frame.utils import infer_df_stype
from torch_frame.data.loader import DataLoader
from torch_frame.nn.encoder import EmbeddingEncoder, LinearEncoder
from torch_frame.nn.models import FTTransformer
from torchmetrics import Accuracy, MeanAbsoluteError

# Modal app
app        = modal.App("parcel-hp-sweep")
MINUTES     = 60
HOURS       = 60 * MINUTES
GPU_TYPE    = "T4"
vol         = modal.Volume.from_name("parcel", create_if_missing=True)
VPATH       = PosixPath("/vol")
MODEL_DIR   = VPATH / "models"

# container images
base_image  = modal.Image.debian_slim(python_version="3.11").uv_pip_install(
    "duckdb", "pyarrow", "pandas", "numpy"
)
torch_image = (
    base_image
    .uv_pip_install(
        "torch",
        "torch_frame",
        "torchmetrics",
        "wandb",
    )
)

# helpers
def set_seed(seed: int):
    random.seed(seed)
    np.random.seed(seed)
    torch.manual_seed(seed)
    torch.backends.cudnn.deterministic = True
    torch.backends.cudnn.benchmark = False

@dataclass
class HParams:
    # model
    channels: int        = 32
    n_layers: int        = 3
    # optimisation
    lr: float            = 1e-2
    # dataset
    sample_rows: int     = 10
    batch_size: int      = 5
    val_split: float     = 0.2
    seed: int            = 42

# data module
def build_dataloaders(
    parquet_path: str,
    target_col: str,
    task: str,
    h: HParams,
) -> Tuple[DataLoader, DataLoader, tf.data.Dataset.col_stats, dict]:
    """
    Returns train & val DataLoaders plus col_stats / col_names_dict needed by model.
    `task` ∈ {"regression", "binary"}
    """
    q = f"SELECT * FROM '{parquet_path}'"
    if h.sample_rows:
        q += f" USING SAMPLE reservoir({h.sample_rows} ROWS) REPEATABLE ({h.seed})"
    df = duckdb.sql(q).df()

    if task == "regression":
        df[target_col] = df[target_col].astype(float)
    else:
        df[target_col] = df[target_col].astype(int)

    # simple heuristic – drop obvious identifiers
    df = df.drop(columns=[c for c in df.columns if c.endswith("_id")], errors="ignore")

    col2stype = infer_df_stype(df)
    col2stype[target_col] = tf.numerical if task == "regression" else tf.categorical

    full_ds = Dataset(df, col_to_stype=col2stype, target_col=target_col)
    n_train = int(len(full_ds) * (1 - h.val_split))
    train_ds, val_ds = full_ds[:n_train], full_ds[n_train:]

    train_ds.materialize(path=f"{VPATH}/train_stats.pt")
    val_ds.materialize(path=f"{VPATH}/val_stats.pt", col_stats=train_ds.col_stats)

    train_loader = DataLoader(train_ds, batch_size=h.batch_size, shuffle=True)
    val_loader   = DataLoader(val_ds, batch_size=h.batch_size)

    return train_loader, val_loader, train_ds.col_stats, train_ds.tensor_frame.col_names_dict

# training loop
def run_epoch(
    model, loader, optim, criterion, device
) -> float:
    model.train()
    total = n = 0.0
    for batch in loader:
        batch = batch.to(device)
        pred  = model(batch).squeeze()
        loss  = criterion(pred, batch.y.float())
        optim.zero_grad()
        loss.backward()
        optim.step()
        total += loss.item() * len(batch.y)
        n += len(batch.y)
    return total / n

@torch.no_grad()
def evaluate(model, loader, metric, device) -> float:
    metric.reset()
    model.eval()
    for batch in loader:
        batch = batch.to(device)
        pred  = model(batch).squeeze()
        if isinstance(metric, Accuracy):
            pred = torch.sigmoid(pred)
        metric.update(pred, batch.y.float())
    return metric.compute().item()

# remote fn
@app.function(
    gpu=GPU_TYPE,
    image=torch_image,
    timeout=2 * HOURS,
    volumes={VPATH: vol},
    secrets=[modal.Secret.from_name("wandb-secret")]
)
def train_model(
    parquet_path: str,
    target_col: str,
    task: str,
    h: HParams,
    run_to_end: bool,
    max_epochs: int = 50,
):
    """
    Train one model with given hyper-parameters.
    """
    set_seed(h.seed)
    device = "cuda" if torch.cuda.is_available() else "cpu"

    # data
    train_loader, val_loader, col_stats, col_names_dict = build_dataloaders(
        parquet_path, target_col, task, h
    )

    # model
    enc = {
        tf.stype.categorical: EmbeddingEncoder(),
        tf.stype.numerical:   LinearEncoder(),
    }
    model = FTTransformer(
        channels=h.channels,
        num_layers=h.n_layers,
        out_channels=1,
        col_stats=col_stats,
        col_names_dict=col_names_dict,
        stype_encoder_dict=enc,
    )
    if torch.cuda.device_count() > 1:
        model = torch.nn.DataParallel(model)
    model.to(device)

    # optimisation
    optim = torch.optim.AdamW(model.parameters(), lr=h.lr)
    criterion = torch.nn.SmoothL1Loss() if task == "regression" else torch.nn.BCEWithLogitsLoss()
    metric    = MeanAbsoluteError().to(device) if task == "regression" else Accuracy(task="binary").to(device)

    # W&B
    import wandb
    wandb.init(
        project="parcel-tabular",
        config=dict(**h.__dict__, task=task),
        name=f"E{datetime.now().strftime('%m%d_%H%M%S')}_ch{h.channels}_L{h.n_layers}_lr{h.lr:g}",
    )
    best_val, patience, PATIENCE = (np.inf if task=="regression" else 0.0), 0, 5

    for epoch in range(1, max_epochs + 1):
        tr_loss = run_epoch(model, train_loader, optim, criterion, device)
        val_metric = evaluate(model, val_loader, metric, device)
        wandb.log({"epoch": epoch, "train_loss": tr_loss,
                   "val_metric": val_metric})

        improved = (val_metric < best_val) if task=="regression" else (val_metric > best_val)
        if improved:
            best_val, patience = val_metric, 0
            # save weights into the shared Volume so other jobs can see them
            MODEL_DIR.mkdir(parents=True, exist_ok=True)
            torch.save(model.state_dict(), MODEL_DIR / f"{wandb.run.name}.pt")
            vol.commit()
        else:
            patience += 1
            if patience >= PATIENCE and not run_to_end:
                break
    wandb.finish()
    return float(best_val)

# sweep main
@app.local_entrypoint()
def main(
    parquet_path: str = "../data/parcel_tracking_output_data.parquet",
    target_col: str   = "total_hours_from_receiving_to_last_success_delivery",
    task: str         = "regression",
):
    """
    Kick off an 8-run grid search, find the best, then resume it to completion.
    """
    # default hparams
    ref = HParams()
    ch_opts  = (16, ref.channels)
    lyr_opts = (2,  ref.n_layers)
    lr_opts  = (1e-3, ref.lr)

    hp_list: List[HParams] = [
        HParams(channels=channel, n_layers=layer, lr=lr,
                sample_rows=ref.sample_rows,
                batch_size=ref.batch_size,
                val_split=ref.val_split,
                seed=42)
        for channel, layer, lr in product(ch_opts, lyr_opts, lr_opts)
    ]

    print(f"Launching {len(hp_list)} hyper-param jobs on Modal …")
    # first round – early stop
    early_results = train_model.starmap(
        [
            (parquet_path, target_col, task, h, False)   # run_to_end=False
            for h in hp_list
        ],
        order_outputs=False,
    )

    best_h, best_val = None, np.inf if task=="regression" else -np.inf
    for res, h in zip(early_results, hp_list):
        v = float(res)
        better = (v < best_val) if task=="regression" else (v > best_val)
        print(f"{h} ⇒ {v:.4f}")
        if better:
            best_h, best_val = h, v

    print(f"\nBest so far: {best_h} ({best_val:.4f}) — continuing to full training …")
    # train_model.remote(parquet_path, target_col, task, best_h, True, max_epochs=100)

In [2]:
# tabular_experiments.py
# Re‑usable pipeline for ETA regression & POD classification
import argparse
import random
import duckdb
import numpy as np
import torch
# import torch.nn.functional as F
import torch_frame as tf
from torch_frame.data import Dataset
from torch_frame.utils import infer_df_stype
from torch_frame.data.loader import DataLoader
from torchmetrics import Accuracy, MeanAbsoluteError   # torchmetrics ≥1.4
from torch_frame.nn.encoder import EmbeddingEncoder, LinearEncoder
from torch_frame.nn.models import FTTransformer


In [3]:
# 0. 100 % reproducibility
def set_seed(seed: int):
    random.seed(seed)
    np.random.seed(seed)
    torch.manual_seed(seed)
    torch.backends.cudnn.deterministic = True
    torch.backends.cudnn.benchmark = False

# 1. Data module (handles leakage‑free materialisation)
class TabularDataModule:
    def __init__(self, file_path: str, target: str, task: str,
                 sample_rows: int = None, batch_size: int = 256,
                 val_split: float = 0.2, seed: int = 42):
        """
        task ∈ {"regression", "binary"}
        """
        self.file_path, self.target, self.task = file_path, target, task
        self.sample_rows, self.batch_size, self.val_split, self.seed = \
            sample_rows, batch_size, val_split, seed

    def load(self):
        q = f"SELECT * FROM '{self.file_path}'"
        if self.sample_rows:
            q += f" USING SAMPLE reservoir({self.sample_rows} ROWS) REPEATABLE ({self.seed})"
        df = duckdb.sql(q).df()

        # Cast target dtype & drop high‑cardinality IDs
        if self.task == "regression":
            df[self.target] = df[self.target].astype(float)
        else:
            df[self.target] = df[self.target].astype(int)
        df = df.drop(columns=["customer_id", "parcel_id"], errors="ignore")

        # Detect column semantics *before* split
        col_to_stype = infer_df_stype(df)
        col_to_stype[self.target] = tf.numerical if self.task == "regression" else tf.categorical

        # Create Dataset & materialise **train only** to avoid leakage
        ds = Dataset(df, col_to_stype=col_to_stype, target_col=self.target)
        n_train = int(len(ds) * (1 - self.val_split))
        train_ds, val_ds = ds[:n_train], ds[n_train:]
        train_ds.materialize(path="cache/train.pt")             # persisted stats
        val_ds.materialize(path="cache/val.pt", col_stats=train_ds.col_stats)

        # Mini‑batch loaders
        self.train_loader = DataLoader(train_ds, batch_size=self.batch_size, shuffle=True)
        self.val_loader   = DataLoader(val_ds,   batch_size=self.batch_size)

        self.col_stats, self.col_names_dict = train_ds.col_stats, train_ds.tensor_frame.col_names_dict


# 2. Model factory (drop your own backbones here)
def build_model(name: str, out_channels: int,
                col_stats, col_names_dict, stype_enc):
    if name == "ftt":
        return FTTransformer(channels=32, out_channels=out_channels,
                             num_layers=4,
                             col_stats=col_stats,
                             col_names_dict=col_names_dict,
                             stype_encoder_dict=stype_enc)
    elif name == "mlp":        # tiny baseline for sanity checks
        from torch_frame.nn.models import MLP
        return MLP(channels=64, out_channels=out_channels,
                   col_stats=col_stats,
                   col_names_dict=col_names_dict,
                   stype_encoder_dict=stype_enc)
    else:
        raise ValueError(f"Unknown model {name}")


# 3. Generic trainer
def train(model, loader, optimizer, criterion):
    model.train()
    loss_sum = 0
    n = 0
    for batch in loader:
        pred = model(batch).squeeze()
        y = batch.y.float()
        loss = criterion(pred, y)
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()
        loss_sum += loss.item() * len(y)
        n += len(y)
    return loss_sum / n

@torch.no_grad()
def evaluate(model, loader, metric):
    model.eval()
    metric.reset()
    for batch in loader:
        pred = model(batch).squeeze()
        y = batch.y.float()
        if isinstance(metric, Accuracy):
            pred = torch.sigmoid(pred)
        metric.update(pred, y)
    return metric.compute().item()


# 4. Experiment runner
def run(args):
    set_seed(args.seed)
    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

    # Build data
    dm = TabularDataModule(args.file, args.target, args.task,
                           sample_rows=args.sample, batch_size=args.batch,
                           val_split=args.val_split, seed=args.seed)
    dm.load()

    # Shared stype encoders
    stype_enc = {tf.stype.categorical: EmbeddingEncoder(),
                 tf.stype.numerical:   LinearEncoder()}




    # Model & optimiser
    out_dim = 1    # both tasks output scalar
    model = build_model(args.model, out_dim,
                        dm.col_stats, dm.col_names_dict, stype_enc).to(device)
    opt = torch.optim.AdamW(model.parameters(), lr=args.lr)

    # Loss & metric
    if args.task == "regression":
        criterion = torch.nn.SmoothL1Loss()
        metric = MeanAbsoluteError().to(device)
    else:
        criterion = torch.nn.BCEWithLogitsLoss()
        metric = Accuracy(task="binary").to(device)

    # Training loop with rudimentary early stopping
    best_val, patience = float("inf") if args.task == "regression" else 0.0, 0
    for epoch in range(1, args.epochs + 1):
        tr_loss = train(model, dm.train_loader, opt, criterion)
        val_metric = evaluate(model, dm.val_loader, metric)
        print(f"E{epoch:02d} | train-loss {tr_loss:.4f} | val‑{'MAE' if args.task == 'regression' else 'Acc'} {val_metric:.4f}")
        improve = (val_metric < best_val) if args.task == "regression" else (val_metric > best_val)
        if improve:
            best_val, patience = val_metric, 0
            torch.save(model.state_dict(), "best.pt")
        else:
            patience += 1
            if patience >= args.patience:
                break

In [4]:
run()

TypeError: run() missing 1 required positional argument: 'args'

In [None]:
# 5. CLI – define once; compare infinitely
if __name__ == "__main__":
    p = argparse.ArgumentParser()
    p.add_argument("--file",   type=str, required=True)
    p.add_argument("--target", type=str, required=True)
    p.add_argument("--task",   choices=["regression", "binary"], required=True)
    p.add_argument("--model",  choices=["ftt", "mlp"], default="ftt")
    p.add_argument("--sample", type=int, default=None)
    p.add_argument("--batch",  type=int, default=256)
    p.add_argument("--lr",     type=float, default=5e-3)
    p.add_argument("--epochs", type=int, default=50)
    p.add_argument("--val_split", type=float, default=0.2)
    p.add_argument("--patience",  type=int, default=5)
    p.add_argument("--seed",      type=int, default=42)
    # run(p.parse_args())

In [16]:
import duckdb
import torch
import torch.nn.functional as F
import torch_frame as tf
from torch_frame.data import Dataset
from torch_frame.utils import infer_df_stype
from torch_frame.data.loader import DataLoader
from torch_frame.nn.encoder import EmbeddingEncoder, LinearEncoder
from torch_frame.nn.models import FTTransformer   # any backbone works

# 1. Read & sample exactly the same way you did
file_path = "../data/parcel_tracking_output_data.parquet"
df = duckdb.sql(f"""
    SELECT * FROM '{file_path}'
    USING SAMPLE reservoir(50 ROWS)
    REPEATABLE (100)
""").df()

In [24]:
df_test = duckdb.sql("""
SELECT
    *,
    /* 0 ➜ NULL, then choose first non‑NULL */
    COALESCE(
        NULLIF(total_hours_from_receiving_to_last_success_delivery, 0),
        total_hours_from_receiving_to_last_failed_delivery
    ) AS total_hours_from_receiving_to_last_delivery
FROM read_parquet('../data/parcel_tracking_output_data.parquet')
USING SAMPLE reservoir(1000 ROWS)
REPEATABLE (100)
""").df()

In [25]:
df_test["is_successful_delivery"].unique()

array([1, 0], dtype=int32)

In [22]:
df_test.columns

Index(['parcel_category_id', 'customer_id', 'received_post_office_id',
       'delivery_post_office_id', 'received_post_office_address_latitude',
       'received_post_office_address_longtitude',
       'delivery_post_office_address_latitude',
       'delivery_post_office_address_longtitude', 'recipient_address_latitude',
       'recipient_address_longtitude',
       'distance_received_post_office_delivery_post_office',
       'distance_delivery_post_office_recipient_address',
       'distance_received_post_office_recipient_address',
       'received_datetime_hour', 'received_datetime_day',
       'received_datetime_month', 'received_datetime_is_non_working_day',
       'Rush_hour', 'last_delivery_datetime_hour',
       'last_delivery_datetime_day', 'last_delivery_datetime_month',
       'last_delivery_datetime_is_non_working_day', 'parcel_id',
       'is_successful_delivery', 'is_successful_delivery_at_first_time',
       'total_hours_from_receiving_to_last_failed_delivery',
       't

In [None]:
df_test

In [None]:
df_test.drop(columns=[
    "customer_id",
    "last_delivery_datetime_hour",
    "last_delivery_datetime_day",
    "last_delivery_datetime_month",
    "last_delivery_datetime_is_non_working_day",
    "parcel_id",
    "is_successful_delivery_at_first_time",
    "total_hours_from_receiving_to_last_failed_delivery",
    "total_hours_from_receiving_to_last_success_delivery"
])

Unnamed: 0,parcel_category_id,customer_id,received_post_office_id,delivery_post_office_id,received_post_office_address_latitude,received_post_office_address_longtitude,delivery_post_office_address_latitude,delivery_post_office_address_longtitude,recipient_address_latitude,recipient_address_longtitude,...,last_delivery_datetime_hour,last_delivery_datetime_day,last_delivery_datetime_month,last_delivery_datetime_is_non_working_day,parcel_id,is_successful_delivery,is_successful_delivery_at_first_time,total_hours_from_receiving_to_last_failed_delivery,total_hours_from_receiving_to_last_success_delivery,total_hours_from_receiving_to_last_delivery
0,4,2905,34,10,10.792941,106.704707,10.780602,106.697883,10.768759,106.680891,...,10,4,7,0,7773056,1,1,0.0,25.35,25.35
1,4,165,56,4,10.77529,106.671258,10.788467,106.688801,10.790357,106.754223,...,11,6,3,1,681986,0,0,19.116667,0.0,19.116667
2,4,3921,63,4,10.803788,106.683946,10.788467,106.688801,10.768619,106.739293,...,9,5,1,0,4925744,1,1,0.0,24.25,24.25
3,4,11807,247,166,10.786025,106.595628,10.776678,106.655749,10.8,106.729481,...,9,1,11,0,3143303,1,1,0.0,43.65,43.65
4,2,4928,93,94,10.889278,106.593113,10.889482,106.594265,10.860618,106.569443,...,10,1,9,0,8029237,1,1,0.0,42.75,42.75
5,4,1764,1,249,10.780126,106.696979,10.809917,106.586693,10.784004,106.607835,...,11,0,7,1,2708496,1,1,0.0,18.75,18.75
6,4,3531,54,45,10.850782,106.752343,10.871303,106.762203,10.879668,106.771231,...,16,4,12,0,5392107,1,1,0.0,49.783333,49.783333
7,4,1915,1,196,10.780126,106.696979,10.749983,106.656659,10.790241,106.709544,...,9,3,4,0,624614,0,0,142.966667,0.0,142.966667
8,4,11807,30,27,10.817453,106.769922,10.807538,106.78914,10.861588,106.746878,...,16,6,11,1,3285281,1,1,0.0,24.233333,24.233333
9,2,1864,1,198,10.780126,106.696979,10.749983,106.656659,10.762876,106.642345,...,9,5,9,0,9336478,1,1,0.0,19.6,19.6


In [19]:
df

Unnamed: 0,parcel_category_id,customer_id,received_post_office_id,delivery_post_office_id,received_post_office_address_latitude,received_post_office_address_longtitude,delivery_post_office_address_latitude,delivery_post_office_address_longtitude,recipient_address_latitude,recipient_address_longtitude,...,Rush_hour,last_delivery_datetime_hour,last_delivery_datetime_day,last_delivery_datetime_month,last_delivery_datetime_is_non_working_day,parcel_id,is_successful_delivery,is_successful_delivery_at_first_time,total_hours_from_receiving_to_last_failed_delivery,total_hours_from_receiving_to_last_success_delivery
0,4,2905,34,10,10.792941,106.704707,10.780602,106.697883,10.768759,106.680891,...,0,10,4,7,0,7773056,1,1,0.0,25.35
1,4,165,56,4,10.77529,106.671258,10.788467,106.688801,10.790357,106.754223,...,0,11,6,3,1,681986,0,0,19.116667,0.0
2,4,3921,63,4,10.803788,106.683946,10.788467,106.688801,10.768619,106.739293,...,1,9,5,1,0,4925744,1,1,0.0,24.25
3,4,11807,247,166,10.786025,106.595628,10.776678,106.655749,10.8,106.729481,...,1,9,1,11,0,3143303,1,1,0.0,43.65
4,2,4928,93,94,10.889278,106.593113,10.889482,106.594265,10.860618,106.569443,...,0,10,1,9,0,8029237,1,1,0.0,42.75
5,4,1764,1,249,10.780126,106.696979,10.809917,106.586693,10.784004,106.607835,...,0,11,0,7,1,2708496,1,1,0.0,18.75
6,4,3531,54,45,10.850782,106.752343,10.871303,106.762203,10.879668,106.771231,...,0,16,4,12,0,5392107,1,1,0.0,49.783333
7,4,1915,1,196,10.780126,106.696979,10.749983,106.656659,10.790241,106.709544,...,1,9,3,4,0,624614,0,0,142.966667,0.0
8,4,11807,30,27,10.817453,106.769922,10.807538,106.78914,10.861588,106.746878,...,0,16,6,11,1,3285281,1,1,0.0,24.233333
9,2,1864,1,198,10.780126,106.696979,10.749983,106.656659,10.762876,106.642345,...,1,9,5,9,0,9336478,1,1,0.0,19.6


In [5]:
df.columns

Index(['parcel_category_id', 'customer_id', 'received_post_office_id',
       'delivery_post_office_id', 'received_post_office_address_latitude',
       'received_post_office_address_longtitude',
       'delivery_post_office_address_latitude',
       'delivery_post_office_address_longtitude', 'recipient_address_latitude',
       'recipient_address_longtitude',
       'distance_received_post_office_delivery_post_office',
       'distance_delivery_post_office_recipient_address',
       'distance_received_post_office_recipient_address',
       'received_datetime_hour', 'received_datetime_day',
       'received_datetime_month', 'received_datetime_is_non_working_day',
       'Rush_hour', 'last_delivery_datetime_hour',
       'last_delivery_datetime_day', 'last_delivery_datetime_month',
       'last_delivery_datetime_is_non_working_day', 'parcel_id',
       'is_successful_delivery', 'is_successful_delivery_at_first_time',
       'total_hours_from_receiving_to_last_failed_delivery',
       't

In [None]:
# import duckdb
# import torch
# import torch.nn.functional as F
# import torch_frame as tf
# from torch_frame.data import Dataset
# from torch_frame.utils import infer_df_stype
# from torch_frame.data.loader import DataLoader
# from torch_frame.nn.encoder import EmbeddingEncoder, LinearEncoder
# from torch_frame.nn.models import FTTransformer   # any backbone works

# # 1. Read & sample exactly the same way you did
# file_path = "../data/parcel_tracking_output_data.parquet"
# df = duckdb.sql(f"""
#     SELECT * FROM '{file_path}'
#     USING SAMPLE reservoir(100 ROWS)
#     REPEATABLE (100)
# """).df()

# # Targets ----------------------------------------------------
# ETA_COL   = "total_hours_from_receiving_to_last_success_delivery"
# POD_COL   = "is_successful_delivery"

# # Cast targets to numeric
# df[ETA_COL] = df[ETA_COL].astype(float)
# df[POD_COL] = df[POD_COL].astype(int)
# df.drop(columns=["customer_id", "parcel_id"], inplace=True)

# eta_df = df.drop(columns=[POD_COL])
# pod_df = df.drop(columns=[ETA_COL])

# # 2. Build two Dataset objects (one per task)
# eta_col_to_stype = infer_df_stype(eta_df)
# eta_col_to_stype[ETA_COL] = tf.numerical

# pod_col_to_stype = infer_df_stype(pod_df)
# pod_col_to_stype[POD_COL] = tf.categorical

# eta_ds = Dataset(eta_df, col_to_stype=eta_col_to_stype, target_col=ETA_COL)
# pod_ds = Dataset(pod_df, col_to_stype=pod_col_to_stype, target_col=POD_COL)

# # Materialise once so we can reuse stats & mappings
# eta_ds.materialize()              # gives .tensor_frame, .col_stats
# pod_ds.materialize()

# # Split
# train_eta = eta_ds[:0.8]    # same slice trick as in quick‑tour :contentReference[oaicite:3]{index=3}
# val_eta   = eta_ds[0.8:]
# train_pod = pod_ds[:0.8]
# val_pod   = pod_ds[0.8:]

# # 3.  Mini‑batch loaders
# BATCH = 5
# train_eta_loader = DataLoader(train_eta, batch_size=BATCH, shuffle=True)
# val_eta_loader   = DataLoader(val_eta,   batch_size=BATCH)

# train_pod_loader = DataLoader(train_pod, batch_size=BATCH, shuffle=True)
# val_pod_loader   = DataLoader(val_pod,   batch_size=BATCH)

# # 4. Common stype‑wise encoders
# stype_enc = {
#     tf.stype.categorical: EmbeddingEncoder(),
#     tf.stype.numerical:   LinearEncoder(),
# }

# # 5‑A. ETA regressor  (FT‑Transformer, 1 output)
# eta_model = FTTransformer(
#     channels=32,
#     out_channels=1,
#     num_layers=3,
#     col_stats=train_eta.col_stats,
#     col_names_dict=train_eta.tensor_frame.col_names_dict,
#     stype_encoder_dict=stype_enc,
# )
# opt_eta = torch.optim.AdamW(eta_model.parameters(), lr=5e-3)

# # 5‑B. POD classifier (same backbone, sigmoid output)
# pod_model = FTTransformer(
#     channels=32,
#     out_channels=1,
#     num_layers=3,
#     col_stats=train_pod.col_stats,
#     col_names_dict=train_pod.tensor_frame.col_names_dict,
#     stype_encoder_dict=stype_enc,
# )
# opt_pod = torch.optim.AdamW(pod_model.parameters(), lr=1e-3)

# # 6. Plain PyTorch training loops (no Lightning/Rich headaches)
# def train_epoch(model, loader, optimizer, is_regression: bool):
#     model.train()
#     total = 0
#     loss_sum = 0
#     loss_history = []
#     for tf_batch in loader:
#         pred = model(tf_batch)
#         target = tf_batch.y.float()
#         loss = F.smooth_l1_loss(pred.squeeze(), target) if is_regression else F.binary_cross_entropy_with_logits(pred.squeeze(), target)
#         optimizer.zero_grad()
#         loss.backward()
#         optimizer.step()
#         loss_history.append(loss.item())
#         loss_sum += loss.item() * len(target)
#         total += len(target)
#     return loss_sum / total, loss_history

# for epoch in range(1, 30):
#     eta_loss, eta_loss_history = train_epoch(eta_model, train_eta_loader, opt_eta, True)
#     pod_loss, pod_loss_history = train_epoch(pod_model, train_pod_loader, opt_pod, False)
#     print(f"Epoch {epoch:02d} | ETA MAE-ish {eta_loss:.3f} | POD BCE {pod_loss:.3f}")

# # 7.  Validation & inference# -----------------------------------------------------------
# @torch.no_grad()
# def evaluate(model, loader, is_regression):
#     model.eval()
#     outs, ys = [], []
#     for tf_batch in loader:
#         outs.append(model(tf_batch).cpu())
#         ys.append(tf_batch.y.float().cpu())
#     pred = torch.cat(outs).squeeze()
#     y    = torch.cat(ys).squeeze()
#     if is_regression:
#         return torch.mean(torch.abs(pred - y)).item()      # MAE
#     else:
#         prob = torch.sigmoid(pred)
#         acc  = ((prob > 0.5) == y.bool()).float().mean()
#         return acc.item()

# print("ETA MAE (val):", evaluate(eta_model, val_eta_loader, True))
# print("POD ACC (val):", evaluate(pod_model, val_pod_loader, False))

In [52]:
# torch.manual_seed(42)
# pod_model.train()
# loss_history = []
# for epoch in range(0, 50):
#     print("")
#     print(f"EPOCH {epoch}:")
#     for idx, batch in enumerate(train_pod_loader):
#         print(batch)
#         pred = pod_model(batch)
#         target = batch.y.float()
#         loss = F.smooth_l1_loss(pred.squeeze(), target) if False else F.binary_cross_entropy_with_logits(pred.squeeze(), target)
#         opt_eta.zero_grad()
#         loss.backward()
#         opt_eta.step()
#         loss_history.append(loss.item())
#         print(f"step {idx}: {loss.item()}")

In [None]:
# for epoch in range(1, 30):
#     eta_loss, eta_loss_history = train_epoch(eta_model, train_eta_loader, opt_eta, True)
#     pod_loss, pod_loss_history = train_epoch(pod_model, train_pod_loader, opt_pod, False)
#     print(f"Epoch {epoch:02d} | ETA MAE-ish {eta_loss:.3f} | POD BCE {pod_loss:.3f}")

Epoch 01 | ETA MAE-ish 24.736 | POD BCE 0.581
Epoch 02 | ETA MAE-ish 23.274 | POD BCE 0.405
Epoch 03 | ETA MAE-ish 21.756 | POD BCE 0.373
Epoch 04 | ETA MAE-ish 19.835 | POD BCE 0.331
Epoch 05 | ETA MAE-ish 17.341 | POD BCE 0.270
Epoch 06 | ETA MAE-ish 14.937 | POD BCE 0.244
Epoch 07 | ETA MAE-ish 13.276 | POD BCE 0.174
Epoch 08 | ETA MAE-ish 13.048 | POD BCE 0.224
Epoch 09 | ETA MAE-ish 12.977 | POD BCE 0.166
Epoch 10 | ETA MAE-ish 12.933 | POD BCE 0.211
Epoch 11 | ETA MAE-ish 13.068 | POD BCE 0.137
Epoch 12 | ETA MAE-ish 12.984 | POD BCE 0.110
Epoch 13 | ETA MAE-ish 13.015 | POD BCE 0.104
Epoch 14 | ETA MAE-ish 12.997 | POD BCE 0.105
Epoch 15 | ETA MAE-ish 12.969 | POD BCE 0.080
Epoch 16 | ETA MAE-ish 12.974 | POD BCE 0.073
Epoch 17 | ETA MAE-ish 12.979 | POD BCE 0.066
Epoch 18 | ETA MAE-ish 12.965 | POD BCE 0.067
Epoch 19 | ETA MAE-ish 13.057 | POD BCE 0.063
Epoch 20 | ETA MAE-ish 12.859 | POD BCE 0.060
Epoch 21 | ETA MAE-ish 12.922 | POD BCE 0.060
Epoch 22 | ETA MAE-ish 12.936 | PO

In [None]:
# # 7.  Validation & inference# -----------------------------------------------------------
# @torch.no_grad()
# def evaluate(model, loader, is_regression):
#     model.eval()
#     outs, ys = [], []
#     for tf_batch in loader:
#         outs.append(model(tf_batch).cpu())
#         ys.append(tf_batch.y.float().cpu())
#     pred = torch.cat(outs).squeeze()
#     y    = torch.cat(ys).squeeze()
#     if is_regression:
#         return torch.mean(torch.abs(pred - y)).item()      # MAE
#     else:
#         prob = torch.sigmoid(pred)
#         acc  = ((prob > 0.5) == y.bool()).float().mean()
#         return acc.item()

# print("ETA MAE (val):", evaluate(eta_model, val_eta_loader, True))
# print("POD ACC (val):", evaluate(pod_model, val_pod_loader, False))

ETA MAE (val): 24.087116241455078
POD ACC (val): 0.949999988079071
