In [1]:
import torch
import torch.nn as nn
import torch.nn.functional as F
from torch.utils.data import DataLoader, Dataset
from torchvision import transforms
from PIL import Image
import os
import random

In [2]:
class LaneDetectionCNN(nn.Module):
    def __init__(self, input_shape=(3, 224, 224)):
        super(LaneDetectionCNN, self).__init__()
        # Convolutional layers
        self.conv1 = nn.Conv2d(3, 16, kernel_size=5, stride=2, padding=2)
        self.conv2 = nn.Conv2d(16, 32, kernel_size=5, stride=1, padding=2)
        self.maxpool1 = nn.MaxPool2d(kernel_size=2, stride=2)
        self.conv3 = nn.Conv2d(32, 64, kernel_size=3, stride=1, padding=1)
        self.maxpool2 = nn.MaxPool2d(kernel_size=2, stride=2)
        self.conv4 = nn.Conv2d(64, 128, kernel_size=3, stride=1, padding=1)
        self.maxpool3 = nn.MaxPool2d(kernel_size=2, stride=2)
        self.conv5 = nn.Conv2d(128, 256, kernel_size=3, stride=1, padding=1)
        self.maxpool4 = nn.MaxPool2d(kernel_size=2, stride=2)
        self.dropout = nn.Dropout(0.5)

        # Calculate flat size dynamically for compatibility with RNNs
        self._to_linear = None
        self._calculate_flat_size(input_shape)

    def _calculate_flat_size(self, input_shape):
        """Pass a dummy tensor through the convolutional layers to determine the flattened size."""
        x = torch.zeros(1, *input_shape)
        x = self._forward_conv(x)
        self._to_linear = x.numel()

    def _forward_conv(self, x):
        """Forward pass through convolutional layers."""
        x = torch.relu(self.conv1(x))
        x = torch.relu(self.conv2(x))
        x = self.maxpool1(x)
        x = torch.relu(self.conv3(x))
        x = self.maxpool2(x)
        x = torch.relu(self.conv4(x))
        x = self.maxpool3(x)
        x = torch.relu(self.conv5(x))
        x = self.maxpool4(x)
        x = self.dropout(x)
        return x

    def forward(self, x):
        """
        Forward pass through the CNN.
        Returns flattened features suitable for RNN input.
        """
        x = self._forward_conv(x)
        x = x.view(x.size(0), -1)  # Flatten for RNN input
        return x

In [3]:
class LaneDetectionRNN(nn.Module):
    def __init__(self, input_shape, rnn_hidden_size=128, action_embedding_size=32, num_frequencies=6):
        super(LaneDetectionRNN, self).__init__()
        self.cnn = LaneDetectionCNN(input_shape)
        # self.action_embed = nn.Linear(2, action_embedding_size)
        self.rnn = nn.LSTM(
            input_size=self.cnn._to_linear + 2 * 2 * num_frequencies,  # Feature size from CNN
            hidden_size=rnn_hidden_size,
            num_layers=5,
            batch_first=True
        )
        self.fc = nn.Linear(rnn_hidden_size, 1)  # Predict distance for each timestep

    def forward(self, x, actions):
        batch_size, seq_length, channels, height, width = x.size()
        x = x.view(batch_size * seq_length, channels, height, width)  # Flatten sequence dimension
        cnn_features = self.cnn(x)  # Extract features
        cnn_features = cnn_features.view(batch_size, seq_length, -1)  # Restore sequence dimension

        # # Process actions
        # actions = self.action_embed(actions) # Shape: (batch_size, seq_length, action_embedding_size)

        # Concatenate CNN features and action embeddings
        rnn_input = torch.cat((cnn_features, actions), dim=-1) # Shape: (batch_size, seq_length, feature_size)



        rnn_out, _ = self.rnn(rnn_input)  # Process with RNN
        predictions = torch.tanh(self.fc(rnn_out))  # Predict for each timestep
        return predictions


In [4]:
import torch
from torch.utils.data import Dataset, DataLoader
from torchvision import transforms
from PIL import Image
import os
import random
from torch.utils.data import DataLoader, random_split


class SequentialImageDataset(Dataset):
    def __init__(self, image_folder, label_folder, action_folder, seq_length=100, transform=None, num_frequencies=6):
        self.image_folder = image_folder
        self.label_folder = label_folder
        self.action_folder = action_folder
        self.seq_length = seq_length
        self.num_frequencies = num_frequencies
        self.transform = transform

        # Load and sort image, label, and action files
        self.image_files = sorted(os.listdir(image_folder))
        self.label_files = sorted(os.listdir(label_folder))
        self.action_files = sorted(os.listdir(action_folder))

        # Ensure dataset size is divisible by seq_length
        self.num_sequences = len(self.image_files) // seq_length
        self.image_files = self.image_files[:self.num_sequences * seq_length]
        self.label_files = self.label_files[:self.num_sequences * seq_length]
        self.action_files = self.action_files[:self.num_sequences * seq_length]

        # Split into sequences
        self.sequences = [
            (self.image_files[i:i + seq_length],
             self.label_files[i:i + seq_length],
             self.action_files[i:i + seq_length])
            for i in range(0, len(self.image_files), seq_length)
        ]
        random.shuffle(self.sequences)  # Shuffle sequences

    def __len__(self):
        return len(self.sequences)

    def __getitem__(self, idx):
        image_sequence = []
        label_sequence = []
        action_sequence = []

        image_files, label_files, action_files = self.sequences[idx]
        for img_file, lbl_file, act_file in zip(image_files, label_files, action_files):
            # Load and transform image
            img_path = os.path.join(self.image_folder, img_file)
            image = Image.open(img_path).convert("RGB")
            if self.transform:
                image = self.transform(image)
            image_sequence.append(image)

            # Load label
            lbl_path = os.path.join(self.label_folder, lbl_file)
            with open(lbl_path, "r") as f:
                label = float(f.read().strip())
            label_sequence.append(label)

            # Load action (speed and angular velocity)
            act_path = os.path.join(self.action_folder, act_file)
            with open(act_path, "r") as f:
                speed, angular_velocity = map(float, f.read().strip().split())
                
                # Compute Fourier features
                speed_features = compute_fourier_features(torch.tensor(speed), self.num_frequencies)
                angular_features = compute_fourier_features(torch.tensor(angular_velocity), self.num_frequencies)
                
                # Concatenate speed and angular features
                action_sequence.append(torch.cat([speed_features, angular_features]))


        image_sequence = torch.stack(image_sequence)  # Shape: (seq_length, channels, height, width)
        label_sequence = torch.tensor(label_sequence, dtype=torch.float32)  # Shape: (seq_length,)
        action_sequence = torch.stack(action_sequence)
        return image_sequence, label_sequence, action_sequence
    
def compute_fourier_features(value, num_frequencies=6):
    """
    Compute sine-cosine Fourier features for the input value.
    
    Parameters:
    - value: Input scalar (e.g., speed or angular velocity).
    - num_frequencies: Number of frequency terms to include.
    
    Returns:
    - features: Tensor of shape (2 * num_frequencies,).
    """
    frequencies = 2 ** torch.arange(num_frequencies, dtype=torch.float32)  # Exponentially increasing frequencies
    features = torch.cat([
        torch.sin(frequencies * value),
        torch.cos(frequencies * value)
    ])
    return features

def get_sequential_dataloader(
    image_folder, label_folder, action_folder, batch_size, seq_length=100, train_fraction=0.8, val_fraction=0.1, test_fraction=0.1
):
    """
    Create DataLoaders for training, validation, and testing datasets, ensuring distinct splits.

    Parameters:
    - image_folder: Path to the folder containing images.
    - label_folder: Path to the folder containing labels.
    - batch_size: Number of sequences per batch.
    - seq_length: Number of images per sequence.
    - train_fraction: Fraction of the data to use for training.
    - val_fraction: Fraction of the data to use for validation.
    - test_fraction: Fraction of the data to use for testing.

    Returns:
    - train_loader: DataLoader for training.
    - val_loader: DataLoader for validation.
    - test_loader: DataLoader for testing.
    """
    if not (0.0 < train_fraction + val_fraction + test_fraction <= 1.0):
        raise ValueError("Fractions for train, validation, and test must sum to 1.0 or less.")

    transform = transforms.Compose([
        transforms.ToTensor(),  # Convert image to tensor
    ])

    # Load the full dataset
    dataset = SequentialImageDataset(image_folder, label_folder, action_folder, seq_length=seq_length, transform=transform)

    # Compute sizes for train, validation, and test splits
    total_size = len(dataset)
    train_size = int(total_size * train_fraction)
    val_size = int(total_size * val_fraction)
    test_size = total_size - train_size - val_size

    # Perform the splits
    train_dataset, val_dataset, test_dataset = random_split(dataset, [train_size, val_size, test_size])

    # Create DataLoaders
    train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
    val_loader = DataLoader(val_dataset, batch_size=batch_size, shuffle=False)
    test_loader = DataLoader(test_dataset, batch_size=batch_size, shuffle=False)

    return train_loader, val_loader, test_loader

In [5]:
if __name__ == "__main__":
    IMAGE_FOLDER = "training_images/trail2/images"
    LABEL_FOLDER = "training_images/trail2/labels"
    ACTION_FOLDER = "training_images/trail2/actions"
    batch_size = 4  # Number of sequences per batch
    seq_length = 100  # Number of images per sequence

    # Define split fractions
    train_fraction = 0.8
    val_fraction = 0.1
    test_fraction = 0.1

    # Create DataLoaders
    train_loader, val_loader, test_loader = get_sequential_dataloader(
        IMAGE_FOLDER, LABEL_FOLDER, ACTION_FOLDER, batch_size, seq_length,
        train_fraction=train_fraction, val_fraction=val_fraction, test_fraction=test_fraction
    )

    # Test the training DataLoader
    print("Testing Train DataLoader:")
    for images, labels, actions in train_loader:
        print(f"Image batch shape: {images.shape}")  # Expected: (batch_size, seq_length, channels, height, width)
        print(f"Label batch shape: {labels.shape}")  # Expected: (batch_size, seq_length)
        print(f"Label batch shape: {actions.shape}")

        break

    # Test the validation DataLoader
    print("\nTesting Validation DataLoader:")
    for images, labels, actions in val_loader:
        print(f"Image batch shape: {images.shape}")  # Expected: (batch_size, seq_length, channels, height, width)
        print(f"Label batch shape: {labels.shape}")  # Expected: (batch_size, seq_length)
        print(f"Label batch shape: {actions.shape}") 
        break

    # Test the test DataLoader
    print("\nTesting Test DataLoader:")
    for images, labels, actions in test_loader:
        print(f"Image batch shape: {images.shape}")  # Expected: (batch_size, seq_length, channels, height, width)
        print(f"Label batch shape: {labels.shape}")  # Expected: (batch_size, seq_length)
        print(f"Label batch shape: {actions.shape}") 
        break
print("number of data: ",list(map(len, [train_loader, val_loader, test_loader])))

Testing Train DataLoader:
Image batch shape: torch.Size([4, 100, 3, 480, 640])
Label batch shape: torch.Size([4, 100])
Label batch shape: torch.Size([4, 100, 24])

Testing Validation DataLoader:
Image batch shape: torch.Size([4, 100, 3, 480, 640])
Label batch shape: torch.Size([4, 100])
Label batch shape: torch.Size([4, 100, 24])

Testing Test DataLoader:
Image batch shape: torch.Size([4, 100, 3, 480, 640])
Label batch shape: torch.Size([4, 100])
Label batch shape: torch.Size([4, 100, 24])
number of data:  [35, 5, 5]


In [6]:
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader, random_split
from tqdm import tqdm

# Assuming the following classes and functions are already defined:
# - LaneDetectionRNN
# - SequentialImageDataset

# -------------------
# Validation Function
# -------------------
def validate_model(model, dataloader, criterion, device):
    """
    Validate the LaneDetectionRNN model on the validation set.
    
    Parameters:
    - model: The RNN model to validate.
    - dataloader: DataLoader providing validation data.
    - criterion: Loss function (e.g., MSELoss).
    - device: Device to run the model on ('cpu' or 'cuda').
    
    Returns:
    - Average validation loss.
    """
    model.eval()  # Set model to evaluation mode
    total_loss = 0.0

    with torch.no_grad():
        for images, labels, actions in dataloader:
            images, labels, actions = images.to(device), labels.to(device), actions.to(device)

            # Forward pass
            predictions = model(images, actions)  # Shape: (batch_size, seq_length)
            predictions = predictions.squeeze(-1)

            # Compute loss
            loss = criterion(predictions, labels)
            total_loss += loss.item()

    avg_loss = total_loss / len(dataloader)
    return avg_loss


In [7]:
# -------------------
# Training Function with Validation and Loss Tracking
# -------------------
def train_model_with_validation(
    model, train_loader, val_loader, criterion, optimizer, device, n_epochs=10
):
    """
    Train the LaneDetectionRNN model with validation after each epoch and track loss.
    
    Parameters:
    - model: The RNN model to train.
    - train_loader: DataLoader providing training data.
    - val_loader: DataLoader providing validation data.
    - criterion: Loss function (e.g., MSELoss).
    - optimizer: Optimizer (e.g., Adam).
    - device: Device to run the model on ('cpu' or 'cuda').
    - n_epochs: Number of epochs to train for.

    Returns:
    - train_losses: List of average training losses for each epoch.
    - val_losses: List of average validation losses for each epoch.
    """
    model.to(device)

    # Lists to store loss values
    train_losses = []
    val_losses = []

    for epoch in range(n_epochs):
        # Training
        model.train()
        total_loss = 0.0
        for images, labels, actions in tqdm(train_loader, total=len(train_loader),
                                                desc=f"Epoch {epoch + 1}/{n_epochs}", unit="batch"):

            images, labels, actions = images.to(device), labels.to(device), actions.to(device)
            # Forward pass
            optimizer.zero_grad()
            predictions = model(images, actions)
            predictions = predictions.squeeze(-1)

            # Compute loss
            loss = criterion(predictions, labels)
            total_loss += loss.item()

            # Backward pass and optimization
            loss.backward()
            optimizer.step()



        avg_train_loss = total_loss / len(train_loader)
        train_losses.append(avg_train_loss)

        # Validation
        avg_val_loss = validate_model(model, val_loader, criterion, device)
        val_losses.append(avg_val_loss)

        print(
            f"Epoch [{epoch + 1}/{n_epochs}] - "
            f"Train Loss: {avg_train_loss:.7f}, Validation Loss: {avg_val_loss:.7f}"
        )
    
    return train_losses, val_losses


In [None]:
# -------------------
# TRAIN
# -------------------
if __name__ == "__main__":
    # Dataset paths
    IMAGE_FOLDER = "training_images/trail2/images"
    LABEL_FOLDER = "training_images/trail2/labels"
    ACTION_FOLDER = "training_images/trail2/actions"

    batch_size = 10  # Number of sequences per batch
    seq_length = 20  # Number of images per sequence
    n_epochs = 10
    learning_rate = 0.001

    # Define split fractions
    train_fraction = 0.85  # 80% for training
    val_fraction = 0.1    # 10% for validation
    test_fraction = 0.05   # 10% for testing

    # Create DataLoaders using the improved get_sequential_dataloader function
    train_loader, val_loader, test_loader = get_sequential_dataloader(
        IMAGE_FOLDER, LABEL_FOLDER, ACTION_FOLDER, batch_size=batch_size, seq_length=seq_length,
        train_fraction=train_fraction, val_fraction=val_fraction, test_fraction=test_fraction
    )

    # Initialize the model
    input_shape = (3, 480, 640)  # Update based on actual image dimensions
    model = LaneDetectionRNN(input_shape=input_shape, rnn_hidden_size=128)

    # Define loss function and optimizer
    criterion = nn.MSELoss()  # Mean squared error loss
    optimizer = optim.Adam(model.parameters(), lr=learning_rate)

    # Select device
    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
    print(f"Using device: {device}")

    # Train the model with validation
    train_losses, val_losses = train_model_with_validation(
        model, train_loader, val_loader, criterion, optimizer, device, n_epochs
    )

    # Save the trained model
    torch.save(model.state_dict(), "models/lane_detection_rnn4.pth")
    print("Model saved to 'lane_detection_rnn4.pth'")

    # Evaluate on test set
    test_loss = validate_model(model, test_loader, criterion, device)
    print(f"Test Loss: {test_loss:.7f}")


In [10]:
# loading saved model

def load_model(model_class, model_path, device, input_shape, rnn_hidden_size=128):
    model = model_class(input_shape=input_shape, rnn_hidden_size=rnn_hidden_size)
    
    # Load state dictionary
    model.load_state_dict(torch.load(model_path, map_location=device))
    model.to(device)
    model.eval()  # Set model to evaluation mode
    print(f"Model loaded successfully from '{model_path}'")
    return model

# Dataset paths
IMAGE_FOLDER = "training_images/trail2/images"
LABEL_FOLDER = "training_images/trail2/labels"
ACTION_FOLDER = "training_images/trail2/actions"
MODEL_PATH = "models/lane_detection_rnn4.pth"

batch_size = 4  # Number of sequences per batch
seq_length = 100  # Number of images per sequence
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(f"Using device: {device}")

# Create DataLoader for testing
_, _, test_loader = get_sequential_dataloader(
    IMAGE_FOLDER, LABEL_FOLDER, ACTION_FOLDER, batch_size=batch_size, seq_length=seq_length,
    train_fraction=0.8, val_fraction=0.1, test_fraction=0.1
)

# Load the trained model
input_shape = (3, 480, 640)  # Input shape of images (channels, height, width)
model = load_model(LaneDetectionRNN, MODEL_PATH, device, input_shape)


Using device: cuda
Model loaded successfully from 'models/lane_detection_rnn4.pth'


  model.load_state_dict(torch.load(model_path, map_location=device))


In [11]:
criterion = nn.MSELoss()  # Mean squared error loss

test_loss = validate_model(model, test_loader, criterion, device)
print(f"Test Loss: {test_loss:.7f}")

Test Loss: 0.0001931


In [16]:
import cv2
import numpy as np

def visualize_predictions(model, dataloader, device, num_examples=3, seq_length=10):
    """
    Visualize predictions vs. real labels for a few examples from the dataset using OpenCV.
    
    Parameters:
    - model: The trained RNN model.
    - dataloader: DataLoader providing the dataset.
    - device: Device to run the model on ('cpu' or 'cuda').
    - num_examples: Number of examples to visualize.
    - seq_length: Number of consecutive images in each sequence.
    """
    model.eval()
    model.to(device)

    examples_shown = 0

    with torch.no_grad():
        for images, labels, actions in dataloader:
            # Ensure images and labels are on the correct device
            images, labels = images.to(device), labels.to(device)
            actions = actions.to(device)
            
            # Get predictions for the batch
            predictions = model(images, actions).squeeze(-1)  # Shape: (batch_size, seq_length)
            
            # Iterate through the batch
            for i in range(len(images)):
                if examples_shown >= num_examples:
                    return  # Stop after showing the required number of examples
                
                # Extract the sequence and predictions
                image_sequence = images[i][:seq_length].cpu()  # Take the first `seq_length` images
                real_labels = labels[i][:seq_length].cpu().numpy()
                predicted_labels = predictions[i][:seq_length].cpu().numpy()
                
                # Prepare OpenCV visualization
                for t in range(seq_length):
                    # Convert image tensor to a format OpenCV can display
                    image = (image_sequence[t].permute(1, 2, 0).numpy() * 255).astype(np.uint8)
                    image_bgr = cv2.cvtColor(image, cv2.COLOR_RGB2BGR)  # Convert RGB to BGR for OpenCV

                    # Overlay predictions and real labels
                    text = f"P: {predicted_labels[t]}\nR: {real_labels[t]}"
                    cv2.putText(image_bgr, text, (10, 30), cv2.FONT_HERSHEY_SIMPLEX, 0.7, (0, 255, 0), 2)

                    # Display the image
                    cv2.imshow(f"Sequence {examples_shown + 1} - Frame {t + 1}/{seq_length}", image_bgr)
                    cv2.waitKey(20000)  # Wait 500ms between frames

                cv2.destroyAllWindows()
                examples_shown += 1

In [17]:
import torch
import cv2
import numpy as np
# -------------------
# Main Script
# -------------------
if __name__ == "__main__":


    # Visualize predictions
    visualize_predictions(model, test_loader, device, num_examples=3, seq_length=10)

