In [13]:
# Cell 1: Setup and Imports
import os
from pathlib import Path
import pandas as pd
from PIL import Image
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader, random_split
import torchvision.transforms as transforms
import logging
import optuna

# Import model-specific modules
from ncps.wirings import AutoNCP
from ncps.torch import CfC

# Set up basic logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

# Global device configuration
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")


In [15]:
# Cell 2: DrivingDataset Class Definition

# Define CSV column names
COLUMN_NAMES = ["center", "left", "right", "steering", "throttle", "brake", "speed"]

class DrivingDataset(Dataset):
    """Dataset for autonomous driving images and targets."""

    def __init__(self, csv_file, root_dir, transform=None, sequence_length=5):
        """
        Args:
            csv_file (str): Path to the CSV file.
            root_dir (str): Directory where images are stored.
            transform (callable, optional): Transformations to apply to images.
            sequence_length (int): Number of consecutive frames per sample.
        """
        self.df = pd.read_csv(csv_file, names=COLUMN_NAMES)
        # Keep only required columns
        self.df = self.df[["center", "left", "right", "steering", "throttle", "brake"]]
        self.root_dir = Path(root_dir)
        self.transform = transform
        self.sequence_length = sequence_length

    def get_image_path(self, col_value):
        """Construct full image path from CSV entry."""
        filename = Path(col_value.strip()).name
        return self.root_dir / "IMG" / filename

    def load_image(self, path: Path):
        """Load image and convert to RGB. On failure, returns a blank image."""
        try:
            img = Image.open(path).convert("RGB")
        except Exception as e:
            logger.error(f"Error loading image {path}: {e}")
            # Return a blank image (default size 200x66); adjust if necessary.
            img = Image.new("RGB", (200, 66))
        return img

    def __len__(self):
        """Return the number of sequences in the dataset."""
        return len(self.df) - self.sequence_length + 1

    def __getitem__(self, idx):
        images_seq = []
        # Iterate over the sequence of frames
        for i in range(self.sequence_length):
            row = self.df.iloc[idx + i]
            # Construct paths for center, left, and right images
            center_path = self.get_image_path(row["center"])
            left_path = self.get_image_path(row["left"])
            right_path = self.get_image_path(row["right"])
            
            # Load images with error handling
            center_img = self.load_image(center_path)
            left_img = self.load_image(left_path)
            right_img = self.load_image(right_path)
            
            # Apply transformations if provided
            if self.transform:
                center_img = self.transform(center_img)
                left_img = self.transform(left_img)
                right_img = self.transform(right_img)
            
            # Stack camera views: resulting shape (3, channels, height, width)
            images = torch.stack([center_img, left_img, right_img], dim=0)
            images_seq.append(images)
        
        # Stack sequence: shape (sequence_length, 3, channels, height, width)
        images_seq = torch.stack(images_seq, dim=0)
        
        # Use target values from the last frame
        target_row = self.df.iloc[idx + self.sequence_length - 1]
        target = torch.tensor([
            target_row["steering"],
            target_row["throttle"],
            target_row["brake"]
        ], dtype=torch.float32)
        return images_seq, target


In [16]:
# Cell 3: Data Transformations and Dataset Initialization

# Define image transformations
transform = transforms.Compose([
    transforms.Resize((66, 200)),
    transforms.ToTensor(),
    transforms.Normalize((0.5, 0.5, 0.5), (0.5, 0.5, 0.5))
])

# Update these paths to match your dataset location.
csv_path = r"C:\Users\harsh\OneDrive\Desktop\Udacity datset 2\self_driving_car_dataset_make\driving_log.csv"
root_dir = r"C:\Users\harsh\OneDrive\Desktop\Udacity datset 2\self_driving_car_dataset_make"

# Create dataset instance
sequence_length = 5
dataset = DrivingDataset(csv_file=csv_path, root_dir=root_dir, transform=transform, sequence_length=sequence_length)

# Print total samples and inspect one sample
print("Total samples in dataset:", len(dataset))
sample_images, sample_target = dataset[0]
print("Sample images shape:", sample_images.shape)  # Expected: (sequence_length, 3, channels, 66, 200)
print("Sample target (steering, throttle, brake):", sample_target)




In [17]:
# Cell 4: TemporalSequenceLearner Model Definition

class TemporalSequenceLearner(nn.Module):
    def __init__(self, hidden_neurons, image_channels=3, pretrained_weights_path=None,sparsity_level=0.5):
        """
        Args:
            hidden_neurons (int): Number of hidden units for the CfC wiring.
            image_channels (int): Number of input image channels (e.g., 3 for RGB).
            pretrained_weights_path (str, optional): Path to pretrained weights.
        """
        super().__init__()
        # Shared feature extractor for individual images
        self.feature_extractor = nn.Sequential(
            nn.Conv2d(image_channels, 24, kernel_size=5, stride=2),
            nn.ReLU(),
            nn.Conv2d(24, 36, kernel_size=5, stride=2),
            nn.ReLU(),
            nn.Conv2d(36, 48, kernel_size=5, stride=2),
            nn.ReLU(),
            nn.Conv2d(48, 64, kernel_size=3, stride=1),
            nn.ReLU(),
            nn.Conv2d(64, 64, kernel_size=3, stride=1),
            nn.ReLU()
        )
        if pretrained_weights_path:
            self.feature_extractor.load_state_dict(torch.load(pretrained_weights_path))
        # Freeze feature extractor parameters
        for param in self.feature_extractor.parameters():
            param.requires_grad = False
        
        # Computed feature dimension for images of size (3, 66, 200)
        self.feature_dim = 1152
        
        # Configure AutoNCP to output 3 values (steering, throttle, brake)
        wiring = AutoNCP(hidden_neurons, 3,sparsity_level=sparsity_level)
        wiring.adjacency_matrix = torch.tensor(wiring.adjacency_matrix).cpu()
        if wiring.sensory_adjacency_matrix is not None:
            wiring.sensory_adjacency_matrix = torch.tensor(wiring.sensory_adjacency_matrix).cpu()

        self.classifier = CfC(self.feature_dim, wiring)
        # Final output layer maps CfC output to a 3-element vector.
        self.output_layer = nn.Linear(3, 3)
    
    def forward(self, images):
        """
        Args:
            images: Tensor of shape (batch, T, 3, channels, height, width).
        Returns:
            Tensor of shape (batch, 3) representing steering, throttle, and brake.
        """
        batch, T, num_views, channels, height, width = images.size()
        # Merge batch, time, and view dimensions: shape (batch*T*num_views, channels, height, width)
        images = images.view(batch * T * num_views, channels, height, width)
        features = self.feature_extractor(images)
        features = features.view(features.size(0), -1)  # Flatten features
        
        # Reshape to (batch, T, num_views, feature_dim) and average over views
        features = features.view(batch, T, num_views, self.feature_dim).mean(dim=2)
        
        # Process temporal sequence with CfC; output shape: (batch, T, 3)
        classifier_out, _ = self.classifier(features)
        final_time_step = classifier_out[:, -1, :]  # Use last time step
        
        x = self.output_layer(final_time_step)  # (batch, 3)
        # Apply activation constraints: steering with tanh, throttle and brake with sigmoid
        steering = torch.tanh(x[:, 0:1])
        throttle = torch.sigmoid(x[:, 1:2])
        brake = torch.sigmoid(x[:, 2:3])
        return torch.cat([steering, throttle, brake], dim=1)


In [18]:
# Cell 5: Data Preparation and Hyperparameter Tuning

def prepare_data(sequence_length=5, batch_size=32):
    """
    Prepares training and validation data loaders.
    """
    # Update these paths to match your dataset location.
    csv_path = r"C:\Users\harsh\OneDrive\Desktop\Udacity datset 2\self_driving_car_dataset_make\driving_log.csv"
    root_dir = r"C:\Users\harsh\OneDrive\Desktop\Udacity datset 2\self_driving_car_dataset_make"
    jungle_csv_path=r"C:\Users\harsh\OneDrive\Desktop\Udacity datset 2\self_driving_car_dataset_jungle\driving_log.csv"
    jungle_root_dir=r"C:\Users\harsh\OneDrive\Desktop\Udacity datset 2\self_driving_car_dataset_jungle"
    transform = transforms.Compose([
        transforms.Resize((66, 200)),
        transforms.ToTensor(),
        transforms.Normalize((0.5, 0.5, 0.5), (0.5, 0.5, 0.5))
    ])
    
    dataset = DrivingDataset(csv_file=csv_path, root_dir=root_dir, transform=transform, sequence_length=sequence_length)
    jungle_dataset=DrivingDataset(csv_file=jungle_csv_path, root_dir=jungle_root_dir, transform=transform, sequence_length=sequence_length)
    # Split into Train (70%), CV (20%), Test (10%)
    train_size = int(0.8 * len(dataset))
    cv_size = int(0.1 * len(dataset))
    test_size = len(dataset) - train_size - cv_size

    train_dataset, cv_dataset, test_dataset = torch.utils.data.random_split(dataset, [train_size, cv_size, test_size])

    train_loader = DataLoader(train_dataset, batch_size=32, shuffle=True)
    cv_loader = DataLoader(cv_dataset, batch_size=32, shuffle=False)
    test_loader = DataLoader(test_dataset, batch_size=32, shuffle=False)
    jungle_data_loader=DataLoader(jungle_dataset,batch_size=32,shuffle=False)
    return train_loader, cv_loader, test_loader,jungle_data_loader

# Prepare data loaders
jungle_data_loader,train_loader,cv_loader, test_loader = prepare_data(sequence_length)


def objective(trial):
    """
    Objective function for hyperparameter tuning with Optuna.
    """
    hidden_neurons = trial.suggest_int("hidden_neurons", 16, 96, step=4)
    sparsity_level = trial.suggest_float("sparsity_level", 0.1, 0.9, step=0.05)
    
    # Re-prepare data loaders in case of changes.
    _, cv_loader, test_loader = prepare_data(sequence_length=5)


    model = TemporalSequenceLearner(hidden_neurons,sparsity_level=sparsity_level)
    model.to(device)
    
    optimizer = optim.Adam(model.parameters(), lr=0.0725)
    criterion = nn.MSELoss()
    num_epochs = 5  # Short training for tuning
    
    for epoch in range(num_epochs):
        model.train()
        running_loss = 0.0
        for images, target in cv_loader:
            images, target = images.to(device), target.to(device)
            optimizer.zero_grad()
            outputs = model(images)
            loss = criterion(outputs, target)
            loss.backward()
            optimizer.step()
            running_loss += loss.item()
        
        epoch_loss = running_loss / len(cv_loader)
        trial.report(epoch_loss, epoch)
        if trial.should_prune():
            raise optuna.exceptions.TrialPruned()
    
    # Evaluate on validation set
    model.eval()
    val_loss = 0.0
    with torch.no_grad():
        for images, target in jungle_data_loader:
            images, target = images.to(device), target.to(device)
            outputs = model(images)
            loss = criterion(outputs, target)
            val_loss += loss.item()
    return val_loss


In [19]:
import optuna

# Create an Optuna study and optimize the objective function.
study = optuna.create_study(direction="minimize")
study.optimize(objective, n_trials=50)  # You can adjust the number of trials

print("Best trial:")
trial = study.best_trial
print(f"  Value (validation loss): {trial.value}")
print("  Hyperparameters:")
for key, value in trial.params.items():
    print(f"    {key}: {value}")






In [24]:
import torch
import torch.nn as nn
import torch.optim as optim
import os

def final_train(model, train_loader, criterion, optimizer, device, start_epoch, num_epochs):
    """
    Trains the model starting from 'start_epoch' for an additional 'num_epochs'.
    """
    model.to(device)
    model.train()
    for epoch in range(start_epoch, start_epoch + num_epochs):
        running_loss = 0.0
        for images, target in train_loader:
            images, target = images.to(device), target.to(device)
            optimizer.zero_grad()
            outputs = model(images)
            loss = criterion(outputs, target)
            loss.backward()
            optimizer.step()
            running_loss += loss.item()
        epoch_loss = running_loss / len(train_loader)
        print(f"Final Training Epoch [{epoch+1}] Loss: {epoch_loss:.4f}")

def final_eval(model, test_loader, criterion, device):
    """
    Evaluates the model on the test set and prints the average loss.
    """
    model.to(device)
    model.eval()  # Set model to evaluation mode
    running_loss = 0.0
    
    # Disable gradient calculation for evaluation
    with torch.no_grad():
        for images, target in test_loader:
            images, target = images.to(device), target.to(device)
            outputs = model(images)
            loss = criterion(outputs, target)
            running_loss += loss.item()
    
    avg_loss = running_loss / len(test_loader)
    print(f"Test Loss: {avg_loss:.4f}")
    return avg_loss

def main():
    # Initialize your model and hyperparameters
    final_model = TemporalSequenceLearner(hidden_neurons=36, sparsity_level=0.75)
    final_model.to(device)
    
    criterion = nn.MSELoss()
    optimizer = optim.Adam(final_model.parameters(), lr=0.001)
    
    checkpoint_path = "final_temporal_sequence_model.pth"
    start_epoch = 2

    # Check if a checkpoint exists and load it
    if os.path.exists(checkpoint_path):
        checkpoint = torch.load(checkpoint_path)
        final_model.load_state_dict(checkpoint['model_state_dict'])
        optimizer.load_state_dict(checkpoint['optimizer_state_dict'])
        start_epoch = checkpoint['epoch']
        print(f"Resuming training from epoch {start_epoch}.")
    else:
        print("No checkpoint found. Training from scratch.")

    # Specify additional epochs you want to train for
    additional_epochs = 0
    final_train(final_model, train_loader, criterion, optimizer, device, start_epoch, additional_epochs)
    
    # Save a new checkpoint with updated epoch count, model state, and optimizer state
    total_epochs = start_epoch + additional_epochs
    torch.save({
        'epoch': total_epochs,
        'model_state_dict': final_model.state_dict(),
        'optimizer_state_dict': optimizer.state_dict()
    }, checkpoint_path)
    print(f"Final model saved as '{checkpoint_path}' after {total_epochs} epochs.")
    checkpoint = torch.load("final_temporal_sequence_model.pth", map_location="cpu")
    final_model.load_state_dict(checkpoint["model_state_dict"])
    final_model.eval()
    dummy_input = torch.randn(1, 5,3, 3, 66, 200).to(device)  # Batch size=1, Sequence length=5, Channels=3, Height=66, Width=200
    torch.onnx.export(final_model, dummy_input, "model.onnx")

    # # Evaluate the model on the test set
    # final_eval(final_model, jungle_data_loader, criterion, device)

if __name__ == '__main__':
    main()






In [3]:
dummy_input = torch.randn(1, 5,3, 3, 66, 200)  # Batch size=1, Sequence length=5, Channels=3, Height=66, Width=200
torch.onnx.export(final_model, dummy_input, "model.onnx")


