<p style="align: center;"><img style="align: center; border-radius: 10px;" align=center src="https://sun9-32.userapi.com/impf/MmYC-5xp1ziyMkUdVkKVul8c2lCvmY7b4VTjtw/EekxnCmkVGM.jpg?size=1818x606&quality=95&crop=0,140,960,320&sign=7e803728a35685a6605e03a5da104e34&type=cover_group" width=1200 height=480></p>
<h1 style="text-align: center;"><b>Segmentation part of AGNI CV summer 2023</b></h1>




In [1]:
!pip install torchsummary



In [2]:
!pip install -U segmentation-models-pytorch albumentations --user



In [None]:
import torch
import torch.nn as nn
import torchvision.models as models

import os
from torch.utils.data import DataLoader, Dataset
from torchvision.datasets import ImageFolder
import torchvision.transforms as tt
import torchvision
import torch
import torch.nn as nn
from torchsummary import summary
import cv2
from tqdm.notebook import tqdm

import numpy as np
%matplotlib inline

from torch.utils.data import Dataset
import multiprocessing as mp
from PIL import Image

import albumentations 
from albumentations.pytorch import ToTensorV2
from torchvision.transforms import ToTensor

import segmentation_models_pytorch as smp
import segmentation_models_pytorch.utils.metrics

## Dataset

In [4]:
class GladeDataset(Dataset):
    """ Dataset for Glade Segmentation """
    
    def __init__(self, path, input_transform=None):
        super().__init__()
        
        self.path = path
        self.input_transform = input_transform
        self.to_tensor = ToTensor()
        
        self.names_of_files = [i.rsplit(".", 1)[0] for i in os.listdir(self.path) if i.endswith(".jpg")]
  
        self.data = []

        
        self._read_images()
        
    def _read_image(self, image_name):
        """ Read image and use input_transforms """
        
        try:
            image = Image.open(os.path.join(
                self.path, 
                image_name + ".jpg")
            )

            mask = Image.open(os.path.join(
                self.path, 
                image_name + "_mask.png")
            )

            if self.input_transform:
                transformed = self.input_transform(image=np.array(image), mask=np.array(mask))
                image = transformed["image"]
                mask = transformed["mask"]
            
            image = image.float()/255
            mask = mask.float()

            return image, mask
        
        except FileNotFoundError:
            print(self.path + image_name + ".jpg",
                  self.path + image_name + "_mask.png", 
                  end="\n")
    
    def _read_images(self):
        """ Pool of _read_image functions """
        
        with mp.Pool(processes=mp.cpu_count()) as pool:
            self.data = list(
                tqdm(
                    pool.imap_unordered(self._read_image, self.names_of_files),
                    total=len(self.names_of_files)
                )
            )
            
    def __getitem__(self, idx):
        return self.data[idx]
            
    def __len__(self):
        """ Return lenght of dataset"""
        
        return len(self.data)

In [5]:
def to_device(data, device):
        """
        Move data to self.device
        """
        if isinstance(data, (list, tuple)):
            return [to_device(x, device) for x in data]
        
        return data.to(device, non_blocking=True)

In [6]:
class DeviceDataLoader():
    """ Dataloader that moving images to device """
    
    def __init__(self, dataloader, device):
        self.dataloader = dataloader
        self.device = device
        
    def __iter__(self):
        for image_batch, mask_batch in self.dataloader:
            yield to_device(image_batch, self.device), to_device(mask_batch, self.device)
        
    def __len__(self):
        return len(self.dataloader)

In [7]:
def get_dataloader(dataset, device, batch_size):
    """
    Builds dataloader for training data.
    Use tt.Compose and tt.Resize for transformations
    :param batch_size: batch_size of the dataloader
    :returns: DataLoader object 
    """
    
    return DeviceDataLoader(
        DataLoader(dataset, batch_size=batch_size, drop_last=True),
        torch.device(device)
    )

## Dataset

In [8]:
ENCODER = "mobilenet_v2"
ENCODER_WEIGHTS = "imagenet"
CLASSES = ["Glade"]
ACTIVATION = 'sigmoid'
DEVICE = "cuda" if torch.cuda.is_available() else "cpu"
RESIZE_SHAPE = 256
BATCH_SIZE = 16
LOSS_NAME = "DICE"

In [None]:
input_transform = albumentations.Compose([
    albumentations.HorizontalFlip(p=0.5),
    albumentations.Resize(height=RESIZE_SHAPE, width=RESIZE_SHAPE),
    # albumentations.Normalize(),
    ToTensorV2()
])

input_transform_torch = tt.Compose([
        tt.Resize(RESIZE_SHAPE),
        tt.ToTensor(),
        tt.ConvertImageDtype(torch.float),
    ])

train_path = "semantic-seg/train"

train_dataset = GladeDataset(train_path, input_transform)

train_dataloader = get_dataloader(train_dataset, DEVICE, BATCH_SIZE)

  0%|          | 0/517 [00:00<?, ?it/s]

In [None]:
valid_path = "semantic-seg/valid"

valid_dataset = GladeDataset(valid_path, input_transform)

valid_dataloader = get_dataloader(valid_dataset, DEVICE, BATCH_SIZE)

In [None]:
test_path = "semantic-seg/test"

test_dataset = GladeDataset(test_path, input_transform)

test_dataloader = get_dataloader(test_dataset, DEVICE, BATCH_SIZE)

In [None]:
import matplotlib.pyplot as plt
from IPython.display import clear_output

plt.figure(figsize=(18, 6))
    
for i in range(6):
    plt.subplot(2, 6, i+1)
    plt.axis("off")
    plt.imshow(train_dataset[i][0].permute(1, 2, 0).cpu().numpy())

    plt.subplot(2, 6, i+7)
    plt.axis("off")
    plt.imshow(train_dataset[i][1].numpy())
plt.show()


## Model

In [None]:
exp_folder = f"experiments/checkpoints_{RESIZE_SHAPE}x{RESIZE_SHAPE}_{ENCODER}_{ACTIVATION}_{LOSS_NAME}_unet"
try:
    os.makedirs(exp_folder)
except FileExistsError:
    os.rmdir(exp_folder)
    os.makedirs(exp_folder)

In [None]:
model = smp.UnetPlusPlus(
    encoder_name=ENCODER,
    encoder_weights=ENCODER_WEIGHTS,
    in_channels=3,  
    classes=len(CLASSES)
)

loss = smp.losses.DiceLoss(mode="binary")
loss.__name__ = LOSS_NAME
# loss = smp.losses.SoftBCEWithLogitsLoss()
# loss.__name__ = LOSS_NAME

metrics = [smp.utils.metrics.IoU()]

optimizer = torch.optim.Adam([
        dict(params=model.parameters(), lr=0.0001),
    ])

In [None]:
model.to("cuda")
summary(model, (3, 256, 256))

## Train

In [None]:
train_epoch = smp.utils.train.TrainEpoch(
    model,
    loss=loss,
    metrics=metrics,
    optimizer=optimizer,
    device=DEVICE,
    verbose=True,
)

valid_epoch = smp.utils.train.ValidEpoch(
    model,
    loss=loss,
    metrics=metrics,
    device=DEVICE,
    verbose=True,
)

max_score = 0
for i in range(0, 50):

    print("\nEpoch: {}".format(i))
    train_logs = train_epoch.run(train_dataloader)
    valid_logs = valid_epoch.run(valid_dataloader)

    if max_score < valid_logs["iou_score"]:
        max_score = valid_logs["iou_score"]
        torch.save(
            model, os.path.join(exp_folder, f"best_model_{max_score}_{i}-epoch.pth")
        )
        print("Model saved!")

    if i == 12:
        optimizer.param_groups[0]["lr"] = 1e-5
        print("Decrease decoder learning rate to 1e-5!")

## Test

In [None]:
def predict(model, X_batch):
    model.eval()  # testing mode
    y_pred = model(X_batch)

    return y_pred

In [None]:
plt.figure(figsize=(18, 18))

pred = None
for image, mask in test_dataloader:
    pred = predict(model=model, X_batch=image)
    pred = torch.where(pred > 0.5, 1, 0)
    break
    
for i in range(3):
    plt.subplot(3, 3, i+1)
    plt.axis("off")
    plt.imshow(test_dataset[i][0].permute(1, 2, 0).cpu().numpy())

    plt.subplot(3, 3, i+4)
    plt.axis("off")
    plt.imshow(test_dataset[i][1].numpy())
    
    plt.subplot(3, 3, i+7)
    plt.axis("off")
    plt.imshow(pred[i].permute(1, 2, 0).detach().cpu().numpy())
    
    
plt.subplots_adjust(wspace=0, hspace=0.1)
plt.show()

## Ручка

In [None]:
class GladeSegment:
    """ Unet + MobileNet segmentation """

    def __init__(self, model_path):
        self.model = torch.hub.load(None, path=model_path, force_reload=True)

    def predict(self, image: np.ndarray):
        model.eval()
        
        image = np.expand_dims(image, axis=0)
        image = torch.ToTensor(image)
        
        pred = self.model(image)
        pred = torch.where(pred > 0.5, 1, 0)

        result = self.model(image)
        result = result.numpy()

        return result

In [None]:
!ls /kaggle/working/

In [None]:
test_model = GladeSegment()

image_path = "/kaggle/input/hello-world/test/0301435ff901dc5a3bb4ab900fa5e4c3b9c27632_JPG.rf.3503e2b93cb2eabf78c50230369c4dc1.jpg"