In [1]:
import torch
import torch.nn as nn
from torch import Tensor
from typing import List
import os
import numpy as np
from PIL import Image
from torch.utils.data import Dataset, DataLoader
from torchvision import transforms
from skimage import io
from torch import nn
from torch import Tensor
from typing import List


In [2]:
class ConvNextStem(nn.Sequential):
    def __init__(self, in_features: int, out_features: int):
        super().__init__(
            ConvNormAct(
                in_features, out_features, kernel_size=7, stride=2
            ),
            nn.MaxPool2d(kernel_size=3, stride=2, padding=1),
        )

In [3]:
class ConvNextEncoder(nn.Module):
    def __init__(
        self,
        in_channels: int,
        stem_features: int,
        depths: List[int],
        widths: List[int],
        drop_p: float = .0,
    ):
        super().__init__()
        self.stem = ConvNextStem(in_channels, stem_features)

        in_out_widths = list(zip(widths, widths[1:]))
        # create drop paths probabilities (one for each stage)
        drop_probs = [x.item() for x in torch.linspace(0, drop_p, sum(depths))] 
        
        self.stages = nn.ModuleList(
            [
                ConvNexStage(stem_features, widths[0], depths[0], drop_p=drop_probs[0]),
                *[
                    ConvNexStage(in_features, out_features, depth, drop_p=drop_p)
                    for (in_features, out_features), depth, drop_p in zip(
                        in_out_widths, depths[1:], drop_probs[1:]
                    )
                ],
            ]
        )
        

    def forward(self, x):
        x = self.stem(x)
        for stage in self.stages:
            x = stage(x)
        return x

In [16]:
image = torch.rand(1, 3, 224, 224)
encoder = ConvNextEncoder(in_channels=3, stem_features=64, depths=[3,4,6,4], widths=[256, 512, 1024, 2048])
encoder(image).shape

torch.Size([1, 2048, 7, 7])

In [17]:
encoder = ConvNextEncoder(in_channels=3, stem_features=64, depths=[3,3,9,3], widths=[256, 512, 1024, 2048])

In [4]:
class ConvNextStem(nn.Sequential):
    def __init__(self, in_features: int, out_features: int):
        super().__init__(
            nn.Conv2d(in_features, out_features, kernel_size=4, stride=4),
            nn.BatchNorm2d(out_features)
        )

In [5]:
from torchvision.ops import StochasticDepth

class LayerScaler(nn.Module):
    def __init__(self, init_value: float, dimensions: int):
        super().__init__()
        self.gamma = nn.Parameter(init_value * torch.ones((dimensions)), 
                                    requires_grad=True)
        
    def forward(self, x):
        return self.gamma[None,...,None,None] * x

class BottleNeckBlock(nn.Module):
    def __init__(
        self,
        in_features: int,
        out_features: int,
        expansion: int = 4,
        drop_p: float = .0,
        layer_scaler_init_value: float = 1e-6,
    ):
        super().__init__()
        expanded_features = out_features * expansion
        self.block = nn.Sequential(
            # narrow -> wide (with depth-wise and bigger kernel)
            nn.Conv2d(
                in_features, in_features, kernel_size=7, padding=3, bias=False, groups=in_features
            ),
            # GroupNorm with num_groups=1 is the same as LayerNorm but works for 2D data
            nn.GroupNorm(num_groups=1, num_channels=in_features),
            # wide -> wide 
            nn.Conv2d(in_features, expanded_features, kernel_size=1),
            nn.GELU(),
            # wide -> narrow
            nn.Conv2d(expanded_features, out_features, kernel_size=1),
        )
        self.layer_scaler = LayerScaler(layer_scaler_init_value, out_features)
        self.drop_path = StochasticDepth(drop_p, mode="batch")

        
    def forward(self, x: Tensor) -> Tensor:
        res = x
        x = self.block(x)
        x = self.layer_scaler(x)
        x = self.drop_path(x)
        x += res
        return x

In [6]:
class ConvNexStage(nn.Sequential):
    def __init__(
        self, in_features: int, out_features: int, depth: int, **kwargs
    ):
        super().__init__(
            # add the downsampler
            nn.Sequential(
                nn.GroupNorm(num_groups=1, num_channels=in_features),
                nn.Conv2d(in_features, out_features, kernel_size=2, stride=2)
            ),
            *[
                BottleNeckBlock(out_features, out_features, **kwargs)
                for _ in range(depth)
            ],
        )

In [21]:
stage = ConvNexStage(32, 62, depth=1)
stage(torch.randn(1, 32, 14, 14)).shape

torch.Size([1, 62, 7, 7])

In [7]:
class ClassificationHead(nn.Sequential):
    def __init__(self, num_channels: int, num_classes: int = 1000):
        super().__init__(
            nn.AdaptiveAvgPool2d((1, 1)),
            nn.Flatten(1),
            nn.LayerNorm(num_channels),
            nn.Linear(num_channels, num_classes)
        )
    
    
class ConvNextForImageClassification(nn.Sequential):
    def __init__(self,  
                 in_channels: int,
                 stem_features: int,
                 depths: List[int],
                 widths: List[int],
                 drop_p: float = .0,
                 num_classes: int = 1000):
        super().__init__()
        self.encoder = ConvNextEncoder(in_channels, stem_features, depths, widths, drop_p)
        self.head = ClassificationHead(widths[-1], num_classes)

In [23]:
image = torch.rand(1, 3, 224, 224)
classifier = ConvNextForImageClassification(in_channels=3, stem_features=64, depths=[3,4,6,4], widths=[256, 512, 1024, 2048])
classifier(image).shape

torch.Size([1, 1000])

In [8]:
# Modified ConvNextStem for KITTI images (typically 375x1242)
class ConvNextStem(nn.Sequential):
    def __init__(self, in_features: int, out_features: int):
        super().__init__(
            nn.Conv2d(in_features, out_features, kernel_size=4, stride=2, padding=1),
            nn.BatchNorm2d(out_features)
        )

# Modified ConvNext for Object Detection
class ConvNextForObjectDetection(nn.Module):
    def __init__(self,  
                 in_channels: int = 3,  # KITTI uses RGB images
                 stem_features: int = 96,
                 depths: List[int] = [3, 3, 9, 3],
                 widths: List[int] = [96, 192, 384, 768],
                 drop_p: float = 0.1,
                 num_classes: int = 8):  # KITTI has 8 main classes
        super().__init__()
        self.encoder = ConvNextEncoder(in_channels, stem_features, depths, widths, drop_p)
        
        # Detection head instead of classification head
        self.detection_head = nn.Sequential(
            nn.AdaptiveAvgPool2d((1, 1)),
            nn.Flatten(1),
            nn.LayerNorm(widths[-1]),
            nn.Linear(widths[-1], 512),
            nn.ReLU(),
            nn.Dropout(p=0.5),
            nn.Linear(512, num_classes * 5)  # 5 values per class (class, x, y, w, h)
        )
    
    def forward(self, x):
        features = self.encoder(x)
        detections = self.detection_head(features)
        batch_size = detections.shape[0]
        return detections.view(batch_size, -1, 5)  # Reshape to (batch_size, num_classes, 5)

In [9]:
# Configuration for KITTI dataset
class KITTIConfig:
    NUM_CLASSES = 8  # Main KITTI classes
    IMAGE_SIZE = (375, 1242)  # Standard KITTI image size
    BATCH_SIZE = 16
    LEARNING_RATE = 1e-4
    EPOCHS = 100
    
    # Class mapping
    CLASS_MAPPING = {
        'Car': 0,
        'Van': 1,
        'Truck': 2,
        'Pedestrian': 3,
        'Cyclist': 4,
        'Person_sitting': 5,
        'Tram': 6,
        'Misc': 7
    }

# Loss function for object detection
class DetectionLoss(nn.Module):
    def __init__(self):
        super().__init__()
        self.classification_loss = nn.CrossEntropyLoss()
        self.regression_loss = nn.SmoothL1Loss()
    
    def forward(self, predictions, targets):
        class_pred = predictions[..., 0]
        bbox_pred = predictions[..., 1:]
        class_target = targets[..., 0]
        bbox_target = targets[..., 1:]
        
        class_loss = self.classification_loss(class_pred, class_target)
        bbox_loss = self.regression_loss(bbox_pred, bbox_target)
        
        return class_loss + bbox_loss

In [15]:
class KITTIDataset(Dataset):
    def __init__(self, root_dir, split='train', transform=None):
        self.root_dir = root_dir
        self.split = split
        self.transform = transform
        
        # Define paths for images and labels
        self.image_dir = os.path.join(root_dir, split, 'image_2')
        self.label_dir = os.path.join(root_dir, 'label_2')
        
        # Get all image files
        self.images = sorted(os.listdir(self.image_dir))

    def __len__(self):
        return len(self.images)

    def __getitem__(self, idx):
        if torch.is_tensor(idx):
            idx = idx.tolist()

        # Load image
        img_name = os.path.join(self.image_dir, self.images[idx])
        image = io.imread(img_name)
        
        # Load labels
        label_name = os.path.join(self.label_dir, self.images[idx].replace('.png', '.txt'))
        boxes = []
        labels = []
        
        with open(label_name, 'r') as f:
            for line in f:
                data = line.strip().split()
                category = data[0]
                if category not in ['DontCare']:  # Skip DontCare objects
                    bbox = [float(x) for x in data[4:8]]  # [x1, y1, x2, y2]
                    boxes.append(bbox)
                    labels.append(KITTIConfig.CLASS_MAPPING[category])

        # Convert to tensor
        boxes = torch.as_tensor(boxes, dtype=torch.float32)
        labels = torch.as_tensor(labels, dtype=torch.int64)
        
        target = {
            'boxes': boxes,
            'labels': labels
        }

        if self.transform:
            image = self.transform(image)

        return image, target

def collate_fn(batch):
    images, targets = zip(*batch)
    
    # Find the maximum width and height in the batch
    max_height = max(image.size(1) for image in images)
    max_width = max(image.size(2) for image in images)

    padded_images = []
    for image in images:
        # Create a new tensor filled with zeros (black image) with max dimensions
        padded_image = torch.zeros((image.size(0), max_height, max_width))
        # Paste the original image into the padded image
        padded_image[:, :image.size(1), :image.size(2)] = image
        padded_images.append(padded_image)

    # Stack padded images into a single tensor
    images = torch.stack(padded_images)

    return images, targets

# Create train loader
train_dataset = KITTIDataset(
    root_dir='/data/cmpe258-sp24/010892622/data/KITTI',
    split='training',
    transform=transforms.Compose([
        transforms.ToTensor(),
        transforms.Normalize(mean=[0.485, 0.456, 0.406], 
                           std=[0.229, 0.224, 0.225])
    ])
)
# Modified DataLoader configuration
train_loader = DataLoader(
    dataset,
    batch_size=4,  # Reduced batch size to avoid memory issues
    shuffle=True,
    num_workers=4,
    collate_fn=lambda batch: tuple(zip(*batch)),  # Important for object detection
    pin_memory=True
)

# train_loader = DataLoader(
#     train_dataset,
#     batch_size=2,
#     shuffle=True,
#     num_workers=4,
#     collate_fn=collate_fn,  # Required for handling variable number of objects
#     pin_memory=True
# )

In [11]:
def train_model(model, train_loader, criterion, optimizer, num_epochs):
    device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
    model = model.to(device)
    
    for epoch in range(num_epochs):
        model.train()
        running_loss = 0.0
        
        for images, targets in train_loader:
            images = images.to(device)  # Move the entire batch to the device
            targets = [{k: v.to(device) for k, v in target.items()} for target in targets]  # Move each target to device

            optimizer.zero_grad()
            outputs = model(images)  # This should now work correctly
            loss = criterion(outputs, targets)

            loss.backward()
            optimizer.step()

            running_loss += loss.item()
            
        epoch_loss = running_loss / len(train_loader)
        print(f'Epoch [{epoch+1}/{num_epochs}], Loss: {epoch_loss:.4f}')

In [18]:
def train_model(model, train_loader, criterion, optimizer, num_epochs):
    device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
    model = model.to(device)
    
    for epoch in range(num_epochs):
        model.train()
        running_loss = 0.0
        
        for images, targets in train_loader:
            images = images.to(device)
            
            # Convert targets to device
            targets = [{k: v.to(device) for k, v in t.items()} for t in targets]
            
            optimizer.zero_grad()
            outputs = model(images)
            loss = criterion(outputs, targets)
            
            loss.backward()
            optimizer.step()
            
            running_loss += loss.item()
        
        epoch_loss = running_loss / len(train_loader)
        print(f'Epoch [{epoch+1}/{num_epochs}], Loss: {epoch_loss:.4f}')

In [17]:
class DetectionLoss(nn.Module):
    def __init__(self):
        super().__init__()
        self.classification_loss = nn.CrossEntropyLoss()
        self.regression_loss = nn.SmoothL1Loss(reduction='mean')
    
    def forward(self, predictions, targets):
        # Convert list of dictionaries to appropriate tensor format
        batch_labels = []
        batch_boxes = []
        
        for target in targets:
            batch_labels.append(target['labels'])
            batch_boxes.append(target['boxes'])
        
        # Stack tensors if possible, otherwise handle them individually
        try:
            target_labels = torch.cat(batch_labels)
            target_boxes = torch.cat(batch_boxes)
        except:
            target_labels = torch.stack([t['labels'][0] for t in targets])
            target_boxes = torch.stack([t['boxes'][0] for t in targets])

        # Reshape predictions to match target format
        pred_class = predictions[..., 0]  # Classification predictions
        pred_boxes = predictions[..., 1:]  # Bounding box predictions
        
        # Calculate losses
        cls_loss = self.classification_loss(pred_class, target_labels)
        box_loss = self.regression_loss(pred_boxes, target_boxes)
        
        # Combine losses
        total_loss = cls_loss + box_loss
        
        return total_loss

In [19]:
# Model initialization
model = ConvNextForObjectDetection(
    in_channels=3,
    stem_features=96,
    depths=[3, 3, 9, 3],
    widths=[96, 192, 384, 768],
    drop_p=0.1,
    num_classes=KITTIConfig.NUM_CLASSES
)

# Initialize the loss and optimizer
criterion = DetectionLoss()
optimizer = torch.optim.AdamW(model.parameters(), lr=0.001)
num_epochs = 3

train_model(model, train_loader, criterion, optimizer, num_epochs)

ValueError: Expected input batch_size (2) to match target batch_size (12).