In [1]:
#!g1.1
import torch
import matplotlib.pyplot as plt


In [2]:
#!g1.1
import torchvision.transforms as T
from torchvision.datasets import OxfordIIITPet

dataset = OxfordIIITPet('data', target_types='segmentation', download=True)

In [3]:
#!g1.1
transform = T.Compose(
    [
        T.Resize((256, 256)),
        T.ToTensor(),
    ]
)

target_transform = T.Compose(
    [
        T.Resize((256, 256)),
        T.PILToTensor(),
        T.Lambda(lambda x: (x - 1).long())
    ]
)
# то нравится, то не нравится...
# def lambda_func(x):
#     return (x-1).long
#     
# transform = T.Compose(
#     [
#         T.Resize((256, 256)),
#         T.ToTensor(),
#     ]
# )
# target_transform = T.Compose(
#     [
#         T.Resize((256, 256)),
#         T.PILToTensor(),
#         T.Lambda(lambda_func)
#     ]
# )
train_dataset = OxfordIIITPet('data', transform=transform, target_transform=target_transform, target_types='segmentation')
valid_dataset = OxfordIIITPet('data', transform=transform, split='test', target_transform=target_transform, target_types='segmentation')

In [4]:
#!g1.1
from torch.utils.data import DataLoader

train_loader = DataLoader(train_dataset, batch_size=64, shuffle=True)
valid_loader = DataLoader(valid_dataset, batch_size=64, shuffle=True)

In [5]:
#!g1.1
from tqdm import tqdm


def train(model) -> float:
    model.train()

    train_loss = 0
    total = 0
    correct = 0

    for x, y in tqdm(train_loader, desc='Train'):
        bs = y.size(0)

        x, y = x.to(device), y.squeeze(1).to(device)

        optimizer.zero_grad()

        output = model(x)

        loss = loss_fn(output.reshape(bs, 3, -1), y.reshape(bs, -1))

        train_loss += loss.item()

        loss.backward()

        optimizer.step()

        _, y_pred = output.max(dim=1)
        total += y.size(0) * y.size(1) * y.size(2)
        correct += (y == y_pred).sum().item()

    train_loss /= len(train_loader)
    accuracy = correct / total

    return train_loss, accuracy

In [6]:
#!g1.1
@torch.inference_mode()
def evaluate(model, loader) -> tuple[float, float]:
    model.eval()

    total_loss = 0
    total = 0
    correct = 0

    for x, y in tqdm(loader, desc='Evaluation'):
        bs = y.size(0)

        x, y = x.to(device), y.squeeze(1).to(device)

        output = model(x)

        loss = loss_fn(output.reshape(bs, 3, -1), y.reshape(bs, -1))

        total_loss += loss.item()

        _, y_pred = output.max(dim=1)
        total += y.size(0) * y.size(1) * y.size(2)
        correct += (y == y_pred).sum().item()

    total_loss /= len(loader)
    accuracy = correct / total

    return total_loss, accuracy

In [21]:
#!g1.1
import numpy as np
from PIL import Image

In [7]:
import torch.nn as nn


def conv_plus_conv(in_channels: int, out_channels: int):
    """
    Makes UNet block
    :param in_channels: input channels
    :param out_channels: output channels
    :return: UNet block
    """
    return nn.Sequential(
        nn.Conv2d(
            in_channels=in_channels,
            out_channels=out_channels,
            kernel_size=3,
            stride=1,
            padding=1
        ),
        nn.BatchNorm2d(num_features=out_channels),
        nn.LeakyReLU(0.2),
        nn.Conv2d(
            in_channels=out_channels,
            out_channels=out_channels,
            kernel_size=3,
            stride=1,
            padding=1
        ),
        nn.BatchNorm2d(num_features=out_channels),
        nn.LeakyReLU(0.2),
    )


class UNET(nn.Module):
    def __init__(self):
        super().__init__()

        base_channels = 32

        self.down1 = conv_plus_conv(3, base_channels)
        self.down2 = conv_plus_conv(base_channels, base_channels * 2)
        self.down3 = conv_plus_conv(base_channels * 2, base_channels * 4)
        self.down4 = conv_plus_conv(base_channels * 4, base_channels * 8)
        self.down5 = conv_plus_conv(base_channels * 8, base_channels * 16)

        self.up1 = conv_plus_conv(base_channels * 2, base_channels)
        self.up2 = conv_plus_conv(base_channels * 4, base_channels)
        self.up3 = conv_plus_conv(base_channels * 8, base_channels * 2)
        self.up4 = conv_plus_conv(base_channels * 16, base_channels * 4)
        self.up5 = conv_plus_conv(base_channels * 32, base_channels * 8)

        self.bottleneck = conv_plus_conv(base_channels * 16, base_channels * 16)

        self.out = nn.Conv2d(in_channels=base_channels, out_channels=3, kernel_size=1)

        self.downsample = nn.MaxPool2d(kernel_size=2, stride=2)

    def forward(self, x):

        residual1 = self.down1(x)  
        x = self.downsample(residual1) 

        residual2 = self.down2(x) 
        x = self.downsample(residual2) 

        residual3 = self.down3(x)  
        x = self.downsample(residual3)

        residual4 = self.down4(x)  
        x = self.downsample(residual4) 
        
        residual5 = self.down5(x)  
        x = self.downsample(residual5) 

 
        x = self.bottleneck(x)  
        
        x = nn.functional.interpolate(x, scale_factor=2)  
        x = torch.cat((x, residual5), dim=1)  
        x = self.up5(x) 
       

        x = nn.functional.interpolate(x, scale_factor=2)  
        x = torch.cat((x, residual4), dim=1)  
        x = self.up4(x) 

        x = nn.functional.interpolate(x, scale_factor=2) 
        x = torch.cat((x, residual3), dim=1)  
        x = self.up3(x) 

        x = nn.functional.interpolate(x, scale_factor=2)  
        x = torch.cat((x, residual2), dim=1) 
        x = self.up2(x)  

        x = nn.functional.interpolate(x, scale_factor=2)  
        x = torch.cat((x, residual1), dim=1) 
        x = self.up1(x)  

        x = self.out(x)  

        return x

In [8]:
device = torch.device('cuda:0' if torch.cuda.is_available() else 'cpu')
print(device)

model = UNET().to(device)

from torch.optim import Adam
optimizer = Adam(model.parameters(), lr=1e-3)
#scheduler = StepLR(optimizer, step_size=25)

loss_fn = nn.CrossEntropyLoss()

cuda:0


In [9]:
train_loss_history, valid_loss_history = [], []
train_accuracy_history, valid_accuracy_history = [], []

num_epochs = 35

best_valid_accuracy = 0

for epoch in range(num_epochs):
    train_loss, train_accuracy = train(model)
    valid_loss, valid_accuracy = evaluate(model, valid_loader)

    train_loss_history.append(train_loss)
    valid_loss_history.append(valid_loss)

    train_accuracy_history.append(train_accuracy)
    valid_accuracy_history.append(valid_accuracy)
    
    best_valid_accuracy = max(valid_accuracy, best_valid_accuracy)
    
    print(f'epoch = {epoch+1} with valid_accuracy = {valid_accuracy*100}')
    print(f'epoch = {epoch+1} with best_valid_accuracy = {best_valid_accuracy*100}')
    
    if valid_accuracy >= 0.885:
        break
    

Train:   0%|          | 0/58 [00:21<?, ?it/s]


OutOfMemoryError: CUDA out of memory. Tried to allocate 512.00 MiB. GPU 0 has a total capacity of 4.00 GiB of which 0 bytes is free. Of the allocated memory 10.32 GiB is allocated by PyTorch, and 59.22 MiB is reserved by PyTorch but unallocated. If reserved but unallocated memory is large try setting PYTORCH_CUDA_ALLOC_CONF=expandable_segments:True to avoid fragmentation.  See documentation for Memory Management  (https://pytorch.org/docs/stable/notes/cuda.html#environment-variables)

In [None]:
@torch.inference_mode()
def predict(model: nn.Module, loader: DataLoader, device: torch.device):
    model.eval()
    predictions = []
    for x, _ in loader:
        x = x.to(device)
        outputs = model(x)
        y_pred = torch.argmax(outputs, 1)
        predictions.append(y_pred)
    result = torch.cat(predictions)

    return result

In [18]:
np.random.seed(100)
idx = np.random.randint(len(valid_dataset), size=200)

test_dataset = [valid_dataset[i] for i in idx]
test_loader = DataLoader(test_dataset, batch_size=64)

predictions = predict(model, test_loader, device)


In [1]:
predictions.unsqueeze(1).size()

In [22]:
# torch.save(predictions.unsqueeze(1), 'predictions.pth')

In [27]:
predictions_uint8 = predictions.unsqueeze(1).to(torch.uint8)

In [2]:
predictions_uint8.size()

In [29]:
torch.save(predictions_uint8, 'predictions_uint8.pth')