In [1]:
import warnings

import matplotlib.pyplot
import matplotlib.pyplot as plt
import seaborn as sns
import numpy as np
import torch
import torch.nn as nn
from IPython.display import clear_output
from PIL import Image
from matplotlib import cm
from time import perf_counter
from torch.utils.data import DataLoader
from tqdm import tqdm
import torch.optim as optim
from torchsummary import summary
#from skimage import io, transform
import os
from torchvision.transforms import Compose, PILToTensor, Normalize
import torchvision.transforms as T
from torch.utils.data import random_split
import torch.nn.functional as F
import torchshow as ts

In [2]:
device = torch.device('cuda:0' if torch.cuda.is_available() else 'cpu')

In [5]:
from torch import nn


class Model(nn.Module):
    def __init__(self):
        super().__init__()

        self.net = nn.Sequential(
            nn.Conv2d(in_channels=1, out_channels=16, kernel_size=3, padding=1),  # 256 x 256 x 16
            nn.BatchNorm2d(16),
            nn.ReLU(),

            nn.MaxPool2d(2),  # 128 x 128 x 16

            nn.Conv2d(in_channels=16, out_channels=32, kernel_size=3, padding=1),  # 128 x 128 x 32
            nn.BatchNorm2d(32),
            nn.ReLU(),

            nn.MaxPool2d(2),  # 64 x 64 x 32

            nn.Conv2d(in_channels=32, out_channels=64, kernel_size=3, padding=1),  # 64 x 64 x 64
            nn.BatchNorm2d(64),
            nn.ReLU(),

            nn.MaxPool2d(2),  # 32 x 32 x 64

            nn.Conv2d(in_channels=64, out_channels=128, kernel_size=3, padding=1),  # 32 x 32 x 128
            nn.BatchNorm2d(128),
            nn.ReLU(),

            nn.MaxPool2d(2),  # 16 x 16 x 128

            nn.Conv2d(in_channels=128, out_channels=256, kernel_size=3, padding=1),  # 16 x 16 x 256
            nn.BatchNorm2d(256),
            nn.ReLU(),

            nn.MaxPool2d(2),  # 8 x 8 x 256

            nn.Flatten(),

            nn.Linear(in_features=8*8*256, out_features=1024),
            nn.BatchNorm1d(1024),
            nn.ReLU(),
            nn.Dropout(0.5),

            nn.Linear(in_features=1024, out_features=256),
            nn.BatchNorm1d(256),
            nn.ReLU(),
            nn.Dropout(0.5),

            nn.Linear(in_features=256, out_features=64),
            nn.BatchNorm1d(64),
            nn.ReLU(),
            nn.Dropout(0.5),

            nn.Linear(in_features=64, out_features=10)
        )

    def forward(self, x):
        return self.net(x)


In [7]:
class Block(nn.Module):
    def __init__(self, in_ch, out_ch):
        super().__init__()
        self.conv1 = nn.Conv2d(in_ch, out_ch, 3)
        self.act1  = nn.ReLU()
        self.conv2 = nn.Conv2d(out_ch, out_ch, 3)
        self.act2  = nn.ReLU()
    
    def forward(self, x):
        return self.act2(self.conv2(self.act1(self.conv1(x))))
class Encoder(nn.Module):
    def __init__(self, chs=(1,64,128,256,512,1024)):
        super().__init__()
        self.enc_blocks = nn.ModuleList([Block(chs[i], chs[i+1]) for i in range(len(chs)-1)])
        self.pool       = nn.MaxPool2d(2)
    
    def forward(self, x):
        ftrs = []
        for block in self.enc_blocks:
            x = block(x)
            ftrs.append(x)
            x = self.pool(x)
        return ftrs


class Decoder(nn.Module):
    def __init__(self, chs=(1024, 512, 256, 128, 64)):
        super().__init__()
        self.chs         = chs
        self.upconvs    = nn.ModuleList([nn.ConvTranspose2d(chs[i], chs[i+1], 2, 2) for i in range(len(chs)-1)])
        self.dec_blocks = nn.ModuleList([Block(chs[i], chs[i+1]) for i in range(len(chs)-1)]) 
        
    def forward(self, x, encoder_features):
        for i in range(len(self.chs)-1):
            x        = self.upconvs[i](x)
            enc_ftrs = self.crop(encoder_features[i], x)
            x        = torch.cat([x, enc_ftrs], dim=1)
            x        = self.dec_blocks[i](x)
        return x
    
    def crop(self, enc_ftrs, x):
        _, _, H, W = x.shape
        enc_ftrs   = T.CenterCrop([H, W])(enc_ftrs)
        return enc_ftrs


class UNet(nn.Module):
    def __init__(self, enc_chs=(1,64,128,256,512,1024), dec_chs=(1024, 512, 256, 128, 64), num_class=1, retain_dim=False, out_sz=(572,572)):
        super().__init__()
        self.encoder     = Encoder(enc_chs)
        self.decoder     = Decoder(dec_chs)
        self.head        = nn.Conv2d(dec_chs[-1], num_class, 1)
        self.retain_dim  = retain_dim
        self.out_sz  = out_sz

    def forward(self, x):
        enc_ftrs = self.encoder(x)
        out      = self.decoder(enc_ftrs[::-1][0], enc_ftrs[::-1][1:])
        out      = self.head(out)
        if self.retain_dim:
            out = F.interpolate(out, self.out_sz)
        return out

In [8]:
model_weights = torch.load('D:\ml_yandex-project\checkpoints\model_10.pt')
#optimizer_weights = torch.load('D:\ml_yandex-project\checkpoints\optimizer_segment.pt')

unet_model = UNet(enc_chs=(1,64,128,256,512), dec_chs=(512, 256, 128, 64), retain_dim=True, out_sz=(256,256)).to(device)
classification_model = Model()

unet_model.load_state_dict(model_weights)
classification_model.load_state_dict(torch.load('D:\ml_yandex-project\checkpoints\model_clsf.pt'))

#optimizer_unet = torch.optim.Adam(unet_model.parameters())
#optimizer_classification = torch.optim.Adam(classification_model.parameters())

#optimizer_unet.load_state_dict(torch.load('D:\ml_yandex-project\checkpoints\optimizer_segment.pt'))
#optimizer_classification.load_state_dict(torch.load('D:\ml_yandex-project\checkpoints\optimizer_clsf.pt'))

<All keys matched successfully>

In [16]:
import os
import torch
from torchvision import transforms
from PIL import Image

image_folder = 'D:/ml_yandex-project/data/test_images/'

mask_folder = 'D:/ml_yandex-project/data/test_lung_masks/'

if not os.path.exists(mask_folder):
    os.makedirs(mask_folder)

transform = transforms.Compose([
    transforms.Resize((256, 256)), 
    transforms.ToTensor()
])

count = 0
for image_name in os.listdir(image_folder):
    image_path = os.path.join(image_folder, image_name)
    image = Image.open(image_path).convert('L')

    image_tensor = transform(image).unsqueeze(0).to(device)


    unet_model.eval()

    with torch.no_grad():
        mask = unet_model(image_tensor)

    mask_image = transforms.ToPILImage()(mask.squeeze(0).cpu())

    mask_name = image_name.split('.')[0] + '.png'
    mask_path = os.path.join(mask_folder, mask_name)
    mask_image.save(mask_path)
    if count % 100 == 0:
        clear_output()
        print(int(count / 6920 * 100), "%")
    count += 1

print("Маски успешно созданы и сохранены.")

99 %
Маски успешно созданы и сохранены.


In [9]:
import numpy as np
import pandas as pd
import os
from PIL import Image

images = []
names = []

answers_dir = "data/test_images"


device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
classification_model.to(device)
classification_model.eval()

transform = T.Compose([
    T.Resize((256, 256)),
    T.RandomHorizontalFlip(p=0.5),
    T.RandomRotation(degrees=9),
    T.ColorJitter(brightness=0.2, 
                  contrast=0.2, 
                  saturation=0.2,
                  hue=0.1),
    T.RandomAdjustSharpness(sharpness_factor=2, p=0.5),
    T.RandomPerspective(distortion_scale=0.2, p=0.5),
    T.RandomGrayscale(p=0.1),
    T.ToTensor(),
])
names = []
predictions = []

answers_dir = "data/test_images"
count = 0

for filename in os.listdir(answers_dir):
    img_path = os.path.join(answers_dir, filename)
    img = Image.open(img_path)
    img = transform(img).unsqueeze(0).to(device)

    with torch.no_grad():
        output = classification_model(img)
        predicted_class = torch.argmax(output, dim=1)
        predictions.append(predicted_class.item())
        names.append(count)
        count += 1

print(len(predictions), len(os.listdir(answers_dir)), len(names))

6920 6920 6920


In [10]:
df = pd.DataFrame({
    "id": names,
    "target_feature": predictions
})

df.to_csv('answers.csv', index=False, sep=',')