In [50]:
import os
import numpy as np
import pandas as pd
from sklearn.model_selection import train_test_split
import torch
from torch.utils.data import Dataset, DataLoader
from torchvision import transforms as T
import cv2
import albumentations as A
from PIL import Image

IMAGE_PATH = "data/dataset/semantic_drone_dataset/original_images/"
MASK_PATH = "data/dataset/semantic_drone_dataset/label_images_semantic/"


def create_df():
    name = []
    for dirname, _, filenames in os.walk(IMAGE_PATH):
        for filename in filenames:
            name.append(filename.split(".")[0])
    return pd.DataFrame({"id": name}, index=np.arange(0, len(name)))


def get_data_splits():
    df = create_df()
    X_trainval, X_test = train_test_split(
        df["id"].values, test_size=0.1, random_state=19
    )
    X_train, X_val = train_test_split(X_trainval, test_size=0.15, random_state=19)
    return X_train, X_val, X_test


class DroneDataset(Dataset):
    def __init__(self, img_path, mask_path, X, mean, std, transform=None, patch=False):
        self.img_path = img_path
        self.mask_path = mask_path
        self.X = X
        self.transform = transform
        self.patches = patch
        self.mean = mean
        self.std = std

    def __len__(self):
        return len(self.X)

    def __getitem__(self, idx):
        img = cv2.imread(self.img_path + self.X[idx] + ".jpg")
        img = cv2.cvtColor(img, cv2.COLOR_BGR2RGB)
        mask = cv2.imread(self.mask_path + self.X[idx] + ".png", cv2.IMREAD_GRAYSCALE)

        if self.transform is not None:
            aug = self.transform(image=img, mask=mask)
            img = Image.fromarray(aug["image"])
            mask = aug["mask"]

        if self.transform is None:
            img = Image.fromarray(img)

        t = T.Compose([T.ToTensor(), T.Normalize(self.mean, self.std)])
        img = t(img)
        mask = torch.from_numpy(mask).long()

        if self.patches:
            img, mask = self.tiles(img, mask)

        return img, mask

    def tiles(self, img, mask):
        img_patches = img.unfold(1, 512, 512).unfold(2, 768, 768)
        img_patches = img_patches.contiguous().view(3, -1, 512, 768)
        img_patches = img_patches.permute(1, 0, 2, 3)

        mask_patches = mask.unfold(0, 512, 512).unfold(1, 768, 768)
        mask_patches = mask_patches.contiguous().view(-1, 512, 768)

        return img_patches, mask_patches


class DroneTestDataset(Dataset):
    def __init__(self, img_path, mask_path, X, transform=None):
        self.img_path = img_path
        self.mask_path = mask_path
        self.X = X
        self.transform = transform

    def __len__(self):
        return len(self.X)

    def __getitem__(self, idx):
        img = cv2.imread(self.img_path + self.X[idx] + ".jpg")
        img = cv2.cvtColor(img, cv2.COLOR_BGR2RGB)
        mask = cv2.imread(self.mask_path + self.X[idx] + ".png", cv2.IMREAD_GRAYSCALE)

        if self.transform is not None:
            aug = self.transform(image=img, mask=mask)
            img = Image.fromarray(aug["image"])
            mask = aug["mask"]

        if self.transform is None:
            img = Image.fromarray(img)

        mask = torch.from_numpy(mask).long()

        return img, mask


def get_data_loaders(batch_size=16):
    X_train, X_val, X_test = get_data_splits()

    mean = [0.485, 0.456, 0.406]
    std = [0.229, 0.224, 0.225]

    t_train = A.Compose(
        [
            A.Resize(704, 1056, interpolation=cv2.INTER_NEAREST),
            A.HorizontalFlip(),
            A.VerticalFlip(),
            A.GridDistortion(p=0.2),
            A.RandomBrightnessContrast((0, 0.5), (0, 0.5)),
            A.GaussNoise(),
        ]
    )

    t_val = A.Compose(
        [
            A.Resize(704, 1056, interpolation=cv2.INTER_NEAREST),
            A.HorizontalFlip(),
            A.GridDistortion(p=0.2),
        ]
    )

    train_set = DroneDataset(
        IMAGE_PATH, MASK_PATH, X_train, mean, std, t_train, patch=False
    )
    val_set = DroneDataset(IMAGE_PATH, MASK_PATH, X_val, mean, std, t_val, patch=False)

    train_loader = DataLoader(train_set, batch_size=batch_size, shuffle=True)
    val_loader = DataLoader(val_set, batch_size=batch_size, shuffle=True)

    t_test = A.Resize(768, 1152, interpolation=cv2.INTER_NEAREST)
    test_set = DroneTestDataset(IMAGE_PATH, MASK_PATH, X_test, transform=t_test)

    return train_loader, val_loader, test_set


In [51]:
def pixel_accuracy(output, mask):
    with torch.no_grad():
        output = torch.argmax(F.softmax(output, dim=1), dim=1)
        correct = torch.eq(output, mask).int()
        accuracy = float(correct.sum()) / float(correct.numel())
    return accuracy


def mIoU(pred_mask, mask, smooth=1e-10, n_classes=23):
    with torch.no_grad():
        pred_mask = F.softmax(pred_mask, dim=1)
        pred_mask = torch.argmax(pred_mask, dim=1)
        pred_mask = pred_mask.contiguous().view(-1)
        mask = mask.contiguous().view(-1)

        iou_per_class = []
        for clas in range(0, n_classes):  # loop per pixel class
            true_class = pred_mask == clas
            true_label = mask == clas

            if true_label.long().sum().item() == 0:  # no exist label in this loop
                iou_per_class.append(np.nan)
            else:
                intersect = (
                    torch.logical_and(true_class, true_label).sum().float().item()
                )
                union = torch.logical_or(true_class, true_label).sum().float().item()

                iou = (intersect + smooth) / (union + smooth)
                iou_per_class.append(iou)
        return np.nanmean(iou_per_class)

In [52]:
import torch
import torch.nn as nn
import torch.optim as optim
import segmentation_models_pytorch as smp

from tqdm.notebook import tqdm

import time
import numpy as np
from metrics import mIoU, pixel_accuracy
import os
from torchvision import transforms  # Change this line

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")


def create_model():
    return smp.Unet(
        "mobilenet_v2",
        encoder_weights="imagenet",
        classes=23,
        activation=None,
        encoder_depth=5,
        decoder_channels=[256, 128, 64, 32, 16],
    )


def get_lr(optimizer):
    for param_group in optimizer.param_groups:
        return param_group["lr"]


def fit(
    epochs,
    model,
    train_loader,
    val_loader,
    criterion,
    optimizer,
    scheduler,
    patch=False,
    checkpoint_dir="checkpoints",
):
    torch.cuda.empty_cache()
    train_losses = []
    test_losses = []
    val_iou = []
    val_acc = []
    train_iou = []
    train_acc = []
    lrs = []
    min_loss = np.inf
    decrease = 1
    not_improve = 0

    # Create checkpoint directory if it doesn't exist
    os.makedirs(checkpoint_dir, exist_ok=True)

    model.to(device)
    fit_time = time.time()
    for e in range(epochs):
        since = time.time()
        running_loss = 0
        iou_score = 0
        accuracy = 0
        # training loop
        model.train()
        for i, data in enumerate(tqdm(train_loader)):
            # training phase
            image_tiles, mask_tiles = data
            if patch:
                bs, n_tiles, c, h, w = image_tiles.size()
                image_tiles = image_tiles.view(-1, c, h, w)
                mask_tiles = mask_tiles.view(-1, h, w)

            image = image_tiles.to(device)
            mask = mask_tiles.to(device)
            # forward
            output = model(image)
            loss = criterion(output, mask)
            # evaluation metrics
            iou_score += mIoU(output, mask)
            accuracy += pixel_accuracy(output, mask)
            # backward
            loss.backward()
            optimizer.step()  # update weight
            optimizer.zero_grad()  # reset gradient

            # step the learning rate
            lrs.append(get_lr(optimizer))
            scheduler.step()

            running_loss += loss.item()

        else:
            model.eval()
            test_loss = 0
            test_accuracy = 0
            val_iou_score = 0
            # validation loop
            with torch.no_grad():
                for i, data in enumerate(tqdm(val_loader)):
                    # reshape to 9 patches from single image, delete batch size
                    image_tiles, mask_tiles = data

                    if patch:
                        bs, n_tiles, c, h, w = image_tiles.size()
                        image_tiles = image_tiles.view(-1, c, h, w)
                        mask_tiles = mask_tiles.view(-1, h, w)

                    image = image_tiles.to(device)
                    mask = mask_tiles.to(device)
                    output = model(image)
                    # evaluation metrics
                    val_iou_score += mIoU(output, mask)
                    test_accuracy += pixel_accuracy(output, mask)
                    # loss
                    loss = criterion(output, mask)
                    test_loss += loss.item()

            # calculation mean for each batch
            train_losses.append(running_loss / len(train_loader))
            test_losses.append(test_loss / len(val_loader))

            if min_loss > (test_loss / len(val_loader)):
                print(
                    "Loss Decreasing.. {:.3f} >> {:.3f} ".format(
                        min_loss, (test_loss / len(val_loader))
                    )
                )
                min_loss = test_loss / len(val_loader)
                decrease += 1
                if decrease % 5 == 0:
                    print("saving model...")
                    torch.save(
                        model,
                        "Unet-Mobilenet_v2_mIoU-{:.3f}.pt".format(
                            val_iou_score / len(val_loader)
                        ),
                    )

            if (test_loss / len(val_loader)) > min_loss:
                not_improve += 1
                min_loss = test_loss / len(val_loader)
                print(f"Loss Not Decrease for {not_improve} time")
                if not_improve == 7:
                    print("Loss not decrease for 7 times, Stop Training")
                    break

            # iou
            val_iou.append(val_iou_score / len(val_loader))
            train_iou.append(iou_score / len(train_loader))
            train_acc.append(accuracy / len(train_loader))
            val_acc.append(test_accuracy / len(val_loader))
            print(
                "Epoch:{}/{}..".format(e + 1, epochs),
                "Train Loss: {:.3f}..".format(running_loss / len(train_loader)),
                "Val Loss: {:.3f}..".format(test_loss / len(val_loader)),
                "Train mIoU:{:.3f}..".format(iou_score / len(train_loader)),
                "Val mIoU: {:.3f}..".format(val_iou_score / len(val_loader)),
                "Train Acc:{:.3f}..".format(accuracy / len(train_loader)),
                "Val Acc:{:.3f}..".format(test_accuracy / len(val_loader)),
                "Time: {:.2f}m".format((time.time() - since) / 60),
            )

        # Save checkpoint
        history = {
            "train_loss": train_losses,
            "val_loss": test_losses,
            "train_miou": train_iou,
            "val_miou": val_iou,
            "train_acc": train_acc,
            "val_acc": val_acc,
            "lrs": lrs,
        }
        save_checkpoint(
            model, optimizer, scheduler, e + 1, min_loss, history, checkpoint_dir
        )

    print("Total time: {:.2f} m".format((time.time() - fit_time) / 60))
    return history


def predict_image_mask_miou(
    model, image, mask, mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]
):
    model.eval()
    t = transforms.Compose(
        [transforms.ToTensor(), transforms.Normalize(mean, std)]
    )  # Change this line
    image = t(image)
    model.to(device)
    image = image.to(device)
    mask = mask.to(device)
    with torch.no_grad():
        image = image.unsqueeze(0)
        mask = mask.unsqueeze(0)
        output = model(image)
        score = mIoU(output, mask)
        masked = torch.argmax(output, dim=1)
        masked = masked.cpu().squeeze(0)
    return masked, score


def predict_image_mask_pixel(
    model, image, mask, mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]
):
    model.eval()
    t = transforms.Compose(
        [transforms.ToTensor(), transforms.Normalize(mean, std)]
    )  # Change this line
    image = t(image)
    model.to(device)
    image = image.to(device)
    mask = mask.to(device)
    with torch.no_grad():
        image = image.unsqueeze(0)
        mask = mask.unsqueeze(0)
        output = model(image)
        acc = pixel_accuracy(output, mask)
        masked = torch.argmax(output, dim=1)
        masked = masked.cpu().squeeze(0)
    return masked, acc


def evaluate_model(model, test_set):
    score_iou = []
    accuracy = []
    for i in tqdm(range(len(test_set))):
        img, mask = test_set[i]
        pred_mask, score = predict_image_mask_miou(model, img, mask)
        score_iou.append(score)
        _, acc = predict_image_mask_pixel(model, img, mask)
        accuracy.append(acc)
    return np.mean(score_iou), np.mean(accuracy)


def load_checkpoint(model, optimizer, scheduler, checkpoint_path):
    checkpoint = torch.load(checkpoint_path)
    model.load_state_dict(checkpoint["model_state_dict"])
    optimizer.load_state_dict(checkpoint["optimizer_state_dict"])
    scheduler.load_state_dict(checkpoint["scheduler_state_dict"])
    epoch = checkpoint["epoch"]
    loss = checkpoint["loss"]
    history = {
        "train_loss": checkpoint["train_loss"],
        "val_loss": checkpoint["val_loss"],
        "train_miou": checkpoint["train_miou"],
        "val_miou": checkpoint["val_miou"],
        "train_acc": checkpoint["train_acc"],
        "val_acc": checkpoint["val_acc"],
        "lrs": checkpoint["lrs"],
    }
    return model, optimizer, scheduler, epoch, loss, history


def get_latest_checkpoint(checkpoint_dir):
    checkpoints = [
        f for f in os.listdir(checkpoint_dir) if f.startswith("checkpoint_epoch_")
    ]
    if not checkpoints:
        return None
    latest_checkpoint = max(
        checkpoints, key=lambda x: int(x.split("_")[-1].split(".")[0])
    )
    return os.path.join(checkpoint_dir, latest_checkpoint)


def resume_from_checkpoint(model, optimizer, scheduler, checkpoint_dir):
    latest_checkpoint = get_latest_checkpoint(checkpoint_dir)
    if latest_checkpoint:
        model, optimizer, scheduler, start_epoch, min_loss, history = load_checkpoint(
            model, optimizer, scheduler, latest_checkpoint
        )
        print(f"Resuming training from epoch {start_epoch}")
        return model, optimizer, scheduler, start_epoch, min_loss, history
    else:
        print("No checkpoint found. Starting from scratch.")
        return model, optimizer, scheduler, 0, float("inf"), {}


def save_checkpoint(model, optimizer, scheduler, epoch, loss, history, checkpoint_dir):
    checkpoint = {
        "epoch": epoch,
        "model_state_dict": model.state_dict(),
        "optimizer_state_dict": optimizer.state_dict(),
        "scheduler_state_dict": scheduler.state_dict(),
        "loss": loss,
        "train_loss": history.get("train_loss", []),
        "val_loss": history.get("val_loss", []),
        "train_miou": history.get("train_miou", []),
        "val_miou": history.get("val_miou", []),
        "train_acc": history.get("train_acc", []),
        "val_acc": history.get("val_acc", []),
        "lrs": history.get("lrs", []),
    }
    torch.save(
        checkpoint, os.path.join(checkpoint_dir, f"checkpoint_epoch_{epoch}.pth")
    )


In [53]:
import matplotlib.pyplot as plt
from matplotlib.backends.backend_pdf import PdfPages
import numpy as np
import torch
from torchvision import transforms as T


def plot_loss(history):
    plt.figure(figsize=(10, 5))
    plt.plot(history["val_loss"], label="val", marker="o")
    plt.plot(history["train_loss"], label="train", marker="o")
    plt.title("Loss per epoch")
    plt.ylabel("loss")
    plt.xlabel("epoch")
    plt.legend(), plt.grid()
    plt.show()


def plot_score(history):
    plt.figure(figsize=(10, 5))
    plt.plot(history["train_miou"], label="train_mIoU", marker="*")
    plt.plot(history["val_miou"], label="val_mIoU", marker="*")
    plt.title("Score per epoch")
    plt.ylabel("mean IoU")
    plt.xlabel("epoch")
    plt.legend(), plt.grid()
    plt.show()


def plot_acc(history):
    plt.figure(figsize=(10, 5))
    plt.plot(history["train_acc"], label="train_accuracy", marker="*")
    plt.plot(history["val_acc"], label="val_accuracy", marker="*")
    plt.title("Accuracy per epoch")
    plt.ylabel("Accuracy")
    plt.xlabel("epoch")
    plt.legend(), plt.grid()
    plt.show()


def visualize_predictions(
    model,
    test_set,
    output_pdf,
    num_classes=23,
    mean=[0.485, 0.456, 0.406],
    std=[0.229, 0.224, 0.225],
):
    model.eval()
    device = next(model.parameters()).device

    # Create a colormap for the segmentation mask
    cmap = plt.get_cmap("tab20")
    colors = [cmap(i) for i in np.linspace(0, 1, num_classes)]

    with PdfPages(output_pdf) as pdf:
        for i, (img, mask) in enumerate(test_set):
            # Prepare the image
            img_tensor = T.Compose([T.ToTensor(), T.Normalize(mean, std)])(img)
            img_tensor = img_tensor.unsqueeze(0).to(device)

            # Get the prediction
            with torch.no_grad():
                output = model(img_tensor)
                pred_mask = torch.argmax(output, dim=1).squeeze().cpu().numpy()

            # Create a figure with three subplots
            fig, (ax1, ax2, ax3) = plt.subplots(1, 3, figsize=(15, 5))
            fig.suptitle(f"Test Image {i+1}")

            # Plot original image
            ax1.imshow(img)
            ax1.set_title("Original Image")
            ax1.axis("off")

            # Plot ground truth mask
            ax2.imshow(mask, cmap=cmap, vmin=0, vmax=num_classes - 1)
            ax2.set_title("Ground Truth")
            ax2.axis("off")

            # Plot predicted mask
            ax3.imshow(pred_mask, cmap=cmap, vmin=0, vmax=num_classes - 1)
            ax3.set_title("Prediction")
            ax3.axis("off")

            # Add the plot to the PDF
            pdf.savefig(fig)
            plt.close(fig)

        print(f"Visualizations saved to {output_pdf}")


In [54]:
import requests
import zipfile
from pathlib import Path

# Setup path to data folder
# data_path = Path("data/")
# image_path = data_path / "pizza_steak_sushi"

DATA_PATH = Path("data/")

# If the image folder doesn't exist, download it and prepare it...
if Path(DATA_PATH).is_dir():
    print(f"{DATA_PATH} directory exists.")
else:
    print(f"Did not find {DATA_PATH} directory, creating one...")
    Path(DATA_PATH).mkdir(parents=True, exist_ok=True)

    # Download pizza, steak, sushi data
    with open(DATA_PATH / "archive.zip", "wb") as f:
        # request = requests.get("https://github.com/mrdbourke/pytorch-deep-learning/raw/main/data/pizza_steak_sushi.zip")
        request = requests.get("https://storage.googleapis.com/kaggle-data-sets/333968/1834160/bundle/archive.zip?X-Goog-Algorithm=GOOG4-RSA-SHA256&X-Goog-Credential=gcp-kaggle-com%40kaggle-161607.iam.gserviceaccount.com%2F20241023%2Fauto%2Fstorage%2Fgoog4_request&X-Goog-Date=20241023T113757Z&X-Goog-Expires=259200&X-Goog-SignedHeaders=host&X-Goog-Signature=3e3bbcf7cb80c007d26471d7f7be115d075367ab5a6e241b83823607ac7683cb813a1757c5e71ee5052498b967758686f92be595272f684bf1a90bd8a21681ba7ba7a34e074464ac5e2d3b944af4ebf34d425d50281034b3fd3c17f1f15320f27eaf578cfbead4c6e40b721f1209333e55c6185b157001d9afd3762fd3f6eadb67ee4841ba059b999775c14615537f31e44b0f3e2cea010e3c13b612d18d952cf22c7d101962cdefe0da4d4e6a03345f9d3ceb14048de01e987345e318361b9d2f8cea7c9fb749de9c78eea4795da2e71ae5d8e065206627970bebb1eb523d7cf03d413978eb542f3f0500538b53bda10f198ca97f5f85d267bbe40b269487dd")
        print("Downloading drone dataset ...")
        f.write(request.content)

    # Unzip pizza, steak, sushi data
    with zipfile.ZipFile(DATA_PATH / "archive.zip", "r") as zip_ref:
        print("Unzipping drone dataset ...")
        zip_ref.extractall(DATA_PATH)

data directory exists.


In [59]:
import torch
import torch.nn as nn
# from cnn_data import get_data_loaders
# from model import create_model, fit, evaluate_model, resume_from_checkpoint
# from viz import plot_loss, plot_score, plot_acc, visualize_predictions
from tqdm import tqdm
import os

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

# Get data loaders
train_loader, val_loader, test_set = get_data_loaders(batch_size=16)

# Create model
model = create_model()

# Training parameters
max_lr = 1e-3
epochs = 4
weight_decay = 1e-4

criterion = nn.CrossEntropyLoss()
optimizer = torch.optim.AdamW(model.parameters(), lr=max_lr, weight_decay=weight_decay)
sched = torch.optim.lr_scheduler.OneCycleLR(
    optimizer, max_lr, epochs=epochs, steps_per_epoch=len(train_loader)
)

# Check for existing checkpoints and resume training if possible
checkpoint_dir = "checkpoints"
os.makedirs(checkpoint_dir, exist_ok=True)
model, optimizer, sched, start_epoch, min_loss, history = resume_from_checkpoint(
    model, optimizer, sched, checkpoint_dir
)


No checkpoint found. Starting from scratch.


In [60]:

# Train the model
history = fit(
    epochs - start_epoch,
    model,
    train_loader,
    val_loader,
    criterion,
    optimizer,
    sched,
    checkpoint_dir=checkpoint_dir,
)

# Save the final model
torch.save(model, "Unet-Mobilenet.pt")







[A[A[A[A[A






[A[A[A[A[A




[A[A[A[A[A




[A[A[A[A[A




[A[A[A[A[A




[A[A[A[A[A




[A[A[A[A[A




[A[A[A[A[A




[A[A[A[A[A




[A[A[A[A[A




[A[A[A[A[A




[A[A[A[A[A




[A[A[A[A[A




[A[A[A[A[A




[A[A[A[A[A




[A[A[A[A[A




[A[A[A[A[A




[A[A[A[A[A




[A[A[A[A[A




[A[A[A[A[A




100%|██████████| 20/20 [02:02<00:00,  6.14s/it]





[A[A[A[A[A




[A[A[A[A[A




[A[A[A[A[A




[A[A[A[A[A




100%|██████████| 4/4 [00:18<00:00,  4.73s/it]


Loss Decreasing.. inf >> 5.935 
Epoch:1/4.. Train Loss: 2.741.. Val Loss: 5.935.. Train mIoU:0.048.. Val mIoU: 0.076.. Train Acc:0.272.. Val Acc:0.496.. Time: 2.36m







[A[A[A[A[A




  5%|▌         | 1/20 [00:11<03:41, 11.65s/it]


KeyboardInterrupt: 

In [None]:
# Plot training results
plot_loss(history)
plot_score(history)
plot_acc(history)

# Evaluate on test set
test_miou, test_accuracy = evaluate_model(model, test_set)
print("Test Set mIoU:", test_miou)
print("Test Set Pixel Accuracy:", test_accuracy)

# Visualize predictions
visualize_predictions(model, test_set, "test_predictions.pdf")