# **CNN model for recognizing Christmas Eve dishes**

*Hackaton - Noc Sztucznej Inteligencji*

**Authors:**

Jakub Zdancewicz

Wiktor Niedźwiedzki

In [None]:
# Model trenowany na google colab
# from google.colab import drive
# drive.mount('/content/drive')

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


In [None]:
!pip3 install torch torchaudio torchvision torchtext torchdata



# Wczytanie własnego data setu

In [None]:
# !rm -rf /content/data
# !unzip -qq "data.zip"

# Uczenie modelu

In [None]:
import random
import os
import shutil

from PIL import Image

import torch
import torch.nn as nn
import torch.optim as optim
from torchvision import datasets, models, transforms
from torchvision.datasets import ImageFolder
from torch.utils.data import DataLoader

from sklearn.model_selection import train_test_split
from sklearn.metrics import f1_score

device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")

if torch.cuda.is_available():
    torch.cuda.set_device(torch.device("cuda:0"))

In [None]:
# Hyperparameters
lr = 0.0001 # learning rate
batch_size = 128
num_epochs = 11
lr_decay = 0.1 # decay rate
num_classes = 8

def set_seed(seed):
    torch.manual_seed(seed)
    torch.cuda.manual_seed_all(seed)
    random.seed(seed)
set_seed(42)

In [None]:
class_labels = {
    'barszcz czerwony': 1,
    'bigos': 2,
    'kutia': 3,
    'makowiec': 4,
    'pierniki': 5,
    'pierogi': 6,
    'sernik': 7,
    'zupa grzybowa': 8
}

In [None]:
data_transforms = {
        'train': transforms.Compose([
            transforms.Resize((254, 254)), # Resnet needs 224 x 224 images
            transforms.ToTensor(),
            #transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]),
        ]),
        'val': transforms.Compose([
            transforms.Resize((254, 254)),
            transforms.ToTensor(),
            #transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]),
        ]),
        'test': transforms.Compose([
            transforms.Resize((254, 254)),
            transforms.ToTensor(),
            #transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]),
        ]),
}

In [None]:
def prepare_data(file_path, output_dir, validation_split=0.2, test_split=0.1, batch_size=32):
    # create output directory if doesn't exist
    if not os.path.exists(output_dir):
        os.makedirs(output_dir)

    # create train, val, test folders
    train_dir = os.path.join(output_dir, 'train')
    val_dir = os.path.join(output_dir, 'val')
    test_dir = os.path.join(output_dir, 'test')

    for subset_dir in [train_dir, val_dir, test_dir]:
        if not os.path.exists(subset_dir):
            os.makedirs(subset_dir)


    # split images into train, val and test sets
    for class_name in os.listdir(file_path):
        class_path = os.path.join(file_path, class_name)
        if not os.path.isdir(class_path):
            continue

        images = os.listdir(class_path)


        train_images, temp_images = train_test_split(images, test_size=(validation_split + test_split), random_state=42)
        val_images, test_images = train_test_split(temp_images, test_size=test_split / (validation_split + test_split), random_state=42)

        for image in train_images + val_images + test_images:
            src = os.path.join(class_path, image)
            try:
                with Image.open(src) as img:
                    # Convert to RGB to prevent PIL errors
                    img = img.convert('RGBA' if img.mode == 'P' and 'transparency' in img.info else 'RGB')

                    label = class_labels.get(class_name, None)

                    # put images in correct folders
                    if image in train_images:
                        dst_dir = os.path.join(train_dir, str(label))
                    elif image in val_images:
                        dst_dir = os.path.join(val_dir, str(label))
                    else:
                        dst_dir = os.path.join(test_dir, str(label))

                    os.makedirs(dst_dir, exist_ok=True)
                    dst = os.path.join(dst_dir, image)

                    img.save(dst)
            except Exception as e:
                print(f"Błąd podczas przetwarzania obrazu '{image}': {e}") # Few images are weird

    # ImageFolder dataloader
    image_datasets = {
        'train': datasets.ImageFolder(train_dir, data_transforms['train']),
        'val': datasets.ImageFolder(val_dir, data_transforms['val']),
        'test': datasets.ImageFolder(test_dir, data_transforms['test']),
    }

    # create data loaders
    dataloaders = {
        'train': DataLoader(image_datasets['train'], batch_size=batch_size, shuffle=True, num_workers=2),
        'val': DataLoader(image_datasets['val'], batch_size=batch_size, shuffle=False, num_workers=2),
        'test': DataLoader(image_datasets['test'], batch_size=batch_size, shuffle=False, num_workers=2),
    }

    return image_datasets, dataloaders

In [None]:
!rm -rf /content/m

In [None]:
database_path = "/content/data"
splitted_data_path = "/content/m"
image_datasets, dataloaders = prepare_data(database_path, splitted_data_path, validation_split=0.2, batch_size=batch_size)

Błąd podczas przetwarzania obrazu 'pierniki_118.jpg': cannot write mode RGBA as JPEG


In [None]:
def train_model(model, dataloaders, image_datasets, device, loss_fn, optimizer, lr_scheduler=None, num_epochs=10):
    for epoch in range(1, num_epochs + 1):
        print(f'Epoch {epoch}/{num_epochs}')
        print('-' * 10)

        # lr decay
        if lr_scheduler:
            lr_scheduler(optimizer, epoch)

        # train mode
        model.train()
        running_loss = 0.0
        all_preds = []
        all_labels = []

        # Iterate through batches
        for inputs, labels in dataloaders['train']:

            # CUDA things (or cpu if CUDA is unavailable)
            inputs = inputs.to(device)
            labels = labels.to(device)

            # clean gradients
            optimizer.zero_grad()

            with torch.set_grad_enabled(True):
                # forward pass
                outputs = model(inputs)
                _, preds = torch.max(outputs, 1)
                loss = loss_fn(outputs, labels)

                # backward pass
                loss.backward()
                optimizer.step()

            # get total loss for a batch
            running_loss += loss.item() * inputs.size(0)
            all_preds.extend(preds.cpu().numpy())
            all_labels.extend(labels.cpu().numpy())

        epoch_loss = running_loss / len(image_datasets['train'])
        epoch_f1 = f1_score(all_labels, all_preds, average='weighted')
        print(f'Train Loss: {epoch_loss:.4f} F1-score: {epoch_f1:.4f}')

        model.eval()
        running_loss = 0.0
        all_preds = []
        all_labels = []

        # Val loss
        for inputs, labels in dataloaders['val']:
            inputs = inputs.to(device)
            labels = labels.to(device)

            # prevent from calculating gradient
            with torch.set_grad_enabled(False):
                outputs = model(inputs)
                _, preds = torch.max(outputs, 1)
                loss = loss_fn(outputs, labels)

            running_loss += loss.item() * inputs.size(0)

            all_preds.extend(preds.cpu().numpy())
            all_labels.extend(labels.cpu().numpy())

        epoch_loss = running_loss / len(image_datasets['val'])
        epoch_f1 = f1_score(all_labels, all_preds, average='weighted')
        print(f'Validation Loss: {epoch_loss:.4f} F1-score: {epoch_f1:.4f}')

    return model

In [None]:
def exp_lr_scheduler(optimizer, epoch, init_lr=0.0001, lr_decay_epoch=5, decay_weight=0.1):
    # Calculate lr for current epoch
    lr = init_lr * (decay_weight**(epoch // lr_decay_epoch))

    if epoch % lr_decay_epoch == 0:
        print('LR is set to {:.6f}'.format(lr))

    for param_group in optimizer.param_groups:
        param_group['lr'] = lr

    return optimizer

In [None]:
# Finetune ResNet
model_ft = models.resnet50(weights='DEFAULT')
num_ftrs = model_ft.fc.in_features
model_ft.fc = nn.Linear(num_ftrs, num_classes)

# Loss
loss_fn = nn.CrossEntropyLoss()

if device.type == 'cuda':
    criterion = loss_fn.cuda()
    model_ft = model_ft.cuda()

# Optimizer
optimizer_ft = optim.Adam(model_ft.parameters(), lr=lr)

model_ft = train_model(model_ft, dataloaders, image_datasets, device, loss_fn, optimizer_ft,
                       lambda opt, epoch: exp_lr_scheduler(opt, epoch, init_lr=lr, lr_decay_epoch=5, decay_weight=lr_decay),
                       num_epochs=num_epochs)

Epoch 1/11
----------
Train Loss: 1.7747 F1-score: 0.5877
Validation Loss: 1.1571 F1-score: 0.8634
Epoch 2/11
----------
Train Loss: 0.6700 F1-score: 0.9540
Validation Loss: 0.2266 F1-score: 0.9542
Epoch 3/11
----------
Train Loss: 0.1521 F1-score: 0.9809
Validation Loss: 0.0979 F1-score: 0.9687
Epoch 4/11
----------
Train Loss: 0.0443 F1-score: 0.9922
Validation Loss: 0.0832 F1-score: 0.9750
Epoch 5/11
----------
LR is set to 0.000010
Train Loss: 0.0191 F1-score: 0.9988
Validation Loss: 0.0826 F1-score: 0.9791
Epoch 6/11
----------
Train Loss: 0.0222 F1-score: 0.9976
Validation Loss: 0.0924 F1-score: 0.9771
Epoch 7/11
----------
Train Loss: 0.0174 F1-score: 0.9994
Validation Loss: 0.0919 F1-score: 0.9708
Epoch 8/11
----------
Train Loss: 0.0183 F1-score: 0.9982
Validation Loss: 0.0924 F1-score: 0.9687
Epoch 9/11
----------
Train Loss: 0.0163 F1-score: 0.9994
Validation Loss: 0.0792 F1-score: 0.9750
Epoch 10/11
----------
LR is set to 0.000001
Train Loss: 0.0155 F1-score: 0.9988
Valida

In [None]:
# Get test set loss
model_ft.eval()

running_loss = 0.0
all_preds = []
all_labels = []

for inputs, labels in dataloaders['test']:
    inputs = inputs.to(device)
    labels = labels.to(device)

    with torch.no_grad():
        outputs = model_ft(inputs)
        _, preds = torch.max(outputs, 1)
        loss = criterion(outputs, labels)

    running_loss += loss.item() * inputs.size(0)

    all_preds.extend(preds.cpu().numpy())
    all_labels.extend(labels.cpu().numpy())

epoch_loss = running_loss / len(image_datasets['test'])
epoch_f1 = f1_score(all_labels, all_preds, average='weighted')

print(f'Test Loss: {epoch_loss:.4f} F1-score: {epoch_f1:.4f}')

Test Loss: 0.0921 F1-score: 0.9835


In [None]:
torch.save(model_ft.state_dict(), "model_weights.pth")

# Testowanie modelu

In [None]:
# Wczytanie modelu
num_classes = 8
model_ft = models.resnet50(weights=None)

num_ftrs = model_ft.fc.in_features
model_ft.fc = nn.Linear(num_ftrs, num_classes)

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

model_ft.load_state_dict(torch.load("model_weights.pth", map_location=torch.device(device)))

print("Model loaded successfully!")

model_ft = model_ft.to(device)

  model_ft.load_state_dict(torch.load("model_weights.pth", map_location=torch.device(device)))


Model loaded successfully!


In [None]:
# Define model image classification
def classify_image(model, image_path, class_labels, device):

    # transform input Image
    transform = transforms.Compose([
        transforms.Resize((224, 224)),
        transforms.ToTensor(),
        #transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]),
    ])

    # Changing classes indexes in train_model() causes CUDA errors so changing them during inference
    try:
        image = Image.open(image_path).convert('RGB')

        input_tensor = transform(image).unsqueeze(0) # Add batch dimension

        input_tensor = input_tensor.to(device)

        model.eval()

        with torch.no_grad():
            outputs = model(input_tensor)
            _, predicted_class = torch.max(outputs, 1)

        class_index = predicted_class.item()
        return predicted_class.item() + 1

    except Exception as e:
        print(f"Error processing image '{image_path}': {e}")
        return None

class_labels = {
    'barszcz czerwony': 1,
    'bigos': 2,
    'kutia': 3,
    'makowiec': 4,
    'pierniki': 5,
    'pierogi': 6,
    'sernik': 7,
    'zupa grzybowa': 8
}

In [None]:
!rm -rf /content/test
!unzip -qq "test.zip"

In [None]:
folder_path = "/content/test/"
files = [os.path.join(folder_path, file) for file in os.listdir(folder_path) if os.path.isfile(os.path.join(folder_path, file))]

class_labels_reverse = {v: k for k, v in class_labels.items()}

for image_path in files:
    file_name = os.path.basename(image_path)
    predicted_label_index = classify_image(model_ft, image_path, class_labels, device)
    predicted_label = class_labels_reverse.get(predicted_label_index, "Unknown")
    print(f"{file_name} -> {predicted_label}")

zupa z grzybami_20.jpg -> zupa grzybowa
zupa barszcz_2.jpg -> zupa grzybowa
zupa z grzybami_13.jpg -> zupa grzybowa
polskie pierogi_6.jpg -> pierogi
sernik z rodzynkami_9.jpg -> sernik
zupa barszcz_21.jpg -> barszcz czerwony
super bigos_5.jpg -> bigos
super bigos_20.jpg -> bigos
super bigos_11.jpg -> bigos
ciasto makowe_5.jpg -> makowiec
sernik z rodzynkami_6.jpg -> sernik
zupa z grzybami_5.jpg -> zupa grzybowa
ciasto makowe_14.jpg -> makowiec
ciasto makowe_8.jpg -> makowiec
sernik z rodzynkami_11.jpg -> sernik
ciasto makowe_18.jpg -> makowiec
zupa z grzybami_10.jpg -> zupa grzybowa
sernik z rodzynkami_4.jpg -> sernik
super bigos_19.jpg -> bigos
polskie pierogi_21.jpg -> pierogi
zupa barszcz_17.jpg -> barszcz czerwony
polskie pierogi_10.jpg -> pierogi
zupa z grzybami_4.jpg -> zupa grzybowa
polskie pierogi_11.jpg -> pierogi
super bigos_6.jpg -> bigos
polskie pierogi_23.jpg -> pierogi
ciasto makowe_21.jpg -> makowiec
super bigos_16.jpg -> bigos
zupa z grzybami_8.jpg -> zupa grzybowa
pols