<a href="https://colab.research.google.com/github/poprygo/Master-thesis-KPI/blob/main/explosion_crater_detection.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
!pip install torch torchvision matplotlib Pillow imutils scikit-learn tqdm gdal numpy

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/


In [None]:
!unzip train_03.zip -d .

In [None]:
# import the necessary packages
from pathlib import Path

import torch

# base path of the dataset
DATASET_PATH = Path(".") / "train_03"

# define the path to the images and masks dataset
IMAGE_DATASET_PATH = (DATASET_PATH / "images").as_posix()
MASK_DATASET_PATH = (DATASET_PATH / "masks").as_posix()

# define the explosion-crater-detection split
TEST_SPLIT = 0.15

# determine the device to be used for training and evaluation
DEVICE = "cuda" if torch.cuda.is_available() else "cpu"

# determine if we will be pinning memory during data loading
PIN_MEMORY = True if DEVICE == "cuda" else False

# define the number of channels in the input, number of classes,
# and number of levels in the U-Net model
NUM_CHANNELS = 1
NUM_CLASSES = 1
NUM_LEVELS = 3

# initialize learning rate, number of epochs to train for, and the
# batch size
INIT_LR = 1e-3
NUM_EPOCHS = 60
BATCH_SIZE = 512

# define the input image dimensions
INPUT_IMAGE_WIDTH = 64
INPUT_IMAGE_HEIGHT = 64

# define threshold to filter weak predictions
THRESHOLD = 0.5

# define the path to the base output directory
BASE_OUTPUT = Path("output")

BASE_OUTPUT.mkdir(exist_ok=True)
# define the path to the output serialized model, model training
# plot, and testing image paths
MODEL_PATH = (BASE_OUTPUT / "unet.pt").as_posix()
PLOT_PATH = (BASE_OUTPUT / "plot.png").as_posix()
TEST_PATHS = (BASE_OUTPUT / "test_paths.txt").as_posix()


In [None]:
# import the necessary packages
# import cv2
import numpy as np
from osgeo import gdal
from torch.utils.data import Dataset


class SegmentationDataset(Dataset):
    def __init__(self, image_paths, mask_paths, transforms):
        # store the image and mask filepaths, and augmentation
        # transforms
        self.image_paths = image_paths
        self.mask_paths = mask_paths
        self.transforms = transforms

    def __getitem__(self, index):
        # grab the image path from the current index
        image_path = self.image_paths[index]
        mask_paths = self.mask_paths[index]

        # load the image from disk, swap its channels from BGR to RGB,
        # and read the associated mask from disk in grayscale mode
        image_dataset = gdal.Open(image_path)
        mask_dataset = gdal.Open(mask_paths)

        # image = (image_dataset.ReadAsArray().transpose(1, 2, 0)).astype(np.uint8)
        image = image_dataset.ReadAsArray().transpose(1, 2, 0)
        image = (image / 3000).astype(np.float32)
        # image = (image / 18496).astype(np.float32)
        mask = mask_dataset.ReadAsArray()
        mask = (mask * 255).astype(np.uint8)
        # check to see if we are applying any transformations
        if self.transforms is not None:
            # apply the transformations to both image and its mask
            image = self.transforms(image)
            mask = self.transforms(mask)
        # return a tuple of the image and its mask
        return image, mask

    def __len__(self):
        # return the number of total samples contained in the dataset
        return len(self.image_paths)


In [None]:
from torch.nn import Module
import torch
from torch.nn import functional as F
from torchvision.ops.boxes import _box_inter_union


# https://www.kaggle.com/code/bigironsphere/loss-function-library-keras-pytorch/notebook

class IoULoss(Module):
    def __init__(self, weight=None, size_average=True):
        super(IoULoss, self).__init__()

    def forward(self, inputs, targets, smooth=1):
        # comment out if your model contains a sigmoid or equivalent activation layer
        inputs = torch.sigmoid(inputs)

        # flatten label and prediction tensors
        inputs = inputs.view(-1)
        targets = targets.view(-1)

        # intersection is equivalent to True Positive count
        # union is the mutually inclusive area of all labels & predictions
        intersection = (inputs * targets).sum()
        total = (inputs + targets).sum()
        union = total - intersection

        io_u = (intersection + smooth) / (union + smooth)

        return 1 - io_u


class DiceLoss(Module):
    def __init__(self, weight=None, size_average=True):
        super(DiceLoss, self).__init__()

    def forward(self, inputs, targets, smooth=1):
        # comment out if your model contains a sigmoid or equivalent activation layer
        inputs = torch.sigmoid(inputs)

        # flatten label and prediction tensors
        inputs = inputs.view(-1)
        targets = targets.view(-1)

        intersection = (inputs * targets).sum()
        dice = (2. * intersection + smooth) / (inputs.sum() + targets.sum() + smooth)

        return 1 - dice


class DiceBCELoss(Module):
    def __init__(self, weight=None, size_average=True):
        super(DiceBCELoss, self).__init__()

    def forward(self, inputs, targets, smooth=1):
        # comment out if your model contains a sigmoid or equivalent activation layer
        inputs = torch.sigmoid(inputs)

        # flatten label and prediction tensors
        inputs = inputs.view(-1)
        targets = targets.view(-1)

        intersection = (inputs * targets).sum()
        dice_loss = 1 - (2. * intersection + smooth) / (inputs.sum() + targets.sum() + smooth)
        bce = F.binary_cross_entropy(inputs, targets, reduction='mean')
        dice_bce = bce + dice_loss

        return dice_bce


ALPHA = 0.8
GAMMA = 2


class FocalLoss(Module):
    def __init__(self, weight=None, size_average=True):
        super(FocalLoss, self).__init__()

    def forward(self, inputs, targets, alpha=ALPHA, gamma=GAMMA, smooth=1):
        # comment out if your model contains a sigmoid or equivalent activation layer
        inputs = torch.sigmoid(inputs)

        # flatten label and prediction tensors
        inputs = inputs.view(-1)
        targets = targets.view(-1)

        # first compute binary cross-entropy
        bce = F.binary_cross_entropy(inputs, targets, reduction='mean')
        bce_exp = torch.exp(-bce)
        focal_loss = alpha * (1 - bce_exp) ** gamma * bce

        return focal_loss


ALPHA = 0.5
BETA = 0.5


class TverskyLoss(Module):
    def __init__(self, weight=None, size_average=True):
        super(TverskyLoss, self).__init__()

    def forward(self, inputs, targets, smooth=1, alpha=ALPHA, beta=BETA):
        # comment out if your model contains a sigmoid or equivalent activation layer
        inputs = F.sigmoid(inputs)

        # flatten label and prediction tensors
        inputs = inputs.view(-1)
        targets = targets.view(-1)

        # True Positives, False Positives & False Negatives
        tp = (inputs * targets).sum()
        fp = ((1 - targets) * inputs).sum()
        fn = (targets * (1 - inputs)).sum()

        tversky = (tp + smooth) / (tp + alpha * fp + beta * fn + smooth)

        return 1 - tversky


ALPHA = 0.5
BETA = 0.5
GAMMA = 1


class FocalTverskyLoss(Module):
    def __init__(self, weight=None, size_average=True):
        super(FocalTverskyLoss, self).__init__()

    def forward(self, inputs, targets, smooth=1, alpha=ALPHA, beta=BETA, gamma=GAMMA):
        # comment out if your model contains a sigmoid or equivalent activation layer
        inputs = torch.sigmoid(inputs)

        # flatten label and prediction tensors
        inputs = inputs.view(-1)
        targets = targets.view(-1)

        # True Positives, False Positives & False Negatives
        tp = (inputs * targets).sum()
        fp = ((1 - targets) * inputs).sum()
        fn = (targets * (1 - inputs)).sum()

        tversky = (tp + smooth) / (tp + alpha * fp + beta * fn + smooth)
        focal_tversky = (1 - tversky) ** gamma

        return focal_tversky


ALPHA = 0.5  # < 0.5 penalises FP more, > 0.5 penalises FN more
CE_RATIO = 0.5  # weighted contribution of modified CE loss compared to Dice loss


class ComboLoss(Module):
    def __init__(self, weight=None, size_average=True):
        super(ComboLoss, self).__init__()

    def forward(self, inputs, targets, smooth=1, alpha=ALPHA, beta=BETA, eps=1e-9):
        # flatten label and prediction tensors
        inputs = inputs.view(-1)
        targets = targets.view(-1)

        # True Positives, False Positives & False Negatives
        intersection = (inputs * targets).sum()
        dice = (2. * intersection + smooth) / (inputs.sum() + targets.sum() + smooth)

        inputs = torch.clamp(inputs, eps, 1.0 - eps)
        out = - (ALPHA * ((targets * torch.log(inputs)) + ((1 - ALPHA) * (1.0 - targets) * torch.log(1.0 - inputs))))
        weighted_ce = out.mean(-1)
        combo = (CE_RATIO * weighted_ce) - ((1 - CE_RATIO) * dice)

        return combo


def dice_loss(pred, target, smooth=1.):
    pred = pred.contiguous()
    target = target.contiguous()

    intersection = (pred * target).sum(dim=2).sum(dim=2)

    loss = (1 - ((2. * intersection + smooth) / (pred.sum(dim=2).sum(dim=2) + target.sum(dim=2).sum(dim=2) + smooth)))

    return loss.mean()

In [None]:
# import the necessary packages
import torch
from torch.nn import BatchNorm2d
from torch.nn import Conv2d
from torch.nn import ConvTranspose2d
from torch.nn import MaxPool2d
from torch.nn import Module
from torch.nn import ModuleList
from torch.nn import ReLU
from torch.nn import functional as F
from torchvision.transforms import CenterCrop


class Block(Module):
    def __init__(self, in_channels, out_channels):
        super().__init__()
        # store the convolution and RELU layers
        self.conv1 = Conv2d(in_channels, out_channels, 3)
        self.bn = BatchNorm2d(out_channels)
        self.relu = ReLU()
        self.conv2 = Conv2d(out_channels, out_channels, 3)

    def forward(self, x):
        # apply CONV => RELU => CONV block to the inputs and return it
        x = self.conv1(x)
        x = self.relu(self.bn(x))

        x = self.conv2(x)
        x = self.relu(self.bn(x))
        return x


class Encoder(Module):
    def __init__(self, channels=(4, 16, 32, 64)):
        super().__init__()
        # store the encoder blocks and maxpooling layer
        self.enc_blocks = ModuleList(
            [Block(channels[i], channels[i + 1])
             for i in range(len(channels) - 1)])
        self.pool = MaxPool2d(2)

    def forward(self, x):
        # initialize an empty list to store the intermediate outputs
        block_outputs = []

        # loop through the encoder blocks
        for block in self.enc_blocks:
            # pass the inputs through the current encoder block, store
            # the outputs, and then apply maxpooling on the output
            x = block(x)
            block_outputs.append(x)
            x = self.pool(x)

        # return the list containing the intermediate outputs
        return block_outputs


class Decoder(Module):
    def __init__(self, channels=(64, 32, 16)):
        super().__init__()
        # initialize the number of channels, upsampler blocks, and
        # decoder blocks
        self.channels = channels
        self.upconvs = ModuleList(
            [ConvTranspose2d(channels[i], channels[i + 1], 2, 2)
             for i in range(len(channels) - 1)])
        self.dec_blocks = ModuleList(
            [Block(channels[i], channels[i + 1])
             for i in range(len(channels) - 1)])

    def forward(self, x, enc_features):
        # loop through the number of channels
        for i in range(len(self.channels) - 1):
            # pass the inputs through the upsampler blocks
            x = self.upconvs[i](x)

            # crop the current features from the encoder blocks,
            # concatenate them with the current upsampled features,
            # and pass the concatenated output through the current
            # decoder block
            enc_feat = self.crop(enc_features[i], x)
            x = torch.cat([x, enc_feat], dim=1)
            x = self.dec_blocks[i](x)

        # return the final decoder output
        return x

    @staticmethod
    def crop(enc_features, x):
        # grab the dimensions of the inputs, and crop the encoder
        # features to match the dimensions
        (_, _, H, W) = x.shape
        enc_features = CenterCrop([H, W])(enc_features)

        # return the cropped features
        return enc_features


class UNet(Module):
    def __init__(self, enc_channels=(4, 16, 32, 64),
                 dec_channels=(64, 32, 16),
                 nb_classes=1, retain_dim=True,
                 out_size=(INPUT_IMAGE_HEIGHT, INPUT_IMAGE_WIDTH)):
        super().__init__()
        # initialize the encoder and decoder
        self.encoder = Encoder(enc_channels)
        self.decoder = Decoder(dec_channels)

        # initialize the regression head and store the class variables
        self.head = Conv2d(dec_channels[-1], nb_classes, 1)
        self.retain_dim = retain_dim
        self.out_size = out_size

    def forward(self, x):
        # grab the features from the encoder
        enc_features = self.encoder(x)
        # pass the encoder features through decoder making sure that
        # their dimensions are suited for concatenation
        dec_features = self.decoder(enc_features[::-1][0],
                                    enc_features[::-1][1:])
        # pass the decoder features through the regression head to
        # obtain the segmentation mask
        map_ = self.head(dec_features)
        # check to see if we are retaining the original output
        # dimensions and if so, then resize the output to match them
        if self.retain_dim:
            map_ = F.interpolate(map_, self.out_size)
        # return the segmentation map_
        return map_


In [None]:
# USAGE
# python train.py
# import the necessary packages
import logging
import os
import time

import matplotlib.pyplot as plt
import torch
from imutils import paths
from sklearn.model_selection import train_test_split
from torch.nn import BCEWithLogitsLoss, CrossEntropyLoss
from torch.optim import Adam, RMSprop
from torch.utils.data import DataLoader
from torchvision import transforms
from tqdm import tqdm


logging.basicConfig(
    filename="logs.log",
    filemode='a',
    format='%(asctime)s | %(levelname)s | %(message)s',
    level=logging.INFO
)

# load the image and mask filepaths in a sorted manner
image_paths = sorted(list(paths.list_images(IMAGE_DATASET_PATH)))
mask_paths = sorted(list(paths.list_images(MASK_DATASET_PATH)))
print(image_paths)
# partition the data into training and testing splits using 85% of
# the data for training and the remaining 15% for testing
train_images, test_images, train_masks, test_masks = train_test_split(
    image_paths, mask_paths, test_size=TEST_SPLIT, random_state=42)
# write the testing image paths to disk so that we can use then
# when evaluating/testing our model
logging.info("saving testing image paths...")
f = open(TEST_PATHS, "w")
f.write("\n".join(test_images))
f.close()

# define transformations
transforms = transforms.Compose([
    # transforms.ToPILImage(),
    # transforms.Resize((INPUT_IMAGE_HEIGHT,
    #                    INPUT_IMAGE_WIDTH)),
    transforms.ToTensor()
])

# create the train and explosion-crater-detection datasets
train_ds = SegmentationDataset(image_paths=train_images, mask_paths=train_masks,
                               transforms=transforms)
test_ds = SegmentationDataset(image_paths=test_images, mask_paths=test_masks,
                              transforms=transforms)

logging.info(f"found {len(train_ds)} examples in the training set...")
logging.info(f"found {len(test_ds)} examples in the testing set...")

# create the training and explosion-crater-detection data loaders
train_loader = DataLoader(train_ds, shuffle=True,
                          batch_size=BATCH_SIZE, pin_memory=PIN_MEMORY,
                          num_workers=os.cpu_count())
test_loader = DataLoader(test_ds, shuffle=False,
                         batch_size=BATCH_SIZE, pin_memory=PIN_MEMORY,
                         num_workers=os.cpu_count())

# initialize our UNet model
unet = UNet().to(DEVICE)

# initialize loss function and optimizer
loss_func = BCEWithLogitsLoss()
# loss_func = IoULoss()
opt = RMSprop(unet.parameters(), lr=INIT_LR)

# calculate steps per epoch for training and explosion-crater-detection set
train_steps = len(train_ds) // BATCH_SIZE
test_steps = len(test_ds) // BATCH_SIZE

# initialize a dictionary to store training history
H = {"train_loss": [], "test_loss": []}

# loop over epochs
logging.info("training the network...\n\n")
startTime = time.time()

for epoch in range(NUM_EPOCHS):
    # set the model in training mode
    unet.train()

    # initialize the total training and validation loss
    total_train_loss = 0
    total_test_loss = 0

    tqdm_data = tqdm(train_loader, desc=f'Training (epoch #{epoch})', total=int(len(train_loader)))
    # loop over the training set
    for (i, (x, y)) in enumerate(tqdm_data):
        # send the input to the device
        (x, y) = (x.to(DEVICE), y.to(DEVICE))

        # perform a forward pass and calculate the training loss
        pred = unet(x)
        loss = loss_func(pred, y)

        # first, zero out any previously accumulated gradients, then
        # perform backpropagation, and then update model parameters
        opt.zero_grad()
        loss.backward()
        opt.step()

        # add the loss to the total training loss so far
        total_train_loss += loss
        tqdm_data.set_postfix(loss=(total_train_loss / ((i + 1) * train_loader.batch_size)))

    # switch off autograd
    with torch.no_grad():
        # set the model in evaluation mode
        unet.eval()

        tqdm_data = tqdm(test_loader, desc=f'Validation (epoch #{epoch})', total=int(len(test_loader)))
        # loop over the validation set
        for i, (x, y) in enumerate(tqdm_data):
            # send the input to the device
            (x, y) = (x.to(DEVICE), y.to(DEVICE))

            # make the predictions and calculate the validation loss
            pred = unet(x)
            total_test_loss += loss_func(pred, y)
            tqdm_data.set_postfix(loss=(total_test_loss / ((i + 1) * test_loader.batch_size)))

    # calculate the average training and validation loss
    avg_train_loss = total_train_loss / train_steps
    avg_test_loss = total_test_loss / test_steps

    # update our training history
    H["train_loss"].append(avg_train_loss.cpu().detach().numpy())
    H["test_loss"].append(avg_test_loss.cpu().detach().numpy())

    # print the model training and validation information
    logging.info(f"Train loss: {avg_train_loss:.6f}, Test loss: {avg_test_loss:.4f}")

# display the total time needed to perform the training
end_time = time.time()
logging.info(f"total time taken to train the model: {end_time - startTime:.2f}s")

# plot the training loss
plt.style.use("ggplot")
plt.figure()
plt.plot(H["train_loss"], label="train_loss")
plt.plot(H["test_loss"], label="test_loss")
plt.title("Training Loss on Dataset")
plt.xlabel("Epoch #")
plt.ylabel("Loss")
plt.legend(loc="lower left")
plt.savefig(PLOT_PATH)

# serialize the model to disk
torch.save(unet, MODEL_PATH)


[]


ValueError: ignored

# New Section

In [None]:
# USAGE
# python predict.py

import logging
from pathlib import Path

import matplotlib.pyplot as plt
import numpy as np
import torch
from osgeo import gdal

# import the necessary packages

logging.basicConfig(
    filename="logs.log",
    filemode='a',
    format='%(asctime)s | %(levelname)s | %(message)s',
    level=logging.INFO
)


def prepare_plot(orig_image, orig_mask, pred_mask):
    # initialize our figure
    figure, ax = plt.subplots(nrows=1, ncols=3, figsize=(10, 10))

    # plot the original image, its mask, and the predicted mask
    ax[0].imshow(orig_image)
    ax[1].imshow(orig_mask)
    ax[2].imshow(pred_mask)

    # set the titles of the subplots
    ax[0].set_title("Image")
    ax[1].set_title("Original Mask")
    ax[2].set_title("Predicted Mask")

    # set the layout of the figure and display it
    figure.tight_layout()
    figure.show()


def make_predictions(model, image_path):
    # set model to evaluation mode
    model.eval()

    # turn off gradient tracking
    with torch.no_grad():
        # load the image from disk, swap its color channels, cast it
        # to float data type, and scale its pixel values
        image_dataset = gdal.Open(image_path)
        image = image_dataset.ReadAsArray().transpose(1, 2, 0)
        image = (image / 3000).astype(np.float32)

        # resize the image and make a copy of it for visualization
        orig = image.copy()

        # find the filename and generate the path to ground truth
        # mask
        filename = Path(image_path).name
        ground_truth_path = (Path(MASK_DATASET_PATH) / filename).as_posix()

        # load the ground-truth segmentation mask in grayscale mode
        # and resize it
        mask_dataset = gdal.Open(ground_truth_path)
        gt_mask = mask_dataset.ReadAsArray()
        gt_mask = (gt_mask * 255).astype(np.uint8)

        # make the channel axis to be the leading one, add a batch
        # dimension, create a PyTorch tensor, and flash it to the
        # current device
        image = np.transpose(image, (2, 0, 1))
        image = np.expand_dims(image, 0)
        image = torch.from_numpy(image).to(DEVICE)

        # make the prediction, pass the results through the sigmoid
        # function, and convert the result to a NumPy array
        pred_mask = model(image).squeeze()

        # pred_mask -= pred_mask.min(1, keepdim=True)[0]
        # pred_mask /= pred_mask.max(1, keepdim=True)[0]
        pred_mask = torch.sigmoid(pred_mask)
        pred_mask = pred_mask.detach().cpu().numpy()

        # filter out the weak predictions and convert them to integers
        pred_mask = (pred_mask > THRESHOLD) * 255
        pred_mask = pred_mask.astype(np.uint8)
        # prepare a plot for visualization
        prepare_plot(orig, gt_mask, pred_mask)


# load the image paths in our testing file and randomly select 10
# image paths
logging.info("loading up explosion-crater-detection image paths...")
image_paths = open(TEST_PATHS).read().strip().split("\n")
image_paths = np.random.choice(image_paths, size=20)

# load our model from disk and flash it to the current device
logging.info("load up model...")
unet = torch.load(MODEL_PATH).to(DEVICE)

# iterate over the randomly selected explosion-crater-detection image paths
for path in image_paths:
    # make predictions and visualize the results
    make_predictions(unet, path)


FileNotFoundError: ignored