<a href="https://colab.research.google.com/github/michaelharlam/Medical-NN/blob/main/project.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [1]:
import torch
from torch import nn
from torchsummary import summary

In [2]:
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
device # должен быть cuda

device(type='cpu')

# Хронология создания нашего проекта

Изначально мы решили сделать простейшую классификацию с помощью глубокой ResNet-модели, которая к сожалению, не сохранилась. Мы получили достаточно неплохой результат со значением F1-Score 0.84

После этого мы принялись за разработку более сложной структуры, включающей в себя 2 модели - сегментирующую и классификационную.
прочитав несколько статей о медецинской сфере, в качестве сегментирующей модели мы решили выбрать U-Net, а классификационную так и оставили ResNet'ом.

Изначально архитектура U-Net'а была сложнее и глубже, но после многочисленных тестов, мы пришли к выводу, что упрошенная архитектура с меньшей глубиной и обучаться будет в разы быстрее, и не сожмет исходное изображение слишком сильно. Нижепердставленная сегментирующая модель - наилучшая, из тех, что мы смогли придумать, по соотношению "время обучения / качество". На "бумаге" она показывает неплохие результаты MSE (в среденем ~0.018), а на практике достаточно точно выделяет легкие на изображениях.

ResNet стал для нас самой трудной задачей, поскольку в нашем случае он получался очень нестабильным (незначительное изменение learning rate быо способно испортить результат на ~30%). Опробовав достаточно большое кол-во архитектур, мы так и не смогли найти достаточно хорошую. И самый лучший F1-Score, получившийся в резульате работы системы из двух моделей на тестовой выборке - 0.73

Мы думаем, что главная проблема нашей команды - распределение задач. Размышляя о проделанной работе, мы понимаем, что могли опробовать больше вариантов классификационных моделей и / или создать более сложную систему классификации, что значительно улучшило бы итоговый результат.

# Сегментирующая модель U-Net (1 этап)

In [3]:
from torchvision.transforms.v2.functional import center_crop

class UNetConvBlock(nn.Module): # сверточный блок с задаваемым кол-вом входных/выходных каналов
    def __init__(self, in_channels, out_channels):
        super(UNetConvBlock, self).__init__()
        self.conv = nn.Sequential(
            nn.Conv2d(in_channels, out_channels, 3),
            nn.BatchNorm2d(out_channels),
            nn.ReLU(),

            nn.Conv2d(out_channels, out_channels, 3),
            nn.BatchNorm2d(out_channels),
            nn.ReLU()
        )

    def forward(self, x):
        out = self.conv(x)
        return out


class UpConv(nn.Module): # реализация стрелки up-conv 2x2 с первой схемы
    def __init__(self, in_channels, out_channels):
        super(UpConv, self).__init__()
        self.upconv = nn.Sequential(
            nn.Upsample(scale_factor=2),
            nn.ConvTranspose2d(in_channels, out_channels, 1),
            nn.ReLU()
        )

    def forward(self, x):
        out = self.upconv(x)
        return out


def stack(x_1: torch.Tensor, x_2: torch.Tensor): # функция принимающая 2 тензора и возвращающая более объемный тензор из них, идущих в нем по порядку (термина не знаю)
    x_1 = center_crop(x_1, x_2.shape[-2:])
    return torch.cat((x_1, x_2), -3)


class UNetModel(nn.Module): # Сама модель (цифры в названии переменных - "глубина" слоя)
    def __init__(self):
        super(UNetModel, self).__init__()

        self.conv1 = UNetConvBlock(1, 16)
        # по порядку тут maxpool
        self.conv2 = UNetConvBlock(16, 32)
        self.maxpool = nn.MaxPool2d(2)

        self.bridge = UNetConvBlock(32, 64)

        self.upconv2 = UpConv(64, 32)
        self.convt2 = UNetConvBlock(64, 32)
        self.upconv1 = UpConv(32, 16)
        self.convt1 = UNetConvBlock(32, 16)

        self.post_processing = nn.Sequential(
            nn.ConvTranspose2d(16, 1, 1),
            nn.Sigmoid()
        )

    def forward(self, x):
        d1 = self.conv1(x)
        out = self.maxpool(d1)

        d2 = self.conv2(out)
        out = self.maxpool(d2)

        out = self.bridge(out)

        out = self.upconv2(out)
        out = self.convt2(stack(d2, out))

        out = self.upconv1(out)
        out = self.convt1(stack(d1, out))

        out = self.post_processing(out)

        return out

unet = UNetModel().to(device)
summary(unet, (1, 256, 256))

----------------------------------------------------------------
        Layer (type)               Output Shape         Param #
            Conv2d-1         [-1, 16, 254, 254]             160
       BatchNorm2d-2         [-1, 16, 254, 254]              32
              ReLU-3         [-1, 16, 254, 254]               0
            Conv2d-4         [-1, 16, 252, 252]           2,320
       BatchNorm2d-5         [-1, 16, 252, 252]              32
              ReLU-6         [-1, 16, 252, 252]               0
     UNetConvBlock-7         [-1, 16, 252, 252]               0
         MaxPool2d-8         [-1, 16, 126, 126]               0
            Conv2d-9         [-1, 32, 124, 124]           4,640
      BatchNorm2d-10         [-1, 32, 124, 124]              64
             ReLU-11         [-1, 32, 124, 124]               0
           Conv2d-12         [-1, 32, 122, 122]           9,248
      BatchNorm2d-13         [-1, 32, 122, 122]              64
             ReLU-14         [-1, 32, 1

# Классификационная модель ResNet 18 (2 этап)

In [4]:
class ResBlock(nn.Module): # блок со сложением входного слоя и конечного
    def __init__(self, channels):
        super(ResBlock, self).__init__()
        self.conv = nn.Sequential(
            nn.Conv2d(channels, channels, 3, padding=1),
            nn.BatchNorm2d(channels),
            nn.ReLU(),

            nn.Conv2d(channels, channels, 3, padding=1),
            nn.BatchNorm2d(channels)
        )
        self.relu = nn.ReLU()

    def forward(self, x):
        out = self.relu(x + self.conv(x))
        return out


class ResNetConvBlock(nn.Module): # сверточный блок с задаваемым кол-вом входных/выходных каналов
    def __init__(self, in_channels, out_channels):
        super(ResNetConvBlock, self).__init__()
        self.conv = nn.Sequential(
            nn.Conv2d(in_channels, out_channels, 3, 2, 1),
            nn.BatchNorm2d(out_channels),
            nn.ReLU(),

            nn.Conv2d(out_channels, out_channels, 3, padding=1),
            nn.BatchNorm2d(out_channels),
            nn.ReLU()
        )

    def forward(self, x):
        out = self.conv(x)
        return out


class ResNetModel(nn.Module):
    def __init__(self):
        super(ResNetModel, self).__init__()
        self.conv0 = nn.Sequential(
            nn.Conv2d(1, 8, 7, 2, 7),
            nn.BatchNorm2d(8),
            nn.ReLU(),
            nn.MaxPool2d(3, 2, 1)
        )
        self.layer1 = ResBlock(8)
        self.conv1 = ResNetConvBlock(8, 16)
        self.layer2 = ResBlock(16)
        self.conv2 = ResNetConvBlock(16, 32)
        self.layer3 = ResBlock(32)
        self.conv3 = ResNetConvBlock(32, 64)
        self.layer4 = ResBlock(64)
        self.post_processing = nn.Sequential(
            nn.AvgPool2d(7),
            nn.Flatten(),
            nn.Linear(64, 3),
            nn.Softmax(-1)
        )

    def forward(self, x):
        out = self.conv0(x)

        for i in range(2):
            out = self.layer1(out)
        out = self.conv1(out)

        for i in range(2):
            out = self.layer2(out)
        out = self.conv2(out)

        for i in range(2):
            out = self.layer3(out)
        out = self.conv3(out)

        for i in range(2):
            out = self.layer4(out)

        out = self.post_processing(out)

        return out

resnet = ResNetModel().to(device)
summary(resnet, (1, 216, 216))

----------------------------------------------------------------
        Layer (type)               Output Shape         Param #
            Conv2d-1          [-1, 8, 112, 112]             400
       BatchNorm2d-2          [-1, 8, 112, 112]              16
              ReLU-3          [-1, 8, 112, 112]               0
         MaxPool2d-4            [-1, 8, 56, 56]               0
            Conv2d-5            [-1, 8, 56, 56]             584
       BatchNorm2d-6            [-1, 8, 56, 56]              16
              ReLU-7            [-1, 8, 56, 56]               0
            Conv2d-8            [-1, 8, 56, 56]             584
       BatchNorm2d-9            [-1, 8, 56, 56]              16
             ReLU-10            [-1, 8, 56, 56]               0
         ResBlock-11            [-1, 8, 56, 56]               0
           Conv2d-12            [-1, 8, 56, 56]             584
      BatchNorm2d-13            [-1, 8, 56, 56]              16
             ReLU-14            [-1, 8,

# Загрузка датасета

In [5]:
!kaggle competitions download -c  ml-intensive-yandex-autumn-2023

Traceback (most recent call last):
  File "/usr/local/bin/kaggle", line 5, in <module>
    from kaggle.cli import main
  File "/usr/local/lib/python3.10/dist-packages/kaggle/__init__.py", line 23, in <module>
    api.authenticate()
  File "/usr/local/lib/python3.10/dist-packages/kaggle/api/kaggle_api_extended.py", line 403, in authenticate
    raise IOError('Could not find {}. Make sure it\'s located in'
OSError: Could not find kaggle.json. Make sure it's located in /root/.kaggle. Or use the environment method.


In [6]:
import zipfile

with zipfile.ZipFile('ml-intensive-yandex-autumn-2023.zip') as zf:
    zf.extractall('.')

FileNotFoundError: [Errno 2] No such file or directory: 'ml-intensive-yandex-autumn-2023.zip'

# Обучение U-Net'а

In [None]:
from pathlib import Path
from PIL import Image
from torchvision.transforms.functional import to_tensor
from torchvision.transforms.v2.functional import resize
import os


class MySegmentationDataset(torch.utils.data.Dataset):
    def __init__(self, img_dir, mask_dir=None,
                 transform=None):
        self.img_dir = img_dir
        self.mask_dir = mask_dir

        self.transform = transform

    def __len__(self):
        return len(list(Path(self.img_dir).iterdir()))

    def __getitem__(self, idx):
        img_name = os.path.join(self.img_dir, "img_" + str(idx) + ".png")
        image = to_tensor(Image.open(img_name)) # загрузка картинки

        if self.mask_dir:
            mask_name = os.path.join(self.mask_dir, "img_" + str(idx) + ".png")
            mask = resize(to_tensor(Image.open(mask_name)), 216, antialias=True) # загрузка маски в случае тренировки

        if self.transform:
            image = self.transform(image)
            if self.mask_dir:
                mask = self.mask_transform(mask) # преобразование изображений

        if self.mask_dir:
            return image, mask
        else:
            return image

In [None]:
from torch.utils.data import random_split, DataLoader


s_model = unet
s_optimizer = torch.optim.Adam(s_model.parameters(), lr = 0.01)
s_criterion = nn.MSELoss()

s_dataset = MySegmentationDataset('data/train_images',
                                  'data/train_lung_masks')

s_train_dataset, s_valid_dataset = random_split(s_dataset, (0.8, 0.2))

s_train_loader = DataLoader(s_train_dataset, batch_size=16)
s_valid_loader = DataLoader(s_valid_dataset, batch_size=16)

In [None]:
from ignite.engine import create_supervised_trainer, create_supervised_evaluator
from ignite.metrics import Loss

s_trainer = create_supervised_trainer(s_model, s_optimizer, s_criterion, device)

s_metrics = {
    'Loss': Loss(s_criterion)
}

s_train_hist_loss, s_valid_hist_loss = [], []

s_train_evaluator = create_supervised_evaluator(s_model, s_metrics, device)
s_valid_evaluator = create_supervised_evaluator(s_model, s_metrics, device)

In [None]:
import matplotlib.pyplot as plt
from IPython.display import clear_output

def show_s_loss(engine, train_hist_loss, valid_hist_loss):
    clear_output()

    ln = len(train_hist_loss)

    plt.figure(figsize=(20, 10))

    plt.subplot(1, 1, 1)
    plt.title('MSE')
    plt.plot(torch.arange(ln), train_hist_loss)
    plt.plot(torch.arange(ln), valid_hist_loss)
    plt.yscale('log')
    plt.grid()

    plt.show()

In [None]:
from ignite.engine import Events

# тут можно добавить лог итераций при желании

def compute_s_epoch_results(engine):
    s_train_evaluator.run(s_train_loader)
    s_valid_evaluator.run(s_valid_loader)

s_trainer.add_event_handler(Events.EPOCH_COMPLETED, compute_s_epoch_results)

def log_s_epoch_results(engine, label):
    if label == 'Train':
        s_train_hist_loss.append(engine.state.metrics['Loss'])
    elif label == 'Valid':
        s_valid_hist_loss.append(engine.state.metrics['Loss'])


s_train_evaluator.add_event_handler(Events.EPOCH_COMPLETED, log_s_epoch_results, "Train")
s_valid_evaluator.add_event_handler(Events.EPOCH_COMPLETED, log_s_epoch_results, "Valid")
s_valid_evaluator.add_event_handler(Events.EPOCH_COMPLETED, show_s_loss,
                                    s_train_hist_loss, s_valid_hist_loss)

In [None]:
from ignite.handlers.param_scheduler import ReduceLROnPlateauScheduler

s_scheduler = ReduceLROnPlateauScheduler(
    s_optimizer,
    metric_name="Loss",
    factor=0.5,
    patience=1,
    threshold=0.05
)

def print_s_lr():
    for param_group in s_optimizer.param_groups:
        print(f"Learning rate = {param_group['lr']}")

s_valid_evaluator.add_event_handler(Events.COMPLETED, s_scheduler)
s_valid_evaluator.add_event_handler(Events.COMPLETED, print_s_lr)

In [None]:
s_trainer.run(s_train_loader, 15)

In [None]:
s_test_dataset = MySegmentationDataset('data/test_images')

s_model.eval()

plt.figure(figsize=(16, 8))

for idx, i in enumerate(range(15, 20)):
    img = resize(s_test_dataset[i], 216, antialias=True).cpu().permute(1, 2, 0).detach().numpy()
    with torch.no_grad():
        pred = s_model(s_test_dataset[i].unsqueeze(1).to(device))[0].cpu().permute(1, 2, 0).detach().numpy()

    plt.subplot(3, 5, idx + 1)
    plt.imshow(img, cmap="gray")
    plt.xticks([])
    plt.yticks([])

    plt.subplot(3, 5, idx + 6)
    plt.imshow(pred, cmap="gray")
    plt.xticks([])
    plt.yticks([])

    plt.subplot(3, 5, idx + 11)
    plt.imshow(img * pred, cmap="gray")
    plt.xticks([])
    plt.yticks([])

plt.show()


# Обучение ResNet'а

In [None]:
import pandas as pd

s_model.eval()

class MyClassificationDataset(torch.utils.data.Dataset):
    def __init__(self, img_dir, model, csv_file=None, transform=None):
        self.img_dir = img_dir

        self.model = model

        if csv_file:
            self.csv, self.answers = True, pd.read_csv(csv_file)
        else:
            self.csv = False

        self.transform = transform

    def __len__(self):
        return len(list(Path(self.img_dir).iterdir()))

    def __getitem__(self, idx):
        img_name = os.path.join(self.img_dir, "img_" + str(idx) + ".png")

        img = resize(to_tensor(Image.open(img_name)), 216, antialias=True).to(device)
        with torch.no_grad():
            mask = self.model(to_tensor(Image.open(img_name)).unsqueeze(1).to(device))[0]
        image = img * mask # получение сегментированных легких

        if self.transform:
            image = self.transform(image) # преобразования картинки

        if self.csv:
            answer = self.answers.iloc[idx, 1] # получение ответа в случае тренировки

        if self.csv:
            return image, answer
        else:
            return image

In [None]:
c_model = resnet
c_optimizer = torch.optim.Adam(c_model.parameters(), lr = 0.02)
c_criterion = torch.nn.CrossEntropyLoss()

c_dataset = MyClassificationDataset('data/train_images',
                                    s_model,
                                    'data/train_answers.csv')

c_train_dataset, c_valid_dataset = random_split(c_dataset, (0.8, 0.2))

c_train_loader = DataLoader(c_train_dataset, batch_size=16)
c_valid_loader = DataLoader(c_valid_dataset, batch_size=16)

In [None]:
from ignite.metrics import Recall, Precision, Fbeta

precision = Precision(average=False)
recall = Recall(average=False)

c_trainer = create_supervised_trainer(c_model, c_optimizer, c_criterion, device)

c_metrics = {
    'Loss': Loss(c_criterion),
    'F1' : Fbeta(1.0, precision=precision, recall=recall)
}

c_train_hist_loss, c_valid_hist_loss = [], []
c_train_hist_metric, c_valid_hist_metric = [], []

c_train_evaluator = create_supervised_evaluator(c_model, c_metrics, device)
c_valid_evaluator = create_supervised_evaluator(c_model, c_metrics, device)

In [None]:
def show_c_losses(engine, train_hist_loss, valid_hist_loss,
                  train_hist_metric, valid_hist_metric):
    clear_output()

    plt.figure(figsize=(20, 5))

    plt.subplot(1, 2, 1)
    plt.title('CE Loss')
    plt.plot(torch.arange(len(train_hist_loss)), train_hist_loss)
    plt.plot(torch.arange(len(valid_hist_loss)), valid_hist_loss)
    plt.yscale('log')
    plt.grid()

    plt.subplot(1, 2, 2)
    plt.title('F1')
    plt.plot(torch.arange(len(train_hist_metric)), train_hist_metric)
    plt.plot(torch.arange(len(valid_hist_metric)), valid_hist_metric)
    plt.yscale('log')
    plt.grid()

    plt.show()

In [None]:
# тут можно добавить лог итераций при желании

def compute_c_epoch_results(engine):
    c_train_evaluator.run(c_train_loader)
    c_valid_evaluator.run(c_valid_loader)

c_trainer.add_event_handler(Events.EPOCH_COMPLETED, compute_c_epoch_results)

def log_c_epoch_results(engine, label):
    if label == 'Train':
        c_train_hist_loss.append(engine.state.metrics['Loss'])
        c_train_hist_metric.append(engine.state.metrics['F1'])
    elif label == 'Valid':
        c_valid_hist_loss.append(engine.state.metrics['Loss'])
        c_valid_hist_metric.append(engine.state.metrics['F1'])


c_train_evaluator.add_event_handler(Events.EPOCH_COMPLETED, log_c_epoch_results, "Train")
c_valid_evaluator.add_event_handler(Events.EPOCH_COMPLETED, log_c_epoch_results, "Valid")
c_valid_evaluator.add_event_handler(Events.EPOCH_COMPLETED, show_c_losses,
                                    c_train_hist_loss, c_valid_hist_loss,
                                    c_train_hist_metric, c_valid_hist_metric)

In [None]:
c_scheduler = ReduceLROnPlateauScheduler(
    c_optimizer,
    metric_name="Loss",
    factor=0.5,
    patience=1,
    threshold=0.05
)

def print_c_lr():
    for param_group in c_optimizer.param_groups:
        print(f"Learning rate = {param_group['lr']}")

c_valid_evaluator.add_event_handler(Events.COMPLETED, c_scheduler)
c_valid_evaluator.add_event_handler(Events.COMPLETED, print_c_lr)

In [None]:
c_trainer.run(c_train_loader, 10)

In [None]:
c_model.eval()

test_dataset = MyClassificationDataset('data/test_images', s_model)

df = pd.DataFrame(columns=['id', 'target_feature'])
for i in range(len(test_dataset)):
    with torch.no_grad():
        pred = torch.argmax(c_model(test_dataset[i].unsqueeze(1).to(device))).cpu().detach().numpy()
    df = pd.concat([df, pd.DataFrame({'id': [i],'target_feature': pred})],
                   ignore_index=True)

df.to_csv('./preds.csv', index=False)