In [12]:
import os
import torch
import cv2
import pandas as pd
import numpy as np
from torch.utils.data import Dataset, DataLoader
from torchvision import transforms
import torch.optim as optim
from torch.autograd import Variable
from torchvision import models
import random
from torch.optim import lr_scheduler
import time
import copy
import torch.nn as nn
from PIL import Image

Defining Data Loader

In [13]:
train_dataset_file = '../archive/TUSimple/train_set/training/train.txt'
val_dataset_file = '../archive/TUSimple/train_set/training/val.txt'

resize_height, resize_width = 256, 512

# Define the Rescale class
class Rescale():
    """Rescale the image in a sample to a given size.

    Args:
        output_size (width, height) (tuple): Desired output size (width, height).
    """
    def __init__(self, output_size):
        assert isinstance(output_size, (tuple))
        self.output_size = output_size

    def __call__(self, sample):
        # Rescale the image using OpenCV's resize function
        sample = cv2.resize(sample, dsize=self.output_size, interpolation=cv2.INTER_NEAREST)
        return sample

# Define the TusimpleSet class
class TusimpleSet(Dataset):
    def __init__(self, dataset, n_labels=3, transform=None, target_transform=None):
        self._gt_img_list = []
        self._gt_label_binary_list = []
        self._gt_label_instance_list = []
        self.transform = transform
        self.target_transform = target_transform
        self.n_labels = n_labels

        # Read the dataset file
        with open(dataset, 'r') as file:
            for _info in file:
                info_tmp = _info.strip(' ').split()

                self._gt_img_list.append(info_tmp[0])
                self._gt_label_binary_list.append(info_tmp[1])
                self._gt_label_instance_list.append(info_tmp[2])

        assert len(self._gt_img_list) == len(self._gt_label_binary_list) == len(self._gt_label_instance_list)

        # Shuffle the dataset
        self._shuffle()

        purger = 0.25
        if purger < 1.0:
            total_size = len(self._gt_img_list)
            subset_size = int(total_size * purger)
            self._gt_img_list = self._gt_img_list[:subset_size]
            self._gt_label_binary_list = self._gt_label_binary_list[:subset_size]
            self._gt_label_instance_list = self._gt_label_instance_list[:subset_size]

    def _shuffle(self):
        # Randomly shuffle all lists identically
        c = list(zip(self._gt_img_list, self._gt_label_binary_list, self._gt_label_instance_list))
        random.shuffle(c)
        self._gt_img_list, self._gt_label_binary_list, self._gt_label_instance_list = zip(*c)

    def __len__(self):
        return len(self._gt_img_list)

    def __getitem__(self, idx):
        # Load images and labels
        img = Image.open(self._gt_img_list[idx])
        label_instance_img = cv2.imread(self._gt_label_instance_list[idx], cv2.IMREAD_UNCHANGED)
        label_img = cv2.imread(self._gt_label_binary_list[idx], cv2.IMREAD_COLOR)

        # Apply transformations
        if self.transform:
            img = self.transform(img)
        if self.target_transform:
            label_img = self.target_transform(label_img)
            label_instance_img = self.target_transform(label_instance_img)

        label_binary = np.zeros([label_img.shape[0], label_img.shape[1]], dtype=np.uint8)
        mask = np.where((label_img[:, :, :] != [0, 0, 0]).all(axis=2))
        label_binary[mask] = 1

        return img, label_binary, label_instance_img

data_transforms = {
    'train': transforms.Compose([
        transforms.Resize((resize_height, resize_width)),
        transforms.ColorJitter(brightness=0.1, contrast=0.1, saturation=0.1, hue=0.1),
        transforms.ToTensor(),
        transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225])
    ]),
    'val': transforms.Compose([
        transforms.Resize((resize_height, resize_width)),
        transforms.ToTensor(),
        transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225])
    ]),
}

target_transforms = transforms.Compose([
    Rescale((resize_width, resize_height)),
])

# Create datasets
train_dataset = TusimpleSet(train_dataset_file, transform=data_transforms['train'], target_transform=target_transforms)
val_dataset = TusimpleSet(val_dataset_file, transform=data_transforms['val'], target_transform=target_transforms)

# Create dataloaders
train_loader = DataLoader(train_dataset, batch_size=4, shuffle=True)
val_loader = DataLoader(val_dataset, batch_size=4, shuffle=True)

# Dataloaders dictionary
dataloaders = {
    'train': train_loader,
    'val': val_loader
}

# Dataset sizes
dataset_sizes = {
    'train': len(train_loader.dataset),
    'val': len(val_loader.dataset)
}

print(f"Data Loaders Set: {dataset_sizes['train']} training samples, {dataset_sizes['val']} validation samples")


Data Loaders Set: 2510 training samples, 209 validation samples


Defining Model

In [14]:
class LaneNet(nn.Module):
    def __init__(self):
        super(LaneNet, self).__init__()
        print("LaneNet Model Created!")

        # Encoder (Feature Extractor)
        self.conv1 = nn.Conv2d(3, 32, kernel_size=3, stride=2, padding=1)
        self.conv2 = nn.Conv2d(32, 64, kernel_size=3, stride=2, padding=1)
        self.conv3 = nn.Conv2d(64, 128, kernel_size=3, stride=2, padding=1)
        self.relu = nn.ReLU()

        # Decoder for Binary Segmentation
        self.deconv1_binary = nn.ConvTranspose2d(128, 64, kernel_size=3, stride=2, padding=1, output_padding=1)
        self.deconv2_binary = nn.ConvTranspose2d(64, 32, kernel_size=3, stride=2, padding=1, output_padding=1)
        self.deconv3_binary = nn.ConvTranspose2d(32, 2, kernel_size=3, stride=2, padding=1, output_padding=1)

    def forward(self, x):
        # Encoding (Feature Extraction)
        x = self.relu(self.conv1(x))
        x = self.relu(self.conv2(x))
        x = self.relu(self.conv3(x))

        # Decoding (Binary Segmentation)
        binary = self.relu(self.deconv1_binary(x))
        binary = self.relu(self.deconv2_binary(binary))
        binary = self.deconv3_binary(binary)

        # Predicted Segmentation (Argmax for final binary output)
        binary_pred = torch.argmax(binary, dim=1, keepdim=True)

        return {
            "binary_seg_logits": binary,
            "binary_seg_pred": binary_pred
        }


Training Model

In [15]:
print("here")

def compute_loss(net_output, binary_label):
    k_binary = 10
    loss_fn = nn.CrossEntropyLoss()
    binary_seg_logits = net_output["binary_seg_logits"]
    binary_loss = loss_fn(binary_seg_logits, binary_label)
    binary_loss = binary_loss * k_binary
    total_loss = binary_loss
    out = net_output["binary_seg_pred"]
    return total_loss, binary_loss, out

def train_loop(model, dataloader, optimizer, scheduler, device):
    model.train()
    running_loss = 0.0
    running_loss_b = 0.0

    for inputs, binarys, instances in dataloader:
        inputs = inputs.float().to(device)
        binarys = binarys.long().to(device)
        instances = instances.float().to(device)

        optimizer.zero_grad()  # Zero gradients

        with torch.set_grad_enabled(True): 
            outputs = model(inputs)
            total_loss, binary_loss, out = compute_loss(outputs, binarys)
            total_loss.backward()
            optimizer.step()

        # Update running loss
        batch_size = inputs.size(0)
        running_loss += total_loss.item() * batch_size
        running_loss_b += binary_loss.item() * batch_size

    # Step the learning rate scheduler
    if scheduler is not None:
        scheduler.step()

    return running_loss, running_loss_b


def test_loop(model, dataloader, device):
    """Evaluates the model on the validation dataset."""
    model.eval()  # Set model to evaluation mode
    running_loss = 0.0
    running_loss_b = 0.0

    with torch.no_grad():  # No gradients needed
        for inputs, binarys, instances in dataloader:
            inputs = inputs.float().to(device)
            binarys = binarys.long().to(device)
            instances = instances.float().to(device)

            outputs = model(inputs)
            total_loss, binary_loss, out = compute_loss(outputs, binarys)

            # Update running loss
            batch_size = inputs.size(0)
            running_loss += total_loss.item() * batch_size
            running_loss_b += binary_loss.item() * batch_size

    return running_loss, running_loss_b

here


In [None]:
# Initialize your model, optimizer, and learning rate scheduler
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

# Assuming your model is defined in the previous cells
model = LaneNet().to(device)

# Define your optimizer
optimizer = optim.Adam(model.parameters(), lr=0.001)

# Define your learning rate scheduler (optional)
scheduler = optim.lr_scheduler.StepLR(optimizer, step_size=10, gamma=0.1)

# Assuming you have your DataLoader set up (replace with your actual DataLoader)
train_dataloader = DataLoader(train_dataset, batch_size=32, shuffle=True)
val_dataloader = DataLoader(val_dataset, batch_size=32, shuffle=False)

# Set the number of epochs
num_epochs = 10

# Training Loop
best_model_wts = copy.deepcopy(model.state_dict())
best_loss = float("inf")

for epoch in range(num_epochs):
    print(f"Epoch {epoch + 1}/{num_epochs}")
    
    # Train for one epoch
    train_loss, train_loss_b = train_loop(model, train_dataloader, optimizer, scheduler, device)
    print(f"Training Loss: {train_loss:.4f} | Binary Loss: {train_loss_b:.4f}")
    
    # Validate the model on the validation set
    val_loss, val_loss_b = test_loop(model, val_dataloader, device)
    print(f"Validation Loss: {val_loss:.4f} | Binary Loss: {val_loss_b:.4f}")

    # Save the model if it's the best so far
    if val_loss < best_loss:
        best_loss = val_loss
        best_model_wts = copy.deepcopy(model.state_dict())
        torch.save(best_model_wts, "best_model.pth")
        print("Saved best model weights")

    # Optionally print learning rate or scheduler information
    print(f"Learning rate: {optimizer.param_groups[0]['lr']:.6f}")

# Load the best model weights after training
model.load_state_dict(best_model_wts)
print("Training complete. Best model loaded.")


LaneNet Model Created!
Epoch 1/10
Training Loss: 4184.3449 | Binary Loss: 4184.3449
Validation Loss: 178.0734 | Binary Loss: 178.0734
Saved best model weights
Learning rate: 0.001000
Epoch 2/10


In [None]:
import os
import sys
import torch
import numpy as np
from torchvision import transforms
from PIL import Image
import cv2

DEVICE = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")

def load_test_data(img_path, transform):
    img = Image.open(img_path)
    img = transform(img)
    return img

def test():
    # Create output directory if it doesn't exist
    if not os.path.exists('test_output'):
        os.mkdir('test_output')
    
    # Hardcode image parameters for testing
    img_path = '0001.png'
    resize_height, resize_width = 256, 512
    model_path = 'best_model.pth'
    
    # Define the data transformation pipeline
    data_transform = transforms.Compose([
        transforms.Resize((resize_height, resize_width)),
        transforms.ToTensor(),
        transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225])
    ])
    
    # Load the pre-trained model
    model = LaneNet()
    state_dict = torch.load(model_path)
    model.load_state_dict(state_dict)
    model.eval()
    model.to(DEVICE)
    
    # Prepare the image input
    dummy_input = load_test_data(img_path, data_transform).to(DEVICE)
    dummy_input = torch.unsqueeze(dummy_input, dim=0)
    
    # Forward pass through the model
    with torch.no_grad():
        outputs = model(dummy_input)
    
    # Load original input image
    input_img = Image.open(img_path)
    input_img = input_img.resize((resize_width, resize_height))
    input_img = np.array(input_img)
    
    # Process binary segmentation predictions
    binary_logits = outputs['binary_seg_logits']
    binary_pred = outputs['binary_seg_pred']
    
    # Convert outputs to numpy for visualization
    binary_logits_np = binary_logits.detach().cpu().numpy()
    binary_pred_np = binary_pred.detach().cpu().numpy()
    
    # Visualize and save results
    # Original input image
    cv2.imwrite(os.path.join('test_output', 'input.jpg'), input_img)
    
    # Binary segmentation logits (channel-wise)
    for i in range(binary_logits_np.shape[1]):
        channel_logits = binary_logits_np[0, i, :, :]
        # Normalize to 0-255 for visualization
        channel_logits_norm = cv2.normalize(channel_logits, None, 0, 255, cv2.NORM_MINMAX, dtype=cv2.CV_8U)
        cv2.imwrite(os.path.join('test_output', f'binary_logits_channel_{i}.jpg'), channel_logits_norm)
    
    # Binary prediction mask
    binary_pred_visual = binary_pred_np[0, 0, :, :] * 255
    cv2.imwrite(os.path.join('test_output', 'binary_prediction.jpg'), binary_pred_visual)
    
    # Optional: Overlay prediction on input image
    overlay = input_img.copy()
    overlay[binary_pred_np[0, 0, :, :] > 0] = [0, 0, 255]  # Color detected lanes red
    cv2.imwrite(os.path.join('test_output', 'input_with_prediction_overlay.jpg'), overlay)
    
    print("Prediction visualization completed. Check test_output directory.")

if __name__ == "__main__":
    test()

LaneNet Model Created!
Prediction visualization completed. Check test_output directory.
