In [None]:
!pip install -q imantics
!pip install -q segmentation_models_pytorch

In [None]:
import os
import random
import json

from tqdm import tqdm
import imantics
import numpy as np
import cv2
import matplotlib.pyplot as plt
import torch
from torch.utils.data import DataLoader, Dataset
from torch.nn import BCEWithLogitsLoss
from torch.optim import Adam
from sklearn.model_selection import train_test_split
import segmentation_models_pytorch as smp
from segmentation_models_pytorch.encoders import get_preprocessing_fn
import albumentations as A

In [None]:
N_IMAGES = 512
TRAIN_IMAGE_SIZE = 512
INPUT_IMAGE_SIZE = (1920, 1080)

In [None]:
with open("../input/football-player-segmentation/annotations/instances_default.json") as f:
    annotations = json.load(f)

In [None]:
map_id_filename = {}
for index in range(len(annotations["images"])):
    map_id_filename[annotations["images"][index]["id"]] = annotations["images"][index]["file_name"]

In [None]:
images = np.zeros((N_IMAGES, TRAIN_IMAGE_SIZE, TRAIN_IMAGE_SIZE, 3), dtype=np.uint8)

for image_id, image_filename in map_id_filename.items():
    cur_image = cv2.imread(f"../input/football-player-segmentation/images/{image_filename}")
    cur_image = cv2.cvtColor(cur_image, cv2.COLOR_BGR2RGB)
    cur_image = cv2.resize(cur_image, (TRAIN_IMAGE_SIZE, TRAIN_IMAGE_SIZE))

    images[image_id - 1] = cur_image

In [None]:
masks = np.zeros((N_IMAGES, TRAIN_IMAGE_SIZE, TRAIN_IMAGE_SIZE), dtype=bool)

for index in range(len(annotations["annotations"])):
    image_id = annotations["annotations"][index]["image_id"]
    segmentation = annotations["annotations"][index]["segmentation"]

    cur_mask = imantics.Polygons(segmentation).mask(*INPUT_IMAGE_SIZE).array
    cur_mask = cv2.resize(cur_mask.astype(float), (TRAIN_IMAGE_SIZE, TRAIN_IMAGE_SIZE)) >= 0.5

    masks[image_id - 1] = masks[image_id - 1] | cur_mask

In [None]:
plt.imshow(images[0])
plt.imshow(masks[0], alpha=0.5)

In [None]:
class CFG:
    seed = 42
    train_size = 0.8
    batch_size = 4
    lr = 0.001
    n_epochs = 5
    device = "cuda"

In [None]:
def seed_everything(seed: int) -> None:
    random.seed(seed)
    os.environ["PYTHONHASHSEED"] = str(seed)
    np.random.seed(seed)
    torch.manual_seed(seed)
    torch.cuda.manual_seed(seed)
    if torch.cuda.is_available():
        torch.backends.cudnn.deterministic = True
        torch.backends.cudnn.benchmark = False

seed_everything(CFG.seed)

In [None]:
indexes = list(range(N_IMAGES))
train_indexes = indexes[: int(N_IMAGES * CFG.train_size)]
valid_indexes = indexes[int(N_IMAGES * CFG.train_size) :]

In [None]:
def get_transforms():
    return A.Compose(
        [
            A.HueSaturationValue( # Change colors
                p=1.0, 
                hue_shift_limit=(-20, 20), 
                sat_shift_limit=(-30, 30), 
                val_shift_limit=(-20, 20),
            ),
            A.HorizontalFlip(p=0.5),
        ], 
        p=1.0
    )


In [None]:
class CustomDataset(Dataset):
    def __init__(self, indexes, transform=None, preprocess=None):
        self.indexes = indexes
        self.transform = transform
        self.preprocess = preprocess

    def __len__(self):
        return len(self.indexes)

    def __getitem__(self, index):
        _index = self.indexes[index]

        image = images[_index]
        mask = masks[_index]
        
        if self.transform:
            data = {"image": image, "mask": mask}
            augmented = self.transform(**data)
            image, mask = augmented["image"], augmented["mask"]
        
        if self.preprocess:
            image = self.preprocess(image)
        
        image = torch.tensor(image, dtype=torch.float)
        mask = torch.tensor(mask, dtype=torch.float)

        image = image.permute(2, 0, 1)
        mask = mask.unsqueeze(0)

        return {"image": image, "mask": mask}

In [None]:
model = smp.Unet(
    encoder_name="resnet34",
    encoder_weights="imagenet",
    in_channels=3,
    classes=1,
)

In [None]:
preprocess_input = get_preprocessing_fn("resnet34", pretrained="imagenet")

In [None]:
train_dataset = CustomDataset(
    train_indexes, transform=get_transforms(), preprocess=preprocess_input
)
train_dataloader = DataLoader(train_dataset, batch_size=CFG.batch_size, shuffle=True)

valid_dataset = CustomDataset(valid_indexes, preprocess=preprocess_input)
valid_dataloader = DataLoader(valid_dataset, batch_size=CFG.batch_size, shuffle=False)

In [None]:
model.to(CFG.device)

criterion = BCEWithLogitsLoss()
optimizer = Adam(model.parameters(), lr=CFG.lr)

In [None]:
def iou(outputs, labels) -> float:
    intersection = np.sum(np.logical_and(outputs, labels), axis=(1, 2, 3))
    union = np.sum(np.logical_or(outputs, labels), axis=(1, 2, 3))
    iou = intersection / union
    return np.mean(iou)

In [None]:
train_loss_history = []
val_loss_history = []
train_iou_history = []
val_iou_history = []

for epoch in range(CFG.n_epochs):
    train_loss = 0
    train_iou = 0
    model.train()
    for i, batch in tqdm(enumerate(train_dataloader)):
        inputs = batch["image"].to(CFG.device)
        labels = batch["mask"].to(CFG.device)

        optimizer.zero_grad()
        outputs = model(inputs)
        loss = criterion(outputs, labels)
        train_loss += loss.item()

        loss.backward()
        optimizer.step()

        _iou = iou(outputs.detach().cpu().numpy() >= 0, labels.detach().cpu().numpy())
        train_iou += _iou

    train_loss /= len(train_dataloader)
    train_iou /= len(train_dataloader)
    train_loss_history.append(train_loss)
    train_iou_history.append(train_iou)

    val_loss = 0
    val_iou = 0
    model.eval()
    with torch.no_grad():
        for i, batch in tqdm(enumerate(valid_dataloader)):
            inputs = batch["image"].to(CFG.device)
            labels = batch["mask"].to(CFG.device)

            outputs = model(inputs)
            loss = criterion(outputs, labels)
            val_loss += loss.item()

            _iou = iou(outputs.detach().cpu().numpy() >= 0, labels.detach().cpu().numpy())
            val_iou += _iou

    val_loss /= len(valid_dataloader)
    val_iou /= len(valid_dataloader)
    val_loss_history.append(val_loss)
    val_iou_history.append(val_iou)

    print(
        "Epoch [{}/{}], Train Loss: {:.4f}, Val Loss: {:.4f}, Train IOU: {:.4f}, Val IOU: {:.4f}".format(
            epoch + 1, CFG.n_epochs, train_loss, val_loss, train_iou, val_iou
        )
    )


In [None]:
torch.save(
    {
        "model_state_dict": model.state_dict(),
    },
    "last_model.pt",
)

In [None]:
plt.figure(figsize=(16, 5))
plt.plot(range(CFG.n_epochs), train_iou_history, label="train")
plt.plot(range(CFG.n_epochs), val_iou_history, label="valid")

plt.title("Train and Valid Score", fontsize=16)
plt.legend(fontsize=15)
plt.ylabel("iou", fontsize=14)
plt.xlabel("epoch", fontsize=14)
plt.grid()
plt.show()

In [None]:
plt.figure(figsize=(16, 5))
plt.plot(range(CFG.n_epochs), train_loss_history, label="train")
plt.plot(range(CFG.n_epochs), val_loss_history, label="valid")

plt.title("Train and Valid Loss", fontsize=16)
plt.legend(fontsize=15)
plt.ylabel("iou", fontsize=14)
plt.xlabel("epoch", fontsize=14)
plt.grid()
plt.show()

In [None]:
with torch.no_grad():
    for i, batch in enumerate(valid_dataloader):
        inputs = batch["image"].to(CFG.device)
        labels = batch["mask"].to(CFG.device)

        outputs = model(inputs)

        np_labels = labels.detach().cpu().numpy()
        np_outputs = outputs.detach().cpu().numpy()

        for i in range(len(np_labels)):
            plt.figure(figsize=(16, 6))
            plt.subplot(1, 2, 1)
            plt.imshow(np_labels[i][0])
            plt.title("Target")
            plt.subplot(1, 2, 2)
            plt.imshow(np_outputs[i][0] >= 0)
            plt.title("Predict")
            plt.show()
        
        break