In [1]:
import torch
from torch import nn
import torchvision.transforms.functional as TF
import torch.nn.functional as F
from PIL import Image
from torch.utils.data import Dataset
import numpy as np
import os
import random
from torch.utils.data import DataLoader
from torchvision.transforms import v2
import matplotlib.pyplot as plt

In [2]:
device = 'cuda' if torch.cuda.is_available() else 'cpu'
device

'cpu'

In [3]:
class DoubleConv(nn.Module):
  def __init__(self, in_channels = 3, out_channels = 5):
    super(DoubleConv, self).__init__()
    self.block = nn.Sequential(
        nn.Conv2d(in_channels, out_channels, 3, 1, 1),
        nn.BatchNorm2d(out_channels),
        nn.ReLU(inplace = True),
        nn.Conv2d(out_channels, out_channels, 3, 1, 1),
        nn.BatchNorm2d(out_channels),
        nn.ReLU(inplace = True),
    )
  def forward(self, x):
    return self.block(x)


In [4]:
class UNET(nn.Module):
  def __init__(self, in_channels, out_channels):
    super(UNET, self).__init__()
    features = [64,128,256,512]
    self.downs = nn.ModuleList()
    self.ups = nn.ModuleList()
    self.pool = nn.MaxPool2d(2, 2)

    #DOWN
    for feature in features:
      self.downs.append(DoubleConv(in_channels, feature))
      in_channels = feature

    #BOTTLE NECK
    self.bottleneck = DoubleConv(features[-1], features[-1]*2)

    #UP
    for feature in reversed(features):
      self.ups.append(nn.ConvTranspose2d(feature*2, feature, 2, 2))
      self.ups.append(DoubleConv(feature*2, feature))
    #FINAL
    self.final = nn.Conv2d(features[0], out_channels, kernel_size = 1)

  def forward(self, x):
    skip_connections = []
    for down in self.downs:
      x = down(x)
      skip_connections.append(x)
      x = self.pool(x)

    x = self.bottleneck(x)
    skip_connections = skip_connections[::-1]

    for i in range(0, len(self.ups), 2):
      x = self.ups[i](x)
      skip_connection = skip_connections[i//2]

      if x.shape != skip_connection.shape:
        skip_connection = TF.resize(skip_connection, [x.shape[2:]])

      concat = torch.cat((x, skip_connection), dim = 1)
      # print(concat.shape)
      x = self.ups[i+1](concat)

    return self.final(x)

model = UNET(3, 19).to(device)
# dummy_tensor = torch.rand(size = (1,3,512,1024))
# test_output = model(dummy_tensor.to(device))

In [5]:
train_transforms = v2.Compose([
    v2.Resize(size=(512, 1024), antialias=True),
    v2.ToTensor(),
    v2.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]),
])

val_transforms = v2.Compose([
    v2.Resize(size=(512, 1024), antialias=True),
    v2.ToTensor(),
    v2.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]),
])




In [6]:
color_map = np.array([
    [128, 64, 128],   # 0: Road
    [244, 35, 232],   # 1: Sidewalk
    [ 70, 70, 70],    # 2: Building
    [102, 102, 156],  # 3: Wall
    [190, 153, 153],  # 4: Fence
    [153, 153, 153],  # 5: Pole
    [250, 170, 30],   # 6: Traffic light
    [220, 220, 0],    # 7: Traffic sign
    [107, 142, 35],   # 8: Vegetation
    [152, 251, 152],  # 9: Terrain
    [70, 130, 180],   # 10: Sky
    [220, 20, 60],    # 11: Person
    [255, 0, 0],      # 12: Rider
    [0, 0, 142],      # 13: Car
    [0, 0, 70],       # 14: Truck
    [0, 60, 100],     # 15: Bus
    [0, 80, 100],     # 16: Train
    [0, 0, 230],      # 17: Motorcycle
    [119, 11, 32],    # 18: Bicycle
], dtype=np.uint8)

label_mapping = {-1: -1, 0: -1,
                 1: -1, 2: -1,
                 3: -1, 4: -1,
                 5: -1, 6: -1,
                 7: 0, 8: 1, 9: -1,
                 10: -1, 11: 2, 12: 3,
                 13: 4, 14: -1, 15: -1,
                 16: -1, 17: 5, 18: -1,
                 19: 6, 20: 7, 21: 8, 22: 9, 23: 10, 24: 11,
                 25: 12, 26: 13, 27: 14, 28: 15,
                 29: -1, 30: -1,
                 31: 16, 32: 17, 33: 18}

In [7]:
def print_number(img):
  for row in img:
    for col in row:
      print(col, end = " ")
    print()

In [None]:
# Quy trình xử lý mask trong Dataset
img = Image.open("ids.png")
flip_image = img.transpose(Image.FLIP_LEFT_RIGHT)
np_img = np.array(flip_image, dtype=np.int64)
vectorize_img = np.vectorize(label_mapping.get)(np_img)
tensor_img = torch.tensor(vectorize_img, dtype=torch.long)

In [None]:
tensor_img.shape

torch.Size([1024, 2048])

In [None]:
# Quy trình xử lý image trong Dataset
img = Image.open("rgb.png").convert("RGB")
flip_image = img.transpose(Image.FLIP_LEFT_RIGHT)
transformed_img = train_transforms(flip_image)

In [None]:
transformed_img.shape

torch.Size([3, 512, 1024])

In [8]:
class CityscapeDataset(Dataset):
  def __init__(self, image_dir, mask_dir, transform, flip_prob = 0.5):
    self.image_dir = image_dir
    self.mask_dir = mask_dir
    self.transform = transform
    self.flip_prob = flip_prob
    self.images = sorted([os.path.join(dp, f) for dp, dn, fn in os.walk(os.path.expanduser(self.image_dir)) for f in fn if f.endswith('.png')])
    self.masks = sorted([os.path.join(dp, f) for dp, dn, fn in os.walk(os.path.expanduser(self.mask_dir)) for f in fn if f.endswith('_labelIds.png')])

  def __len__(self):
    return len(self.images)

  def __getitem__(self, idx):
    image_path = self.images[idx]
    mask_path = self.masks[idx]
    image = Image.open(image_path).convert('RGB')
    mask = Image.open(mask_path)

    if random.random() < self.flip_prob:
      image = image.transpose(Image.FLIP_LEFT_RIGHT)
      mask = mask.transpose(Image.FLIP_LEFT_RIGHT)

    if self.transform is not None:
      image = self.transform(image)

    mask = np.array(mask, dtype=np.int64)
    mask = np.vectorize(label_mapping.get)(mask)
    mask = torch.tensor(mask, dtype=torch.long)

    return image, mask



In [None]:
# điền link theo kaggle
train_img_dir = ""
train_mask_dir = ""

val_img_dir = ""
val_mask_dir = ""

In [None]:
num_workers = os.cpu_count()
# tạo dataset rồi tạo dataloader bằng dataset
train_dataset = CityscapeDataset(train_img_dir, train_mask_dir, train_transforms, flip_prob = 0.5)
train_dataloader = DataLoader(train_dataset, batch_size = 4, shuffle=True, num_workers = num_workers)

val_dataset = CityscapeDataset(val_img_dir, val_mask_dir, val_transforms, flip_prob = 0)
val_dataloader = DataLoader(val_dataset, batch_size = 4, shuffle=False, num_workers = num_workers)

In [9]:
lr = 0.0003
WD = 0.001
# ignore -1 để khộng tính loss cho các vị trí bằng -1 ở mask ground truth
loss_fn = nn.CrossEntropyLoss(ignore_index = -1)
optimizer = torch.optim.Adam(params = model.parameters(),
                             lr = lr,
                             weight_decay= WD)

In [10]:
from tqdm.auto import tqdm

In [11]:
# test tính loss
dummy_mask = torch.randint(-1, 19, (2, 1024, 2048), dtype = torch.long)
dummy_tensor = torch.rand(size = (2,3,512,1024))
output = model(dummy_tensor.to(device))
outputs_resized = F.interpolate(output, size=(1024, 2048), mode='bilinear', align_corners=False)
outputs_resized = outputs_resized.permute(0,2,3,1)
dummy_loss = loss_fn(outputs_resized.reshape(-1, 19), dummy_mask.view(-1))
print(dummy_loss)

tensor(2.9852, grad_fn=<NllLossBackward0>)


In [12]:
# test tính acc
dummy_mask_pred = torch.argmax(outputs_resized, dim = -1)
dummy_mask_pred[dummy_mask == -1] = -1
acc = torch.eq(dummy_mask_pred, dummy_mask).sum().item()
print(acc / dummy_mask.numel())

0.09994125366210938


In [None]:
epochs = 20
for epoch in tqdm(range(epochs)):
  model.train()
  train_loss, train_acc = 0, 0
  for idx, (X, y) in enumerate(train_dataloader):
    X, y = X.to(device), y.to(device)
    mask_logits = model(X)
    # resize khớp với mask
    mask_logits = F.interpolate(mask_logits, size = (1024, 2048), mode = 'bilinear', align_corners=False)
    # permute chiều channels xuống cuối
    mask_logits = mask_logits.permute(0,2,3,1)
    # loss
    loss = loss_fn(mask_logits.reshape(-1, 19), y.view(-1))
    train_loss += loss.item()
    #update
    optimizer.zero_grad()
    loss.backward()
    optimizer.step()
    # tính pixel accuracy
    mask_pred = torch.argmax(mask_logits, dim = -1)
    mask_pred[y == -1] = -1
    acc_per_batch = torch.eq(mask_pred, y).sum().item()
    train_acc += (acc_per_batch / y.numel())


  val_loss, val_acc = 0,0
  model.eval()
  with torch.inference_mode():
    for idx, (X, y) in enumerate(val_dataloader):
      X, y = X.to(device), y.to(device)
      mask_logits = model(X)
      mask_logits = F.interpolate(mask_logits, size = (1024, 2048), mode = 'bilinear', align_corners=False)
      mask_logits = mask_logits.permute(0,2,3,1)
      loss = loss_fn(mask_logits.reshape(-1, 19), y.view(-1))
      val_loss += loss.item()

      mask_pred = torch.argmax(mask_logits, dim = -1)
      mask_pred[y == -1] = -1
      acc_per_batch = torch.eq(mask_pred, y).sum().item()
      val_acc += (acc_per_batch / y.numel())

  print(f'Epoch {epoch+1}')
  print(f'Train_loss: {train_loss / len(train_dataloader):.4f}')
  print(f'Train_acc: {train_acc / len(train_dataloader):.4f}')
  print(f'Valid_loss: {val_loss / len(val_dataloader):.4f}')
  print(f'Valid_acc: {val_acc / len(val_dataloader):.4f}')

In [13]:
# chuyển dự đoán về ảnh rgb
# argmax rồi cho ảnh về dạng cpu và numpy
outputs_resized_pred = torch.argmax(outputs_resized, dim = -1)
outputs_resized_pred = outputs_resized_pred.squeeze()
np_outputs_resized_pred = outputs_resized_pred.cpu().numpy()
# tạo một mảng 3 chiều cho ảnh rbg rồi điền value theo key của color map
rgb_image = np.zeros((1024, 2048, 3), dtype=np.uint8)
for class_id in range(color_map.shape[0]):
    rgb_image[np_outputs_resized_pred == class_id] = color_map[class_id]
# show ảnh
plt.imshow(rgb_image)
plt.title('Color Image')
plt.show()

IndexError: boolean index did not match indexed array along dimension 0; dimension is 1024 but corresponding boolean dimension is 2