In [1]:
import torch
from torch import nn
from torch.utils.data import Dataset, DataLoader
from torchvision import transforms
import numpy as np
from PIL import Image
import os
from tqdm import tqdm

In [2]:
# Create the mapping between original labels
OG_CLASSES2LABELS = {
  0: 0,     # "unlabeled"
  1: 0,     # "outlier" mapped to "unlabeled" --------------------------mapped
  10: 1,     # "car"
  11: 2,     # "bicycle"
  13: 5,     # "bus" mapped to "other-vehicle" --------------------------mapped
  15: 3,     # "motorcycle"
  16: 5,     # "on-rails" mapped to "other-vehicle" ---------------------mapped
  18: 4,     # "truck"
  20: 5,     # "other-vehicle"
  30: 6,     # "person"
  31: 7,     # "bicyclist"
  32: 8,     # "motorcyclist"
  40: 9,     # "road"
  44: 10,    # "parking"
  48: 11,    # "sidewalk"
  49: 12,    # "other-ground"
  50: 13,    # "building"
  51: 14,    # "fence"
  52: 0,    # "other-structure" mapped to "unlabeled" ------------------mapped
  60: 9,     # "lane-marking" to "road" ---------------------------------mapped
  70: 15,    # "vegetation"
  71: 16,    # "trunk"
  72: 17,    # "terrain"
  80: 18,    # "pole"
  81: 19,    # "traffic-sign"
  99: 0,     # "other-object" to "unlabeled" ----------------------------mapped
  252: 1,    # "moving-car" to "car" ------------------------------------mapped
  253: 7,    # "moving-bicyclist" to "bicyclist" ------------------------mapped
  254: 6,    # "moving-person" to "person" ------------------------------mapped
  255: 8,    # "moving-motorcyclist" to "motorcyclist" ------------------mapped
  256: 5,    # "moving-on-rails" mapped to "other-vehicle" --------------mapped
  257: 5,    # "moving-bus" mapped to "other-vehicle" -------------------mapped
  258: 4,    # "moving-truck" to "truck" --------------------------------mapped
  259: 5    # "moving-other"-vehicle to "other-vehicle" ----------------mapped
}

In [3]:
LABELS2OG_CLASSES = {
  0: 0,    # "unlabeled", and others ignored
  1: 10,     # "car"
  2: 11,     # "bicycle"
  3: 15,     # "motorcycle"
  4: 18,     # "truck"
  5: 20,     # "other-vehicle"
  6: 30,     # "person"
  7: 31,     # "bicyclist"
  8: 32,     # "motorcyclist"
  9: 40,     # "road"
  10: 44,    # "parking"
  11: 48,    # "sidewalk"
  12: 49,    # "other-ground"
  13: 50,    # "building"
  14: 51,    # "fence"
  15: 70,    # "vegetation"
  16: 71,    # "trunk"
  17: 72,    # "terrain"
  18: 80,    # "pole"
  19: 81    # "traffic-sign"
}

In [4]:
class SemanticKittiDataset(Dataset):
    def __init__(self, image_dir, mask_dir, transform=None, cls_map: dict = OG_CLASSES2LABELS):
        self.image_dir = image_dir
        self.mask_dir = mask_dir
        self.transform = transform
        self.images = os.listdir(image_dir)
        self.mask_mapping = cls_map

    def __len__(self):
        return len(self.images)

    def __getitem__(self, idx):
        img_name = os.path.join(self.image_dir, self.images[idx])
        mask_name = os.path.join(self.mask_dir, f"m{self.images[idx].replace('png', 'npy')[1:]}")
        image = Image.open(img_name)
        # there is a general problem with loading depth 16bit image into PIL
        # it fixed the issue, but maybe there exist a better solution
        image = image.convert("I")
        mask = np.loadtxt(mask_name, delimiter=" ")
        # map the original mask into proper ranges
        mask = np.vectorize(self.mask_mapping.get)(mask).astype(np.uint8)
        sample = {'image': image, 'mask': mask}
        if self.transform:
            sample = self.transform(sample)
        return sample

In [5]:
class Transformations(object):
    """Apply Resize and ToTensor transformations."""
    def __init__(self, output_size):
        self.output_size = output_size

    def __call__(self, sample):
        image, mask = sample['image'], sample['mask']
        resize = transforms.Resize(self.output_size, interpolation=Image.NEAREST)
        # image = resize(image)
        mask = Image.fromarray(mask)
        # mask = resize(mask)
        image = np.array(image, dtype=np.float32)
        mask = np.array(mask, dtype=np.int64)
        image /= 65535.0  # normalize 16-bit image
        image = image.reshape((1, image.shape[0], image.shape[1]))
        return {'image': torch.from_numpy(image),
                'mask': torch.from_numpy(mask)}

In [6]:
# images are resized into 256x256
train_image_dir = 'data/lidar_png/00/'
train_mask_dir = 'data/masks/00/'
train_dataset = SemanticKittiDataset(image_dir=train_image_dir, mask_dir=train_mask_dir,
                                           transform=Transformations(output_size=(256, 256)),
                                           cls_map = OG_CLASSES2LABELS)
valid_image_dir = 'data/lidar_png/02/'
valid_mask_dir = 'data/masks/02/'
val_dataset = SemanticKittiDataset(image_dir=valid_image_dir, mask_dir=valid_mask_dir,
                                             transform=Transformations(output_size=(256, 256)),
                                             cls_map = OG_CLASSES2LABELS)

In [11]:
# define data loaders
BATCH_SIZE = 16
train_loader = DataLoader(train_dataset, batch_size=BATCH_SIZE, shuffle=True)
val_loader = DataLoader(val_dataset, batch_size=BATCH_SIZE, shuffle=False)

# Model

In [12]:
import torch
import torch.nn as nn
import torchvision.transforms.functional as TF

class UNET(nn.Module):

    def __init__(self, in_channels=3, classes=1):
        super(UNET, self).__init__()
        self.layers = [in_channels, 64, 128, 256, 512, 1024]

        self.double_conv_downs = nn.ModuleList(
            [self.__double_conv(layer, layer_n) for layer, layer_n in zip(self.layers[:-1], self.layers[1:])])

        self.up_trans = nn.ModuleList(
            [nn.ConvTranspose2d(layer, layer_n, kernel_size=2, stride=2)
             for layer, layer_n in zip(self.layers[::-1][:-2], self.layers[::-1][1:-1])])

        self.double_conv_ups = nn.ModuleList(
        [self.__double_conv(layer, layer//2) for layer in self.layers[::-1][:-2]])

        self.max_pool_2x2 = nn.MaxPool2d(kernel_size=2, stride=2)

        self.final_conv = nn.Conv2d(64, classes, kernel_size=1)


    def __double_conv(self, in_channels, out_channels):
        conv = nn.Sequential(
            nn.Conv2d(in_channels, out_channels, kernel_size=3, padding=1, bias=False),
            nn.BatchNorm2d(out_channels),
            nn.ReLU(inplace=True),
            nn.Conv2d(out_channels, out_channels, kernel_size=3, padding=1),
            nn.ReLU(inplace=True)
        )
        return conv

    def forward(self, x):
        # down layers
        concat_layers = []

        for down in self.double_conv_downs:
            x = down(x)
            if down != self.double_conv_downs[-1]:
                concat_layers.append(x)
                x = self.max_pool_2x2(x)

        concat_layers = concat_layers[::-1]

        # up layers
        for up_trans, double_conv_up, concat_layer  in zip(self.up_trans, self.double_conv_ups, concat_layers):
            x = up_trans(x)
            if x.shape != concat_layer.shape:
                x = TF.resize(x, concat_layer.shape[2:])

            concatenated = torch.cat((concat_layer, x), dim=1)
            x = double_conv_up(concatenated)

        x = self.final_conv(x)
        # x = torch.nn.functional.softmax(x, dim = 1)

        return x

In [13]:
model = UNET(in_channels=1, classes = 20)
model = model.cuda()

In [14]:
# define training parameters
LEARNING_RATE = 0.001
EPOCHS = 10
optimizer = torch.optim.Adam(model.parameters(), lr=LEARNING_RATE)
loss_fn = nn.CrossEntropyLoss()

In [None]:
min_valid_loss = np.inf
history = {'train_losses': [], 'valid_losses': []}
os.environ['CUDA_LAUNCH_BLOCKING'] = '1'
model.cuda()
model.train()

for epoch in range(EPOCHS):
    model.train()
    train_batch_losses = []
    for data in tqdm(train_loader, desc='Training Batches', leave=False):
        model.zero_grad()
        optimizer.zero_grad()
        images = data["image"]
        labels = data["mask"]
        images = images.cuda()
        targets = labels.cuda()
        outputs = model(images)
        # random_output = torch.rand((32, 20, 256, 256), requires_grad = True)
        # random_output = random_output.cuda()
        loss = loss_fn(outputs, targets)
        loss.backward()
        optimizer.step()
        train_batch_losses.append(loss.item())
    train_loss = np.sum(train_batch_losses) / len(train_batch_losses)
    history['train_losses'].append(train_loss)

    model.eval()
    fin_targets=[]
    fin_outputs=[]
    valid_batch_losses=[]
    with torch.no_grad():
        for data in val_loader:
            images = data["image"]
            labels = data["mask"]
            images = images.cuda()
            targets = labels.cuda()
            outputs = model(images)

            loss = loss_fn(outputs, targets)
            valid_batch_losses.append(loss.item())
        valid_loss = np.sum(valid_batch_losses) / len(valid_batch_losses)
        history['valid_losses'].append(valid_loss)

    if min_valid_loss > valid_loss:
        torch.save(model.state_dict(), 'best_model.pth')
        min_valid_loss = valid_loss

    print(f'Epoch {epoch+1} \t\t Training Loss: {train_loss} \t\t Validation Loss: {valid_loss}')


    torch.save(model.state_dict(), 'final_model.pth')

In [9]:
# take next sample from data loader
sample = next(iter(train_loader))
image = sample['image']
mask = sample['mask']

In [19]:
mask.shape

torch.Size([16, 256, 256])

In [None]:
loss = loss_fn(image, mask)