# Uploading libraries

In [104]:
# Fundamental libraries

import os
import random
import time
import copy

# Working libraries
import cv2
import matplotlib.pyplot as plt
import numpy as np
import pandas as pd
import torch
import torch.nn as nn
import torch.nn.functional as F
import wandb
from torch.autograd import Variable
from sklearn.metrics import classification_report
import torch.utils.data as data
import torchvision
from torchvision import transforms
from PIL import Image
from transformers import AutoImageProcessor, ViTForImageClassification, ViTConfig, ResNetForImageClassification, ResNetConfig
import torchvision.transforms.functional as TF
from torchvision.io import read_image
from torch.utils.data import Dataset, DataLoader
from torchvision import transforms, datasets, models
import torch.optim as optim
from torch.optim import lr_scheduler
from sklearn.metrics import f1_score
from importlib import reload
import utility
reload(utility)
from utility import load_data, plot_confusion_matrix, plot_average_f1_scores, train_model, get_classification_details, get_hard_disk_path, show_samples, plot_features_importance, visualize_correlation, get_shap

# Evaluation
from sklearn.metrics import f1_score, confusion_matrix, ConfusionMatrixDisplay

# Model Recommendations
VGG-16: Despite being relatively deep, VGG-16 can still be effective on smaller datasets, especially when used with transfer learning. Its simplicity and well-understood architecture make it a good starting point.

ResNet-50: ResNet models, especially the shallower ones like ResNet-50, are known for their ability to avoid overfitting through the use of residual connections. This can be beneficial for learning from small datasets.

MobileNet: Designed for mobile and resource-constrained environments, MobileNets are lightweight and efficient, which can be advantageous when training data is limited.

SqueezeNet: This network achieves AlexNet-level accuracy with significantly fewer parameters. Its compact architecture makes it suitable for small datasets and limited computational resources.

# FILE PATHS on local environment

#### Each folder path represent class (label) :

**Folder name - calss name : description**

0 - 0 class : dead

1 - 1 class : empty

2 - 2 class : keep0

3 - 3 class : keep1

4 - 4 class : keep2

5 - 5 class : reseed0

6 - 6 class : reseed1

7 -  7 class : split

In [105]:
# paths for data upload
FILE_PATH = get_hard_disk_path("DL")[:-1] + "_CV/"
TRAIN_FEATURES_PATH_0 = FILE_PATH + 'dead' 
TRAIN_FEATURES_PATH_1 = FILE_PATH + 'empty'
TRAIN_FEATURES_PATH_2 = FILE_PATH + 'keep0'
TRAIN_FEATURES_PATH_3 = FILE_PATH + 'keep1'
TRAIN_FEATURES_PATH_4 = FILE_PATH + 'keep2'
TRAIN_FEATURES_PATH_5 = FILE_PATH + 'reseed0'
TRAIN_FEATURES_PATH_6 = FILE_PATH + 'reseed1'
TRAIN_FEATURES_PATH_7 = FILE_PATH + 'split'

# list with pathe
PATHES_LIST = [TRAIN_FEATURES_PATH_0,TRAIN_FEATURES_PATH_1,TRAIN_FEATURES_PATH_2,TRAIN_FEATURES_PATH_3,TRAIN_FEATURES_PATH_4, TRAIN_FEATURES_PATH_5, TRAIN_FEATURES_PATH_6, TRAIN_FEATURES_PATH_7]

Successfully loaded data from D:/data_for_DL/


# Data set : train & test

The data was divided into training and testing sets for each class, with a split of 80% for training and 20% for testing, maintaining the same ratio for each class. However, the dataset exhibits an imbalance issue, with one class having a significantly larger number of samples compared to the other class.

In [106]:
labels = ['dead', 'empty', 'keep0', 'keep1', 'keep2', 'reseed0', 'reseed1', 'split']
for i, path in enumerate(PATHES_LIST):
    print(path)

D:/data_for_DL_CV/dead
D:/data_for_DL_CV/empty
D:/data_for_DL_CV/keep0
D:/data_for_DL_CV/keep1
D:/data_for_DL_CV/keep2
D:/data_for_DL_CV/reseed0
D:/data_for_DL_CV/reseed1
D:/data_for_DL_CV/split


In [107]:
class CustomImageDataset_train(Dataset):
    def __init__(self, img_paths, img_labels, transform=None, model_name=None):
        self.img_sort = img_paths
        self.img_labels = img_labels
        self.transform = transform
        self.processor = None

        if model_name is not None:
            self.processor = AutoImageProcessor.from_pretrained(model_name)

        self.len = len(self.img_sort)

    def __len__(self):
        return self.len

    def __getitem__(self, idx):
        img_path = self.img_sort[idx]
        try:
            image = read_image(img_path).float()
        except Exception as e:
            print(f"Error reading image {img_path}: {e}")
            # Handle error (e.g., by skipping this sample or using a default image)

        label = torch.tensor(self.img_labels[idx], dtype=torch.long)

        if image.shape[0] == 1:
            image = image.repeat(3, 1, 1)

        if self.transform:
            image = self.transform(image)

        image = self.augmentation(image)

        if self.processor:
            image = self.processor(images=image, return_tensors="pt").pixel_values.squeeze()

        return image, label
        
    def augmentation(self, image):
        # Random horizontal flipping
        if random.random() > 0.5:
            image = TF.hflip(image)

        # Random vertical flipping
        if random.random() > 0.5:
            image = TF.vflip(image)

        # Random rotation by 0, 90, 180, or 270 degrees
        degree = random.choice([0, 90, 180, 270])
        image = TF.rotate(image, degree)

        return image

### Separate data to train and test data with ratio 0.8 in test data

To utilize the CustomImageDataset_test and CustomImageDataset_train, it is necessary to specify the desired transformations for each case, such as Normalize and CenterCrop.

The following code demonstrates how to define the transformations:

For test data transformation using CenterCrop:

In [112]:
class CustomImageDataset_test(Dataset):
    def __init__(self, img_paths, img_labels, transform=None, model_name=None):
        self.img_sort = img_paths   # List of image paths
        self.img_labels = img_labels # Corresponding labels for each image
        self.transform = transform
        self.processor = None

        # Load the processor if a model name is given
        if model_name is not None:
            self.processor = AutoImageProcessor.from_pretrained(model_name)
        
        self.len = len(self.img_sort)

    def __len__(self):
        return self.len

    def __getitem__(self, idx):
        img_path = self.img_sort[idx]
        try:
            image = read_image(img_path).float()
        except Exception as e:
            print(f"Error reading image {img_path}: {e}")
            # Handle error (e.g., by skipping this sample or using a default image)

        label = torch.tensor(self.img_labels[idx], dtype=torch.long)

        if image.shape[0] == 1:
            image = image.repeat(3, 1, 1)

        if self.transform:
            image = self.transform(image)

        if self.processor:
            image = self.processor(images=image, return_tensors="pt").pixel_values.squeeze()

        return image, label

In [113]:
import torch
import wandb
from sklearn.metrics import f1_score
from torch.utils.data import DataLoader
from sklearn.model_selection import StratifiedKFold

# Initialize a new wandb run
wandb.init(project="organoid_classification", entity="laurent-gurtler")

all_labels_list = ['dead', 'empty', 'keep0', 'keep1', 'keep2', 'reseed0', 'reseed1', 'split']

model_name = "microsoft/resnet-50"
num_labels = len(all_labels_list)  # The number of unique labels/classes in your dataset
print(num_labels)

config = ResNetConfig.from_pretrained(model_name, num_labels=num_labels)
# Instantiate the model with the new configuration
model = ResNetForImageClassification(config)

criterion = torch.nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=5e-5, weight_decay=1e-5)

# Assuming PATHES_LIST contains paths to your different class directories
img_labels = []
img_sort = []
for i, img_dir in enumerate(PATHES_LIST):
    img_files = sorted([f for f in os.listdir(img_dir) if f.endswith(".jpg")])
    img_labels.extend([i] * len(img_files))  # Labels for each image
    img_sort.extend([os.path.join(img_dir, f) for f in img_files])  # Paths for each image

# Convert img_sort to a numpy array for easier indexing
img_sort = np.array(img_sort)

# Number of folds
k_folds = 5

# Initialize StratifiedKFold
kf = StratifiedKFold(n_splits=k_folds, shuffle=True, random_state=42)

# Convert img_labels to numpy array for StratifiedKFold
img_labels = np.array(img_labels)

# Loop through each fold
for fold, (train_idx, val_idx) in enumerate(kf.split(img_sort, img_labels)):
    print(f"Training on fold {fold+1}/{k_folds}")

    # Split data into training and validation for this fold
    train_data = img_sort[train_idx]
    val_data = img_sort[val_idx]

    # Create datasets for this fold
    train_dataset = CustomImageDataset_train(train_data, img_labels[train_idx], 
                                             transform=False, 
                                             model_name="microsoft/resnet-50")
    test_dataset = CustomImageDataset_test(val_data, img_labels[val_idx], 
                                          transform=False, 
                                          model_name="microsoft/resnet-50")
    

    test_loader = DataLoader(test_dataset, shuffle=False)
    train_loader = DataLoader(train_dataset, batch_size=4, shuffle=True)

    # Configurations (hyperparameters and model architecture)
    config = wandb.config
    config.learning_rate = 0.00001
    config.batch_size = 64

    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
    model.to(device)

    wandb.watch(model, criterion, log="all", log_freq=10)

    epoch_f1_scores_dict_list = []

    num_epochs = 20  # Number of epochs to train for

    for epoch in range(num_epochs):
        model.train()
        running_loss = 0.0
        for images, labels in train_loader:
            images = images.to(device)
            labels = labels.to(device)  # Convert labels to torch.long
            optimizer.zero_grad()
            outputs = model(images).logits
            loss = criterion(outputs, labels)
            loss.backward()
            optimizer.step()
            running_loss += loss.item()
        
        # Logging the training loss
        wandb.log({"epoch": epoch, "train_loss": running_loss/len(train_loader)})

        # Validation phase
        model.eval()
        all_labels = []
        all_predictions = []
        validation_loss = 0.0
        correct = 0
        total = 0

        with torch.no_grad():
            for images, labels in test_loader:
                images = images.to(device)
                labels = labels.to(device)
                outputs = model(images).logits
                predicted = outputs.argmax(dim=1)
                total += labels.size(0)
                correct += (predicted == labels).sum().item()
                loss = criterion(outputs, labels)
                validation_loss += loss.item()

                # Move the labels and predictions to CPU for sklearn metrics
                all_labels.extend(labels.cpu().numpy())
                all_predictions.extend(predicted.cpu().numpy())
                print(all_labels, all_predictions)

        # Calculate accuracy and F1 score
        accuracy = 100 * correct / total
        f1_scores = f1_score(all_labels, all_predictions, average=None, zero_division=0)  # This will give you an array of F1 scores per class
        # Check if the length of F1 scores matches the number of labels
        if len(f1_scores) != len(all_labels_list):
            print(f"Warning: Number of F1 scores ({len(f1_scores)}) does not match number of classes ({len(all_labels_list)})")
            # Trim the list of F1 scores to match the number of classes, if necessary
            f1_scores = f1_scores[:len(all_labels_list)]

        # Create a dictionary of class labels and their corresponding F1 scores
        f1_scores_dict = {all_labels_list[idx]: f1 for idx, f1 in enumerate(f1_scores)}
        epoch_f1_scores_dict_list.append(f1_scores_dict)

        # It might be useful to also get the classification report for all metrics
        classification_rep = classification_report(all_labels, all_predictions, output_dict=True, zero_division=0)

        # Logging the individual F1 scores
        for idx, f1 in enumerate(f1_scores):
            wandb.log({f"class_{idx}_f1_score": f1})

        # Logging the classification report
        wandb.log({"classification_report": classification_rep})

        wandb.log({"epoch": epoch, "validation_loss": validation_loss/len(test_loader)})

        # Print statistics for each class
        print(f"Epoch {epoch+1}, Train Loss: {running_loss/len(train_loader)}, "
            f"Validation Loss: {validation_loss/len(test_loader)}, Accuracy: {accuracy} %")
        for idx, f1 in enumerate(f1_scores):
            print(f"Class {idx} F1 Score: {f1}")

    if len(epoch_f1_scores_dict_list) >= 5:
        # Get the last 5 epochs' F1-scores for each class
        last_5_epochs_f1_scores = epoch_f1_scores_dict_list[-5:]

        # Initialize a dictionary to hold the cumulative F1-scores for the last 5 epochs
        cumulative_f1_scores = {label: 0 for label in all_labels_list}

        # Sum the F1-scores for each label across the last 5 epochs
        for epoch_dict in last_5_epochs_f1_scores:
            for label, score in epoch_dict.items():
                cumulative_f1_scores[label] += score

        # Calculate the average F1-scores for each label
        average_f1_scores = {label: score / 5 for label, score in cumulative_f1_scores.items()}

        plot_average_f1_scores(all_labels_list, average_f1_scores)


    # After the loop, you may want to log the overall performance
    wandb.log({
        "final_accuracy": accuracy,
        "final_f1_scores": f1_scores.tolist(),  # Convert to list if necessary
        "final_classification_report": classification_rep
    })

    print('Finished Training')

    # Close the wandb run
    wandb.finish()


VBox(children=(Label(value='0.001 MB of 0.001 MB uploaded (0.000 MB deduped)\r'), FloatProgress(value=1.0, max…

VBox(children=(Label(value='Waiting for wandb.init()...\r'), FloatProgress(value=0.011111111111111112, max=1.0…

8
Training on fold 1/5


Could not find image processor class in the image processor config or the model config. Loading based on pattern matching with the model's feature extractor configuration.
Could not find image processor class in the image processor config or the model config. Loading based on pattern matching with the model's feature extractor configuration.


[0] [1]
[0, 0] [1, 1]
[0, 0, 0] [1, 1, 1]
[0, 0, 0, 0] [1, 1, 1, 1]
[0, 0, 0, 0, 1] [1, 1, 1, 1, 1]
[0, 0, 0, 0, 1, 1] [1, 1, 1, 1, 1, 1]
[0, 0, 0, 0, 1, 1, 1] [1, 1, 1, 1, 1, 1, 1]
[0, 0, 0, 0, 1, 1, 1, 2] [1, 1, 1, 1, 1, 1, 1, 1]
[0, 0, 0, 0, 1, 1, 1, 2, 2] [1, 1, 1, 1, 1, 1, 1, 1, 1]
[0, 0, 0, 0, 1, 1, 1, 2, 2, 2] [1, 1, 1, 1, 1, 1, 1, 1, 1, 1]
[0, 0, 0, 0, 1, 1, 1, 2, 2, 2, 2] [1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1]
[0, 0, 0, 0, 1, 1, 1, 2, 2, 2, 2, 3] [1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1]
[0, 0, 0, 0, 1, 1, 1, 2, 2, 2, 2, 3, 3] [1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1]
[0, 0, 0, 0, 1, 1, 1, 2, 2, 2, 2, 3, 3, 3] [1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1]
[0, 0, 0, 0, 1, 1, 1, 2, 2, 2, 2, 3, 3, 3, 3] [1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1]
[0, 0, 0, 0, 1, 1, 1, 2, 2, 2, 2, 3, 3, 3, 3, 3] [1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1]
[0, 0, 0, 0, 1, 1, 1, 2, 2, 2, 2, 3, 3, 3, 3, 3, 4] [1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1]
[0, 0, 0, 0, 1, 1, 1, 2, 2, 2, 2, 3, 3, 3, 3, 3,

KeyboardInterrupt: 