In [1]:
import os
from pathlib import Path
import cv2
import numpy as np
import pandas as pd
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader
from torchvision import transforms as tr
from PIL import Image
from sklearn.model_selection import train_test_split
from tqdm import tqdm

In [2]:
TRAIN_DATASET_PATH = Path().absolute() / "Train"
TEST_DATASET_PATH = Path().absolute() / "Test"

df = pd.read_csv(TRAIN_DATASET_PATH / 'markup.csv', encoding='utf-16', header=None)
train_df, val_df = train_test_split(df, test_size=0.2, random_state=42)

transform = tr.Compose([tr.ToTensor(), tr.Resize((128, 128))])

In [3]:
class BarcodeDataset(Dataset):
    def __init__(self, df, path):
        self.items = []
        for item in df.itertuples(index=False):
            try:
                x = np.array(Image.open(path / 'Images' / item[0])) / 255.
                y = cv2.fillConvexPoly(np.zeros(x.shape[:2]), np.array(item[2:10]).reshape(-1, 2), 1)
                y = np.expand_dims(y, axis=-1)
                self.items.append((transform(x).float(), transform(y).float()))
            except Exception as exc:
                print(exc)

    def __len__(self):
        return len(self.items)
    
    def __getitem__(self, idx):
        x, y = self.items[idx]
        return x, y

In [4]:
train_dataset = BarcodeDataset(train_df, TRAIN_DATASET_PATH)
valid_dataset = BarcodeDataset(val_df, TRAIN_DATASET_PATH)
train_loader = DataLoader(train_dataset, batch_size=4, shuffle=True, drop_last=True)
val_loader = DataLoader(valid_dataset, batch_size=4, shuffle=False)

In [5]:
class UNet(nn.Module):
    def __init__(self, in_channels, out_channels):
        super(UNet, self).__init__()
        self.encoder1 = self.conv_block(in_channels, 64)
        self.pool1 = nn.MaxPool2d(kernel_size=2, stride=2)
        self.encoder2 = self.conv_block(64, 128)
        self.pool2 = nn.MaxPool2d(kernel_size=2, stride=2)
        self.encoder3 = self.conv_block(128, 256)
        self.pool3 = nn.MaxPool2d(kernel_size=2, stride=2)
        self.encoder4 = self.conv_block(256, 512)
        self.pool4 = nn.MaxPool2d(kernel_size=2, stride=2)
        self.bottleneck = self.conv_block(512, 1024)
        self.upconv4 = nn.ConvTranspose2d(1024, 512, kernel_size=2, stride=2)
        self.decoder4 = self.conv_block(1024, 512)
        self.upconv3 = nn.ConvTranspose2d(512, 256, kernel_size=2, stride=2)
        self.decoder3 = self.conv_block(512, 256)
        self.upconv2 = nn.ConvTranspose2d(256, 128, kernel_size=2, stride=2)
        self.decoder2 = self.conv_block(256, 128)
        self.upconv1 = nn.ConvTranspose2d(128, 64, kernel_size=2, stride=2)
        self.decoder1 = self.conv_block(128, 64)
        self.outconv = nn.Conv2d(64, out_channels, kernel_size=1)

    def conv_block(self, in_channels, out_channels):
        return nn.Sequential(
            nn.Conv2d(in_channels, out_channels, kernel_size=3, padding=1),
            nn.BatchNorm2d(out_channels),
            nn.ReLU(inplace=True),
            nn.Conv2d(out_channels, out_channels, kernel_size=3, padding=1),
            nn.BatchNorm2d(out_channels),
            nn.ReLU(inplace=True)
        )

    def forward(self, x):
        enc1 = self.encoder1(x)
        enc2 = self.encoder2(self.pool1(enc1))
        enc3 = self.encoder3(self.pool2(enc2))
        enc4 = self.encoder4(self.pool3(enc3))
        bottleneck = self.bottleneck(self.pool4(enc4))
        dec4 = self.upconv4(bottleneck)
        dec4 = torch.cat((enc4, dec4), dim=1)
        dec4 = self.decoder4(dec4)
        dec3 = self.upconv3(dec4)
        dec3 = torch.cat((enc3, dec3), dim=1)
        dec3 = self.decoder3(dec3)
        dec2 = self.upconv2(dec3)
        dec2 = torch.cat((enc2, dec2), dim=1)
        dec2 = self.decoder2(dec2)
        dec1 = self.upconv1(dec2)
        dec1 = torch.cat((enc1, dec1), dim=1)
        dec1 = self.decoder1(dec1)
        return self.outconv(dec1)

In [6]:
class DiceLoss(nn.Module):
    def __init__(self, smooth=1.):
        super(DiceLoss, self).__init__()
        self.smooth = smooth

    def forward(self, y_pred, y_true):
        y_pred_flat = y_pred.view(-1)
        y_true_flat = y_true.view(-1)
        intersection = torch.sum(y_true_flat * y_pred_flat)
        dice_coefficient = (2. * intersection + self.smooth) / (torch.sum(y_true_flat) + torch.sum(y_pred_flat) + self.smooth)
        return 1 - dice_coefficient

device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
model = UNet(in_channels=3, out_channels=1).to(device)
loss_fn = DiceLoss()
optimizer = optim.Adam(model.parameters())

In [7]:
def train_model(model, train_loader, val_loader, loss_fn, optimizer, num_epochs=10):
    best_val_loss = float('inf')
    patience = 2
    patience_counter = 0

    for epoch in range(num_epochs):
        model.train()
        train_loss = 0
        for inputs, targets in tqdm(train_loader, desc=f'Training Epoch {epoch+1}/{num_epochs}'):
            inputs, targets = inputs.to(device), targets.to(device)
            optimizer.zero_grad()
            outputs = model(inputs)
            outputs = torch.sigmoid(outputs)
            loss = loss_fn(outputs, targets)
            loss.backward()
            optimizer.step()
            train_loss += loss.item()

        train_loss /= len(train_loader)

        print(f'loss on train: {train_loss:.4f}')

        model.eval()
        val_loss = 0
        with torch.no_grad():
            for inputs, targets in tqdm(val_loader, desc=f'Validation Epoch {epoch+1}/{num_epochs}'):
                inputs, targets = inputs.to(device), targets.to(device)
                outputs = model(inputs)
                outputs = torch.sigmoid(outputs)
                loss = loss_fn(outputs, targets)
                val_loss += loss.item()

        val_loss /= len(val_loader)
        print(f'loss on val: {val_loss:.4f}')

        if val_loss < best_val_loss:
            best_val_loss = val_loss
            patience_counter = 0
            torch.save(model.state_dict(), 'best.pth')
        else:
            patience_counter += 1

        if patience_counter >= patience:
            print('stopped')
            break

train_model(model, train_loader, val_loader, loss_fn, optimizer, num_epochs=10)

Training Epoch 1/10: 100%|██████████| [24:36<00:00,  1.11it/s]


loss on train: 0.0679


Validation Epoch 1/10: 100%|█████████| [01:51<00:11,  3.34it/s]


loss on val: 0.1354


Training Epoch 2/10: 100%|██████████| [24:36<00:00,  1.11it/s]


loss on train: 0.0453


Validation Epoch 2/10: 100%|█████████| [01:51<00:11,  3.34it/s]


loss on val: 0.1256


Training Epoch 3/10: 100%|██████████| [24:36<00:00,  1.11it/s]


loss on train: 0.0367


Validation Epoch 3/10: 100%|█████████| [01:51<00:11,  3.34it/s]


loss on val: 0.1123


Training Epoch 4/10: 100%|██████████| [24:36<00:00,  1.11it/s]


loss on train: 0.0312


Validation Epoch 4/10: 100%|█████████| [01:51<00:11,  3.34it/s]


loss on val: 0.1056


stopped


In [None]:
def extract_corners(prediction):
    mask = (torch.sigmoid(prediction).cpu().detach().numpy() > 0.5).astype(np.uint8)
    contours, _ = cv2.findContours(mask, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE)
    largest_contour = max(contours, key=cv2.contourArea)
    x, y, w, h = cv2.boundingRect(largest_contour)
    return [x, y + h, x, y, x + w, y, x + w, y + h]

test_result = []

for file in os.listdir(TEST_DATASET_PATH / 'Images'):
    img = np.array(Image.open(TEST_DATASET_PATH / 'Images' / file)) / 255.
    img_tensor = transform(img).unsqueeze(0).float().to(device)
    prediction = model(img_tensor)
    prediction = tr.Resize(img.shape[:2])(prediction).squeeze()
    corners = extract_corners(prediction)
    test_result.append([file, '-', *corners, '-'])

In [None]:
test_result_df = pd.DataFrame(test_result)
test_result_df.to_csv('answer.csv', header=False, index=False, encoding='utf-16')