## import library

In [None]:
import torch
import torch.nn as nn
import os
import random
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import torchvision.transforms as transforms

from PIL import Image
from torch.utils.data import DataLoader, Dataset
from sklearn.model_selection import train_test_split

torch.cuda.empty_cache()

## Download dataset

In [None]:
!pip install -q gdown
!gdown --id 1aAyaNM8q54G5S2IKtw2XAv4BmuEbidX0

Downloading...
From (original): https://drive.google.com/uc?id=1aAyaNM8q54G5S2IKtw2XAv4BmuEbidX0
From (redirected): https://drive.google.com/uc?id=1aAyaNM8q54G5S2IKtw2XAv4BmuEbidX0&confirm=t&uuid=c7c7863b-3601-4498-9a27-b1d60f5b88ef
To: /content/Data.zip
100% 7.74G/7.74G [01:41<00:00, 76.1MB/s]


## Unzip

In [None]:
import zipfile

with zipfile.ZipFile("Data.zip", 'r') as zip_ref:
    zip_ref.extractall("/content/Data")  # This will extract into ./dataset

## Preprocess data

In [None]:
steer_file_path = 'Data/Data/SteerValues/steer_values.txt'
with open(steer_file_path, 'r') as f:
    steer_values = [float(line.strip()) for line in f if line.strip()]

image_folder = 'Data/Data/Images'
image_filenames = sorted(os.listdir(image_folder))  # Ensure alphabetical order
img_paths = [os.path.join(image_folder, fname) for fname in image_filenames if fname.lower().endswith(('.png'))]

## Split training dataset

In [None]:
val_size = 0.2
test_size = 0.125
is_shuffle = True

X_train, X_val, y_train, y_val = train_test_split(
    img_paths, steer_values,
    test_size=val_size,
    shuffle=is_shuffle
)

X_train, X_test, y_train, y_test = train_test_split(
    X_train, y_train,
    test_size=test_size,
    shuffle=is_shuffle
)

## Define dataset

In [None]:
transform = transforms.Compose([
    transforms.Resize((220, 220)),
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.5, 0.5, 0.5], std=[0.5, 0.5, 0.5])
])

class SteeringDataset(Dataset):
    def __init__(self, img_paths, steer_values, transform=None):
      # make sure check the length between folders: images and number of steer values
        self.img_paths = img_paths
        self.steer_values = steer_values
        self.transform = transform if transform is not None else transforms.ToTensor()

    def __len__(self):
        return len(self.img_paths)

    def __getitem__(self, idx):
        # Load image
        img_path = self.img_paths[idx]
        image = Image.open(img_path).convert('RGB')

        # Apply transformation
        image = self.transform(image)

        # Convert steering value to tensor
        steer_value = torch.tensor(self.steer_values[idx], dtype=torch.float32)

        return image, steer_value


## Define dataloader

In [None]:
# Create dataset instances
train_dataset = SteeringDataset(X_train, y_train, transform=transform)
val_dataset   = SteeringDataset(X_val, y_val, transform=transform)
test_dataset  = SteeringDataset(X_test, y_test, transform=transform)

# Create data loaders
train_loader = DataLoader(train_dataset, batch_size=32, shuffle=True)
val_loader   = DataLoader(val_dataset, batch_size=32, shuffle=False)
test_loader  = DataLoader(test_dataset, batch_size=32, shuffle=False)

## Define model

In [None]:
class ResidualBlock(nn.Module):
    def __init__(self, in_channels, out_channels, stride=1):
        super(ResidualBlock, self).__init__()
        self.conv1 = nn.Conv2d(
            in_channels, out_channels,
            kernel_size=3, stride=stride, padding=1, bias=False
        )
        self.bn1 = nn.BatchNorm2d(out_channels)
        self.relu = nn.ReLU(inplace=True)

        self.conv2 = nn.Conv2d(
            out_channels, out_channels,
            kernel_size=3, stride=1, padding=1, bias=False
        )
        self.bn2 = nn.BatchNorm2d(out_channels)

        # Downsample if input and output dimensions do not match
        self.downsample = nn.Sequential()
        if stride != 1 or in_channels != out_channels:
            self.downsample = nn.Sequential(
                nn.Conv2d(in_channels, out_channels, kernel_size=1, stride=stride, bias=False),
                nn.BatchNorm2d(out_channels)
            )

    def forward(self, x):
        identity = x
        out = self.relu(self.bn1(self.conv1(x)))
        out = self.bn2(self.conv2(out))
        out += self.downsample(identity)
        out = self.relu(out)
        return out


class ResNetSteering(nn.Module):
    def __init__(self, block, layers):
        super(ResNetSteering, self).__init__()
        self.in_channels = 64

        self.conv1 = nn.Conv2d(3, 64, kernel_size=7, stride=2, padding=3, bias=False)
        self.bn1 = nn.BatchNorm2d(64)
        self.relu = nn.ReLU(inplace=True)
        self.maxpool = nn.MaxPool2d(kernel_size=3, stride=2, padding=1)

        self.layer1 = self._make_layer(block, 64,  layers[0], stride=1)
        self.layer2 = self._make_layer(block, 128, layers[1], stride=2)
        self.layer3 = self._make_layer(block, 256, layers[2], stride=2)
        self.layer4 = self._make_layer(block, 512, layers[3], stride=2)

        self.avgpool = nn.AdaptiveAvgPool2d((1, 1))
        self.flatten = nn.Flatten()
        self.dropout = nn.Dropout(0.3)
        self.fc = nn.Linear(512, 1)  # Single output neuron for regression

    def _make_layer(self, block, out_channels, num_blocks, stride):
        layers = []
        layers.append(block(self.in_channels, out_channels, stride))
        self.in_channels = out_channels
        for _ in range(1, num_blocks):
            layers.append(block(out_channels, out_channels))
        return nn.Sequential(*layers)

    def forward(self, x):
        x = self.relu(self.bn1(self.conv1(x)))
        x = self.maxpool(x)

        x = self.layer1(x)
        x = self.layer2(x)
        x = self.layer3(x)
        x = self.layer4(x)

        x = self.avgpool(x)
        x = self.flatten(x)
        x = self.dropout(x)
        x = self.fc(x)
        return x.squeeze(1)  # Optional: return shape (batch,) instead of (batch, 1)

## Call out model

In [None]:
device = 'cuda:0' if torch.cuda.is_available() else 'cpu'

model = ResNetSteering(ResidualBlock, [2, 2, 2, 2]).to(device)



## Define evaluation function

In [None]:
def evaluate(model, dataLoader, criterion, device):
    model.eval()
    losses = []
    total_abs_error = 0.0
    total_samples = 0

    with torch.no_grad():
        for inputs, labels in dataLoader:
            inputs, labels = inputs.to(device), labels.to(device).float()

            outputs = model(inputs).squeeze()
            labels = labels.squeeze()

            loss = criterion(outputs, labels)
            losses.append(loss.item())

            total_abs_error += torch.sum(torch.abs(outputs - labels)).item()
            total_samples += labels.size(0)

    avg_loss = sum(losses) / len(losses)
    mae = total_abs_error / total_samples
    return avg_loss, mae

In [None]:
def fit(
    model,
    train_loader,
    val_loader,
    criterion,
    optimizer,
    scheduler,
    device,
    epochs,
    model_save_path="best_model.pth"
):
    train_losses = []
    val_losses = []
    val_maes = []

    best_val_mae = float("inf")

    for epoch in range(epochs):
        batch_train_losses = []
        model.train()

        for idx, (inputs, labels) in enumerate(train_loader):
            inputs = inputs.to(device)
            labels = labels.to(device).float()

            optimizer.zero_grad()
            outputs = model(inputs).squeeze()
            labels = labels.squeeze()

            loss = criterion(outputs, labels)
            loss.backward()
            optimizer.step()
            batch_train_losses.append(loss.item())

        train_loss = sum(batch_train_losses) / len(batch_train_losses)
        train_losses.append(train_loss)

        val_loss, val_mae = evaluate(model, val_loader, criterion, device)
        val_losses.append(val_loss)
        val_maes.append(val_mae)

        scheduler.step()

        # === Save model if MAE improves ===
        if val_mae < best_val_mae:
            best_val_mae = val_mae
            torch.save(model.state_dict(), model_save_path)
            print(f"Model saved at epoch {epoch+1} with best MAE: {val_mae:.4f}")

        print(f'EPOCH {epoch+1}: '
              f'Train_loss: {train_loss:.4f}\t '
              f'Val_loss: {val_loss:.4f}\t '
              f'Val_MAE: {val_mae:.4f}')

    return train_losses, val_losses, val_maes

## Train

In [None]:
lr = 1e-3
epochs = 50

def lr_lambda(epoch, warmup_epochs=5, total_epochs=30,
              init_scale=0.1, min_scale=0.3):
    scale_range = 1.0 - min_scale
    if epoch < warmup_epochs:
        warmup_factor = epoch / warmup_epochs
        return init_scale + (1.0 - init_scale) * warmup_factor
    decay_factor = (total_epochs - epoch) / (total_epochs - warmup_epochs)
    return min_scale + scale_range * max(0.0, decay_factor)

# Loss for regression
criterion = nn.MSELoss()

# Optimizer and scheduler
optimizer = torch.optim.Adam(model.parameters(), lr=lr)
scheduler = torch.optim.lr_scheduler.LambdaLR(optimizer, lr_lambda)

# Train
train_losses, val_losses, val_maes = fit(
    model,
    train_loader,
    val_loader,
    criterion,
    optimizer,
    scheduler,
    device,
    epochs
)



Model saved at epoch 1 with best MAE: 0.0248
EPOCH 1: Train_loss: 0.0047	 Val_loss: 0.0019	 Val_MAE: 0.0248
Model saved at epoch 2 with best MAE: 0.0223
EPOCH 2: Train_loss: 0.0027	 Val_loss: 0.0016	 Val_MAE: 0.0223
Model saved at epoch 3 with best MAE: 0.0200
EPOCH 3: Train_loss: 0.0017	 Val_loss: 0.0013	 Val_MAE: 0.0200
Model saved at epoch 4 with best MAE: 0.0146
EPOCH 4: Train_loss: 0.0015	 Val_loss: 0.0008	 Val_MAE: 0.0146
Model saved at epoch 5 with best MAE: 0.0114
EPOCH 5: Train_loss: 0.0010	 Val_loss: 0.0006	 Val_MAE: 0.0114
EPOCH 6: Train_loss: 0.0010	 Val_loss: 0.0006	 Val_MAE: 0.0117
Model saved at epoch 7 with best MAE: 0.0101
EPOCH 7: Train_loss: 0.0006	 Val_loss: 0.0005	 Val_MAE: 0.0101
Model saved at epoch 8 with best MAE: 0.0088
EPOCH 8: Train_loss: 0.0005	 Val_loss: 0.0004	 Val_MAE: 0.0088
EPOCH 9: Train_loss: 0.0004	 Val_loss: 0.0004	 Val_MAE: 0.0092
Model saved at epoch 10 with best MAE: 0.0085
EPOCH 10: Train_loss: 0.0003	 Val_loss: 0.0004	 Val_MAE: 0.0085
Model sa