<a href="https://colab.research.google.com/github/marcomag416/MLDL/blob/main/randcropColab.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Download GTA 5 dataset

In [1]:
import os
import pandas as pd
from PIL import Image
import matplotlib.pyplot as plt
from zipfile import ZipFile

gta_path = "/content/drive/MyDrive/MLDL-Proj/GTA5.zip"

gta_dataset_path = "./dataset/"

#extract zip file
with ZipFile(gta_path, 'r') as zip_ref:
    zip_ref.extractall(gta_dataset_path)


#download cityscapes dataset

In [2]:
with ZipFile("/content/drive/MyDrive/MLDL-Proj/Cityscapes.zip", 'r') as zip_ref:
    zip_ref.extractall("./dataset")

cityscape_dataset_path = "./dataset/Cityscapes/Cityspaces"

# Creating semantic segmented dataset

In [3]:
import os
import pandas as pd
from PIL import Image
import matplotlib.pyplot as plt


# Define paths
images_path = gta_dataset_path + 'GTA5/images'
labels_path = gta_dataset_path + 'GTA5/labels'

# Initialize lists to hold data
data = []

# Load images and corresponding masks
for image_filename in os.listdir(images_path):
    if image_filename.endswith('.png'):
        image_path = os.path.join(images_path, image_filename)
        mask_path = os.path.join(labels_path, image_filename)

        # Check if corresponding mask file exists
        if os.path.exists(mask_path):
            # Open image and mask to ensure they can be loaded (optional, for validation)
            try:
                image = Image.open(image_path)
                mask = Image.open(mask_path)

                # Add data to list
                data.append({
                    'image_path': image_path,
                    'mask_path': mask_path
                })
            except Exception as e:
                print(f"Error loading {image_path} or {mask_path}: {e}")

# Create a DataFrame from the data list
df = pd.DataFrame(data)

# Save the DataFrame to a CSV file
df.to_csv('gta5_segmentation_dataset.csv', index=False)

print("Semantic segmentation dataset created and saved as 'gta5_segmentation_dataset.csv'")

Semantic segmentation dataset created and saved as 'gta5_segmentation_dataset.csv'


# Download pytorchdl_gta5

One of these 2 ways should work

1. download directly from github

In [None]:
import subprocess

# Define the repository URL and destination path
repository_url = "https://github.com/marcomag416/MLDL/tree/main/pytorchdl_gta5"
destination_path = "./"

# Clone the GitHub repository
subprocess.call(["git", "clone", repository_url, destination_path])

print("Folder downloaded successfully!")

2. upload the zip file manually

In [8]:
import zipfile

# Specify the path to the zip file
file_path = "/content/pytorchdl_gta5.zip"

# Specify the destination directory for unzipping
destination_path = "/content/pytorchdl_gta5"

# Extract the contents of the zip file
with zipfile.ZipFile(file_path, 'r') as zip_ref:
    zip_ref.extractall(destination_path)

# Transformation and preparing train loader with GTA dataset

In [9]:
import os
import pandas as pd
from PIL import Image
import numpy as np
import torch
from torch.utils.data import Dataset, DataLoader
from torchvision import transforms as T
from pytorchdl_gta5.labels import GTA5Labels_TaskCV2017
import albumentations as A
from albumentations.pytorch import ToTensorV2

if __name__ != '__main__':
    raise Exception("This script should not be imported; it should be run directly.")

# Setup device agnostic code
device = "cuda" if torch.cuda.is_available() else "cpu"

# Define the custom dataset class
class GTASegmentationDataset(Dataset):
    def __init__(self, csv_file, transform=None):
        self.data_frame = pd.read_csv(csv_file)
        self.transform = transform
        self.label_mapping = self._create_label_mapping()
        self.color_jitter = T.ColorJitter(brightness=0.2, contrast=0.2, saturation=0.2, hue=0.1)
        self.gaussian_blur = T.GaussianBlur(kernel_size=(3, 7), sigma=(0.1, 5))

    def __len__(self):
        return len(self.data_frame)

    def __getitem__(self, idx):
        if torch.is_tensor(idx):
            idx = idx.tolist()

        img_name = self.data_frame.iloc[idx, 0]
        mask_name = self.data_frame.iloc[idx, 1]

        image = Image.open(img_name).convert('RGB')
        mask = Image.open(mask_name).convert('RGB')

        image = self.color_jitter(image)

        image = self.gaussian_blur(image)

        if self.transform:
            augmented = self.transform(image=np.array(image), mask=np.array(mask))
            image, mask = augmented['image'], augmented['mask']

        mask = self._map_mask(np.array(mask))

        # Convert mask to tensor without normalization
        mask = torch.from_numpy(mask).long()  # Ensure the mask is of type long for cross-entropy loss

        return image, mask

    def _create_label_mapping(self):
        label_mapping = {label.color: label.ID for label in GTA5Labels_TaskCV2017.list_}
        label_mapping[(0, 0, 0)] = 255  # Ensure unmapped colors go to 'unlabeled'
        return label_mapping

    def _map_mask(self, mask):
        new_mask = np.zeros((mask.shape[0], mask.shape[1]), dtype=np.uint8)
        for color, label_id in self.label_mapping.items():
            color_mask = np.all(mask == color, axis=-1)
            new_mask[color_mask] = label_id  # Use label_id instead of color
        return new_mask

# Define paths
csv_file = 'gta5_segmentation_dataset.csv'

# Define image transformations
transform = A.Compose([
    A.Resize(720, 1280),
    A.Normalize(mean=(0.485, 0.456, 0.406), std=(0.229, 0.224, 0.225)),
    A.RandomRotate90(),
    A.HorizontalFlip(p=0.5),
    A.VerticalFlip(p=0.5),
    A.RandomResizedCrop(height=720, width=1280, scale=(0.8, 1.0)),
    ToTensorV2()
])

# Create the dataset and dataloader
train_dataset = GTASegmentationDataset(csv_file=csv_file, transform=transform)
train_dataloader = DataLoader(train_dataset, batch_size=8, shuffle=True)

# Transformation and preparing val loader with cityscape dataset

In [10]:
import numpy as np
from PIL import Image
import torch
from torch.utils.data import Dataset
import torchvision.transforms as T
from torch.utils.data import DataLoader
import os


class Cityscapes(Dataset):
    def __init__(self, root_dir, split, transforms=None, label_type='gtFine_labelTrainIds'):
        self.root_dir = root_dir
        self.split = split
        self.transforms = transforms
        self.label_type = label_type

        self.images_dir = f"{root_dir}/images/{split}"
        self.labels_dir = f"{root_dir}/gtFine/{split}"

        self.image_paths = []
        self.label_paths = []

        # Manually iterate over directories
        cities = [city for city in os.listdir(self.images_dir) if os.path.isdir(f"{self.images_dir}/{city}")]
        for city in cities:
            img_dir_city = f"{self.images_dir}/{city}"
            lbl_dir_city = f"{self.labels_dir}/{city}"

            if not os.path.isdir(img_dir_city) or not os.path.isdir(lbl_dir_city):
                continue

            for img_file in os.listdir(img_dir_city):
                if img_file.endswith('_leftImg8bit.png'):
                    img_path = f"{img_dir_city}/{img_file}"
                    lbl_file = img_file.replace('_leftImg8bit.png', f'_{self.label_type}.png')
                    lbl_path = f"{lbl_dir_city}/{lbl_file}"

                    if os.path.isfile(img_path) and os.path.isfile(lbl_path):
                        self.image_paths.append(img_path)
                        self.label_paths.append(lbl_path)
                    else:
                        print(f"Warning: Image or label file not found for {img_file}")

    def __len__(self):
        return len(self.image_paths)

    def __getitem__(self, idx):
        img_path = self.image_paths[idx]
        lbl_path = self.label_paths[idx]

        image = Image.open(img_path).convert('RGB')
        label = Image.open(lbl_path)

        image = np.array(image)
        label = np.array(label)

        if self.transforms:
            augmented = self.transforms(image=image, mask=label)
            image, label = augmented['image'], augmented['mask']

        return image, label


# Example usage
image_transforms = A.Compose([
    A.Resize(512, 1024),
    A.Normalize(mean=(0.485, 0.456, 0.406), std=(0.229, 0.224, 0.225)),
    ToTensorV2()
])

val_dataset = Cityscapes(root_dir=cityscape_dataset_path, split='val', transforms=image_transforms)

val_dataloader = DataLoader(val_dataset, batch_size=2, shuffle=False)

print(f"Val_Dataset size: {len(val_dataset)}")

Val_Dataset size: 500


# Download biseNet model

In [12]:
import sys
import requests
from zipfile import ZipFile
from io import BytesIO
model_url = "https://github.com/ooooverflow/BiSeNet/archive/refs/heads/master.zip"

# Send a GET request to the URL
response = requests.get(model_url)
# Check if the request was successful
if response.status_code == 200:
    #print(response.content)
    # Open the downloaded bytes and extract them
    with ZipFile(BytesIO(response.content)) as zip_file:
        zip_file.extractall('./')
    print('Download and extraction complete!')

sys.path.insert(0, './BiSeNet-master')

Download and extraction complete!


In [14]:
from model.build_BiSeNet import BiSeNet
from torch import nn

# Set CUDA_LAUNCH_BLOCKING environment variable
os.environ['CUDA_LAUNCH_BLOCKING']="1"
os.environ['TORCH_USE_CUDA_DSA'] = "1"

context_path = 'resnet18'

# Initialize the model
model = BiSeNet(num_classes=19, context_path=context_path).to(device)

loss_fn = nn.CrossEntropyLoss(ignore_index=255)
optimizer = torch.optim.Adam(model.parameters(), lr=1e-3)

# Set the manual seeds
torch.manual_seed(42)
torch.cuda.manual_seed(42)

Downloading: "https://download.pytorch.org/models/resnet18-f37072fd.pth" to /root/.cache/torch/hub/checkpoints/resnet18-f37072fd.pth
100%|██████████| 44.7M/44.7M [00:00<00:00, 115MB/s]
Downloading: "https://download.pytorch.org/models/resnet101-63fe2227.pth" to /root/.cache/torch/hub/checkpoints/resnet101-63fe2227.pth
100%|██████████| 171M/171M [00:01<00:00, 167MB/s]


# Train and evaluation func

In [15]:
def train(model, optimizer_train, dataloader, loss_fn_train):
    model.train()  # Set the model to training mode
    train_loss = 0.0
    total = 0

    for idx, (inputs_train, targets_train) in enumerate(dataloader):
        inputs_train = inputs_train.to(device)
        targets_train = targets_train.to(device, dtype=torch.long)  # Move data to the appropriate device

        optimizer_train.zero_grad()  # Zero out gradients from the previous iteration
        outputs_train, cx1_sup, cx2_sup = model(inputs_train)  # Forward pass
        # print(outputs_train.shape, targets_train.shape)
        loss = loss_fn_train(outputs_train, targets_train)  # Calculate the loss

        aux_loss1 = loss_fn_train(cx1_sup, targets_train)
        aux_loss2 = loss_fn_train(cx2_sup, targets_train)

        loss = loss + aux_loss1 + aux_loss2

        loss.backward()  # Backward pass
        optimizer_train.step()  # Update the weights

        train_loss += loss.item() * inputs_train.size(0)  # Accumulate the total loss
        _, predicted_train = outputs_train.max(1)
        total += targets_train.size(0)

    # Calculate average loss for the epoch
    avg_loss = train_loss / total

    return avg_loss


def compute_iou(pred, target, num_classes):
    ious = []
    pred = pred.view(-1)
    target = target.view(-1)

    for cls in range(num_classes):
        pred_inds = (pred == cls)
        target_inds = (target == cls)
        intersection = (pred_inds[target_inds]).sum().item()
        union = pred_inds.sum().item() + target_inds.sum().item() - intersection
        if union == 0:
            ious.append(float('nan'))  # If there is no union, set IoU to NaN
        else:
            ious.append(intersection / union)

    return np.array(ious)

def eval(model, dataloader, loss_fn, device, num_classes=19):
    model.eval()  # Set the model to evaluation mode
    test_loss = 0.0
    total = 0
    all_ious = []  # List to store IoUs for each batch

    with torch.no_grad():  # Disable gradient calculation during inference
        for inputs_test, targets_test in dataloader:
            inputs_test, targets_test = inputs_test.to(device), targets_test.to(device, dtype=torch.long)

            outputs_test = model(inputs_test)  # Forward pass
            loss = loss_fn(outputs_test, targets_test)  # Calculate the loss

            test_loss += loss.item() * inputs_test.size(0)  # Accumulate the total loss
            _, predicted_test = outputs_test.max(1)
            total += targets_test.size(0)

            # Compute IoU for this batch
            batch_ious = compute_iou(predicted_test, targets_test, num_classes)
            all_ious.append(batch_ious)

    # Calculate average loss
    avg_loss = test_loss / total

    # Calculate mean IoU
    all_ious = np.array(all_ious)
    mean_iou = np.nanmean(all_ious, axis=0)  # Mean IoU for each class
    miou = np.nanmean(mean_iou)  # Mean IoU across all classes

    return avg_loss, miou

# Training

In [None]:
from timeit import default_timer as timer
start_time = timer()

# Setup training and save the results
for _ in range(50):
    train(model, optimizer, train_dataloader, loss_fn)
    avg_loss, miou = eval(model, val_dataloader, loss_fn, device=device)
    print(f"Loss: {avg_loss}, mIoU: {miou*100:.2f}%")

# End the timer and print out how long it took
end_time = timer()
print(f"[INFO] Total training time: {end_time - start_time:.3f} seconds")