In [1]:
import pandas as pd
import os
import math
import random
import numpy as np
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
import matplotlib.pyplot as plt
from datetime import datetime
from PIL import Image
from torch.utils.data import Dataset, DataLoader
import torchvision.transforms as transforms

def set_seed(seed=42):
    """
    Set random seeds for reproducibility across Python, NumPy, and PyTorch.
    """
    random.seed(seed)
    os.environ['PYTHONHASHSEED'] = str(seed)
    np.random.seed(seed)
    torch.manual_seed(seed)
    if torch.cuda.is_available():
        torch.cuda.manual_seed(seed)
        torch.cuda.manual_seed_all(seed)
    
    # Force deterministic behavior for convolutional algorithms
    torch.backends.cudnn.deterministic = True
    torch.backends.cudnn.benchmark = False
    print(f">>> Global seed set to: {seed}")

set_seed(42)

>>> Global seed set to: 42


In [2]:
# --- Dataset Definition ---

class PilotNetDataset(Dataset):
    """
    Custom Dataset for loading driving images and corresponding steering angles.
    """
    def __init__(self, label_file, img_dir, transform=None):
        self.data = pd.read_csv(label_file, sep=' ', names=['filename', 'angle'], header=None)
        self.img_dir = img_dir
        self.transform = transform

    def __len__(self):
        return len(self.data)

    def __getitem__(self, idx):
        img_name = os.path.join(self.img_dir, self.data.iloc[idx, 0])
        # Convert image to grayscale ('L' mode) as per project requirements
        image = Image.open(img_name).convert('L') 
        
        angle = torch.tensor([float(self.data.iloc[idx, 1])], dtype=torch.float32)
        
        if self.transform:
            image = self.transform(image)
            
        return image, angle

In [3]:
# --- Image Transforms ---

train_transform = transforms.Compose([
    transforms.Resize((64, 64)),
    transforms.ColorJitter(brightness=0.3, contrast=0.3),
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.5], std=[0.5])
])

eval_transform = transforms.Compose([
    transforms.Resize((64, 64)),
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.5], std=[0.5])
])

In [4]:
# --- Model Architecture ---

class PilotNet(nn.Module):
    def __init__(self):
        super(PilotNet, self).__init__()           
        self.conv1 = nn.Conv2d(1, 24, kernel_size=5, stride=2)
        self.conv2 = nn.Conv2d(24, 36, kernel_size=5, stride=2)
        self.conv3 = nn.Conv2d(36, 48, kernel_size=5, stride=2)
        self.conv4 = nn.Conv2d(48, 64, kernel_size=3, stride=1)            
        self.conv5 = nn.Conv2d(64, 64, kernel_size=3, stride=1)
        
       
        self.fc1 = nn.Linear(64, 100)
        self.dropout1 = nn.Dropout(0.5)
        self.fc2 = nn.Linear(100, 50)
        self.dropout2 = nn.Dropout(0.5)
        self.fc3 = nn.Linear(50, 10)
        self.fc4 = nn.Linear(10, 1)

    def forward(self, x):
      
        x = F.elu(self.conv1(x))
        x = F.elu(self.conv2(x))
        x = F.elu(self.conv3(x))
        x = F.elu(self.conv4(x))
        x = F.elu(self.conv5(x))
        
        x = x.view(x.size(0), -1) 
        
        x = F.elu(self.fc1(x))
        x = self.dropout1(x)
        x = F.elu(self.fc2(x))
        x = self.dropout2(x)
        x = F.elu(self.fc3(x))
        
#        x = self.fc4(x)
        x = torch.tanh(self.fc4(x))
        return x

model = PilotNet()


In [5]:
# --- Training and Validation Functions ---

def train_and_plot(model, train_loader, val_loader, criterion, optimizer, scheduler, epochs=15, log_name="final_run"):
    history = {'epoch': [], 'train_rmse': [], 'val_rmse': [], 'lr': []}
    best_val_rmse = float('inf') 
    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
    model.to(device)
    
    for epoch in range(epochs):
        # --- Training Phase ---
        model.train()
        train_loss = 0.0
        for inputs, labels in train_loader:
            inputs, labels = inputs.to(device), labels.to(device)
            optimizer.zero_grad()
            outputs = model(inputs)
            loss = criterion(outputs, labels.float().view_as(outputs))
            loss.backward()
            optimizer.step()
            train_loss += loss.item()
        
        # --- Validation Phase ---
        model.eval()
        val_loss = 0.0
        with torch.no_grad():
            for inputs, labels in val_loader:
                inputs, labels = inputs.to(device), labels.to(device)
                outputs = model(inputs)
                v_loss = criterion(outputs, labels.float().view_as(outputs))
                val_loss += v_loss.item()
        
        # Calculate RMSE (Root Mean Squared Error)
        t_rmse = math.sqrt(train_loss / len(train_loader))
        v_rmse = math.sqrt(val_loss / len(val_loader))
      
        # Update Learning Rate Scheduler
        scheduler.step() 
        current_lr = optimizer.param_groups[0]['lr']

        history['epoch'].append(epoch + 1)
        history['train_rmse'].append(t_rmse)
        history['val_rmse'].append(v_rmse)
        history['lr'].append(current_lr)
        
        print(f"Epoch {epoch+1}: LR: {current_lr:.6f}, Train RMSE: {t_rmse:.4f}, Val RMSE: {v_rmse:.4f}")
        
        # Save the best model based on validation performance
        if v_rmse < best_val_rmse:
            best_val_rmse = v_rmse
            torch.save(model.state_dict(), 'best_pilotnet_model.pth')
            print(f">>> New best model saved with Val RMSE: {v_rmse:.4f}")

    # --- Save Logs and Plotting ---
    log_df = pd.DataFrame(history)
    log_filename = f"log_{log_name}_{datetime.now().strftime('%m%d_%H%M')}.csv"
    log_df.to_csv(log_filename, index=False)

    plt.figure(figsize=(10, 5))
    plt.plot(range(1, epochs+1), history['train_rmse'], label='Train RMSE', marker='o')
    plt.plot(range(1, epochs+1), history['val_rmse'], label='Val RMSE', marker='s')
    plt.axhline(y=best_val_rmse, color='r', linestyle='--', label=f'Best Val RMSE: {best_val_rmse:.4f}')
    plt.xlabel('Epochs')
    plt.ylabel('RMSE')
    plt.title('PilotNet Training Performance')
    plt.legend()
    plt.grid(True)
    plt.savefig('RMSE.png')
    plt.show()

In [None]:
# --- Execution ---

    # 1. Data Loading
train_dataset = PilotNetDataset('train/labels.txt', 'train/images', transform=train_transform)
val_dataset = PilotNetDataset('val/labels.txt', 'val/images', transform=eval_transform)
    
train_loader = DataLoader(train_dataset, batch_size=32, shuffle=True, num_workers=0)
val_loader = DataLoader(val_dataset, batch_size=32, shuffle=False, num_workers=0)

    # 2. Model Initialization
model = PilotNet()
criterion = nn.HuberLoss(delta=1.0)
optimizer = optim.AdamW(model.parameters(), lr=0.0005, weight_decay=0.05)
    
    # Using StepLR for predictable decay schedule
scheduler = optim.lr_scheduler.StepLR(optimizer, step_size=5, gamma=0.2)

    # 3. Start Training
train_and_plot(
    model=model, 
    train_loader=train_loader, 
    val_loader=val_loader, 
    criterion=criterion, 
    optimizer=optimizer, 
    scheduler=scheduler,
    epochs=15,
    log_name="final_optimized_run"
    )

Epoch 1: LR: 0.000500, Train RMSE: 0.2135, Val RMSE: 0.2749
>>> New best model saved with Val RMSE: 0.2749
