# Step 1: Parse input data into readable datasets

In [1]:
import os
import pandas as pd
import torch
import torch.nn.functional as F
import torch.nn as nn
import torch.optim as optim

from torch.utils.data import TensorDataset, DataLoader, random_split

In [None]:
# Set device

# Check if CUDA is available
device = torch.device('cpu')
if torch.cuda.is_available():
    device = torch.device('cuda')

torch.set_default_device(device)
print(f"Using device = {torch.get_default_device()}")

Using device = cuda:0


: 

In [None]:

def load_vehicle_annotations(folder_path, save_encoded=False, filepath=None):
    """
    Reads KITTI-style label text files and extracts:
    xmin, xmax, ymin, ymax, label, and distance (z in camera coordinates).

    Returns both a DataFrame and PyTorch tensors with one-hot encoded labels.
    """
    records = []

    # Collect all .txt files in numerical order
    files = sorted([f for f in os.listdir(folder_path) if f.endswith('.txt')])

    for file in files:
        file_id = os.path.splitext(file)[0]  # e.g. "000000"
        file_path = os.path.join(folder_path, file)

        with open(file_path, 'r') as f:
            for line in f:
                parts = line.strip().split()
                if not parts:
                    continue

                label = parts[0]
                if label == 'DontCare':
                    continue  # skip unlabeled/ignored regions

                # bounding box coords
                xmin = float(parts[4])
                ymin = float(parts[5])
                xmax = float(parts[6])
                ymax = float(parts[7])

                # z-coordinate = distance from camera
                distance = float(parts[13])

                records.append({
                    'file_id': file_id,
                    'label': label,
                    'xmin': xmin,
                    'ymin': ymin,
                    'xmax': xmax,
                    'ymax': ymax,
                    'distance': distance
                })

    df = pd.DataFrame(records)

    # --- One-hot encode labels ---
    unique_labels = sorted(df['label'].unique())
    label_to_idx = {label: idx for idx, label in enumerate(unique_labels)}
    # df['label_id'] = df['label'].map(label_to_idx)
    
    
    # Convert to integer indices first
    label_indices = torch.tensor(df['label'].map(label_to_idx).values, dtype=torch.long)
    # Convert to one-hot encoding
    num_classes = len(unique_labels)
    labels_onehot = F.one_hot(label_indices, num_classes=num_classes) #.float()

    # --- Convert features to PyTorch tensors ---
    features = torch.tensor(
        df[['xmin', 'xmax', 'ymin', 'ymax']].values, dtype=torch.float32
    )
    features = torch.cat((features, labels_onehot), dim=1)  # concatenate one-hot label to be a feature
    
    distances = torch.tensor(df['distance'].values, dtype=torch.float32).unsqueeze(1)
    

    if save_encoded:
        if filepath == None:
            torch.save({
                "features": features,
                "distances": distances,
                "label_map": label_to_idx
            }, os.path.join(folder_path, "vehicle_dataset.pt"))
            print(f"Saved encoded dataset to {folder_path}/vehicle_dataset.pt")
        else:
            torch.save({
                "features": features,
                "distances": distances,
                "label_map": label_to_idx
            }, filepath)
            print(f"Saved encoded dataset to {filepath}")

    return df, features, distances, label_to_idx


: 

## Load training and validation datasets from preprocessed files

In [None]:
class VehicleAnnotationDataset(torch.utils.data.Dataset):
    """
    Custom Dataset for vehicle annotations that returns features and targets.
    """
    def __init__(self, features, targets):
        """
        Args:
            features: tensor of shape [N, num_features]
            targets: tensor of shape [N, 1] or [N]
        """
        self.features = features
        self.targets = targets.squeeze() if targets.dim() > 1 else targets
    
    def __len__(self):
        return len(self.features)
    
    def __getitem__(self, idx):
        return self.features[idx], self.targets[idx]


def load_vehicle_annotations_dataset(folder_path, column_mapping, target_column, 
                                      save_encoded=False, filepath=None):
    """
    Reads KITTI-style label text files and extracts features and targets.
    
    Args:
        folder_path: path to folder containing KITTI label files
        column_mapping: dict mapping column names to KITTI file indices
                       e.g., {'xmin': 4, 'xmax': 6, 'ymin': 5, 'ymax': 7, 'label': 0, 'distance': 13}
        target_column: str, name of the target column (e.g., 'distance')
        save_encoded: bool, whether to save the dataset to disk
        filepath: str, path to save the dataset (if save_encoded=True)
    
    Returns:
        VehicleAnnotationDataset: custom dataset object
        label_map: dict mapping class labels to indices (if 'label' in column_mapping)
    """
    records = []
    files = sorted([f for f in os.listdir(folder_path) if f.endswith('.txt')])
    
    for file in files:
        file_id = os.path.splitext(file)[0]
        file_path = os.path.join(folder_path, file)
        
        with open(file_path, 'r') as f:
            for line in f:
                parts = line.strip().split()
                if not parts:
                    continue
                
                # Skip DontCare labels
                if 'label' in column_mapping and parts[column_mapping['label']] == 'DontCare':
                    continue
                
                record = {'file_id': file_id}
                for col_name, col_idx in column_mapping.items():
                    if col_name == 'label':
                        record[col_name] = parts[col_idx]
                    else:
                        record[col_name] = float(parts[col_idx])
                
                records.append(record)
    
    df = pd.DataFrame(records)
    
    # Get feature columns (all except target and file_id)
    feature_cols = [col for col in column_mapping.keys() if col != 'label']
    
    # One-hot encode label if present
    label_map = None
    if 'label' in column_mapping:
        unique_labels = sorted(df['label'].unique())
        label_map = {label: idx for idx, label in enumerate(unique_labels)}
        label_indices = torch.tensor(df['label'].map(label_map).values, dtype=torch.long)
        labels_onehot = F.one_hot(label_indices, num_classes=len(unique_labels))
        
        # Build features with one-hot encoded labels
        features = torch.tensor(
            df[[col for col in feature_cols if col != 'label']].values, dtype=torch.float32
        )
        features = torch.cat((features, labels_onehot), dim=1)
    else:
        features = torch.tensor(df[feature_cols].values, dtype=torch.float32)
    
    # Get target
    targets = torch.tensor(df[target_column].values, dtype=torch.float32).unsqueeze(1)
    
    # Create dataset
    dataset = VehicleAnnotationDataset(features, targets)
    
    # Save if requested
    if save_encoded:
        if filepath is None:
            filepath = os.path.join(folder_path, "vehicle_dataset_custom.pt")
        torch.save({
            "features": features,
            "targets": targets,
            "label_map": label_map,
            "column_mapping": column_mapping
        }, filepath)
        print(f"Saved dataset to {filepath}")
    
    return dataset, label_map


: 

In [None]:
column_mapping = {'xmin': 4, 'xmax': 6, 'ymin': 5, 'ymax': 7, 'label': 0, 'distance': 13}
dataset, label_map = load_vehicle_annotations_dataset(
    'dataset/training/label_2', 
    column_mapping=column_mapping,
    target_column='distance',
    save_encoded=True
)

## Basic NN implementation

In [4]:

class DistanceRegressorOneHot(nn.Module):
    def __init__(self, input_dim, hidden_dim=64):
        """
        input_dim: total number of features including one-hot label
        """
        super(DistanceRegressorOneHot, self).__init__()
        self.model = nn.Sequential(
            nn.Linear(input_dim, hidden_dim),
            nn.ReLU(),
            nn.Linear(hidden_dim, hidden_dim),
            nn.ReLU(),
            nn.Linear(hidden_dim, 1)  # output: distance
        )

    def forward(self, x):
        return self.model(x)


# --- Training function ---
def train_distance_model_onehot(features, distances, epochs=50, batch_size=64, lr=1e-3, verbose=False):
    """
    features: tensor of shape [N, 4] -> xmin, xmax, ymin, ymax
    labels: tensor of shape [N] -> label_id as integer
    """

    

    X = features
    y = distances  # distance targets

    # Dataset and split
    dataset = TensorDataset(X, y)
    n_train = int(0.8 * len(dataset))
    n_val = len(dataset) - n_train
    train_ds, val_ds = random_split(dataset, [n_train, n_val], generator=torch.Generator(device=device))

    train_loader = DataLoader(train_ds, batch_size=batch_size, shuffle=True, generator=torch.Generator(device=device))
    val_loader = DataLoader(val_ds, batch_size=batch_size, generator=torch.Generator(device=device))

    # Initialize model
    model = DistanceRegressorOneHot(input_dim=X.shape[1])
    criterion = nn.MSELoss()
    optimizer = optim.Adam(model.parameters(), lr=lr)

    for epoch in range(epochs):
        model.train()
        train_loss = 0.0
        for X_batch, y_batch in train_loader:
            optimizer.zero_grad()
            preds = model(X_batch)
            loss = criterion(preds, y_batch)
            loss.backward()
            optimizer.step()
            train_loss += loss.item() * X_batch.size(0)
        train_loss /= len(train_loader.dataset)

        # Validation
        model.eval()
        val_loss = 0.0
        with torch.no_grad():
            for X_batch, y_batch in val_loader:
                preds = model(X_batch)
                val_loss += criterion(preds, y_batch).item() * X_batch.size(0)
        val_loss /= len(val_loader.dataset)
        if verbose:
            print(f"Epoch {epoch+1:03d} | Train Loss: {train_loss:.4f} | Val Loss: {val_loss:.4f}")

    return model


In [None]:
df, X, y, label_map = load_vehicle_annotations("dataset/training/label_2", save_encoded=True, filepath="dataset/vehicle_dataset.pt")

print(df.head())
print(X[:5])
print(y[:5])
print(label_map)
print(X.shape, y.shape)

In [None]:
dataset = torch.load("dataset/vehicle_dataset.pt",map_location=device)
X = dataset["features"]
y = dataset["distances"]

model = train_distance_model_onehot(X, y, epochs=10, verbose=True)

tensor([[712.4000, 810.7300, 143.0000, 307.9200,   0.0000,   0.0000,   0.0000,
           1.0000,   0.0000,   0.0000,   0.0000,   0.0000],
        [599.4100, 629.7500, 156.4000, 189.2500,   0.0000,   0.0000,   0.0000,
           0.0000,   0.0000,   0.0000,   1.0000,   0.0000],
        [387.6300, 423.8100, 181.5400, 203.1200,   1.0000,   0.0000,   0.0000,
           0.0000,   0.0000,   0.0000,   0.0000,   0.0000],
        [676.6000, 688.9800, 163.9500, 193.9300,   0.0000,   1.0000,   0.0000,
           0.0000,   0.0000,   0.0000,   0.0000,   0.0000],
        [804.7900, 995.4300, 167.3400, 327.9400,   0.0000,   0.0000,   1.0000,
           0.0000,   0.0000,   0.0000,   0.0000,   0.0000]], device='cuda:0')
Epoch 001 | Train Loss: 194.3690 | Val Loss: 94.7531
Epoch 002 | Train Loss: 82.0346 | Val Loss: 80.3481
Epoch 003 | Train Loss: 63.2807 | Val Loss: 60.1382
Epoch 004 | Train Loss: 52.6793 | Val Loss: 51.1591
Epoch 005 | Train Loss: 44.4281 | Val Loss: 46.2634
Epoch 006 | Train Loss: 37

In [None]:
model.eval()

with torch.no_grad():
    sample_preds = model(X[:5])
    print("Sample Predictions:", sample_preds.squeeze().cpu().numpy())
    print("Actual Distances:", y[:5].squeeze().cpu().numpy())