> <h3 style="color: red;"><b>IMPORTANT</b></h3>
> 
> В очередной раз поторюсь о том, что данный проект лучше всего запускать в специализированной среде, чтобы не устанавливать лишних зависимостей. Берегите память своей железки!
>
> Также хочется отметить, что в данном проекте используется библиотека `Weights&Bias`, которая позволяют следить за процессом обучения нейронки.

## 0. Оглавление
<details>
<summary>Развернуть оглавление</summary>

1. Описываем задачу
2. Импортируем зависимости
3. Оформляем загрузку данных
4. EDA Анализ
5. Моделируем
6. SHAP Анализ
7. Производим оценку модели
</details>

## 1. Описываем задачу

`Задача: классификация рака легких`

Изображения в формате png или jpg.
Всего 4 класса: Adenocarcinoma,Large cell carcinoma, Squamous cell carcinoma and Normal.

<details>
<summary>Adenocarcinoma - Adenocarcinoma of the lung</summary>
Lung adenocarcinoma is the most common form of lung cancer accounting for 30 percent of all cases overall and about 40 percent of all non-small cell lung cancer occurrences. Adenocarcinomas are found in several common cancers, including breast, prostate and colorectal. Adenocarcinomas of the lung are found in the outer region of the lung in glands that secrete mucus and help us breathe. Symptoms include coughing, hoarseness, weight loss, and weakness.
</details>

<details>
<summary>Large cell carcinoma - Large-cell undifferentiated carcinoma</summary>
Large-cell undifferentiated carcinoma lung cancer grows and spreads quickly and can be found anywhere in the lung. This type of lung cancer usually accounts for 10 to 15 percent of all cases of NSCLC. Large-cell undifferentiated carcinoma tends to grow and spread quickly.
</details>

<details>
<summary>Squamous cell carcinoma - Squamous cell</summary>
This type of lung cancer is found centrally in the lung,
where the larger bronchi join the trachea to the lung, or in one of the main airway branches. Squamous cell lung cancer is responsible for about 30 percent of all non-small cell lung cancers and is generally linked to smoking.
</details>

Normal - everything is fine 👍

Что надо сделать:

1. Самим написать какую то небольшую сетку, посмотреть ее результаты
2. Взять предобученную любую архитектуру, которая на основе свёрток, всякие трансформеры пока не брать. Дообучить посмотреть результаты. Если дообучить целую архитектуру не получается, из-за скорости или ресурсов, зафризить начальные слои и обучать только те, что ближе к голове.

## 2. Импортируем зависимости

In [None]:
import os
import random

import matplotlib.pyplot as plt
import numpy as np
import pandas as pd
import seaborn as sns
from tqdm import tqdm
from PIL import Image

import torch
from torch import nn
from torch.utils.data import DataLoader, random_split, Dataset

import torchvision
from torchvision.transforms import v2
from torchmetrics.functional import accuracy, f1_score

import pytorch_lightning as pl
from pytorch_lightning.loggers import WandbLogger
from pytorch_lightning.callbacks import ModelCheckpoint, LearningRateMonitor

from sklearn.metrics import ConfusionMatrixDisplay, confusion_matrix

import wandb
import shap


def setDefaultSeabornParams():
    sns.set(
        palette="tab10",
        font_scale=1.2,
        rc={
            "figure.figsize": (10, 6),
            "axes.titlepad": 24,
            "axes.labelpad": 20,
            "axes.titlesize": 22,
            "figure.titlesize": 22,
            "figure.subplot.hspace": 0.3,
            "figure.subplot.wspace": 0.25,
        },
    )


def seedEverything(seed):
    random.seed(seed)
    os.environ["PYTHONHASHSEED"] = str(seed)
    np.random.seed(seed)
    torch.manual_seed(seed)
    torch.cuda.manual_seed(seed)
    torch.backends.cudnn.deterministic = True


setDefaultSeabornParams()
seedEverything(42)
DEVICE = torch.device("cuda") if torch.cuda.is_available() else torch.device("cpu")


In [None]:
wandb.login()

## 3. Оформляем загрузку данных

`В связи с тем, что картинки поступают последовательно (ID результата = ID изображения), а в добавок еще мы получаем доп.данные, будем кастомить собственный класс.`

In [None]:
def findClasses(dir):
    classes = os.listdir(dir)
    classes.sort()
    class_to_idx = {classes[i]: i for i in range(len(classes))}
    return classes, class_to_idx


def makeDataset(dir, class_to_idx):
    images = []
    for target in os.listdir(dir):
        d = os.path.join(dir, target)
        if not os.path.isdir(d):
            continue
        files = sorted(os.listdir(d), key=lambda x: int(x.split(".")[0]))
        for filename in files:
            path = f"{target}/{filename}"
            item = (path, class_to_idx[target])
            images.append(item)
    return images


def defaultLoader(path):
    return Image.open(path).convert("RGB")


class ImageFolderLoader(Dataset):
    def __init__(self, root, transform=None, loader=defaultLoader, is_test=False):
        classes, class_to_idx = findClasses(root)
        imgs = makeDataset(root, class_to_idx)

        self.root = root
        self.imgs = imgs
        self.classes = classes
        self.class_to_idx = class_to_idx
        self.transform = transform
        self.loader = loader
        self.is_test = is_test

    def __getitem__(self, index):
        path, target = self.imgs[index]
        img = self.loader(os.path.join(self.root, path))
        if self.transform is not None:
            img = self.transform(img)
        if self.is_test:
            return img
        return img, target

    def __len__(self):
        return len(self.imgs)


def makeSubset(image_loader: ImageFolderLoader, indices: list):
    image_loader.imgs = [image_loader.imgs[i] for i in indices]
    return image_loader


`Для собственного удобства напишем пару обработчиков данных`

In [None]:
def loadDataset(path, transform, is_test):
    return ImageFolderLoader(path, transform=transform, is_test=is_test)


def trainTestSplit(dataset, test_size):
    train_size = 1.0 - test_size
    return random_split(dataset, [train_size, test_size])

`А тут обработаем изображения`

In [None]:
def getTransforms(
    resize: tuple, interpolation: v2.InterpolationMode, center_crop: int
):
    train_transform = v2.Compose(
        [
            v2.Resize(resize, interpolation, antialias=True),
            v2.CenterCrop(center_crop),
            v2.RandomVerticalFlip(0.3),
            v2.RandomHorizontalFlip(0.3),
            v2.RandomRotation(degrees=(-10, 10)),
            v2.ColorJitter(brightness=0.5),
            v2.ToImage(),
            v2.ToDtype(torch.float32, scale=True),
            v2.Normalize((0.485, 0.456, 0.406), (0.229, 0.224, 0.225)),
        ]
    )
    val_transform = v2.Compose(
        [
            v2.Resize(resize, interpolation, antialias=True),
            v2.CenterCrop(center_crop),
            v2.ToImage(),
            v2.ToDtype(torch.float32, scale=True),
            v2.Normalize((0.485, 0.456, 0.406), (0.229, 0.224, 0.225)),
        ]
    )

    return train_transform, val_transform


def getResnet18Transforms():
    return getTransforms((256, 256), v2.InterpolationMode.BILINEAR, 224)


def getEfficientnetB1Transforms():
    return getTransforms((255, 255), v2.InterpolationMode.BILINEAR, 240)


def getEfficientnetB3Transforms():
    return getTransforms((320, 320), v2.InterpolationMode.BICUBIC, 300)


def getRegnetY16gfTransforms():
    return getTransforms((256, 256), v2.InterpolationMode.BILINEAR, 300)


In [None]:
BATCH_SIZE = 32
NUM_WORKERS = 4
TEST_SIZE = 0.2  # если вам потребуется использовать весь датасет, то следует установить параметр на нуль (0)


def getDataloaders(train_transform, val_transform):
    TRAIN_PATH = f"./train/train"
    TEST_PATH = f"./test"

    # Создаем загрузчик данных

    dataset = loadDataset(TRAIN_PATH, train_transform, is_test=False)
    full_dataloader = DataLoader(
        dataset, batch_size=BATCH_SIZE, num_workers=NUM_WORKERS, shuffle=True
    )

    # Создаем загрузчик данных train/val и в рандомном порядке разделяем папки для тренировки

    train_dataset = loadDataset(TRAIN_PATH, train_transform, is_test=False)
    val_dataset = loadDataset(TRAIN_PATH, val_transform, is_test=False)
    train_indices, val_indices = trainTestSplit(range(len(train_dataset)), TEST_SIZE)
    train_dataset = makeSubset(train_dataset, train_indices)
    val_dataset = makeSubset(val_dataset, val_indices)

    train_dataloader = DataLoader(
        train_dataset, batch_size=BATCH_SIZE, num_workers=NUM_WORKERS, shuffle=True
    )
    val_dataloader = DataLoader(
        val_dataset, batch_size=BATCH_SIZE, num_workers=NUM_WORKERS
    )

    # Создаем загрузчик данных test

    test_dataset = loadDataset(TEST_PATH, val_transform, is_test=True)
    test_dataloader = DataLoader(
        test_dataset, batch_size=BATCH_SIZE, num_workers=NUM_WORKERS
    )

    return train_dataloader, val_dataloader, test_dataloader, full_dataloader


`Предлагаю взять для пример немалоизвестную ResNet18`

<details>
<summary>Краткая справка о ResNet18 для тех, кто не знает</summary>
ResNet18 — это самая маленькая нейронная сеть в семействе нейронных сетей, называемых остаточными нейронными сетями, разработанная MSR (Ге и соавт.). Вкратце, Ге обнаружил, что нейронная сеть (обозначенная как функция `f` с вводным модулем `x` и выводом `f(x)`) будет работать лучше с «остаточным соединением» `x + f(x)`. Это остаточное соединение используется повсеместно в самых современных нейронных сетях, даже сегодня. Например, FBNetV2, FBNetV3.
</details>

In [None]:
train_transform, val_transform = getResnet18Transforms()
train_dataloader, val_dataloader, test_dataloader, full_dataloader = getDataloaders(train_transform, val_transform)


In [None]:
class_names = ["adenocarcinoma", "large.cell.carcinoma", "normal", "squamous.cell.carcinoma"]


## 4. EDA Анализ

`Рассмотрим соотношение классов в наборе`

In [None]:
classes_idx = []
for batch in full_dataloader:
    _, classes = batch
    classes_idx.extend(classes.tolist())
print(f"Total number of images in train folder: {len(classes_idx)}\n")

df = pd.DataFrame(classes_idx, columns=["Class"])
fig, ax = plt.subplots()
p = sns.countplot(df, x="Class")
p.set_title("Classes distribution")
p.set_xticks(range(4), class_names)

percentage = (
    round(df.value_counts() / len(df) * 100).sort_index().astype(int).astype(str) + "%"
)
_ = ax.bar_label(ax.containers[0], labels=percentage, padding=3)


- `Мы могли бы иметь 4 сбалансированных класса в случае равномерного содержания объектов. Однако на картинке заметно большее смещение в сторону «аденокарциномы».`
- `Также можем заметить, что у нас в итоге не так уж и много изображений`

In [None]:
def plotImages(
    do_transform: bool, dataloader: ImageFolderLoader, batch: list[torch.Tensor] = None
):
    LIMIT_IMAGES = 16

    if not do_transform:
        actual_transform = dataloader.dataset.transform
        temp_transform = v2.Compose(
            [
                v2.Resize(
                    (224, 224),
                    interpolation=v2.InterpolationMode.BILINEAR,
                    antialias=True,
                ),
                v2.ToImage(),
                v2.ToDtype(torch.float32, scale=True),
            ]
        )
        dataloader.dataset.transform = temp_transform
    if batch is None:
        batch = next(iter(dataloader))
        images, class_nums = batch
    else:
        images, class_nums = batch
        images = dataloader.dataset.transform(images)
    sns.set(rc={"axes.titlepad": 8})
    fig, ax = plt.subplots(4, 4, figsize=(8, 8))
    fig.suptitle("Samples", fontsize=14)
    plt.grid(False)
    for i in range(4):
        for j in range(4):
            cur = i * 4 + j
            image = images[cur]
            target = class_nums[cur]
            ax[i][j].set_title(class_names[target], fontsize=10)
            ax[i][j].axis("off")
            ax[i][j].imshow(image.permute(1, 2, 0))
    plt.show()

    if not do_transform:
        dataloader.dataset.transform = actual_transform
    setDefaultSeabornParams()

    return batch


In [None]:
batch = plotImages(do_transform=False, dataloader=val_dataloader)
_ = plotImages(do_transform=True, dataloader=val_dataloader, batch=batch)

`Единственная примечательная мною деталь, которая ни как не влияет на дальнейшую работу, это яркость снимков. По факту, на это могут влиять разные аспекты проведения снимков: начиная от правильного положения пациента, заканчивая ношением имплантов. Но мы сюда не для этого пришли, так что продолжаем.`

## 5. Моделирование

`Для оптимизации задействуем потомка Pytorch 19-го года: Lightning. Также добавим параметры:`

- `Train loss`
- `Validation loss`
- `Validation accuracy`
- `Validation F1 Score`

In [None]:
class PytorchModel(pl.LightningModule):
    def __init__(self, model, lr=5e-3):
        super().__init__()
        self.model = model
        self.optimizer = torch.optim.Adam(model.parameters(), lr=lr)
        self.criterion = torch.nn.CrossEntropyLoss()
        self.save_hyperparameters(ignore=["model"])

    def forwardResult(self, x):
        return self.model(x)

    def configureOptimizers(self):
        return [self.optimizer], [
            torch.optim.lr_scheduler.ExponentialLR(self.optimizer, gamma=0.95)
        ]

    def trainingStep(self, train_batch, batch_idx):
        x, y = train_batch
        predictions = self.forwardResult(x)
        loss = self.criterion(predictions, y)
        self.log("Train Loss", loss, prog_bar=True, on_epoch=True, sync_dist=True)
        return loss

    def validationStep(self, val_batch, batch_idx):
        x, y = val_batch
        predictions = self.forwardResult(x)
        loss = self.criterion(predictions, y)
        with torch.no_grad():
            acc = accuracy(predictions, y, task="multiclass", num_classes=4)
            f1 = f1_score(
                predictions, y, task="multiclass", average="macro", num_classes=4
            )
        self.log("Val Loss", loss, prog_bar=True, on_epoch=True, sync_dist=True)
        self.log("Val F1 (macro)", f1, prog_bar=True, on_epoch=True, sync_dist=True)
        self.log("Val Accuracy", acc, prog_bar=True, on_epoch=True, sync_dist=True)


In [None]:
def getPyTorchModel():
    model = nn.Sequential(
        nn.Conv2d(in_channels=3, out_channels=64, kernel_size=3, stride=2),
        nn.BatchNorm2d(64),
        nn.SELU(),
        nn.MaxPool2d(kernel_size=2),
        nn.Dropout2d(0.2),
        nn.Conv2d(in_channels=64, out_channels=128, kernel_size=3, stride=2),
        nn.BatchNorm2d(128),
        nn.LeakyReLU(),
        nn.MaxPool2d(kernel_size=2),
        nn.Dropout2d(0.2),
        nn.Conv2d(in_channels=128, out_channels=128, kernel_size=3, stride=2),
        nn.BatchNorm2d(128),
        nn.SELU(),
        nn.MaxPool2d(kernel_size=2),
        nn.Flatten(),
        nn.Linear(in_features=1152, out_features=1024),
        nn.LeakyReLU(),
        nn.Linear(in_features=1024, out_features=4),
    )
    return model


`Каждая представляемая вам ниже предварительно обученная модель возвращает:`

1. `Model`
2. `Train dataloader`
3. `Validation dataloader`
4. `Test dataloader`

In [None]:
def prepareResnet18():
    train_transform, val_transform = getResnet18Transforms()
    train_dataloader, val_dataloader, test_dataloader, _ = getDataloaders(
        train_transform, val_transform
    )

    model = torchvision.models.resnet18(weights="IMAGENET1K_V1").to(DEVICE)
    model.fc = nn.Sequential(
        nn.Linear(in_features=512, out_features=256),
        nn.ReLU(),
        nn.Linear(in_features=256, out_features=4),
    )
    return model, train_dataloader, val_dataloader, test_dataloader


def prepareRegnetY16gf():
    train_transform, val_transform = getRegnetY16gfTransforms()
    train_dataloader, val_dataloader, test_dataloader, _ = getDataloaders(
        train_transform, val_transform
    )

    model = torchvision.models.regnet_y_16gf(weights="IMAGENET1K_SWAG_E2E_V1").to(
        DEVICE
    )
    model.fc = nn.Linear(in_features=3024, out_features=4)
    return model, train_dataloader, val_dataloader, test_dataloader


def prepareEfficientnetB1():
    train_transform, val_transform = getEfficientnetB1Transforms()
    train_dataloader, val_dataloader, test_dataloader, _ = getDataloaders(
        train_transform, val_transform
    )

    model = torchvision.models.efficientnet_b1(weights="IMAGENET1K_V1").to(DEVICE)
    model.classifier = nn.Sequential(
        nn.Dropout(0.2), nn.Linear(in_features=1280, out_features=4)
    )
    return model, train_dataloader, val_dataloader, test_dataloader


def prepareEfficientnetB3():
    train_transform, val_transform = getEfficientnetB3Transforms()
    train_dataloader, val_dataloader, test_dataloader, _ = getDataloaders(
        train_transform, val_transform
    )

    model = torchvision.models.efficientnet_b3(weights="IMAGENET1K_V1").to(DEVICE)
    model.classifier = nn.Sequential(
        nn.Dropout(0.3),
        nn.Linear(in_features=1536, out_features=1536),
        nn.LeakyReLU(),
        nn.Linear(in_features=1536, out_features=4),
    )
    return model, train_dataloader, val_dataloader, test_dataloader


def prepareCustomModel():
    train_transform, val_transform = getResnet18Transforms()
    train_dataloader, val_dataloader, test_dataloader, _ = getDataloaders(
        train_transform, val_transform
    )

    model = getPyTorchModel().to(DEVICE)
    return model, train_dataloader, val_dataloader, test_dataloader


In [None]:
def getTrainer(max_epochs):
    return pl.Trainer(
        accelerator="cuda",
        devices=1,
        log_every_n_steps=5,
        max_epochs=max_epochs,
        logger=WandbLogger(log_model="all"),
        callbacks=[
            ModelCheckpoint(
                save_weights_only=True, mode="max", monitor="Val F1 (macro)"
            ),
            LearningRateMonitor("epoch"),
        ],
    )


In [None]:
%%time
model, my_train_dataloader, my_val_dataloader, my_test_dataloader = prepareCustomModel()
my_module = PytorchModel(model, lr=1e-3).to(DEVICE)

trainer = getTrainer(max_epochs=50)
trainer.fit(my_module, train_dataloaders=my_train_dataloader, val_dataloaders=my_val_dataloader)
wandb.finish()


In [None]:
%%time
model, ef_train_dataloader, ef_val_dataloader, ef_test_dataloader = prepareEfficientnetB3()
ef_module = PytorchModel(model, lr=1e-3).to(DEVICE)

trainer = getTrainer(max_epochs=20)
trainer.fit(ef_module, train_dataloaders=ef_train_dataloader, val_dataloaders=ef_val_dataloader)
wandb.finish()


`(F1) ~0,82 VS ~0,95 … B1 слишком слаб по сравнению с B3`

## 6. SHAP Анализ

`Чтобы стать нейросетью, нужно думать как нейросеть. Для этого взглянем, на какие места модель обращает внимание при прогнозировании ответа`

In [None]:
N_IMAGES = 4


def getUniqueImages(images, classes):
    it = 0
    indices = []
    for i in range(len(images)):
        if classes[i] == it:
            indices.append(i)
            it += 1
        if it == len(class_names):
            break
    return images[indices], classes[indices]


batch = next(iter(ef_val_dataloader))
images, classes = batch
images = images.to(DEVICE)
ef_images, ef_classes = getUniqueImages(images, classes)

batch = next(iter(my_val_dataloader))
images, classes = batch
images = images.to(DEVICE)
my_images, my_classes = getUniqueImages(images, classes)


In [None]:
def explain(module, layer, images, classes, model_name):
    explainer = shap.GradientExplainer((module.to(DEVICE), layer), images.to(DEVICE))
    shap_values, indexes = explainer.shap_values(images, ranked_outputs=4, nsamples=200)

    # Изменим изображения, для отображения вероятности

    inverse_norm = v2.Compose(
        [
            v2.Normalize(mean=[0.0, 0.0, 0.0], std=[1 / 0.229, 1 / 0.224, 1 / 0.225]),
            v2.Normalize(mean=[-0.485, -0.456, -0.406], std=[1.0, 1.0, 1.0]),
        ]
    )
    images_backup = images.clone()
    images = inverse_norm(images)
    images = images.movedim(1, -1)

    # Получим имена классов

    index_names = np.vectorize(lambda x: class_names[x])(indexes.cpu())

    # Подготовим отчет

    shap_values = [np.swapaxes(np.swapaxes(s, 2, 3), 1, -1) for s in shap_values]
    sns.set(rc={"axes.titlepad": 2})
    shap.image_plot(shap_values, images.cpu().numpy(), index_names, show=False)
    plt.suptitle(model_name)

    with torch.no_grad():
        preds = [
            class_names[ans]
            for ans in torch.argmax(module.to(DEVICE)(images_backup), dim=1).tolist()
        ]
        formatted_targets = [str(i + 1) + ") " + y for i, y in enumerate(preds)]
        formatted_targets = "\n".join(formatted_targets)
        print(f"Predicted values:\n{formatted_targets}\n")
    preds = [class_names[ans] for ans in classes[:N_IMAGES]]
    formatted_targets = [str(i + 1) + ") " + y for i, y in enumerate(preds)]
    formatted_targets = "\n".join(formatted_targets)
    print(f"Ground truth:\n{formatted_targets}")

    plt.show()
    setDefaultSeabornParams()


In [None]:
my_module.eval()
ef_module.eval()

explain(my_module, my_module.model[8], my_images, my_classes, "My Custom Module")
explain(ef_module, ef_module.model.features[5], ef_images, ef_classes, "EfficientNetB3")


## 7. Производим оценку модели

`Нам необходимо вывести результаты в отдельный файл`

In [None]:
def evaluate(module, dataloader):
    module.eval()
    model_answers = []
    truth_ground = []
    for images in tqdm(dataloader):
        if isinstance(images, list):
            images, classes = images
            truth_ground.extend(classes)
        images = images.to(DEVICE)

        with torch.no_grad():
            model_answers.extend(torch.argmax(module.to(DEVICE)(images), dim=1).tolist())

    return model_answers, truth_ground

In [None]:
def plot_confusion_matrix(model_answers, truth_ground, model_name):
    cm = ConfusionMatrixDisplay(confusion_matrix(model_answers, truth_ground))
    cm.display_labels = class_names
    p = cm.plot(cmap="binary", xticks_rotation="vertical")
    plt.grid(False)
    plt.title(f"Confusion Matrix ({model_name})")

In [None]:
val_model_answers, val_truth_ground   = evaluate(my_module, my_val_dataloader)
test_model_answers, test_truth_ground = evaluate(ef_module, ef_test_dataloader)

plot_confusion_matrix(val_model_answers, val_truth_ground, "My Custom Model")

In [None]:
val_model_answers, val_truth_ground   = evaluate(ef_module, ef_val_dataloader)
test_model_answers, test_truth_ground = evaluate(ef_module, ef_test_dataloader)

plot_confusion_matrix(val_model_answers, val_truth_ground, "EfficientNetB3")

`Генерируем прогнозы для тестового набора и также записываем`

In [None]:
labels = [class_names[val] for val in test_model_answers]
with open("result_predictions.csv", "w") as file:
    file.write("ID,label\n")
    for i, val in enumerate(labels):
        file.write(f"{i},{val}\n")
