<a href="https://colab.research.google.com/github/mahesmeh001/AgroverseComp/blob/main/TransformerWithScoringHead.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
import numpy as np
import pandas as pd
import tensorflow as tf
from tensorflow import keras
from tensorflow.keras import layers
import matplotlib.pyplot as plt
import os

In [None]:
!mkdir -p ~/.kaggle
!mv kaggle.json ~/.kaggle/
!chmod 600 ~/.kaggle/kaggle.json

In [None]:
!kaggle competitions download -c cse-251-b-2025

Downloading cse-251-b-2025.zip to /content
 91% 913M/0.98G [00:10<00:01, 49.5MB/s]
100% 0.98G/0.98G [00:10<00:00, 96.0MB/s]


In [None]:
!unzip cse-251-b-2025.zip -d argoverse_data/

Archive:  cse-251-b-2025.zip
  inflating: argoverse_data/test_input.npz  
  inflating: argoverse_data/train.npz  


In [None]:
train_file = np.load('argoverse_data/train.npz')
train_data = train_file['data']
print("train_data's shape", train_data.shape)
test_file = np.load('argoverse_data/test_input.npz')
test_data = test_file['data']
print("test_data's shape", test_data.shape)

train_data's shape (10000, 50, 110, 6)
test_data's shape (2100, 50, 50, 6)


In [None]:
def transformer_encoder(inputs, head_size, num_heads, ff_dim, dropout=0):
    x = layers.LayerNormalization(epsilon=1e-6)(inputs)
    x = layers.MultiHeadAttention(key_dim=head_size, num_heads=num_heads, dropout=dropout)(x, x)
    x = layers.Dropout(dropout)(x)
    res = x + inputs

    # Pre-norm feedforward
    x = layers.LayerNormalization(epsilon=1e-6)(res)
    x = layers.Conv1D(filters=ff_dim, kernel_size=1, activation="gelu")(x)
    x = layers.Dropout(dropout)(x)
    x = layers.Conv1D(filters=inputs.shape[-1], kernel_size=1)(x)
    return x + res

In [None]:
class LearnablePositionalEncoding(layers.Layer):
    def __init__(self, max_len, d_model):
        super().__init__()
        self.pos_emb = self.add_weight(
            shape=(1, max_len, d_model),
            initializer="random_normal",
            trainable=True,
            name="learnable_positional_encoding"
        )

    def call(self, x):
        return x + self.pos_emb[:, :tf.shape(x)[1], :]

In [None]:
def build_model(
    input_shape,
    head_size,
    num_heads,
    ff_dim,
    num_transformer_blocks,
    mlp_units,
    output_steps=60,
    output_dims=2,
    num_prediction_modes=6,  # Number of candidate predictions
    dropout=0,
    mlp_dropout=0
):
    timesteps, features = input_shape

    inputs = keras.Input(shape=(timesteps, features))

    # Positional encoding
    x = LearnablePositionalEncoding(max_len=timesteps, d_model=features)(inputs)

    # Transformer layers
    for _ in range(num_transformer_blocks):
        x = transformer_encoder(x, head_size, num_heads, ff_dim, dropout)

    # Global pooling
    x = layers.GlobalAveragePooling1D()(x)

    # MLP tower (shared)
    for dim in mlp_units:
        x = layers.Dense(dim, activation="relu")(x)
        x = layers.Dropout(mlp_dropout)(x)

    # One shared feature vector → used to create multiple predictions
    prediction_features = x

    # Generate K prediction heads (one per mode)
    trajectories = []
    for _ in range(num_prediction_modes):
        traj = layers.Dense(output_steps * output_dims)(prediction_features)
        traj = layers.Reshape((output_steps, output_dims))(traj)
        trajectories.append(traj)

    # Stack all trajectories: shape = (batch_size, K, output_steps, output_dims)
    all_trajectories = layers.Lambda(lambda x: tf.stack(x, axis=1), name="trajectories")(trajectories)

    scores = []

    repeated_features = layers.Lambda(
        lambda x: tf.tile(tf.expand_dims(x, axis=1), [1, num_prediction_modes, 1])
    )(prediction_features)

    # Score MLP shared across modes
    score_mlp = keras.Sequential([
        layers.Dense(128, activation="relu"),
        layers.Dropout(mlp_dropout),
        layers.Dense(64, activation="relu"),
        layers.Dropout(mlp_dropout),
        layers.Dense(1)  # Score per mode
    ])

    # Apply score MLP to each mode's features
    scores = layers.TimeDistributed(score_mlp, name="scores")(repeated_features)

    # Reshape to (batch_size, K)
    scores = layers.Reshape((num_prediction_modes,), name="scores_out")(scores)

    return keras.Model(inputs, [all_trajectories, scores])

In [None]:
import numpy as np
from sklearn.preprocessing import StandardScaler, MinMaxScaler
import pickle

def normalize_trajectory_data(X_train, y_train, X_val=None, y_val=None,
                             method='standard', position_relative=True,
                             save_scalers=True, scaler_path='scalers.pkl'):
    """
    Normalize trajectory prediction data with proper handling of temporal sequences.

    Args:
        X_train: Input features (batch, timesteps, features)
        y_train: Target trajectories (batch, future_timesteps, xy)
        X_val: Validation input features (optional)
        y_val: Validation targets (optional)
        method: 'standard', 'minmax', or 'robust'
        position_relative: Convert positions to relative coordinates
        save_scalers: Save scalers for later use
        scaler_path: Path to save/load scalers

    Returns:
        Normalized data and scalers dictionary
    """

    scalers = {}

    # 1. HANDLE POSITION FEATURES (make relative to starting position)
    if position_relative and X_train.shape[-1] >= 2:
        print("Converting to relative coordinates...")

        # Get starting positions (first timestep x,y)
        start_pos_x = X_train[:, 0:1, 0:1]  # (batch, 1, 1)
        start_pos_y = X_train[:, 0:1, 1:2]  # (batch, 1, 1)

        # Make input positions relative
        X_train = X_train.copy()
        X_train[:, :, 0:1] = X_train[:, :, 0:1] - start_pos_x  # Relative x
        X_train[:, :, 1:2] = X_train[:, :, 1:2] - start_pos_y  # Relative y

        # Make target positions relative
        y_train = y_train.copy()
        y_train[:, :, 0:1] = y_train[:, :, 0:1] - start_pos_x  # Relative x
        y_train[:, :, 1:2] = y_train[:, :, 1:2] - start_pos_y  # Relative y

        # Store starting positions for later denormalization
        scalers['start_positions'] = {
            'start_x': start_pos_x,
            'start_y': start_pos_y
        }

        # Handle validation data
        if X_val is not None and y_val is not None:
            start_pos_x_val = X_val[:, 0:1, 0:1]
            start_pos_y_val = X_val[:, 0:1, 1:2]

            X_val = X_val.copy()
            X_val[:, :, 0:1] = X_val[:, :, 0:1] - start_pos_x_val
            X_val[:, :, 1:2] = X_val[:, :, 1:2] - start_pos_y_val

            y_val = y_val.copy()
            y_val[:, :, 0:1] = y_val[:, :, 0:1] - start_pos_x_val
            y_val[:, :, 1:2] = y_val[:, :, 1:2] - start_pos_y_val

    # 2. NORMALIZE INPUT FEATURES
    print(f"Normalizing input features using {method} scaling...")

    # Reshape for fitting scalers: (batch * timesteps, features)
    X_train_reshaped = X_train.reshape(-1, X_train.shape[-1])

    if method == 'standard':
        X_scaler = StandardScaler()
    elif method == 'minmax':
        X_scaler = MinMaxScaler()
    elif method == 'robust':
        from sklearn.preprocessing import RobustScaler
        X_scaler = RobustScaler()
    else:
        raise ValueError("Method must be 'standard', 'minmax', or 'robust'")

    # Fit and transform input features
    X_train_normalized = X_scaler.fit_transform(X_train_reshaped)
    X_train_normalized = X_train_normalized.reshape(X_train.shape)

    scalers['X_scaler'] = X_scaler

    # Transform validation data if provided
    X_val_normalized = None
    if X_val is not None:
        X_val_reshaped = X_val.reshape(-1, X_val.shape[-1])
        X_val_normalized = X_scaler.transform(X_val_reshaped)
        X_val_normalized = X_val_normalized.reshape(X_val.shape)

    # 3. NORMALIZE TARGET TRAJECTORIES
    print("Normalizing target trajectories...")

    # Reshape targets: (batch * timesteps, xy)
    y_train_reshaped = y_train.reshape(-1, y_train.shape[-1])

    if method == 'standard':
        y_scaler = StandardScaler()
    elif method == 'minmax':
        y_scaler = MinMaxScaler()
    elif method == 'robust':
        from sklearn.preprocessing import RobustScaler
        y_scaler = RobustScaler()

    # Fit and transform targets
    y_train_normalized = y_scaler.fit_transform(y_train_reshaped)
    y_train_normalized = y_train_normalized.reshape(y_train.shape)

    scalers['y_scaler'] = y_scaler

    # Transform validation targets if provided
    y_val_normalized = None
    if y_val is not None:
        y_val_reshaped = y_val.reshape(-1, y_val.shape[-1])
        y_val_normalized = y_scaler.transform(y_val_reshaped)
        y_val_normalized = y_val_normalized.reshape(y_val.shape)

    # 4. SAVE SCALERS
    if save_scalers:
        with open(scaler_path, 'wb') as f:
            pickle.dump(scalers, f)
        print(f"Scalers saved to {scaler_path}")

    # 5. PRINT NORMALIZATION STATS
    print("\nNormalization Statistics:")
    print(f"X_train - Mean: {X_train_normalized.mean():.4f}, Std: {X_train_normalized.std():.4f}")
    print(f"y_train - Mean: {y_train_normalized.mean():.4f}, Std: {y_train_normalized.std():.4f}")

    if X_val_normalized is not None:
        print(f"X_val - Mean: {X_val_normalized.mean():.4f}, Std: {X_val_normalized.std():.4f}")
        print(f"y_val - Mean: {y_val_normalized.mean():.4f}, Std: {y_val_normalized.std():.4f}")

    # Return normalized data
    if X_val_normalized is not None and y_val_normalized is not None:
        return (X_train_normalized, y_train_normalized,
                X_val_normalized, y_val_normalized, scalers)
    else:
        return X_train_normalized, y_train_normalized, scalers

def denormalize_predictions(predictions, scalers, validation_indices=None):
    pred_denormalized = predictions.copy()

    if 'start_positions' in scalers:
        start_x = scalers['start_positions']['start_x']
        start_y = scalers['start_positions']['start_y']

        # If validation_indices provided, use only those start positions
        if validation_indices is not None:
            start_x = start_x[validation_indices]
            start_y = start_y[validation_indices]
        else:
            # Take only the first N start positions to match predictions
            n_samples = predictions.shape[0]
            start_x = start_x[:n_samples]
            start_y = start_y[:n_samples]

    pred_denormalized[:, :, 0:1] = pred_denormalized[:, :, 0:1] + start_x
    pred_denormalized[:, :, 1:2] = pred_denormalized[:, :, 1:2] + start_y

    return pred_denormalized

def load_scalers(scaler_path='scalers.pkl'):
    """Load saved scalers."""
    with open(scaler_path, 'rb') as f:
        return pickle.load(f)

# Example usage function
def normalize_your_data(X_train_ego, y_train, X_tiny=None, y_tiny=None):
    """
    Normalize your specific trajectory data.
    """
    print("Original shapes:")
    print(f"X_train_ego shape: {X_train_ego.shape}")
    print(f"y_train shape: {y_train.shape}")

    if X_tiny is not None and y_tiny is not None:
        # Normalize with validation data
        X_norm, y_norm, X_val_norm, y_val_norm, scalers = normalize_trajectory_data(
            X_train_ego, y_train, X_tiny, y_tiny,
            method='standard',
            position_relative=True
        )

        print("\nAfter normalization:")
        print(f"X_train_normalized shape: {X_norm.shape}")
        print(f"y_train_normalized shape: {y_norm.shape}")
        print(f"X_val_normalized shape: {X_val_norm.shape}")
        print(f"y_val_normalized shape: {y_val_norm.shape}")

        return X_norm, y_norm, X_val_norm, y_val_norm, scalers

    else:
        # Normalize without validation data
        X_norm, y_norm, scalers = normalize_trajectory_data(
            X_train_ego, y_train,
            method='standard',
            position_relative=True
        )

        print("\nAfter normalization:")
        print(f"X_train_normalized shape: {X_norm.shape}")
        print(f"y_train_normalized shape: {y_norm.shape}")

        return X_norm, y_norm, scalers

In [None]:
def add_speed_and_direction(X):
    """
    X: numpy array of shape (batch, timesteps, features)
    Assumes that X[..., :2] are (x, y) coordinates
    Returns: X_new of shape (batch, timesteps, features + 2)
    """
    xy = X[..., :2]  # (batch, timesteps, 2)
    deltas = xy[:, 1:] - xy[:, :-1]  # (batch, timesteps - 1, 2)

    speed = np.linalg.norm(deltas, axis=-1)  # (batch, timesteps - 1)
    direction = np.arctan2(deltas[..., 1], deltas[..., 0])  # (batch, timesteps - 1)

    # Pad with 0 at the beginning to match shape (batch, timesteps)
    speed = np.pad(speed, ((0, 0), (1, 0)), mode='constant')
    direction = np.pad(direction, ((0, 0), (1, 0)), mode='constant')

    # Expand dims to concatenate
    speed = speed[..., np.newaxis]      # (batch, timesteps, 1)
    direction = direction[..., np.newaxis]  # (batch, timesteps, 1)

    return np.concatenate([X, speed, direction], axis=-1)

In [None]:
X_train_ego = train_data[:, 0, :50, :]  # (batch, timesteps, features) - ego vehicle only
X_train_ego = add_speed_and_direction(X_train_ego)
y_train = train_data[:, 0, 50:110, :2]  # (batch, future_timesteps, xy)

X_tiny = X_train_ego[:50]
y_tiny = y_train[:50]

print(f"X_train_ego shape: {X_train_ego.shape}")
print(f"y_train shape: {y_train.shape}")

X_train_ego shape: (10000, 50, 8)
y_train shape: (10000, 60, 2)


In [None]:
X_norm, y_norm, X_val_norm, y_val_norm, scalers = normalize_your_data(
    X_train_ego, y_train, X_tiny, y_tiny
)

Original shapes:
X_train_ego shape: (10000, 50, 8)
y_train shape: (10000, 60, 2)
Converting to relative coordinates...
Normalizing input features using standard scaling...
Normalizing target trajectories...
Scalers saved to scalers.pkl

Normalization Statistics:
X_train - Mean: -0.0000, Std: 0.9354
y_train - Mean: 0.0000, Std: 1.0000
X_val - Mean: 0.0415, Std: 0.9082
y_val - Mean: 0.0860, Std: 0.9967

After normalization:
X_train_normalized shape: (10000, 50, 8)
y_train_normalized shape: (10000, 60, 2)
X_val_normalized shape: (50, 50, 8)
y_val_normalized shape: (50, 60, 2)


In [None]:
import matplotlib.pyplot as plt
from tensorflow.keras.callbacks import Callback

class VisualScoringCallback(Callback):
    def __init__(self, val_data, num_prediction_modes=6):
        super().__init__()
        self.val_data = val_data
        self.num_prediction_modes = num_prediction_modes

    def on_epoch_end(self, epoch, logs=None):
        print(f"\nEpoch {epoch + 1} ended. Let's visually score the model predictions.")

        x_val, y_val = self.val_data  # unpack the tuple

        # Grab a few samples
        x_batch = x_val[:3]
        y_batch = y_val[:3]

        # Predict on the batch
        preds = self.model.predict(x_batch)  # preds = [trajectories, scores]
        trajectories = preds[0]

        for i in range(min(3, x_val.shape[0])):  # show top 3 samples
            print(f"\nSample {i + 1}:")
            for k in range(self.num_prediction_modes):
                plt.figure()
                plt.title(f"Mode {k}")
                pred_traj = trajectories[i, k]  # shape (T, D)
                true_traj = y_val[i]     # shape (T, D)

                plt.plot(pred_traj[:, 0], pred_traj[:, 1], label="Prediction")
                plt.plot(true_traj[:, 0], true_traj[:, 1], label="Ground Truth", linestyle="--")
                plt.legend()
                plt.grid(True)
                plt.show()

                try:
                    score = float(input(f"Score for Mode {k} (0-10): "))
                except Exception as e:
                    print(f"Invalid input. Defaulting score to 0. Error: {e}")
                    score = 0.0

                print(f"  → You gave Mode {k} a score of {score}")

In [None]:
def best_mode_mse_loss_with_motion(y_true, y_pred, direction_weight=1.0, speed_weight=1.0):
    """
    y_true: (B, T, 2)
    y_pred: (B, K, T, 2)
    """
    # Expand and tile y_true
    y_true_exp = tf.expand_dims(y_true, axis=1)                          # (B, 1, T, 2)
    y_true_tile = tf.tile(y_true_exp, [1, tf.shape(y_pred)[1], 1, 1])    # (B, K, T, 2)

    # MSE to pick best mode
    per_mode_mse = tf.reduce_mean(tf.square(y_pred - y_true_tile), axis=[2, 3])  # (B, K)
    best_mode_idx = tf.argmin(per_mode_mse, axis=1)  # (B,)

    # Best prediction trajectory
    best_pred = tf.gather(y_pred, best_mode_idx, axis=1, batch_dims=1)  # (B, T, 2)

    # ========== Base Position MSE ==========
    position_loss = tf.reduce_mean(tf.square(best_pred - y_true))

    # ========== Velocity ==========
    def compute_speed(traj):
        delta = traj[:, 1:] - traj[:, :-1]        # (B, T-1, 2)
        speed = tf.norm(delta, axis=-1)           # (B, T-1)
        return speed

    speed_pred = compute_speed(best_pred)
    speed_true = compute_speed(y_true)
    speed_loss = tf.reduce_mean(tf.square(speed_pred - speed_true))

    # ========== Direction ==========
    def compute_angle(traj):
        delta = traj[:, 1:] - traj[:, :-1]        # (B, T-1, 2)
        angle = tf.math.atan2(delta[..., 1], delta[..., 0])  # (B, T-1)
        return angle

    angle_pred = compute_angle(best_pred)
    angle_true = compute_angle(y_true)
    angle_diff = tf.math.angle(
    tf.exp(tf.complex(0.0, angle_pred)) / tf.exp(tf.complex(0.0, angle_true)))  # circular diff
    angle_loss = tf.reduce_mean(tf.square(angle_diff))

    # ========== Total Loss ==========
    total_loss = position_loss + speed_weight * speed_loss + direction_weight * angle_loss
    return total_loss

In [None]:
model = build_model(
    input_shape=X_train_ego.shape[1:],  # (timesteps, features)
    head_size=1024,         # Increase from 256
    num_heads=16,           # Increase from 4
    ff_dim=1024,            # Increase from 256
    num_transformer_blocks=8,  # Increase from 4
    mlp_units=[1024, 512],  # Back to larger MLP
    output_steps=60,
    output_dims=2,
    dropout=0.0,           # Turn off dropout to promote overfitting
    mlp_dropout=0.0        # Turn off MLP dropout
)

model.compile(
    optimizer="adam",
    loss={
        "trajectories": best_mode_mse_loss_with_motion,  # e.g. custom loss, MSE on best trajectory
        "scores_out": None         # optional, e.g. MSE vs. manual scores or None
    },
    loss_weights={"trajectories": 1.0, "scores_out": 0.0}  # Set to 0.0 if you aren't training score head yet
)

model.summary()

# Create smaller batches and use fewer epochs
BATCH_SIZE = 16  # Reduced from 32
EPOCHS = 100      # Reduced from 10

val_callback = VisualScoringCallback(val_data=(X_val_norm, y_val_norm), num_prediction_modes=8)

dummy_scores = np.zeros((len(X_norm), 8))
# Train with reduced memory footprint
history = model.fit(
    X_norm,
    {"trajectories": y_norm, "scores_out": dummy_scores},
    epochs=EPOCHS,
    batch_size=BATCH_SIZE,
    validation_split=0.1,
    callbacks=val_callback
)

Epoch 1/100




[1m563/563[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m146s[0m 195ms/step - loss: 3.4089 - val_loss: 2.6237
Epoch 2/100
[1m563/563[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m87s[0m 155ms/step - loss: 2.3647 - val_loss: 2.0917
Epoch 3/100
[1m563/563[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m142s[0m 155ms/step - loss: 2.0847 - val_loss: 1.8287
Epoch 4/100
[1m563/563[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m87s[0m 154ms/step - loss: 1.8777 - val_loss: 1.9818
Epoch 5/100
[1m563/563[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m145s[0m 159ms/step - loss: 1.6866 - val_loss: 1.7602
Epoch 6/100
[1m563/563[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m139s[0m 155ms/step - loss: 1.5944 - val_loss: 1.4354
Epoch 7/100
[1m563/563[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m142s[0m 154ms/step - loss: 1.5500 - val_loss: 1.6506
Epoch 8/100
[1m563/563[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m144s[0m 159ms/step - loss: 1.5109 - val_loss: 1.4664
Epoch 9/100
[

KeyboardInterrupt: 

In [None]:
predictions = model.predict(X_norm)
denormalized_predictions = denormalize_predictions(predictions, scalers)

[1m313/313[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m32s[0m 92ms/step


AttributeError: 'list' object has no attribute 'shape'

In [None]:
import matplotlib.pyplot as plt
import numpy as np

def visualize_predictions(real_trajectories, predicted_trajectories, sample_indices=None,
                         num_samples=5, figsize=(15, 10), title_prefix="Sample"):
    """
    Visualize real vs predicted trajectories

    Args:
        real_trajectories: Ground truth trajectories (batch_size, time_steps, 2)
        predicted_trajectories: Model predictions (batch_size, time_steps, 2)
        sample_indices: Specific indices to visualize (if None, random samples)
        num_samples: Number of samples to visualize
        figsize: Figure size
        title_prefix: Prefix for subplot titles
    """
    max_idx = len(real_trajectories)

    if sample_indices is None:
        sample_indices = np.random.choice(max_idx,
                                        min(num_samples, max_idx),
                                        replace=False)
    else:
        # Filter out indices that are too large
        sample_indices = [idx for idx in sample_indices if idx < max_idx]
        sample_indices = sample_indices[:num_samples]
        if len(sample_indices) == 0:
            print(f"Warning: No valid indices found. Using random indices from 0 to {max_idx-1}")
            sample_indices = np.random.choice(max_idx, min(num_samples, max_idx), replace=False)

    # Calculate grid dimensions
    cols = min(3, len(sample_indices))
    rows = (len(sample_indices) + cols - 1) // cols

    fig, axes = plt.subplots(rows, cols, figsize=figsize)
    if rows == 1:
        axes = [axes] if cols == 1 else axes
    else:
        axes = axes.flatten()

    for i, idx in enumerate(sample_indices):
        ax = axes[i] if len(sample_indices) > 1 else axes

        # Extract trajectories
        real_traj = real_trajectories[idx]
        pred_traj = predicted_trajectories[idx]

        # Plot real trajectory
        ax.plot(real_traj[:, 0], real_traj[:, 1], 'b-',
                linewidth=2, label='Ground Truth', alpha=0.8)
        ax.scatter(real_traj[0, 0], real_traj[0, 1],
                  c='blue', s=100, marker='o', label='Real Start', zorder=5)
        ax.scatter(real_traj[-1, 0], real_traj[-1, 1],
                  c='blue', s=100, marker='s', label='Real End', zorder=5)

        # Plot predicted trajectory
        ax.plot(pred_traj[:, 0], pred_traj[:, 1], 'r--',
                linewidth=2, label='Prediction', alpha=0.8)
        ax.scatter(pred_traj[0, 0], pred_traj[0, 1],
                  c='red', s=100, marker='o', label='Pred Start', zorder=5)
        ax.scatter(pred_traj[-1, 0], pred_traj[-1, 1],
                  c='red', s=100, marker='s', label='Pred End', zorder=5)

        # Add trajectory points
        ax.scatter(real_traj[:, 0], real_traj[:, 1],
                  c='blue', s=20, alpha=0.5, zorder=3)
        ax.scatter(pred_traj[:, 0], pred_traj[:, 1],
                  c='red', s=20, alpha=0.5, zorder=3)

        ax.set_title(f'{title_prefix} {idx}')
        ax.set_xlabel('X Position')
        ax.set_ylabel('Y Position')
        ax.legend()
        ax.grid(True, alpha=0.3)
        ax.set_aspect('equal')

    # Hide empty subplots
    for i in range(len(sample_indices), len(axes)):
        axes[i].set_visible(False)

    plt.tight_layout()
    plt.show()

def plot_error_analysis(real_trajectories, predicted_trajectories, figsize=(15, 5)):
    """
    Plot error analysis: MSE over time and error distribution
    """
    # Calculate errors
    errors = real_trajectories - predicted_trajectories
    mse_per_timestep = np.mean(errors**2, axis=(0, 2))  # Average over batch and features
    euclidean_errors = np.sqrt(np.sum(errors**2, axis=2))  # Euclidean distance per timestep

    fig, (ax1, ax2, ax3) = plt.subplots(1, 3, figsize=figsize)

    # MSE over time
    ax1.plot(mse_per_timestep, 'b-', linewidth=2)
    ax1.set_title('MSE Over Time Steps')
    ax1.set_xlabel('Time Step')
    ax1.set_ylabel('Mean Squared Error')
    ax1.grid(True, alpha=0.3)

    # Error distribution
    final_errors = euclidean_errors[:, -1]  # Final position errors
    ax2.hist(final_errors, bins=30, alpha=0.7, color='red', edgecolor='black')
    ax2.set_title('Final Position Error Distribution')
    ax2.set_xlabel('Euclidean Error')
    ax2.set_ylabel('Frequency')
    ax2.grid(True, alpha=0.3)

    # Average error over time
    mean_euclidean_error = np.mean(euclidean_errors, axis=0)
    std_euclidean_error = np.std(euclidean_errors, axis=0)

    ax3.plot(mean_euclidean_error, 'g-', linewidth=2, label='Mean Error')
    ax3.fill_between(range(len(mean_euclidean_error)),
                     mean_euclidean_error - std_euclidean_error,
                     mean_euclidean_error + std_euclidean_error,
                     alpha=0.3, color='green', label='±1 Std')
    ax3.set_title('Euclidean Error Over Time')
    ax3.set_xlabel('Time Step')
    ax3.set_ylabel('Euclidean Error')
    ax3.legend()
    ax3.grid(True, alpha=0.3)

    plt.tight_layout()
    plt.show()

    # Print summary statistics
    print(f"Mean Final Position Error: {np.mean(final_errors):.4f}")
    print(f"Std Final Position Error: {np.std(final_errors):.4f}")
    print(f"Max Final Position Error: {np.max(final_errors):.4f}")
    print(f"Min Final Position Error: {np.min(final_errors):.4f}")

def compare_specific_samples(real_trajectories, predicted_trajectories, indices,
                           figsize=(20, 4)):
    """
    Compare specific samples in a single row
    """
    num_samples = len(indices)
    fig, axes = plt.subplots(1, num_samples, figsize=figsize)

    if num_samples == 1:
        axes = [axes]

    for i, idx in enumerate(indices):
        ax = axes[i]

        real_traj = real_trajectories[idx]
        pred_traj = predicted_trajectories[idx]

        # Plot trajectories
        ax.plot(real_traj[:, 0], real_traj[:, 1], 'b-',
                linewidth=3, label='Ground Truth', alpha=0.8)
        ax.plot(pred_traj[:, 0], pred_traj[:, 1], 'r--',
                linewidth=3, label='Prediction', alpha=0.8)

        # Mark start and end points
        ax.scatter(real_traj[0, 0], real_traj[0, 1],
                  c='blue', s=150, marker='o', edgecolors='black', linewidth=2)
        ax.scatter(real_traj[-1, 0], real_traj[-1, 1],
                  c='blue', s=150, marker='s', edgecolors='black', linewidth=2)
        ax.scatter(pred_traj[0, 0], pred_traj[0, 1],
                  c='red', s=150, marker='o', edgecolors='black', linewidth=2)
        ax.scatter(pred_traj[-1, 0], pred_traj[-1, 1],
                  c='red', s=150, marker='s', edgecolors='black', linewidth=2)

        # Calculate and display error
        final_error = np.sqrt(np.sum((real_traj[-1] - pred_traj[-1])**2))
        ax.set_title(f'Sample {idx}\nFinal Error: {final_error:.3f}')
        ax.set_xlabel('X Position')
        ax.set_ylabel('Y Position')
        ax.legend()
        ax.grid(True, alpha=0.3)
        ax.set_aspect('equal')

    plt.tight_layout()
    plt.show()

In [None]:
y_val_denorm = denormalize_predictions(y_norm, scalers, validation_indices=None)

# Basic visualization
visualize_predictions(y_val_denorm, denormalized_predictions, num_samples=6)

# Error analysis
plot_error_analysis(y_val_denorm, denormalized_predictions)

# Compare specific samples
interesting_indices = [0, 5, 10, 15, 20]  # Choose samples you want to examine
compare_specific_samples(y_val_denorm, denormalized_predictions, interesting_indices)

# Find best and worst predictions
final_errors = np.sqrt(np.sum((y_val_denorm[:, -1] - denormalized_predictions[:, -1])**2, axis=1))
best_indices = np.argsort(final_errors)[:3]  # 3 best predictions
worst_indices = np.argsort(final_errors)[-3:]  # 3 worst predictions