# PicsArt AI Hackathon Online

## Детектирование фона на изображениях

In [1]:
%pylab inline

import os
import tqdm

import pandas as pd
from PIL import Image
from skimage.morphology import remove_small_objects, remove_small_holes

import torch
from torch.utils.data import Dataset, DataLoader
from torchvision import transforms
from torchvision.models import vgg13

from utils import rle_encode

Populating the interactive namespace from numpy and matplotlib


In [2]:
device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")

Загрузим список фотографий из обучающей выборки.

In [3]:
path_images = list(map(
    lambda x: x.split('.')[0],
    filter(lambda x: x.endswith('.jpg'), os.listdir('data/train/'))))

Разделим на обучающую и валидационную выборки. Для ускорения оставим только 24 изображения для валидации.

In [4]:
train_images, val_images = path_images[:-24], path_images[-24:]

Опишем датасет. Предусмотрим загрузку масок из другой директории с теми же названиями файлов.

In [5]:
class FaceDataset(Dataset):
    def __init__(self, images_dir, images_name, target_dir=None,
                 transform=None):
        
        self.images_dir = images_dir
        self.target_dir = target_dir
        self.images_name = images_name
        self.transform = transform
        self.to_tensor = transforms.ToTensor()                   
        print('{} images'.format(len(self.images_name)))

    def __len__(self):
        return len(self.images_name)
               
    def __getitem__(self, idx):
        img_filename = os.path.join(
            self.images_dir, self.images_name[idx] + '.jpg')
        img = Image.open(img_filename)
        
        if self.target_dir:
            mask_filename = os.path.join(
                self.target_dir, self.images_name[idx] + '.png')
            mask = Image.open(mask_filename)
        else:
            mask = []
        
        if self.transform:
            img = self.transform(img)
            if mask:
                mask = transform.ToTensor()(mask)

        return {'img': self.to_tensor(img), 'mask': self.to_tensor(mask)}

В качестве трансформации возьмём только нормализацию с параметрами от ImageNet, так как будем использовать предобученный кодировщик.

In [6]:
image_transforms = transforms.Compose([
    transforms.Normalize(mean=[0.485, 0.456, 0.406],
                         std=[0.229, 0.224, 0.225])])

In [7]:
train_dataset = FaceDataset(
    images_dir='data/train/',
    images_name=train_images,
    target_dir='data/train_mask/',
    transform=image_transforms)

val_dataset = FaceDataset(
    images_dir='data/train/',
    images_name=val_images,
    target_dir='data/train_mask/',
    transform=image_transforms)

1467 images
24 images


Генераторы для обучения и валидации сети.

In [8]:
train_data_loader = DataLoader(train_dataset, batch_size=4, shuffle=True)
val_data_loader = DataLoader(val_dataset, batch_size=4)

Для решения задачи сегментации будем использовать UNet. Энкодер сети сделаем из первых блоков предобученного VGG13.

In [9]:
class VGG13Encoder(torch.nn.Module):
    def __init__(self, num_blocks, pretrained=True):
        super().__init__()
        self.num_blocks = num_blocks
        self.blocks = []
        feature_extractor = vgg13(pretrained=pretrained).features
        for i in range(self.num_blocks):
            self.blocks.append(
                torch.nn.Sequential(*[feature_extractor[j]
                                      for j in range(i * 5, i * 5 + 4)]))

    def forward(self, x):
        activations = []
        for i in range(self.num_blocks):
            x = self.blocks[i](x)
            activations.append(x)
            if i != self.num_blocks - 1:
                x = torch.functional.F.max_pool2d(x, kernel_size=2, stride=2)
        return activations

Опишем блок декодера.

In [10]:
class DecoderBlock(torch.nn.Module):
    def __init__(self, out_channels):
        super().__init__()

        self.upconv = torch.nn.Conv2d(
            in_channels=out_channels * 2, out_channels=out_channels,
            kernel_size=3, padding=1, dilation=1)
        self.conv1 = torch.nn.Conv2d(
            in_channels=out_channels * 2, out_channels=out_channels,
            kernel_size=3, padding=1, dilation=1)
        self.conv2 = torch.nn.Conv2d(
            in_channels=out_channels, out_channels=out_channels,
            kernel_size=3, padding=1, dilation=1)

    def forward(self, down, left):
        x = torch.nn.functional.interpolate(down, scale_factor=2)
        x = self.upconv(x)
        x = self.conv1(torch.cat([left, x], 1))
        x = self.conv2(x)
        return x

Сформируем весь декодер из блоков.

In [11]:
class Decoder(torch.nn.Module):
    def __init__(self, num_filters, num_blocks):
        super().__init__()
        self.blocks = []
        for i in range(num_blocks):
            self.blocks.append(DecoderBlock(num_filters * 2**(num_blocks-i-1)))

    def forward(self, activations):
        up = activations[-1]
        for i, left in enumerate(activations[-2::-1]):
            up = self.blocks[i](up, left)
        return up

А теперь и всю сеть целиком.

In [12]:
class UNet(torch.nn.Module):
    def __init__(self, num_classes=1, num_filters=64, num_blocks=4):
        super().__init__()
        self.encoder = VGG13Encoder(num_blocks=num_blocks).to(device)
        self.decoder = Decoder(num_filters=64, num_blocks=num_blocks - 1)
        self.final = torch.nn.Conv2d(
            in_channels=num_filters, out_channels=num_classes, kernel_size=1)

    def forward(self, x):
        acts = self.encoder(x)
        x = self.decoder(acts).to(device)
        x = self.final(x)
        return x.to(device)

In [13]:
unet = UNet()

In [14]:
unet.to(device)

UNet(
  (encoder): VGG13Encoder()
  (decoder): Decoder()
  (final): Conv2d(64, 1, kernel_size=(1, 1), stride=(1, 1))
)

Проверим размерность выхода.

In [15]:
for batch in train_data_loader:
    break

out = unet.forward(batch['img'])
print(batch['img'].shape)
print(out.shape)

TypeError: tensor is not a torch image.

Обучим сеть.

In [None]:
criterion = torch.nn.BCEWithLogitsLoss()
val_criterion = torch.nn.BCEWithLogitsLoss(reduction='sum')
optimizer = torch.optim.Adam(unet.parameters())

In [None]:
num_epoch = 10
steps = 0
with torch.cuda.device(0):
    for epoch in range(num_epoch):
        for i, batch in enumerate(train_data_loader):
            optimizer.zero_grad()
            output = unet(batch['img'].to(device))
            loss = criterion(output, batch['mask'].to(device))
            loss.backward()
            optimizer.step()
            steps += 1
        
            if steps % 10 == 0:
                val_loss = 0
                for i, batch in enumerate(val_data_loader):
                    output = unet(batch['img'])
                    val_loss += float(
                        val_criterion(output, batch['mask']).to(device).detach())
                val_loss = val_loss / len(val_dataset)
        
                print('steps: {},\ttrain loss: {},\tval loss: {}'.format(
                    steps, round(float(loss.detach()), 3), round(val_loss, 3)))

In [None]:
device

Подготовим итератор по тестовым изображениям.

In [None]:
path_images = list(map(
    lambda x: x.split('.')[0],
    filter(lambda x: x.endswith('.jpg'), os.listdir('data/test/'))))

In [None]:
test_data_loader = DataLoader(
    FaceDataset('data/test', path_images, transforms=image_transforms), batch_size=4)

Сделаем предсказания. К выходу сети применим сигмоиду (исходно выходы без нелинейности), сделаем отсечение по порогу и небольшой постобработку по удалению отдельных пикселей маски и закрашиванию дыр. Для кодирования масок в виде массива в формат Run-length encoding используем rle_encode.

In [None]:
threshold = 0.25
predictions = []

for batch in tqdm.tqdm_notebook(test_data_loader):
    output = torch.sigmoid(unet.forward(batch['img']))
    for i in range(output.shape[0]):
        img = output[i].detach().numpy()
        post_img = remove_small_holes(remove_small_objects(img > threshold))
        rle = rle_encode(post_img)
        predictions.append(rle)

In [None]:
df = pd.DataFrame.from_dict({'image': path_images, 'rle_mask': predictions})
df.to_csv('baseline_submission.csv', index=False)