## TRANSFORMER

MINI wayformer a partir d'embeddings

Primer s'ha hagut de fer els embeddings i concatenarlos. 

In [None]:
# shapes de entrada al Transformer (que preparar√°s despu√©s)
# bev_emb:      (batch_size, 1, D)
# agent_embs:   (batch_size, N_agents, D)
# lane_embs:    (batch_size, N_lanes, D)

# y luego los concatenas:
#¬†tokens = torch.cat([bev_emb, agent_embs, lane_embs], dim=1)  # (B, T, D)


### agent embedding:

CADA AQUEST T√â AIX√í: posici√≥n (x, y), velocidad, aceleraci√≥n, heading (√°ngulo), tama√±o (width, length),clase (car, pedestrian, bicycle‚Ä¶), historia pasada (opcional), estado actual (si est√° quieto o no, etc.)

amb una dimensionalidad podriem tenir un feature vector aix√≠: [x, y, vx, vy, ax, ay, heading_sin, heading_cos, width, length]


In [None]:
import torch
import numpy as np

def extract_agent_features(helper, instance_token, sample_token):
    ann = helper.get_sample_annotation(instance_token, sample_token)

    # Position
    x, y, _ = ann["translation"]

    # Velocity
    vx, vy = helper.get_velocity_for_agent(instance_token, sample_token)

    # Acceleration
    ax, ay = helper.get_acceleration_for_agent(instance_token, sample_token)

    # Heading (convert quaternion ‚Üí sin/cos)
    quat = ann["rotation"]  # [qw, qx, qy, qz]
    yaw = Quaternion(quat).yaw_pitch_roll[0]
    sin_yaw = np.sin(yaw)
    cos_yaw = np.cos(yaw)

    # Size
    width, length, _ = ann["size"]

    # Feature vector
    features = np.array([x, y, vx, vy, ax, ay, sin_yaw, cos_yaw, width, length], dtype=np.float32)

    return features  # shape (10,)


## PASSAR TOTS ELS AGENTS PER UN MLP PER TENIR AGENT EMBEDDING DE agent embedding (B, N_agents, D)
class AgentEncoder(nn.Module):
    def __init__(self, in_dim=10, hidden_dim=128, out_dim=128):
        super().__init__()
        self.mlp = nn.Sequential(
            nn.Linear(in_dim, hidden_dim),
            nn.ReLU(),
            nn.Linear(hidden_dim, out_dim)
        )

    def forward(self, agent_features):
        """
        agent_features: (B, N_agents, 10)
        returns: (B, N_agents, out_dim)
        """
        return self.mlp(agent_features)

MAX_AGENTS = 12  # como Waymo, puedes escoger 12‚Äì16

def build_agent_embeddings(helper, sample_token, agent_encoder, device):
    anns = helper.get_annotations_for_sample(sample_token)

    features = []

    for ann in anns:
        instance_token = ann["instance_token"]
        f = extract_agent_features(helper, instance_token, sample_token)
        features.append(f)

        if len(features) == MAX_AGENTS:
            break

    # Padding
    while len(features) < MAX_AGENTS:
        features.append(np.zeros(10, dtype=np.float32))

    features = torch.tensor(features, device=device)  # (MAX_AGENTS, 10)
    features = features.unsqueeze(0)  # (1, MAX_AGENTS, 10)

    # Agent Encoder
    with torch.no_grad():  # or leave gradients if end-to-end
        agent_emb = agent_encoder(features)  # (1, MAX_AGENTS, d_model)

    return agent_emb  # shape (1, N_agents, d_model)


### Lane embeddings

L'objectiu √©s donat un sample_token lane_emb: (1, N_lanes, d_model). Cada lane a nuscenes es un polyline ([(x1, y1), (x2, y2), ..., (xk, yk)])
cADA LANE TIENE Type,conexiones, direccion del trafico y curbatura

Pero la m√≠nima base funcional para un Transformer es:

‚úî samplear la polil√≠nea
‚úî convertir cada punto a un feature
‚úî agregar todos los puntos para crear un embedding por lane

Obtenim les polylines m√©s properes al agent

In [None]:
def get_relevant_lanes(helper, agent_x, agent_y, max_lanes=8):
    """
    Returns up to max_lanes lane IDs near the agent.
    """
    lane_ids = helper.map_api.get_lane_ids_in_xy(agent_x, agent_y)

    # If too many lanes, keep closest N
    if len(lane_ids) > max_lanes:
        lane_ids = lane_ids[:max_lanes]

    return lane_ids


Treiem la polil√≠nea de cada agent

In [None]:
def get_lane_polyline(helper, lane_id):
    # returns Nx2 array with (x, y)
    poly = helper.map_api.get_lane_centerline(lane_id)[:, :2]
    return poly  # shape (K, 2)


Convertim cada polil√≠nea en un conjunt de features (com es fa a implementacions com LaneGCN y Vectornet)
dx, dy                 ‚Üí direcci√≥n del segmento
length                 ‚Üí distancia
norm direction         ‚Üí dx / length, dy / length
curvature              ‚Üí change of angle (si queremos)


In [None]:
import numpy as np

def polyline_to_features(poly):
    """
    poly: (K,2)
    returns features for each segment: (K-1, 5)
    """
    segments = []
    for i in range(len(poly) - 1):
        x1, y1 = poly[i]
        x2, y2 = poly[i+1]

        dx = x2 - x1
        dy = y2 - y1
        length = np.sqrt(dx*dx + dy*dy) + 1e-6
        nx = dx / length
        ny = dy / length

        # Feature per segment
        feat = [dx, dy, length, nx, ny]
        segments.append(feat)

    return np.array(segments, dtype=np.float32)  # (K-1, 5)


Ara MLP com abans per fer el lane encoder. 

In [None]:
class LaneSegmentEncoder(nn.Module):
    def __init__(self, in_dim=5, hidden_dim=128, out_dim=128):
        super().__init__()
        self.mlp = nn.Sequential(
            nn.Linear(in_dim, hidden_dim),
            nn.ReLU(),
            nn.Linear(hidden_dim, out_dim)
        )

    def forward(self, seg_features):
        """
        seg_features: (num_segments, in_dim)
        returns: (num_segments, out_dim)
        """
        return self.mlp(seg_features)


In [None]:
def aggregate_lane_embedding(segment_embs):
    """
    segment_embs: (num_segments, d_model)
    returns a single lane embedding: (d_model,)
    """
    return segment_embs.mean(dim=0)  # or max(dim=0)[0]


In [None]:
def build_lane_embedding(helper, lane_id, lane_encoder, device):
    # 1. Polyline
    poly = get_lane_polyline(helper, lane_id)  # (K,2)

    seg_feats = polyline_to_features(poly)     # (K-1, 5)
    seg_feats = torch.tensor(seg_feats, device=device)

    # 2. Encode each segment
    seg_embs = lane_encoder(seg_feats)         # (K-1, d_model)

    # 3. Aggregate segment embeddings
    lane_emb = seg_embs.mean(dim=0)            # (d_model,)

    return lane_emb


Hacemos embedings para todos los lanes

In [None]:
MAX_LANES = 8

def build_lane_embeddings(helper, agent_x, agent_y, lane_encoder, device):
    lane_ids = get_relevant_lanes(helper, agent_x, agent_y, max_lanes=MAX_LANES)

    lane_embs = []
    for lane_id in lane_ids:
        emb = build_lane_embedding(helper, lane_id, lane_encoder, device)
        lane_embs.append(emb)

    # Padding
    while len(lane_embs) < MAX_LANES:
        lane_embs.append(torch.zeros(lane_embs[0].shape, device=device))

    lane_embs = torch.stack(lane_embs, dim=0)  # (MAX_LANES, d_model)
    lane_embs = lane_embs.unsqueeze(0)         # (1, MAX_LANES, d_model)

    return lane_embs


El resultado tendria que ser un sistema on el lane_emb :(1,N_lanes, d_model)

### Bird Eye View embedding

Aquest embedding hauria de ser un tensor que resumeixi tota la escena. bev_emb: (1, 1, d_model)

La imatge rasterizada en vista cenital ja est√† implementada a nuscnees: 

In [None]:
static_layer_rasterizer = StaticLayerRasterizer(helper)
agent_rasterizer = AgentBoxesWithFadedHistory(helper, seconds_of_history=1)
mtp_input_representation = InputRepresentation(static_layer_rasterizer, agent_rasterizer, Rasterizer())
img = mtp_input_representation.make_input_representation(instance_token, sample_token)


La imatge que produeix el multicanal d'unes dimensions com: (3, 224, 224)    o (3, 500, 500)  seg√∫n configuraci√≥n
    

Aix√≤ se li pasa a una resnet (cnn) treiem un embedding i el reduim a d_model i ser√† el primer token del transformer

In [None]:
import torchvision.models as models
import torch.nn as nn
import torch

"""

"""
class BEVBackbone(nn.Module):
    def __init__(self, out_dim=128, arch="resnet18"):
        super().__init__()

        if arch == "resnet18":
            base = models.resnet18(weights=None)
            feat_dim = 512
        elif arch == "resnet50":
            base = models.resnet50(weights=None)
            feat_dim = 2048
        else:
            raise ValueError("Unsupported backbone")

        # Remove final FC layer, keep convolutional trunk
        self.cnn = nn.Sequential(*list(base.children())[:-1])  # output: (B, feat_dim, 1, 1)

        # MLP to project onto transformer dimension
        self.proj = nn.Linear(feat_dim, out_dim)

    def forward(self, bev_img):
        """
        bev_img: (B, C, H, W)
        returns: (B, out_dim)
        """
        x = self.cnn(bev_img)         # (B, feat_dim, 1, 1)
        x = x.view(x.size(0), -1)     # (B, feat_dim)
        x = self.proj(x)              # (B, out_dim)
        return x


In [None]:
class BEVEncoder(nn.Module):
    def __init__(self, input_representation, d_model=128, arch="resnet18", device="cpu"):
        super().__init__()
        self.input_representation = input_representation
        self.device = device

        # CNN backbone to extract features
        self.backbone = BEVBackbone(out_dim=d_model, arch=arch).to(device)

    def forward(self, instance_token, sample_token):
        """
        Returns:
            bev_token: (1, 1, d_model)
        """

        # 1) Rasterize nuScenes BEV image
        img = self.input_representation.make_input_representation(
            instance_token, sample_token
        )   # numpy array (H, W, C)

        # 2) Convert ‚Üí torch tensor
        bev_img = torch.tensor(img, dtype=torch.float32).permute(2, 0, 1)  # (C,H,W)
        bev_img = bev_img.unsqueeze(0).to(self.device)                    # (1,C,H,W)

        # 3) Normalize (IMPORTANT ‚Äî like ImageNet)
        bev_img = (bev_img - bev_img.mean()) / (bev_img.std() + 1e-6)

        # 4) Extract embedding using CNN backbone
        bev_emb = self.backbone(bev_img)   # (1, d_model)

        # 5) Convert to a single transformer token ‚Üí (1,1,d_model)
        bev_token = bev_emb.unsqueeze(1)

        return bev_token


In [None]:
bev_encoder = BEVEncoder(
    input_representation=mtp_input_representation,
    d_model=128,
    arch="resnet18",
    device=device
)

# OBTENER TOKEN PARA UN SAMPLE
bev_token = bev_encoder(instance_token, sample_token)
print(bev_token.shape)

# la sortida hauria de ser: torch.Size([1, 1, 128])



In [None]:
def build_bev_embedding(self, instance_token, sample_token):
    
    # 1. Rasterize BEV image (you already do this)
    img = self.input_representation.make_input_representation(
        instance_token, sample_token
    )  # numpy array (H, W, C)

    img = torch.tensor(img, dtype=torch.float32).permute(2, 0, 1)  # (C, H, W)
    img = img.unsqueeze(0).to(self.device)  # (1, C, H, W)

    # 2. Backbone CNN ‚Üí embedding
    bev_emb = self.bev_backbone(img)        # (1, d_model)

    # 3. Convert to transformer token
    bev_token = bev_emb.unsqueeze(1)        # (1, 1, d_model)

    return bev_token


Agreguem tots els embeddings

In [None]:
tokens = torch.cat([bev_emb, agent_emb, lane_emb], dim=1)


### Transformer

Despr√©s nom√©s fa falta el model transformer i el head de predicci√≥ de trajectories. 

In [None]:
import math
import torch
from torch import nn
from torch.nn import functional as F


class PositionalEncoding(nn.Module):
    """
    Standard sine-cosine positional encoding from the "Attention is All You Need" paper.
    Adds position information to each token in the sequence.
    """

    def __init__(self, d_model: int, max_len: int = 256):
        super().__init__()
        pe = torch.zeros(max_len, d_model)  # (max_len, d_model)
        position = torch.arange(0, max_len, dtype=torch.float32).unsqueeze(1)
        div_term = torch.exp(
            torch.arange(0, d_model, 2, dtype=torch.float32)
            * (-math.log(10000.0) / d_model)
        )

        pe[:, 0::2] = torch.sin(position * div_term)
        pe[:, 1::2] = torch.cos(position * div_term)

        # shape -> (1, max_len, d_model) to broadcast over batch
        self.register_buffer("pe", pe.unsqueeze(0))  

    def forward(self, x: torch.Tensor) -> torch.Tensor:
        """
        x: Tensor of shape (batch_size, seq_len, d_model)
        """
        seq_len = x.size(1)
        # Add positional encoding to the input embeddings
        x = x + self.pe[:, :seq_len, :]
        return x


class TrajectoryTransformer(nn.Module):
    """
    Transformer-based trajectory prediction model.

    Inputs:
        - token_embeddings: (batch_size, seq_len, d_model)
          Sequence of tokens from:
            * 1 BEV embedding
            * N agent embeddings
            * M lane embeddings

    Outputs:
        - trajectories: (batch_size, num_modes, num_steps, 2)
        - mode_logits (optional): (batch_size, num_modes)
    """

    def __init__(
        self,
        d_model: int = 128,
        nhead: int = 8,
        num_layers: int = 4,
        dim_feedforward: int = 256,
        dropout: float = 0.1,
        num_modes: int = 1,
        num_steps: int = 12,
        use_mode_head: bool = True,
    ):
        super().__init__()

        self.d_model = d_model
        self.num_modes = num_modes
        self.num_steps = num_steps
        self.use_mode_head = use_mode_head

        # Positional encoding for the sequence tokens
        self.pos_encoder = PositionalEncoding(d_model=d_model, max_len=256)

        # Transformer encoder (no decoder needed, we just encode the scene)
        encoder_layer = nn.TransformerEncoderLayer(
            d_model=d_model,
            nhead=nhead,
            dim_feedforward=dim_feedforward,
            dropout=dropout,
            batch_first=True,   # input/output shape = (batch, seq, d_model)
        )
        self.transformer_encoder = nn.TransformerEncoder(
            encoder_layer,
            num_layers=num_layers,
        )

        # We will use the "scene token" representation to decode trajectories.
        # Typically we take the first token (e.g. BEV or a special [CLS]-like token).
        self.scene_pool = "first"  # could be 'mean' if you want average pooling

        # Head that maps the scene embedding to future trajectories
        # Output size = num_modes * num_steps * 2 (x, y)
        self.traj_head = nn.Linear(d_model, num_modes * num_steps * 2)

        # Optional mode classification head (for multi-modal prediction)
        if use_mode_head and num_modes > 1:
            self.mode_head = nn.Linear(d_model, num_modes)
        else:
            self.mode_head = None

    def _pool_scene_token(self, encoded_tokens: torch.Tensor) -> torch.Tensor:
        """
        encoded_tokens: (batch_size, seq_len, d_model)
        Returns:
            scene_emb: (batch_size, d_model)
        """
        if self.scene_pool == "first":
            # Take the first token (assumed to be the BEV / global context token)
            scene_emb = encoded_tokens[:, 0, :]
        elif self.scene_pool == "mean":
            scene_emb = encoded_tokens.mean(dim=1)
        else:
            raise ValueError(f"Unknown scene_pool mode: {self.scene_pool}")
        return scene_emb

    def forward(
        self,
        token_embeddings: torch.Tensor,
        src_key_padding_mask: torch.Tensor | None = None,
    ):
        """
        Args:
            token_embeddings: (batch_size, seq_len, d_model)
            src_key_padding_mask: optional boolean mask of shape (batch_size, seq_len)
                True for positions that should be masked (i.e., padding tokens).

        Returns:
            trajectories: (batch, num_modes, num_steps, 2)
            mode_logits (or None): (batch, num_modes)
        """

        # 1) Add positional encoding
        x = self.pos_encoder(token_embeddings)  # (B, T, D)

        # 2) Transformer encoder
        # If using src_key_padding_mask, it must be (B, T)
        encoded = self.transformer_encoder(
            x,
            src_key_padding_mask=src_key_padding_mask,
        )  # (B, T, D)

        # 3) Pool scene representation (e.g. from first token)
        scene_emb = self._pool_scene_token(encoded)  # (B, D)

        # 4) Trajectory prediction head
        traj_flat = self.traj_head(scene_emb)  # (B, num_modes * num_steps * 2)
        trajectories = traj_flat.view(
            -1, self.num_modes, self.num_steps, 2
        )  # (B, M, T, 2)

        # 5) Optional mode logits for multi-modal weighting
        mode_logits = None
        if self.mode_head is not None:
            mode_logits = self.mode_head(scene_emb)  # (B, num_modes)

        return trajectories, mode_logits


In [None]:
import torch
import torch.nn as nn
import torch.nn.functional as F

class MultiModalTrajectoryLoss(nn.Module):
    """
    Multimodal loss for trajectory prediction:
      - Picks the best mode based on L2 distance (ADE)
      - Applies regression loss on that mode (Smooth L1)
      - Applies cross entropy on mode logits
    """

    def __init__(self, regression_weight=1.0, classification_weight=1.0):
        super().__init__()
        self.reg_weight = regression_weight
        self.cls_weight = classification_weight
        self.reg_loss_fn = nn.SmoothL1Loss()

    def forward(self, trajectories, mode_logits, gt_future):
        """
        trajectories: (B, M, T, 2)
        mode_logits:  (B, M)
        gt_future:    (B, T, 2)
        """

        # Batch size, number of models, y number of timesteps. 
        B, M, T, _ = trajectories.shape

        # ---- STEP 1: Compute ADE for each mode ----
        # (B, M, T, 2) - (B, 1, T, 2)
        diff = trajectories - gt_future.unsqueeze(1) 
        l2 = torch.norm(diff, dim=-1)              # (B, M, T)
        ade = l2.mean(dim=-1)                      # (B, M)

        # ---- STEP 2: Select best mode per sample ----
        best_mode = ade.argmin(dim=1)              # (B,)

        # ---- STEP 3: Regression loss on best mode ----
        best_traj = trajectories[torch.arange(B), best_mode]  # (B, T, 2)
        reg_loss = self.reg_loss_fn(best_traj, gt_future)

        # ---- STEP 4: Classification loss ----
        cls_loss = F.cross_entropy(mode_logits, best_mode)

        # ---- STEP 5: Total loss ----
        total_loss = self.reg_weight * reg_loss + self.cls_weight * cls_loss

        return total_loss, reg_loss.detach(), cls_loss.detach(), best_mode


In [None]:
device = "cuda" if torch.cuda.is_available() else "cpu"

model = TrajectoryTransformer(
    d_model=128,
    nhead=8,
    num_layers=4,
    dim_feedforward=256,
    dropout=0.1,
    num_modes=3,
    num_steps=12,
    use_mode_head=True
).to(device)


In [None]:
optimizer = torch.optim.Adam(model.parameters(), lr=1e-4)

loss_fn = MultiModalTrajectoryLoss()

for tokens, gt_future in dataloader:

    optimizer.zero_grad()

    trajectories, mode_logits = model(tokens)

    loss, reg_loss, cls_loss, best_mode = loss_fn(
        trajectories,
        mode_logits,
        gt_future
    )

    loss.backward()
    optimizer.step()

    print(f"Total: {loss:.3f} | Reg: {reg_loss:.3f} | Cls: {cls_loss:.3f}")


## 1. Lane-Deviation-Loss

Obtienes la referencia de la lane por la que circula el agente (map_api.get_lane) y se calcula en el entrenamiento la distancia del punto predicho a la polil√≠nea del carril. As√≠ si te sales castigas al modelo. 

In [None]:
# DISTANCIA DE UN PUNTO A UN SEGMENTO
import torch
def point_to_segment_distance(p, a, b):
    """
    p: tensor (..., 2), punto
    a, b: tensores (2,), extremos del segmento
    Devuelve distancia m√≠nima punto-segmento
    """
    ap = p - a
    ab = b - a
    ab_norm = torch.sum(ab * ab)

    t = torch.clamp(torch.sum(ap * ab) / (ab_norm + 1e-8), 0., 1.)
    proj = a + t * ab
    return torch.norm(p - proj, dim=-1)

In [None]:
# DISTANCIA DE PUNTO A POLIL√çNEA

def point_to_polyline_distance(p, polyline):
    """
    p: tensor (..., 2)
    polyline: tensor (N, 2)
    """
    distances = []
    for i in range(polyline.shape[0] - 1):
        a = polyline[i]
        b = polyline[i+1]
        dist = point_to_segment_distance(p, a, b)
        distances.append(dist)
    return torch.stack(distances).min()


In [None]:
# CALCULAR LA LOSS PARA UNA SOLA TRAYECTORIA

def lane_deviation_loss_single(traj_global, lane_polyline):
    """
    traj_global: tensor (T, 2)
    lane_polyline: tensor (N, 2)
    """
    distances = []
    for t in range(traj_global.shape[0]):
        pt = traj_global[t]
        dist = point_to_polyline_distance(pt, lane_polyline)
        distances.append(dist)
    return torch.stack(distances).mean()


In [None]:

class MTPLoss:
    """ Computes the loss for the MTP model. """

    def __init__(self,
                 num_modes: int,
                 regression_loss_weight: float = 1.,
                 angle_threshold_degrees: float = 5.,
                 lane_loss_weight=1.0, 
                 helper=None):
        """
        Inits MTP loss.
        :param num_modes: How many modes are being predicted for each agent.
        :param regression_loss_weight: Coefficient applied to the regression loss to
            balance classification and regression performance.
        :param angle_threshold_degrees: Minimum angle needed between a predicted trajectory
            and the ground to consider it a match.
        """
        self.num_modes = num_modes
        self.num_location_coordinates_predicted = 2  # We predict x, y coordinates at each timestep.
        self.regression_loss_weight = regression_loss_weight
        self.angle_threshold = angle_threshold_degrees
        self.lane_loss_weight = lane_loss_weight
        self.helper = helper

    def _get_trajectory_and_modes(self,
                                  model_prediction: torch.Tensor) -> Tuple[torch.Tensor, torch.Tensor]:
        """
        Splits the predictions from the model into mode probabilities and trajectory.
        :param model_prediction: Tensor of shape [batch_size, n_timesteps * n_modes * 2 + n_modes].
        :return: Tuple of tensors. First item is the trajectories of shape [batch_size, n_modes, n_timesteps, 2].
            Second item are the mode probabilities of shape [batch_size, num_modes].
        """
        mode_probabilities = model_prediction[:, -self.num_modes:].clone()

        desired_shape = (model_prediction.shape[0], self.num_modes, -1, self.num_location_coordinates_predicted)
        trajectories_no_modes = model_prediction[:, :-self.num_modes].clone().reshape(desired_shape)

        return trajectories_no_modes, mode_probabilities

    @staticmethod
    def _angle_between(ref_traj: torch.Tensor,
                       traj_to_compare: torch.Tensor) -> float:
        """
        Computes the angle between the last points of the two trajectories.
        The resulting angle is in degrees and is an angle in the [0; 180) interval.
        :param ref_traj: Tensor of shape [n_timesteps, 2].
        :param traj_to_compare: Tensor of shape [n_timesteps, 2].
        :return: Angle between the trajectories.
        """

        EPSILON = 1e-5

        if (ref_traj.ndim != 2 or traj_to_compare.ndim != 2 or
                ref_traj.shape[1] != 2 or traj_to_compare.shape[1] != 2):
            raise ValueError('Both tensors should have shapes (-1, 2).')

        if torch.isnan(traj_to_compare[-1]).any() or torch.isnan(ref_traj[-1]).any():
            return 180. - EPSILON

        traj_norms_product = float(torch.norm(ref_traj[-1]) * torch.norm(traj_to_compare[-1]))

        # If either of the vectors described in the docstring has norm 0, return 0 as the angle.
        if math.isclose(traj_norms_product, 0):
            return 0.

        # We apply the max and min operations below to ensure there is no value
        # returned for cos_angle that is greater than 1 or less than -1.
        # This should never be the case, but the check is in place for cases where
        # we might encounter numerical instability.
        dot_product = float(ref_traj[-1].dot(traj_to_compare[-1]))
        angle = math.degrees(math.acos(max(min(dot_product / traj_norms_product, 1), -1)))

        if angle >= 180:
            return angle - EPSILON

        return angle

    @staticmethod
    def _compute_ave_l2_norms(tensor: torch.Tensor) -> float:
        """
        Compute the average of l2 norms of each row in the tensor.
        :param tensor: Shape [1, n_timesteps, 2].
        :return: Average l2 norm. Float.
        """
        l2_norms = torch.norm(tensor, p=2, dim=2)
        avg_distance = torch.mean(l2_norms)
        return avg_distance.item()

    def _compute_angles_from_ground_truth(self, target: torch.Tensor,
                                          trajectories: torch.Tensor) -> List[Tuple[float, int]]:
        """
        Compute angle between the target trajectory (ground truth) and the predicted trajectories.
        :param target: Shape [1, n_timesteps, 2].
        :param trajectories: Shape [n_modes, n_timesteps, 2].
        :return: List of angle, index tuples.
        """
        angles_from_ground_truth = []
        for mode, mode_trajectory in enumerate(trajectories):
            # For each mode, we compute the angle between the last point of the predicted trajectory for that
            # mode and the last point of the ground truth trajectory.
            angle = self._angle_between(target[0], mode_trajectory)

            angles_from_ground_truth.append((angle, mode))
        return angles_from_ground_truth

    def _compute_best_mode(self,
                           angles_from_ground_truth: List[Tuple[float, int]],
                           target: torch.Tensor, trajectories: torch.Tensor) -> int:
        """
        Finds the index of the best mode given the angles from the ground truth.
        :param angles_from_ground_truth: List of (angle, mode index) tuples.
        :param target: Shape [1, n_timesteps, 2]
        :param trajectories: Shape [n_modes, n_timesteps, 2]
        :return: Integer index of best mode.
        """

        # We first sort the modes based on the angle to the ground truth (ascending order), and keep track of
        # the index corresponding to the biggest angle that is still smaller than a threshold value.
        angles_from_ground_truth = sorted(angles_from_ground_truth)
        max_angle_below_thresh_idx = -1
        for angle_idx, (angle, mode) in enumerate(angles_from_ground_truth):
            if angle <= self.angle_threshold:
                max_angle_below_thresh_idx = angle_idx
            else:
                break

        # We choose the best mode at random IF there are no modes with an angle less than the threshold.
        if max_angle_below_thresh_idx == -1:
            best_mode = random.randint(0, self.num_modes - 1)

        # We choose the best mode to be the one that provides the lowest ave of l2 norms between the
        # predicted trajectory and the ground truth, taking into account only the modes with an angle
        # less than the threshold IF there is at least one mode with an angle less than the threshold.
        else:
            # Out of the selected modes above, we choose the final best mode as that which returns the
            # smallest ave of l2 norms between the predicted and ground truth trajectories.
            distances_from_ground_truth = []

            for angle, mode in angles_from_ground_truth[:max_angle_below_thresh_idx + 1]:
                norm = self._compute_ave_l2_norms(target - trajectories[mode, :, :])

                distances_from_ground_truth.append((norm, mode))

            distances_from_ground_truth = sorted(distances_from_ground_truth)
            best_mode = distances_from_ground_truth[0][1]

        return best_mode

    def __call__(self, predictions: torch.Tensor, targets: torch.Tensor) -> torch.Tensor:
        """
        Computes the MTP loss on a batch.
        The predictions are of shape [batch_size, n_ouput_neurons of last linear layer]
        and the targets are of shape [batch_size, 1, n_timesteps, 2]
        :param predictions: Model predictions for batch.
        :param targets: Targets for batch.
        :return: zero-dim tensor representing the loss on the batch.
        """

        batch_losses = torch.Tensor().requires_grad_(True).to(predictions.device)
        trajectories, modes = self._get_trajectory_and_modes(predictions)

        for batch_idx in range(predictions.shape[0]):

            angles = self._compute_angles_from_ground_truth(target=targets[batch_idx],
                                                            trajectories=trajectories[batch_idx])

            best_mode = self._compute_best_mode(angles,
                                                target=targets[batch_idx],
                                                trajectories=trajectories[batch_idx])

            best_mode_trajectory = trajectories[batch_idx, best_mode, :].unsqueeze(0)

            regression_loss = f.smooth_l1_loss(best_mode_trajectory, targets[batch_idx])

            mode_probabilities = modes[batch_idx].unsqueeze(0)
            best_mode_target = torch.tensor([best_mode], device=predictions.device)
            classification_loss = f.cross_entropy(mode_probabilities, best_mode_target)

            loss = classification_loss + self.regression_loss_weight * regression_loss
            # ============================================================
            # üõ£Ô∏è LANE DEVIATION LOSS
            # ============================================================
            if self.lane_loss_weight > 0 and self.helper is not None:

                instance_token, sample_token = tokens[batch_idx]

                ann = self.helper.get_sample_annotation(instance_token, sample_token)
                agent_x, agent_y = ann["translation"][:2]

                lane_ids = self.helper.map_api.get_lane_ids_in_xy(agent_x, agent_y)

                if len(lane_ids) > 0:
                    lane_id = lane_ids[0]
                    lane_poly = torch.tensor(
                        self.helper.map_api.get_lane_centerline(lane_id)[:, :2],
                        dtype=best_mode_trajectory.dtype,
                        device=best_mode_trajectory.device
                    )

                    # Convert local -> global
                    traj_local = best_mode_trajectory[0]   # (T,2)
                    quat = Quaternion(ann["rotation"])

                    traj_rot = torch.tensor(
                        [quat.rotate((p[0].item(), p[1].item(), 0.0))[:2] for p in traj_local],
                        dtype=traj_local.dtype,
                        device=traj_local.device
                    )

                    traj_global = traj_rot + torch.tensor([agent_x, agent_y], device=traj_rot.device)

                    # Lane deviation loss
                    lane_loss = lane_deviation_loss_single(traj_global, lane_poly)

                    loss = loss + self.lane_loss_weight * lane_loss


            batch_losses = torch.cat((batch_losses, loss.unsqueeze(0)), 0)

        avg_loss = torch.mean(batch_losses)

        return avg_loss

In [None]:
# Canviar quan es truca a la loss per posarli 
# una weight a la lane loss (canviar a mtp)

loss_fn = MTPLoss(
    num_modes=num_modes,
    regression_loss_weight=1.0,
    angle_threshold_degrees=5.,
    lane_loss_weight=1.0,   # <-- nuevo
    helper=helper           # <-- necesario
)


## 2. Snap-to-Lane: 

after the training is done the snap to lane function is used in the prediction to predict the closest point that is IN the lane. 

In [None]:
import numpy as np
from pyquaternion import Quaternion
import numpy as np

def get_agent_lane(helper, instance_token, sample_token):
    # Posici√≥n del agente en coordenadas globales
    annotation = helper.get_sample_annotation(instance_token, sample_token)
    agent_x, agent_y = annotation['translation'][:2]
    
    lanes = helper.map_api.get_lane_ids_in_xy(agent_x, agent_y)
    if len(lanes) == 0:
        return None  # No ha encontrado lane (raro, pero posible)
    
    # Devolvemos la primera para simplificar
    return lanes[0]

def get_lane_centerline(helper, lane_id):
    record = helper.map_api.get_lane(lane_id)
    lane_center = helper.map_api.get_lane_centerline(lane_id)
    # lane_center es un array Nx2 con la polil√≠nea
    return np.array(lane_center[:, :2])

def project_point_to_polyline(point, polyline):
    px, py = point
    min_dist = float('inf')
    closest_point = None
    
    for i in range(len(polyline) - 1):
        p1 = polyline[i]
        p2 = polyline[i+1]
        
        v = p2 - p1
        w = point - p1
        
        t = np.dot(w, v) / (np.dot(v, v) + 1e-8)
        t = np.clip(t, 0, 1)
        
        proj = p1 + t * v
        dist = np.linalg.norm(point - proj)

        if dist < min_dist:
            min_dist = dist
            closest_point = proj
            
    return closest_point


def snap_trajectory_to_lane(global_traj, helper, instance_token, sample_token):
    ann = helper.get_sample_annotation(instance_token, sample_token)
    x, y = ann["translation"][:2]

    lane_ids = helper.map_api.get_lane_ids_in_xy(x, y)
    if len(lane_ids) == 0:
        return global_traj  # no lane found

    lane_id = lane_ids[0]
    centerline = helper.map_api.get_lane_centerline(lane_id)[:, :2]

    snapped = []
    for point in global_traj:
        snapped.append(project_point_to_polyline(point, centerline))
    return np.array(snapped)


We add to the generate submision function to pass from the global to a lane aligned position

pred_coords_global[mode_idx] = snap_trajectory_to_lane(
    pred_coords_global[mode_idx],
    helper,
    instance_token,
    sample_token
)

In [None]:
def generate_submission_notebook(model, dataset, output_path="submission.json"):
    model.eval()
    predictions_list = []
    
    # Necesitamos el helper para buscar la pose del agente
    helper = dataset.helper 

    print(f"üöó Generando submission con conversi√≥n LOCAL -> GLOBAL...")
    
    for i in tqdm(range(len(dataset))):
        img, agent_state, _, _ = dataset[i]
        
        # Recuperar tokens
        raw_token = dataset.split[i]
        instance_token, sample_token = raw_token.split("_")

        # Inferencia
        img = img.unsqueeze(0)        
        agent_state = agent_state.unsqueeze(0)
        with torch.no_grad():
            pred = model(img, agent_state)

        # Procesar salida (tu c√≥digo de antes)
        total_output_size = pred.shape[1]
        num_modes = total_output_size // 25 
        num_coords = num_modes * 24
        
        pred_coords = pred[0, :num_coords]
        pred_probs = pred[0, num_coords:]
        
        # [Num_modos, 12, 2] en coordenadas LOCALES
        pred_coords_local = pred_coords.reshape(num_modes, 12, 2).cpu().numpy()

        # ============================================================
        # üåç TRANSFORMACI√ìN CR√çTICA: LOCAL -> GLOBAL
        # ============================================================
        
        # 1. Obtener la pose actual del agente en el mapa global
        sample_annotation = helper.get_sample_annotation(instance_token, sample_token)
        translation = sample_annotation['translation'] # [x, y, z] global
        rotation = sample_annotation['rotation']       # Quaternion global
        
        # 2. Convertir a matriz de transformaci√≥n (Local -> Global)
        # Nota: transform_matrix espera rotaci√≥n como Quaternion y translaci√≥n
        # Pero ojo: MTP predice X,Y (2D). NuScenes es 3D.
        
        # Manera simplificada de rotar y trasladar vectores 2D:
        quaternion = Quaternion(rotation)
        
        # Creamos un array vac√≠o para las coordenadas globales
        pred_coords_global = np.zeros_like(pred_coords_local)

        for mode_idx in range(num_modes):
            # Cogemos la trayectoria de un modo (Shape: 12, 2)
            trajectory_local = pred_coords_local[mode_idx]
            
            # A. A√±adimos una columna de ceros para Z (necesario para rotaci√≥n 3D)
            # Shape se convierte en (12, 3) -> [x, y, 0]
            traj_3d = np.hstack([trajectory_local, np.zeros((12, 1))])
            
            # B. Rotar (El agente mira hacia una direcci√≥n, rotamos los puntos)
            # Iteramos punto a punto o usamos vectorizaci√≥n si es posible. 
            # rotate funciona con vector √∫nico, as√≠ que iteramos para asegurar:
            traj_rotated = np.array([quaternion.rotate(p) for p in traj_3d])
            
            # C. Trasladar (Sumar la posici√≥n global actual del coche)
            # Solo sumamos X e Y (√≠ndices 0 y 1)
            pred_coords_global[mode_idx, :, 0] = traj_rotated[:, 0] + translation[0]
            pred_coords_global[mode_idx, :, 1] = traj_rotated[:, 1] + translation[1]

            # ============================================================
            # üõ£Ô∏è SNAP-TO-LANE (GLOBAL ‚Üí LANE-ALIGNED)
            # ============================================================
            pred_coords_global[mode_idx] = snap_trajectory_to_lane(
                pred_coords_global[mode_idx],
                helper,
                instance_token,
                sample_token
            )
            
        # ============================================================

        # Probabilidades
        if num_modes > 1:
            probs = torch.nn.functional.softmax(pred_probs, dim=0).cpu().numpy()
        else:
            probs = np.array([1.0])

        prediction_obj = Prediction(
            instance=instance_token,
            sample=sample_token,
            prediction=pred_coords_global, # ¬°USAMOS LAS GLOBALES!
            probabilities=probs
        )

        predictions_list.append(prediction_obj.serialize())

    with open(output_path, "w") as f:
        json.dump(predictions_list, f, indent=2)

    return output_path

## 3. RESTRINGIR EL ESPACIO DE PREDICCI√ìN (LANE CONDITIONED MTP)

4Ô∏è‚É£ Restringir el espacio de predicci√≥n (Lane-conditioned MTP)

En vez de dejar que el modelo prediga cualquier trayectoria libre, puedes:

Generar modos condicionados por la estructura de la lane (ramas, salidas, giros).

Hacer que cada ‚Äúmodo‚Äù siga una lane candidate.

Ejemplos:

CoverNet + Lattice basado en lanes

LaneGCN

Wayformer con road graph

Aqu√≠ el modelo pr√°cticamente solo puede elegir trayectorias v√°lidas por construcci√≥n.

Ventaja:

Es la soluci√≥n m√°s elegante acad√©micamente.

Desventaja:

M√°s trabajo de ingenier√≠a.

## 4. FER M√âS INTERESSANT EL BIRD EYE VIEW

1Ô∏è‚É£ Darle la informaci√≥n de la lane (BEV o vector lanes)

‚û°Ô∏è La opci√≥n que sugiere tu profe.
‚û°Ô∏è Es buena porque el modelo aprende ‚Äúpor s√≠ mismo‚Äù la geometr√≠a del mapa.

Formas de hacerlo:

Raster BEV completo (lo que estamos montando ahora).

Lanes vectorizadas (formato Trajectron++ / VectorNet).

A√±adir polil√≠neas directamente como input a un GNN o MLP.

Ventaja: no fuerza expl√≠citamente, solo ayuda.
Desventaja: el modelo a veces puede seguir equivoc√°ndose.