<a href="https://colab.research.google.com/github/kaixinrongzi/SaintGeorgeClassify/blob/main/SaintGeorgeClassify.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

Load dataset

In [6]:
import os

from google.colab import drive
drive.mount('/content/drive/', force_remount=True)

!cp -r '/content/drive/' '/content/'

Mounted at /content/drive/
cp: '/content/drive/' and '/content/drive' are the same file


Global Variables

In [None]:
WORKSPACE = '/content/drive/MyDrive/SaintGeorgeClassify/'
DATA_DIR = os.path.join(WORKSPACE, "data")
POSITIVE_DIR = os.path.join(DATA_DIR, 'georges')
POSITIVE_DIR = os.path.join(DATA_DIR, 'non_georges')
RESULTS_DIR = os.path.join(WORKSPACE, "results")
MISCLASSIFIED_DIR = os.path.join(RESULTS_DIR, "misclassified")
LOG_PATH = os.path.join(RESULTS_DIR, "models_test_results.log")

package Entity

In [1]:
import os
from typing import Tuple

from PIL import Image
from torch.utils.data import DataLoader, Dataset

# 1. Data loading and Preprocessing
class CustomDataset(Dataset):
    def __init__(self, image_dir:str):
        self.image_dir = image_dir
        self.image_paths = []
        self.labels = []

        self.class_names = ['georges', 'non_georges']
        self.class_to_label = {'georges' : 1,
                               'non_georges' : 0}

        for class_name in self.class_names:
            class_dir = os.path.join(image_dir, class_name)
            print("class_dir=" + class_dir)
            for image_name in os.listdir(class_dir):
                image_path = os.path.join(class_dir, image_name)
                self.image_paths.append(image_path)
                self.labels.append(self.class_to_label[class_name])

    def __len__(self):
        return len(self.image_paths)


    def __getitem__(self, idx:int) -> Tuple[object, int]:
        image_path = self.image_paths[idx]
        image = Image.open(image_path).convert('RGB')  # Ensure consistent color format
        label = self.labels[idx]

        return image, label


class SubsetDataset(Dataset):

    def __init__(self, dataset: CustomDataset, indices:list[int], transform=None):
        self.dataset = dataset
        self.indices = indices
        self.transform = transform

    def __len__(self):
        return len(self.indices)

    def __getitem__(self, idx: int):
        image, label = self.dataset[self.indices[idx]]
        if self.transform:
            image = self.transform(image)
        return image, label


package Service

In [12]:
import copy
import threading
import matplotlib.pyplot as plt
import torch
import logging
from torch.utils.data import DataLoader, Dataset
import torch.optim as optim
from sklearn.metrics import f1_score, precision_score, recall_score, accuracy_score


class Trainer:

    def __init__(self):
        self.device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
        self.counter = 0
        self.patience = 50
        self.best_val_loss = float('inf')

        logging.basicConfig(filename=LOG_PATH,
                            level=logging.INFO,
                            format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')
        self.logger = logging.getLogger(__name__)



    # use different optimizers: Adam and AdamW
    def train_dispatch(self, model, training_dataset:Dataset, val_dataset:Dataset, criterion, num_epochs:int):
        threads = []
        for weight_decay in [1e-6, 1e-5]:
          for learning_rate in [0.001, 0.005, 0.01]:
              # model_copy = copy.deepcopy(model)
              # optimizer = optim.Adam(model_copy.parameters(), lr=learning_rate, weight_decay=weight_decay)
              # thread = threading.Thread(target=self.train, args=(model_copy, training_dataset, val_dataset, optimizer, criterion, num_epochs), name=f"Adam, lr={learning_rate}, weight_decay={weight_decay}")
              # threads.append(thread)
              # thread.start()

              model_copy = copy.deepcopy(model)
              optimizer = optim.AdamW(model_copy.parameters(), lr=learning_rate, weight_decay=weight_decay, betas=(0.9, 0.99), eps=1e-8, amsgrad=False)
              thread = threading.Thread(target=self.train, args=(model_copy, training_dataset, val_dataset, optimizer, criterion, num_epochs), name=f"AdamW, lr={learning_rate}, weight_decay={weight_decay}")
              threads.append(thread)
              thread.start()


        # Main thread waits for all of its child threads
        for thread in threads:
           thread.join()


    def train(self, model, training_dataset:Dataset, val_dataset:Dataset, optimizer, criterion, num_epochs:int):

        print(threading.current_thread().name)

        train_loader = DataLoader(training_dataset, batch_size=16, shuffle=True)
        val_loader = DataLoader(val_dataset, batch_size=16, shuffle=True)

        train_losses = []
        train_accuracies = []
        for epoch in range(num_epochs):

            running_loss = 0.0
            correct = 0
            total = 0
            model.train()
            for inputs, labels in train_loader:
                inputs = inputs.to(self.device)  # 将inputs放到GPU上
                labels = labels.to(self.device)  # 将inputs放到GPU上

                optimizer.zero_grad()
                outputs = model(inputs)
                loss = criterion(outputs, labels.float().unsqueeze(1))
                loss.backward()   # calculate gradients
                optimizer.step()  # update parameters based on gradients: parameter -= learning_rate * gradient
                running_loss += loss.item()
                correct += ((outputs > 0.5).float() == labels.unsqueeze(1)).sum().item()
                total += len(inputs)

            average_train_loss = running_loss/len(train_loader)
            average_train_accuracy = correct / total
            print(f"{threading.current_thread().name}: Epoch {epoch + 1}, training loss: {average_train_loss}")
            print(f"{threading.current_thread().name}: Epoch {epoch + 1}, training accuracy: {average_train_accuracy}")
            train_losses.append(average_train_loss)
            train_accuracies.append(average_train_accuracy)

            model.eval()    # set the model to evaluation mode (mute training)
            val_loss = 0.0
            with torch.no_grad():
                for inputs, labels in val_loader:
                    inputs = inputs.to(self.device)  # 将inputs放到GPU上
                    labels = labels.to(self.device)  # 将inputs放到GPU上
                    outputs = model(inputs)
                    loss = criterion(outputs, labels.unsqueeze(1).float())
                    val_loss += loss.item()
            val_loss /= len(val_loader)
            print(f'{threading.current_thread().name}: Epoch {epoch + 1}, Validation Loss: {val_loss}')

            if val_loss < self.best_val_loss:
                self.best_val_loss = val_loss
                torch.save(model.state_dict(), os.path.join(RESULTS_DIR, f'{model.__class__.__name__}_best_model.pth'))    # save the best model
                self.counter = 0
            else:
                self.counter += 1
                if self.counter >= self.patience:
                    print("Early stopping trigger")
                    # model.load_state_dict(torch.load('best_model.pth'))   # load the best model
                    break    # Early stop

        print("Finished Training")
        fig, (ax1, ax2) = plt.subplots(1, 2)    # 1 row, 2 columns

        ax1.plot(range(1, num_epochs + 1), train_losses, label="Training Loss")
        ax1.set_xlabel('Epoch')
        ax1.set_ylabel('Training Loss')
        ax1.set_title(f"{threading.current_thread().name}: Loss of Train with Model")
        ax1.legend()

        ax2.plot(range(1, num_epochs + 1), train_accuracies, label="Training Accuracy")
        ax2.set_xlabel('Epoch')
        ax2.set_ylabel('Training Accuracy')
        ax2.set_title(f"{threading.current_thread().name}: Accuracy of Train with Model")
        ax2.legend()

        # plt.tight_layout()

        plt.show()


    def test(self, model, test_dataset: Dataset):
        model.eval()
        test_loader = DataLoader(test_dataset, batch_size = 16)

        all_predicted = []
        all_labels = []
        with torch.no_grad():
            for inputs, labels in test_loader:
                inputs = inputs.to(self.device)  # 将inputs放到GPU上
                labels = labels.to(self.device)  # 将inputs放到GPU上
                outputs = model(inputs)
                predicted = (outputs > 0.5).float()  # Threshold at 0.5 for binary classification

                # Move data to CPU and convert to numpy
                predicted_np = predicted.cpu().numpy()
                labels_np = labels.cpu().numpy()

                # Collect batch predictions and labels
                all_predicted.extend(predicted_np.flatten())
                all_labels.extend(labels_np)

                # Save miss-classified inputs
                self._save_miss_classification_input(inputs, labels_np, predicted_np)


        # Calculate metrics
        accuracy = accuracy_score(all_labels, all_predicted)
        precision = precision_score(all_labels, all_predicted)
        recall = recall_score(all_labels, all_predicted)
        f1 = f1_score(all_labels, all_predicted)
        print(f'Test Accuracy: {accuracy:.4f}')
        print(f'Test Precision: {precision:.4f}')
        print(f'Test Recall: {recall:.4f}')
        print(f'Test F1: {f1:.4f}')

        # Log those results as reference later on
        self.logger.error(f'Model Name: {model.__class__.__name__}')
        self.logger.error(f'Test Accuracy: {accuracy:.4f}')
        self.logger.error(f'Test Precision: {precision:.4f}')
        self.logger.error(f'Test Recall: {recall:.4f}')
        self.logger.error(f'Test F1: {f1:.4f}')
        self.logger.error("------------------------------------------------")


    def _save_miss_classification_input(inputs, labels_np, predicted_np):
       diff_indices = np.nonzero(predicted_np - labels_np)[0]
       for i in diff_indices:
          image = inputs[i].cpu()
          predicted_label = int(predicted_np[i])
          true_label = int(labels[i])

          image_fileName = f"missclassified_image_pred_{predicted_label}_true_{true_label}.png"
          image_path = os.path.join(MISCLASSIFIED_DIR, image_fileName)
          img = transforms.toPILImage()(image)
          img.save(image_path)


package Controller

In [10]:
import os
from pathlib import Path

import numpy as np
import torch.cuda
import torchvision
from sklearn.model_selection import train_test_split
from torch import nn
from torch.optim import Adam
import torchvision.models as models
from torchvision.models import ResNet18_Weights
from torchvision.models import EfficientNet_B0_Weights
from torchvision.transforms import transforms

class SaintGeorgeClassify:

    def __init__(self):
        self.device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")

        resnet18 = models.resnet18(weights=ResNet18_Weights.DEFAULT)
        resnet18.fc = nn.Sequential(
            nn.Linear(resnet18.fc.in_features, 1), # Binary classification: output size = 1
            nn.Sigmoid()    # Sigmoid activation
        )
        resnet18.to(self.device)

        efficientNet = models.efficientnet_b0(weights=EfficientNet_B0_Weights.DEFAULT)
        efficientNet.classifier[1] = nn.Linear(efficientNet.classifier[1].in_features, 2)
        efficientNet.to(self.device)

        ViT_B_16 = models.vit_b_16(weights=ViT_B_16_Weights.DEFAULT)
        ViT_B_16.heads.head = nn.Linear(in_features=768, out_features=2)
        ViT_B_16.to(self.device)

        self.models = [resnet18, efficientNet, ViT_B_16]


    def run(self, dataset_dir):

        # data transformers to augment dataset
        train_transforms = transforms.Compose([
            transforms.RandomResizedCrop(size=(224, 224)),  # Randomly crop and resize to the specified size
            transforms.RandomHorizontalFlip(),  # Randomly flip the image horizontally
            transforms.ColorJitter(brightness=0.2, contrast=0.2, saturation=0.2, hue=0.1),  # Random color jitter
            transforms.RandomRotation(degrees=15),  # Randomly rotate the image by ±15 degrees
            transforms.CenterCrop(size=224),
            transforms.ToTensor(),  # Convert the image to a tensor
            transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])
            # Normalization (standard example values if ImgNet, often used for pretrained models)
        ])

        # Note: Validation and test DO NOT need augmentations.
        valid_transform = transforms.Compose([
            transforms.Resize(size=(224, 224)),
            transforms.CenterCrop(size=224),
            transforms.ToTensor(),
            transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225])
        ])

        test_transform = transforms.Compose([
            transforms.Resize(size=(224, 2224)),
            transforms.CenterCrop(size=224),
            transforms.ToTensor(),
            transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225])
        ])

        # Split dataset into test, val, and test
        positive_images = os.listdir(os.path.join(dataset_dir, 'georges'))
        positive_labels = [1 for _ in range(len(positive_images))]
        negative_images = os.listdir(os.path.join(dataset_dir, 'non_georges'))
        negative_labels = [0 for _ in range(len(negative_images))]
        # all_images = positive_images + negative_images
        all_labels = np.array(positive_labels + negative_labels)

        all_indices = np.arange(len(all_labels))
        # First, split into train + val, and test
        train_and_val_indices, test_indices = train_test_split(all_indices,
                                                                    test_size=0.2,   # train+val/test = 80/20
                                                                    random_state=42,
                                                                    stratify=all_labels)
        # Second, split train + val into train and val
        train_and_val_labels = all_labels[train_and_val_indices]
        train_indices, val_indices = train_test_split(train_and_val_indices,
                                                      test_size = 0.25,      # 25% of 80 % = 10%
                                                      random_state=42,
                                                      stratify=train_and_val_labels)

        # Create datasets using indices and transforms
        dataset = CustomDataset(image_dir = dataset_dir)
        train_dataset = SubsetDataset(dataset, train_indices, train_transforms)
        val_dataset = SubsetDataset(dataset, val_indices, valid_transform)
        test_dataset = SubsetDataset(dataset, test_indices, test_transform)

        # get criterion
        criterion = nn.BCELoss()

        # Get appropriate model
        for model in self.models:

            # train model
            trainer = Trainer()
            trainer.train_dispatch(model, train_dataset, val_dataset, criterion, 200)

            # test model
            best_model_by_val = model.load_state_dict(torch.load(os.path.join(RESULTS_DIR, f'{model.__class__.__name__}_best_model.pth')))
            trainer.test(best_model_by_val, test_dataset)

        # model = torchvision.models.resnet18(weights=ResNet18_Weights.DEFAULT)
        # model.fc = nn.Sequential(
        #     nn.Linear(model.fc.in_features, 1), # Binary classification: output size = 1
        #     nn.Sigmoid()    # Sigmoid activation
        # )
        # model.to(self.device)

        # # Get appropriate optimizer
        # optimizer = Adam(model.parameters(), lr=0.001)



run <- entry point of the program

In [None]:
saint_george_classifier = SaintGeorgeClassify()

# current_dir = os.path.dirname(os.path.abspath(__file__))
# path = Path(current_dir).resolve().parent.resolve()

saint_george_classifier.run("/content/drive/MyDrive/SaintGeorgeClassify/data")

cuda:0
class_dir=/content/drive/MyDrive/SaintGeorgeClassify/data/georges
class_dir=/content/drive/MyDrive/SaintGeorgeClassify/data/non_georges
Adam, lr=0.001, weight_decay=1e-06
AdamW, lr=0.001, weight_decay=1e-06
Adam, lr=0.005, weight_decay=1e-06
AdamW, lr=0.005, weight_decay=1e-06
Adam, lr=0.01, weight_decay=1e-06
AdamW, lr=0.01, weight_decay=1e-06
Adam, lr=0.001, weight_decay=1e-05
AdamW, lr=0.001, weight_decay=1e-05
Adam, lr=0.005, weight_decay=1e-05
AdamW, lr=0.005, weight_decay=1e-05
Adam, lr=0.01, weight_decay=1e-05
AdamW, lr=0.01, weight_decay=1e-05
AdamW, lr=0.001, weight_decay=1e-06: Epoch 1, training loss: 0.6884125915364684
AdamW, lr=0.001, weight_decay=1e-06: Epoch 1, training accuracy: 0.6339181286549708
AdamW, lr=0.005, weight_decay=1e-05: Epoch 1, training loss: 0.7494132404572496
AdamW, lr=0.005, weight_decay=1e-05: Epoch 1, training accuracy: 0.5748538011695906
AdamW, lr=0.005, weight_decay=1e-06: Epoch 1, training loss: 0.7537833083177281
AdamW, lr=0.005, weight_dec