In [2]:
import cv2
import json
import numpy as np
import os
import ssl
import torch
import torch.nn as nn
import torch.optim as optim
import torch
import torchvision.models as models
from PIL import Image
from sklearn.metrics import accuracy_score, recall_score
from torch.hub import load_state_dict_from_url
from torch.utils.data import DataLoader, Dataset, random_split
from torchvision import transforms
from torchvision.models import efficientnet_b0, EfficientNet_B0_Weights
from torchvision.models._api import WeightsEnum
# Turn of SSL verification
ssl._create_default_https_context = ssl._create_unverified_context

In [3]:
"""
Custom transform for PyTorch
"""
class GreyscaleContrast(torch.nn.Module):
    def __init__(self):
        super().__init__()

    def forward(self, img):
        img = np.array(img)                          # Convert image from PIL to numpy array
        img = cv2.cvtColor(img, cv2.COLOR_BGR2GRAY)  # Convert image to greyscale
        img = cv2.equalizeHist(img)                  # Apply histogram equalization
        img = Image.fromarray(img)                   # Create PIL image from numpy array
        return img

In [4]:
"""
Custom dataset for PyTorch
"""
class CustomDataset(Dataset):
    def __init__(self, data_dir, transform=None, threshold=True):
        """
        Initialises the Dataset object
        """
        self.data_dir = data_dir                    # Directory with training data
        self.transform = transform                  # Data transforms
        self.threshold = threshold                  # Threshold is true if the task is binary classification, false otherwise
        self.image_paths = [                        # Construct a list with image filenames
            os.path.join(data_dir, filename)
            for filename in os.listdir(data_dir)
            if filename.endswith(".jpg")
        ]

    def __len__(self):
        """
        Returns the number of data samples 
        """
        return len(self.image_paths)

    def __getitem__(self, idx):
        """
        Returns the image and label of specified sample
        """
        image_path = self.image_paths[idx]         
        image_name = os.path.splitext(os.path.basename(image_path))[0]
        label_path = os.path.join(self.data_dir, f"{image_name}.json")
        image = Image.open(image_path).convert("RGB")
        with open(label_path, "r") as f:
            label_data = json.load(f)
        colonies_number = label_data["colonies_number"]
        if self.threshold:
            colonies_number = min(colonies_number, 1)
        if self.transform:
            image = self.transform(image)
        return image, colonies_number

In [5]:
"""
Specify the loaction of training data
Select the task: binary classification or regression
Define training data transformations, random image pre-processing
Create dataset
"""
data_directory = "train_data_resized"
binary_classification = True
transform = transforms.Compose(
    [
        transforms.Resize((256, 256)),
        transforms.RandomHorizontalFlip(0.5),
        transforms.RandomVerticalFlip(0.5),
        transforms.ToTensor(),
    ]
)
dataset = CustomDataset(data_dir=data_directory, transform=transform, threshold=binary_classification)

In [6]:
"""
Select a sample size to not use the whole dataset for training
Calculate a size of training and validation dataset
Set a batch size 
Create a dataloader for training and validation dataset
"""
sample_size = 500 if 500 < len(dataset) else len(dataset)
train_size = int(0.9 * sample_size)
val_size = sample_size - train_size
train_dataset, val_dataset, _ = random_split(dataset, [train_size, val_size, (len(dataset) - train_size - val_size)])
batch_size = 64
train_loader = DataLoader(
    train_dataset, batch_size=batch_size, shuffle=False
)
val_loader = DataLoader(val_dataset, batch_size=batch_size)

In [7]:
"""
Use the whole dataset for training
"""
dataset_size = len(dataset)
train_size = int(0.98 * dataset_size)
val_size = dataset_size - train_size
train_dataset, val_dataset = random_split(dataset, [train_size, val_size])
batch_size = 32
train_loader = DataLoader(
    train_dataset, batch_size=batch_size, shuffle=True
)
val_loader = DataLoader(val_dataset, batch_size=batch_size)

In [8]:
"""
Specify number of epochs and learning rate
Select a model,
Specify the number of input and output features for the last layer, and the function to map real value to binary output
Select the device for training, loss function and optimizer
"""
num_epochs = 10
learning_rate = 0.001
def get_state_dict(self, *args, **kwargs):
    kwargs.pop("check_hash")
    return load_state_dict_from_url(self.url, *args, **kwargs)
WeightsEnum.get_state_dict = get_state_dict
model = efficientnet_b0(weights=EfficientNet_B0_Weights.IMAGENET1K_V1)
model = efficientnet_b0(weights="DEFAULT")
num_ftrs = 1280
model.classifier = nn.Sequential(nn.Linear(num_ftrs, 1), nn.Sigmoid())
device = torch.device("cpu")  # ("cuda:0" if torch.cuda.is_available() else "cpu")
model = model.to(device)
criterion = nn.BCELoss()
optimizer = optim.Adam(model.parameters(), lr=learning_rate)

In [9]:
"""
Metric that we will be evaluated on for competition
"""
def eval(y_true, y_pred):
    score = accuracy_score(y_true, y_pred) * recall_score(y_true, y_pred)
    return score

In [10]:
"""
Training loop
"""
best_val = 0
for epoch in range(num_epochs):
    model.train()
    train_loss = 0.0
    train_correct = 0
    train_total = 0

    for images, labels in train_loader:
        images = images.to(device)
        labels = labels.to(device)
        optimizer.zero_grad()
        outputs = model(images)
        labels = labels.unsqueeze(1)
        loss = criterion(outputs.float(), labels.float())
        loss.backward()
        optimizer.step()
        train_loss += loss.item() * images.size(0)

        threshold = 0.5
        predicted = (outputs >= threshold).int()

        
        train_correct += (predicted == labels).sum().item()
        train_total += labels.size(0)

    avg_train_loss = train_loss / train_total
    train_accuracy = train_correct / train_total
    model.eval()
    val_loss = 0.0
    val_correct = 0
    val_total = 0
    official_score = 0

    for images, labels in val_loader:
        images = images.to(device)
        labels = labels.to(device)
        outputs = model(images)
        labels = labels.unsqueeze(1)
        loss = criterion(outputs.float(), labels.float())
        val_loss += loss.item() * images.size(0)
       
        threshold = 0.5
        predicted = (outputs >= threshold).int()

        val_correct += (predicted == labels).sum().item()
        val_total += labels.size(0)
        official_score += eval(labels.flatten().cpu(), predicted.flatten().cpu())

    avg_val_loss = val_loss / val_total
    avg_official_score = official_score / len(val_loader)
    val_accuracy = val_correct / val_total

    print(
        f"Epoch [{epoch+1}/{num_epochs}], Training Loss: {avg_train_loss:.4f}, Training Accuracy: {train_accuracy:.4f}, Validation Loss: {avg_val_loss:.4f}, Validation Accuracy: {val_accuracy:.4f}, Official score: {avg_official_score:.4f}"
    )
    if avg_official_score > best_val:
        best_val = avg_official_score
        torch.save(model.state_dict(), "agar3000.pth")