# Read data example

In [1]:
import os
import typing
from tqdm import tqdm

In [2]:
ROOT_DATA_FOLDER = "./YandexCup2024v2"

TRAIN_DATASET_PATH = os.path.join(ROOT_DATA_FOLDER, "YaCupTrain")
TEST_DATASET_PATH = os.path.join(ROOT_DATA_FOLDER, "YaCupTest")

In [3]:
# Load all ids of a dataset

def read_testcase_ids(dataset_path: str):
    ids = [int(case_id) for case_id in os.listdir(dataset_path) if os.path.isdir(os.path.join(dataset_path, case_id))]
    return ids

In [4]:
train_ids = read_testcase_ids(TRAIN_DATASET_PATH)
len(train_ids)

42000

In [5]:
test_ids = read_testcase_ids(TEST_DATASET_PATH)
len(test_ids)

8000

In [6]:
class DataFilePaths:
    def __init__(self, testcase_path: str):
        self.testcase_path = testcase_path
        
    def localization(self):
        return os.path.join(self.testcase_path, 'localization.csv')
    
    def control(self):
        return os.path.join(self.testcase_path, 'control.csv')
    
    def metadata(self):
        return os.path.join(self.testcase_path, 'metadata.json')
    
    # exists only for test_dataset
    def requested_stamps(self):
        return os.path.join(self.testcase_path, 'requested_stamps.csv')    

In [7]:
import pandas as pd
import json

def read_localization(localization_path: str):
    return pd.read_csv(localization_path)

def read_control(control_path):
    return pd.read_csv(control_path)

def read_metadata(metadata_path: str):
    with open(metadata_path, 'r') as f:
        data = json.load(f)
    return data

def read_requested_stamps(requested_stamps_path: str):
    return pd.read_csv(requested_stamps_path)
    
def read_testcase(dataset_path: str, testcase_id: str, is_test: bool = False):
    testcase_path = os.path.join(dataset_path, str(testcase_id))
    data_file_paths = DataFilePaths(testcase_path)
    
    testcase_data = {}
    testcase_data['localization'] = read_localization(data_file_paths.localization())
    testcase_data['control'] = read_control(data_file_paths.control())
    testcase_data['metadata'] = read_metadata(data_file_paths.metadata())
    if is_test:
        testcase_data['requested_stamps'] = read_requested_stamps(data_file_paths.requested_stamps())
        
    return testcase_data

In [8]:
def read_testcases(dataset_path: str, is_test: bool = False, testcase_ids: typing.Iterable[int] = None):
    result = {}
    if testcase_ids is None:
        testcase_ids = read_testcase_ids(dataset_path)
    
    for testcase_id in tqdm(testcase_ids):
        testcase = read_testcase(dataset_path, testcase_id, is_test=is_test)
        result[testcase_id] = testcase
    return result

In [29]:
raw_train_dataset = read_testcases(TRAIN_DATASET_PATH,  testcase_ids = train_ids[:10])

100%|██████████| 10/10 [00:00<00:00, 226.08it/s]


In [32]:
sample = raw_train_dataset[0]

In [34]:
sample.keys()

dict_keys(['localization', 'control', 'metadata'])

In [33]:
sample['localization'].shape

(1582, 7)

In [37]:
sample['control']

Unnamed: 0,stamp_ns,acceleration_level,steering
0,2987440736,-114,-2.655140
1,3027341070,-123,-2.598169
2,3066793076,-132,-2.544422
3,3106757146,-141,-2.544422
4,3146784622,-147,-2.488557
...,...,...,...
1495,62786741116,33,117.135357
1496,62826899778,33,119.059706
1497,62867315073,33,120.952111
1498,62906605994,32,122.802597


In [115]:
# may take some time
train_dataset = read_testcases(TRAIN_DATASET_PATH,  testcase_ids = train_ids[:])

len(train_dataset)

100%|██████████| 42000/42000 [03:55<00:00, 178.23it/s]


42000

In [116]:
test_dataset = read_testcases(TEST_DATASET_PATH, is_test=True)
len(test_dataset)

100%|██████████| 8000/8000 [00:25<00:00, 314.15it/s]


8000

In [117]:
import os
import typing
import json
import numpy as np
import pandas as pd
from tqdm import tqdm
import torch
import torch.nn as nn
from torch.utils.data import Dataset, DataLoader
from sklearn.preprocessing import LabelEncoder
from sklearn.model_selection import train_test_split

In [118]:
# Collect all possible categories from the training data
vehicle_models = []
vehicle_modifications = []
tire_types = []

for testcase in train_dataset.values():
    metadata = testcase['metadata']
    vehicle_models.append(metadata['vehicle_model'])
    vehicle_modifications.append(metadata['vehicle_model_modification'])
    tires = metadata['tires']
    tire_types.append(tires['front'])
    tire_types.append(tires['rear'])

# Fit the encoders
vehicle_model_encoder.fit(vehicle_models)
vehicle_modification_encoder.fit(vehicle_modifications)
tires_encoder.fit(tire_types)

In [129]:
from collections import Counter
Counter(tire_types).most_common()

[(0, 60036),
 (2, 7937),
 (5, 4870),
 (6, 3453),
 (3, 2518),
 (1, 2373),
 (7, 1909),
 (4, 492),
 (11, 170),
 (9, 80),
 (12, 78),
 (8, 54),
 (13, 16),
 (10, 14)]

In [17]:
class TrajectoryDataset(Dataset):
    def __init__(self, dataset, normalizer=None, training=True):
        self.data = []
        self.training = training
        sampling_interval_ns = 4e7  # 0.04 seconds in nanoseconds
        initial_state_length = int(5 / 0.04)  # Steps for initial 5 seconds (125 steps)
        target_length = int(15 / 0.04)  # Steps from 5s to 20s (375 steps)
        sequence_length = initial_state_length + target_length  # Total steps (500 steps)

        # For normalization
        self.positions = []
        self.controls = []

        for testcase_id, testcase in dataset.items():
            # Get metadata
            metadata = testcase['metadata']
            vehicle_model = vehicle_model_encoder.transform([metadata['vehicle_model']])[0]
            vehicle_modification = vehicle_modification_encoder.transform([metadata['vehicle_model_modification']])[0]
            tires_front = tires_encoder.transform([metadata['tires']['front']])[0]
            tires_rear = tires_encoder.transform([metadata['tires']['rear']])[0]
            vehicle_features = [vehicle_model, vehicle_modification, tires_front, tires_rear]

            # Get control commands
            control = testcase['control']
            control['acceleration_level'] = control['acceleration_level'].fillna(0)
            control_seq = control[['stamp_ns', 'acceleration_level', 'steering']].values

            # Get localization data
            localization = testcase['localization']
            localization_seq = localization[['stamp_ns', 'x', 'y', 'yaw']].values

            # Resample to fixed time steps (every 0.04s)
            time_steps = np.arange(0, 60 * 1e9, sampling_interval_ns)
            control_resampled = self.resample_sequence(control_seq, time_steps)
            localization_resampled = self.resample_sequence(localization_seq, time_steps)

            max_start_idx = len(time_steps) - sequence_length
            for i in range(0, max_start_idx, initial_state_length):  # Slide window
                # Initial localization sequence (first 5 seconds)
                initial_localization = localization_resampled[i:i+initial_state_length, 1:]  # Shape: [125, 3]
                # Convert yaw to sin and cos
                yaw = initial_localization[:, 2]
                sin_yaw = np.sin(yaw)
                cos_yaw = np.cos(yaw)
                initial_localization = np.hstack((initial_localization[:, :2], sin_yaw[:, np.newaxis], cos_yaw[:, np.newaxis]))  # Shape: [125, 4]

                # Control sequence from t + 5s to t + 20s (for target trajectory)
                control_sequence = control_resampled[i+initial_state_length:i+sequence_length, 1:]  # Shape: [375, 2]

                # Target trajectory from t + 5s to t + 20s
                target_traj = localization_resampled[i+initial_state_length:i+sequence_length, 1:]  # Shape: [375, 3]
                # Convert yaw to sin and cos
                yaw = target_traj[:, 2]
                sin_yaw = np.sin(yaw)
                cos_yaw = np.cos(yaw)
                target_traj = np.hstack((target_traj[:, :2], sin_yaw[:, np.newaxis], cos_yaw[:, np.newaxis]))  # Shape: [375, 4]

                # Collect data for normalization
                if training:
                    self.positions.append(initial_localization[:, :2])
                    self.positions.append(target_traj[:, :2])
                    self.controls.append(control_sequence)

                self.data.append({
                    'vehicle_features': vehicle_features,
                    'initial_localization': initial_localization,
                    'control_sequence': control_sequence,
                    'target_traj': target_traj
                })

        # Fit normalizer if training
        if training:
            self.positions = np.vstack(self.positions)
            self.controls = np.vstack(self.controls)
            self.normalizer = {
                'x_mean': self.positions[:, 0].mean(),
                'x_std': self.positions[:, 0].std(),
                'y_mean': self.positions[:, 1].mean(),
                'y_std': self.positions[:, 1].std(),
                'acceleration_mean': self.controls[:, 0].mean(),
                'acceleration_std': self.controls[:, 0].std(),
                'steering_mean': self.controls[:, 1].mean(),
                'steering_std': self.controls[:, 1].std(),
            }
        else:
            self.normalizer = normalizer

    def resample_sequence(self, seq, time_steps):
        df_seq = pd.DataFrame(seq, columns=['stamp_ns'] + [f'feat_{i}' for i in range(seq.shape[1]-1)])
        df_seq = df_seq.set_index('stamp_ns').reindex(time_steps, method='nearest').reset_index()
        return df_seq.values

    def __len__(self):
        return len(self.data)

    def __getitem__(self, idx):
        sample = self.data[idx]
        vehicle_features = torch.tensor(sample['vehicle_features'], dtype=torch.long)

        # Normalize initial localization
        initial_localization = sample['initial_localization']
        initial_localization[:, 0] = (initial_localization[:, 0] - self.normalizer['x_mean']) / self.normalizer['x_std']
        initial_localization[:, 1] = (initial_localization[:, 1] - self.normalizer['y_mean']) / self.normalizer['y_std']
        initial_localization = torch.tensor(initial_localization, dtype=torch.float32)

        # Normalize control sequence
        control_sequence = sample['control_sequence']
        control_sequence[:, 0] = (control_sequence[:, 0] - self.normalizer['acceleration_mean']) / self.normalizer['acceleration_std']
        control_sequence[:, 1] = (control_sequence[:, 1] - self.normalizer['steering_mean']) / self.normalizer['steering_std']
        control_sequence = torch.tensor(control_sequence, dtype=torch.float32)

        # Normalize target trajectory
        target_traj = sample['target_traj']
        target_traj[:, 0] = (target_traj[:, 0] - self.normalizer['x_mean']) / self.normalizer['x_std']
        target_traj[:, 1] = (target_traj[:, 1] - self.normalizer['y_mean']) / self.normalizer['y_std']
        target_traj = torch.tensor(target_traj, dtype=torch.float32)

        return vehicle_features, initial_localization, control_sequence, target_traj

In [18]:
train_data = TrajectoryDataset(train_dataset, training=True)
normalizer = train_data.normalizer

# Split into training and validation sets
train_size = int(0.8 * len(train_data))
val_size = len(train_data) - train_size
train_dataset, val_dataset = torch.utils.data.random_split(train_data, [train_size, val_size])

# Pass the normalizer to validation dataset
val_dataset.dataset.normalizer = normalizer

# Create DataLoaders
train_loader = DataLoader(train_dataset, batch_size=32, shuffle=True)
val_loader = DataLoader(val_dataset, batch_size=32, shuffle=False)

In [71]:
for vehicle_features, initial_localization, control_sequence, target_traj in train_loader:
    break

In [123]:
# for vehicle_features, initial_localization, control_sequence, target_traj in tqdm(train_loader):
#     vehicle_features = vehicle_features.to(device)
#     initial_localization = initial_localization.to(device)
#     control_sequence = control_sequence.to(device)
#     target_traj = target_traj.to(device)

#     optimizer.zero_grad()
#     output = model(vehicle_features, initial_localization, control_sequence)
#     break

In [108]:
class TrajectoryLSTM(nn.Module):
    def __init__(self, input_size, hidden_size, num_layers):
        super(TrajectoryLSTM, self).__init__()
        self.hidden_size = hidden_size
        self.num_layers = num_layers

        # Vehicle feature embedding
        self.vehicle_embedding = nn.Embedding(num_embeddings=1000, embedding_dim=16)
        self.vehicle_fc = nn.Linear(16, hidden_size)

        # Initial localization embedding
        self.localization_lstm = nn.LSTM(input_size=4, hidden_size=hidden_size, num_layers=num_layers, batch_first=True)

        # Control sequence embedding
        self.control_lstm = nn.LSTM(input_size=2, hidden_size=hidden_size, num_layers=num_layers, batch_first=True)

        # Decoder LSTM
        self.decoder_lstm = nn.LSTM(input_size=hidden_size * 2, hidden_size=hidden_size, num_layers=num_layers, batch_first=True)

        # Output layer
        self.fc_out = nn.Linear(hidden_size, 4)  # Output x, y, sin(yaw), cos(yaw)

    def forward(self, vehicle_features, initial_localization, control_sequence):
        batch_size = control_sequence.size(0)
        seq_len = control_sequence.size(1)
    
        # Vehicle feature embedding
        vehicle_embedded = self.vehicle_embedding(vehicle_features).mean(dim=1)  # [batch_size, embedding_dim]
        vehicle_embedded = self.vehicle_fc(vehicle_embedded)  # [batch_size, hidden_size]
        # print(f"vehicle_embedded shape: {vehicle_embedded.shape}")
    
        # Expand vehicle_embedded
        vehicle_embedded_expanded = vehicle_embedded.unsqueeze(1).repeat(1, seq_len, 1)  # [batch_size, seq_len, hidden_size]
        # print(f"vehicle_embedded_expanded shape: {vehicle_embedded_expanded.shape}")
    
        # Initial localization embedding
        _, (hidden_loc, cell_loc) = self.localization_lstm(initial_localization)
        # print(f"hidden_loc shape: {hidden_loc.shape}")
    
        # Control sequence embedding
        control_output, _ = self.control_lstm(control_sequence)
        # print(f"control_output shape: {control_output.shape}")
    
        # Concatenate
        decoder_input = torch.cat((control_output, vehicle_embedded_expanded), dim=2)
        # print(f"decoder_input shape: {decoder_input.shape}")
    
        # Decoder LSTM
        decoder_output, _ = self.decoder_lstm(decoder_input, (hidden_loc, cell_loc))
        # print(f"decoder_output shape: {decoder_output.shape}")
    
        # Output layer
        output = self.fc_out(decoder_output)
        # print(f"output shape: {output.shape}")
    
        return output
    


In [121]:
import numpy as np

SEGMENT_LENGTH = 1.0

def calculate_metric_on_batch(output_np, target_np, segment_length=1.0):
    """
    output_np: numpy array of shape [batch_size, seq_len, 4], predicted x, y, sin(yaw), cos(yaw)
    target_np: numpy array of same shape, ground truth x, y, sin(yaw), cos(yaw)

    Returns:
        metric: float, the average metric over the batch
    """

    # Convert sin and cos back to yaw
    yaw_pred = np.arctan2(output_np[..., 2], output_np[..., 3])
    yaw_gt = np.arctan2(target_np[..., 2], target_np[..., 3])

    # Unpack x, y for predictions and targets
    x_pred, y_pred = output_np[..., 0], output_np[..., 1]
    x_gt, y_gt = target_np[..., 0], target_np[..., 1]

    # Compute c1 and c2 for predicted
    c1_pred = np.stack([x_pred, y_pred], axis=-1)
    c2_pred = c1_pred + segment_length * np.stack([np.cos(yaw_pred), np.sin(yaw_pred)], axis=-1)

    # Compute c1 and c2 for ground truth
    c1_gt = np.stack([x_gt, y_gt], axis=-1)
    c2_gt = c1_gt + segment_length * np.stack([np.cos(yaw_gt), np.sin(yaw_gt)], axis=-1)

    # Compute distances between corresponding points
    dist_c1 = np.linalg.norm(c1_pred - c1_gt, axis=-1)
    dist_c2 = np.linalg.norm(c2_pred - c2_gt, axis=-1)

    # Compute pose metric
    pose_metric = np.sqrt((dist_c1 ** 2 + dist_c2 ** 2) / 2.0)
    metric = np.mean(pose_metric)

    return metric


In [125]:
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
model = TrajectoryLSTM(input_size=4, hidden_size=64, num_layers=2).to(device)
criterion = nn.MSELoss()
optimizer = optim.Adam(model.parameters(), lr=1e-3)

In [127]:
# Training Loop with Metric Calculation
num_epochs = 10
for epoch in range(num_epochs):
    model.train()
    epoch_loss = 0
    epoch_metric = 0
    for vehicle_features, initial_localization, control_sequence, target_traj in tqdm(train_loader):
        vehicle_features = vehicle_features.to(device)
        initial_localization = initial_localization.to(device)
        control_sequence = control_sequence.to(device)
        target_traj = target_traj.to(device)

        optimizer.zero_grad()
        output = model(vehicle_features, initial_localization, control_sequence)
        loss = criterion(output, target_traj)
        loss.backward()
        torch.nn.utils.clip_grad_norm_(model.parameters(), max_norm=1.0)
        optimizer.step()
        epoch_loss += loss.item()

        # Calculate metric
        output_np = output.detach().cpu().numpy()
        target_np = target_traj.detach().cpu().numpy()
        batch_metric = calculate_metric_on_batch(output_np, target_np)
        epoch_metric += batch_metric

    avg_loss = epoch_loss / len(train_loader)
    avg_metric = epoch_metric / len(train_loader)
    print(f"Epoch [{epoch+1}/{num_epochs}], Loss: {avg_loss:.4f}, Metric: {avg_metric:.4f}")

    # Validation
    model.eval()
    val_loss = 0
    val_metric = 0
    with torch.no_grad():
        for vehicle_features, initial_localization, control_sequence, target_traj in val_loader:
            vehicle_features = vehicle_features.to(device)
            initial_localization = initial_localization.to(device)
            control_sequence = control_sequence.to(device)
            target_traj = target_traj.to(device)

            output = model(vehicle_features, initial_localization, control_sequence)
            loss = criterion(output, target_traj)
            val_loss += loss.item()

            # Calculate metric
            output_np = output.detach().cpu().numpy()
            target_np = target_traj.detach().cpu().numpy()
            batch_metric = calculate_metric_on_batch(output_np, target_np)
            val_metric += batch_metric

    avg_val_loss = val_loss / len(val_loader)
    avg_val_metric = val_metric / len(val_loader)
    print(f"Validation Loss: {avg_val_loss:.4f}, Validation Metric: {avg_val_metric:.4f}")


100%|██████████| 200/200 [00:02<00:00, 94.27it/s]


Epoch [1/10], Loss: 0.0365, Metric: 0.1405
Validation Loss: 0.0327, Validation Metric: 0.1340


100%|██████████| 200/200 [00:02<00:00, 94.45it/s]


Epoch [2/10], Loss: 0.0365, Metric: 0.1410
Validation Loss: 0.0320, Validation Metric: 0.1291


100%|██████████| 200/200 [00:02<00:00, 93.77it/s]


Epoch [3/10], Loss: 0.0361, Metric: 0.1381
Validation Loss: 0.0314, Validation Metric: 0.1252


100%|██████████| 200/200 [00:02<00:00, 95.21it/s]


Epoch [4/10], Loss: 0.0359, Metric: 0.1371
Validation Loss: 0.0308, Validation Metric: 0.1182


100%|██████████| 200/200 [00:02<00:00, 94.39it/s]


Epoch [5/10], Loss: 0.0362, Metric: 0.1387
Validation Loss: 0.0313, Validation Metric: 0.1249


100%|██████████| 200/200 [00:02<00:00, 92.03it/s]


Epoch [6/10], Loss: 0.0357, Metric: 0.1367
Validation Loss: 0.0313, Validation Metric: 0.1190


100%|██████████| 200/200 [00:02<00:00, 93.21it/s]


Epoch [7/10], Loss: 0.0355, Metric: 0.1349
Validation Loss: 0.0325, Validation Metric: 0.1370


100%|██████████| 200/200 [00:02<00:00, 92.40it/s]


Epoch [8/10], Loss: 0.0357, Metric: 0.1368
Validation Loss: 0.0312, Validation Metric: 0.1259


100%|██████████| 200/200 [00:02<00:00, 93.66it/s]


Epoch [9/10], Loss: 0.0355, Metric: 0.1361
Validation Loss: 0.0315, Validation Metric: 0.1222


100%|██████████| 200/200 [00:02<00:00, 91.84it/s]


Epoch [10/10], Loss: 0.0352, Metric: 0.1346
Validation Loss: 0.0316, Validation Metric: 0.1235


In [130]:
class TestDataset(Dataset):
    def __init__(self, dataset, normalizer):
        self.data = []
        sampling_interval_ns = 4e7  # 0.04 seconds in nanoseconds

        for testcase_id, testcase in dataset.items():
            # Get metadata
            metadata = testcase['metadata']
            vehicle_model = vehicle_model_encoder.transform([metadata['vehicle_model']])[0]
            vehicle_modification = vehicle_modification_encoder.transform([metadata['vehicle_model_modification']])[0]
            try:
                tires_front = tires_encoder.transform([metadata['tires']['front']])[0]
            except:
                tires_front = tires_encoder.transform([0])[0]
            try:
                tires_rear = tires_encoder.transform([metadata['tires']['rear']])[0]
            except:
                tires_rear = tires_encoder.transform([0])[0]
                
                
            vehicle_features = [vehicle_model, vehicle_modification, tires_front, tires_rear]

            # Get control commands
            control = testcase['control']
            control['acceleration_level'] = control['acceleration_level'].fillna(0)
            control_seq = control[['stamp_ns', 'acceleration_level', 'steering']].values

            # Get initial localization data (first 5 seconds)
            localization = testcase['localization']
            localization_seq = localization[['stamp_ns', 'x', 'y', 'yaw']].values

            # Resample to fixed time steps
            time_steps_control = np.arange(control_seq[0, 0], control_seq[-1, 0] + sampling_interval_ns, sampling_interval_ns)
            control_resampled = self.resample_sequence(control_seq, time_steps_control)

            time_steps_loc = np.arange(localization_seq[0, 0], localization_seq[-1, 0] + sampling_interval_ns, sampling_interval_ns)
            localization_resampled = self.resample_sequence(localization_seq, time_steps_loc)

            # Prepare data
            initial_localization = localization_resampled[:, 1:]  # Skip stamp_ns
            # Convert yaw to sin and cos
            yaw = initial_localization[:, 2]
            sin_yaw = np.sin(yaw)
            cos_yaw = np.cos(yaw)
            initial_localization = np.hstack((initial_localization[:, :2], sin_yaw[:, np.newaxis], cos_yaw[:, np.newaxis]))  # Shape: [125, 4]

            # Control sequence from 5s to 20s
            idx_start = int(5 / 0.04)  # Starting index at 5 seconds
            control_sequence = control_resampled[idx_start:idx_start+375, 1:]  # Shape: [375, 2]

            # Normalize data
            initial_localization[:, 0] = (initial_localization[:, 0] - normalizer['x_mean']) / normalizer['x_std']
            initial_localization[:, 1] = (initial_localization[:, 1] - normalizer['y_mean']) / normalizer['y_std']
            control_sequence[:, 0] = (control_sequence[:, 0] - normalizer['acceleration_mean']) / normalizer['acceleration_std']
            control_sequence[:, 1] = (control_sequence[:, 1] - normalizer['steering_mean']) / normalizer['steering_std']

            requested_stamps = testcase['requested_stamps']['stamp_ns'].values

            self.data.append({
                'testcase_id': testcase_id,
                'vehicle_features': vehicle_features,
                'initial_localization': initial_localization,
                'control_sequence': control_sequence,
                'requested_stamps': requested_stamps
            })

    def resample_sequence(self, seq, time_steps):
        df_seq = pd.DataFrame(seq, columns=['stamp_ns'] + [f'feat_{i}' for i in range(seq.shape[1]-1)])
        df_seq = df_seq.set_index('stamp_ns').reindex(time_steps, method='nearest').reset_index()
        return df_seq.values

    def __len__(self):
        return len(self.data)

    def __getitem__(self, idx):
        sample = self.data[idx]
        vehicle_features = torch.tensor(sample['vehicle_features'], dtype=torch.long)
        initial_localization = torch.tensor(sample['initial_localization'], dtype=torch.float32)
        control_sequence = torch.tensor(sample['control_sequence'], dtype=torch.float32)
        requested_stamps = sample['requested_stamps']
        testcase_id = sample['testcase_id']
        return testcase_id, vehicle_features, initial_localization, control_sequence, requested_stamps


In [140]:
# Load test dataset
test_data = TestDataset(test_dataset, normalizer=normalizer)
test_loader = DataLoader(test_data, batch_size=1, shuffle=False)

model.eval()
predictions = []
with torch.no_grad():
    for testcase_id, vehicle_features, initial_localization, control_sequence, requested_stamps in tqdm(test_loader):
        vehicle_features = vehicle_features.to(device)
        initial_localization = initial_localization.to(device)
        control_sequence = control_sequence.to(device)

        output = model(vehicle_features, initial_localization, control_sequence)
        output = output.cpu().numpy()[0]  # Shape: [375, 4]

        # Denormalize positions
        x_pred = output[:, 0] * normalizer['x_std'] + normalizer['x_mean']
        y_pred = output[:, 1] * normalizer['y_std'] + normalizer['y_mean']
        # Convert sin and cos back to yaw
        yaw_pred = np.arctan2(output[:, 2], output[:, 3])

        # Get indices corresponding to requested stamps
        time_steps = np.arange(5 * 1e9, 20 * 1e9, 4e7)  # From 5s to 20s every 0.04s
        indices = np.searchsorted(time_steps, requested_stamps - 5 * 1e9)

        x_pred = x_pred[indices]
        y_pred = y_pred[indices]
        yaw_pred = yaw_pred[indices]

        for stamp_ns, x, y, yaw in zip(requested_stamps, x_pred, y_pred, yaw_pred):
            predictions.append({
                'testcase_id': testcase_id.item(),
                'stamp_ns': stamp_ns.detach().cpu().numpy().astype(int),
                'x': x,
                'y': y,
                'yaw': yaw
            })

# Create the submission DataFrame
submission_df = pd.DataFrame(predictions)
submission_df = submission_df[['testcase_id', 'stamp_ns', 'x', 'y', 'yaw']]
submission_df.to_csv('predictions.csv', index=False)

100%|██████████| 8000/8000 [00:23<00:00, 339.31it/s]


### read test dataset 

In [110]:
import numpy as np

NSECS_IN_SEC = 1000000000

def secs_to_nsecs(secs: float):
    return int(secs * NSECS_IN_SEC)

def nsecs_to_secs(nsecs: int):
    return float(nsecs) / NSECS_IN_SEC

def yaw_direction(yaw_value):
    return np.array([np.cos(yaw_value), np.sin(yaw_value)])

### simple pose prediction logic without taking into account control states 

In [None]:
def localization_df_to_poses(loc_df):
    poses = []
    for stamp_ns, x, y, yaw in zip(loc_df['stamp_ns'], loc_df['x'], loc_df['y'], loc_df['yaw']):
        poses.append({'stamp_ns': stamp_ns, 'pos': np.array([x, y]), 'yaw': yaw})
    return poses

# naive estimation of speed at last known localization pose
def dummy_estimate_last_speed(localization_poses):
    last_pose = localization_poses[-1]
    
    start_pose_idx = -1
    for i, pose in enumerate(localization_poses, start=1-len(localization_poses)):
        start_pose_idx = i
        if nsecs_to_secs(last_pose['stamp_ns']) - nsecs_to_secs(pose['stamp_ns']) > 1.: # sec
            break
            
    start_pose = localization_poses[start_pose_idx]
    dt_sec = nsecs_to_secs(last_pose['stamp_ns']) - nsecs_to_secs(start_pose['stamp_ns'])
    
    if dt_sec > 1e-5:
        return np.linalg.norm(last_pose['pos'][:2] - start_pose['pos'][:2]) / dt_sec
    return 5. # some default value

def dummpy_predict_pose(last_loc_pose: dict, last_speed: float, prediction_stamp: int):
    dt_sec = nsecs_to_secs(prediction_stamp) - nsecs_to_secs(last_loc_pose['stamp_ns'])
    distance = dt_sec * last_speed
    direction = yaw_direction(last_loc_pose['yaw'])
    pos_translate = direction * distance
    return {"pos": last_loc_pose['pos'] + pos_translate, 'yaw': last_loc_pose['yaw']}

In [None]:
def predict_testcase(testcase: dict):
    loc_df = testcase['localization']
    localization_poses = localization_df_to_poses(loc_df)
    
    last_loc_pose = localization_poses[-1]
    last_speed = dummy_estimate_last_speed(localization_poses)
    
    predicted_poses = []
    for stamp in testcase['requested_stamps']['stamp_ns']:
        pose = dummpy_predict_pose(last_loc_pose, last_speed, stamp)
        predicted_poses.append(pose)
        
    predictions = {}
    predictions['stamp_ns'] = testcase['requested_stamps']['stamp_ns']
    predictions['x'] = [pose['pos'][0] for pose in predicted_poses]
    predictions['y'] = [pose['pos'][1] for pose in predicted_poses]
    predictions['yaw'] = [pose['yaw'] for pose in predicted_poses]
    return pd.DataFrame(predictions)

def predict_test_dataset(test_dataset: dict):
    predictions = {}
    for testcase_id, testcase in tqdm(test_dataset.items()): 
        predictions[testcase_id] = predict_testcase(testcase)
    return predictions

### make prediction for requested stamps 

In [None]:
test_predictions = predict_test_dataset(test_dataset)
len(test_predictions)

### write predictions 

In [None]:
def write_predictions(dataset_predictions: dict, prediction_file_path: str):
    prediction_list = []
    for testcase_id, prediction in tqdm(dataset_predictions.items()):
        prediction['testcase_id'] = [testcase_id] * len(prediction)
        prediction_list.append(prediction)
    predictions_df = pd.concat(prediction_list)
    predictions_df = predictions_df.reindex(columns=["testcase_id", "stamp_ns", "x", "y", "yaw"])
    print(len(predictions_df))
    predictions_df.to_csv(prediction_file_path, index=False, header=True)

In [None]:
write_predictions(test_predictions, os.path.join(ROOT_DATA_FOLDER, "predictions.csv"))

# Calculate metric

Let's describe final metric. As a first step, all predicted triples $(x,y,yaw)$ are being converted into 2 points $[(x_1, y_1), (x_2, y_2)]$ in the following way:
$$
(x_1, y_1) = (x, y), \\
(x_2, y_2) = (x_1, y_1) + S \times (yaw_x, yaw_y)
$$  

where $S = 1$. In other words, we build a directed segment of length $1$. These points then used in the metric calculation.


Metric for a single pose (rmse):

$$
pose\_metric = \sqrt{ \frac{\displaystyle\sum_{j=1}^{k} {(x_j-\hat{x_j})^2 + (y_j-\hat{y_j})^2}}{k} }
$$

where $k$ - number of points that describe single pose (in our case $k=2$).

Metric for a testcase:

$$
testcase\_metric = \frac{1}{n}  \displaystyle\sum_{i=1}^{n}pose\_metric_i
$$

where $n$ - number of localization points to predict.

And, final metric for a whole dataset:

$$
dataset\_metric = \frac{1}{n}  \displaystyle\sum_{i=1}^{n}testcase\_metric_i
$$

where $n$ - number of test cases.


### implementation of the metric calculation 

In [None]:
import numpy as np
import pandas as pd

SEGMENT_LENGTH = 1.

def yaw_direction(yaw_value):
    return np.array([np.cos(yaw_value), np.sin(yaw_value)])

def build_car_points(x_y_yaw):
    directions = np.vstack(yaw_direction(x_y_yaw[:, -1]))
    
    front_points = x_y_yaw[:, :-1] + SEGMENT_LENGTH * directions.T
    points = np.vstack([x_y_yaw[:, :-1], front_points])
    return points

def build_car_points_from_merged_df(df: pd.DataFrame):
    points_gt = df[['x_gt', 'y_gt', 'yaw_gt']].to_numpy()
    points_pred = df[['x_pred', 'y_pred', 'yaw_pred']].to_numpy()
    
    points_gt = build_car_points(points_gt)
    points_pred = build_car_points(points_pred)
    return points_gt, points_pred

def calculate_metric_testcase(df: pd.DataFrame):        
    points_gt, points_pred = build_car_points_from_merged_df(df)
    
    metric = np.mean(np.sqrt(2. * np.mean((points_gt - points_pred) ** 2, axis=1)))
    return metric

def calculate_metric_dataset(ground_truth_df: pd.DataFrame, prediction_df: pd.DataFrame):
    assert (len(ground_truth_df) == len(prediction_df))
    
    df = ground_truth_df.merge(prediction_df, on=['testcase_id', 'stamp_ns'], suffixes=['_gt', '_pred'])
    
    metric = df.groupby('testcase_id').apply(calculate_metric_testcase)
    return np.mean(metric)