In [1]:
import cv2 as cv
import pickle
import csv

import os

import shutil
import warnings
import platform
from glob import glob
from itertools import chain
from collections import Counter
from dataclasses import dataclass
 
import numpy as np
import pandas as pd
import seaborn as sns
from PIL import Image
import matplotlib.pyplot as plt
# from sklearn.utils.class_weight import compute_class_weight
 
# Next, we have our usual torch and torchvision imports.
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader
 
import torchvision
import torchvision.transforms as TF
from torchvision.utils import make_grid
from torchvision.ops import sigmoid_focal_loss
 
# Importing lighting along with a built-in callback it provides.
import lightning.pytorch as pl
from lightning.pytorch.callbacks import LearningRateMonitor, ModelCheckpoint, EarlyStopping
 
# Importing torchmetrics modular and functional evaluation implementations.
from torchmetrics import MeanMetric
from torchmetrics.classification import MultilabelF1Score
from torchmetrics.classification import MultilabelAccuracy
 
# To print model summary.
from torchinfo import summary

import json

from tqdm import tqdm



In [2]:
class DatasetConfig:
    IMAGE_SIZE: tuple = (224, 224) # (W, H)
    CHANNELS: int = 3
    NUM_CLASSES: int = 59
    VALID_PCT: float = 0.1

    # Pre-defined Mean & Std. Dev. of the Imagenet Trained model
    MEAN: tuple = (0.485, 0.456, 0.406)
    STD: tuple = (0.229, 0.224, 0.225)

    # dataset file and folder paths
    IMG_DIR: str = "/home/noeyiue/cpe/ICSEC/annotation/dataset"

class TrainingConfig:
    BATCH_SIZE: int = 32
    NUM_EPOCHS: int = 30
    INIT_LR: float = 1e-4
    NUM_WORKERS: int = os.cpu_count()
    OPTIMIZER_NAME: str = "Adam"
    WEIGHT_DECAY: float = 1e-4
    USE_SCHEDULER: bool = True
    SCHEDULER: str = "multi_step_lr"
    METRIC_THRESH: float = 0.4
    MODEL_NAME: str = "mobilenet_v3_small"
    FREEZE_BACKBONE: bool = False

In [3]:
class UPARModel(pl.LightningModule):
    def __init__(
        self,
        model_name: str,
        num_classes: int = 59,
        freeze_backbone: bool = False,
        init_lr: float = 1e-4,
        optimizer_name: str = "Adam",
        weight_decay: float = 1e-4,
        use_scheduler: bool = False,
        f1_metric_threshold: float = 0.4,
        metrics_file_path: str = "metrics.json",
        dropout_prob: float = 0.3
    ):
        super().__init__()

        # Save the arguments as hyperparameters.
        self.save_hyperparameters()

        # Loading model using the function defined above.
        self.model = get_model(
            model_name=self.hparams.model_name,
            num_classes=self.hparams.num_classes,
            freeze_backbone=self.hparams.freeze_backbone,
        )
        self.dropout = nn.Dropout(p=self.hparams.dropout_prob)

        # Initialize loss class.
        self.loss_fn = nn.BCEWithLogitsLoss()

        # Initializing the required metric objects.
        self.mean_train_loss = MeanMetric()
        self.mean_train_f1 = MultilabelF1Score(num_labels=self.hparams.num_classes, 
                                               average="macro", threshold=self.hparams.f1_metric_threshold)
        self.train_mA = MultilabelAccuracy(num_labels=self.hparams.num_classes, 
                                               average="macro", threshold=self.hparams.f1_metric_threshold)
        self.mean_valid_loss = MeanMetric()
        self.mean_valid_f1 = MultilabelF1Score(num_labels=self.hparams.num_classes, 
                                               average="macro", threshold=self.hparams.f1_metric_threshold)
        self.valid_mA = MultilabelAccuracy(num_labels=self.hparams.num_classes, 
                                               average="macro", threshold=self.hparams.f1_metric_threshold)

        self.metrics_file_path = metrics_file_path
        

    def forward(self, x):
        return self.model(x)

    def training_step(self, batch, *args, **kwargs):
        data, target = batch
        logits = self(data)
        loss = self.loss_fn(logits, target)
    
        # Update metric states
        self.mean_train_loss(loss, weight=data.shape[0])
        self.mean_train_f1(logits, target)
        self.train_mA(logits, target)
    
        # Log metrics
        self.log("train/batch_loss", self.mean_train_loss, prog_bar=True)
        self.log("train/batch_f1", self.mean_train_f1, prog_bar=True)
        self.log("train/batch_mA", self.train_mA, prog_bar=True)
    
        return loss

    def on_train_epoch_end(self):
        # Computing and logging the training mean loss & mean f1.
        self.log("train/loss", self.mean_train_loss, prog_bar=True)
        self.log("train/f1", self.mean_train_f1, prog_bar=True)
        self.log("train/mA", self.train_mA, prog_bar=True)
        self.log("step", self.current_epoch)
        self.save_metrics()

    def validation_step(self, batch, *args, **kwargs):
        data, target, img_paths = batch
        logits = self(data)
        loss = self.loss_fn(logits, target)

        self.mean_valid_loss.update(loss, weight=data.shape[0])
        self.mean_valid_f1.update(logits, target)
        self.valid_mA.update(logits, target)

    def on_validation_epoch_end(self):
        # Computing and logging the validation mean loss & mean f1.
        self.log("valid/loss", self.mean_valid_loss, prog_bar=True)
        self.log("valid/f1", self.mean_valid_f1, prog_bar=True)
        self.log("valid/mA", self.valid_mA, prog_bar=True)
        self.log("step", self.current_epoch)
        self.save_metrics()

        # Save predictions to file
        self.save_predictions()

    def configure_optimizers(self):
        optimizer = getattr(torch.optim, self.hparams.optimizer_name)(
            filter(lambda p: p.requires_grad, self.model.parameters()),
            lr=self.hparams.init_lr,
            weight_decay=self.hparams.weight_decay,
        )
 
        if self.hparams.use_scheduler:
            lr_scheduler = torch.optim.lr_scheduler.MultiStepLR(
                optimizer,
                milestones=[self.trainer.max_epochs // 2,],
                gamma=0.1,
            )
 
            # The lr_scheduler_config is a dictionary that contains the scheduler
            # and its associated configuration.
            lr_scheduler_config = {
                "scheduler": lr_scheduler,
                "interval": "epoch",
                "name": "multi_step_lr",
            }
            return {"optimizer": optimizer, "lr_scheduler": lr_scheduler_config}
 
        else:
            return optimizer

    def save_metrics(self):
        metrics = {
            "epoch": self.current_epoch,
            "train_loss": self.mean_train_loss.compute().item(),
            "train_f1": self.mean_train_f1.compute().item(),
            "train_mA": self.train_mA.compute().item(),
            "valid_loss": self.mean_valid_loss.compute().item(),
            "valid_f1": self.mean_valid_f1.compute().item(),
            "valid_mA": self.valid_mA.compute().item()
        }
        with open(self.metrics_file_path, "a") as f:
            json.dump(metrics, f)
            f.write("\n")


In [4]:
import time
@torch.inference_mode()
def predict_prob(input_image_paths, model=None, preprocess_fn=None, device="cpu", idx2labels=None, output_csv="prob_predictions.csv"):
    predictions = []
    start_time = time.time()

    # Create a tqdm progress bar
    progress_bar = tqdm(input_image_paths, desc="Predicting", unit="image")

    for image_path in progress_bar:
        input_image = Image.open(image_path).convert("RGB")
        input_tensor = preprocess_fn(input_image)
        input_tensor = input_tensor.unsqueeze(0).to(device)

        # Generate predictions
        output = model(input_tensor).cpu()

        probabilities = torch.sigmoid(output)[0].numpy().tolist()

        predictions.append([image_path] + probabilities)

    end_time = time.time()
    total_time = end_time - start_time
    print(f"Total prediction time: {total_time:.2f} seconds")

    # Write predictions to CSV file
    with open(output_csv, "w", newline="") as csvfile:
        writer = csv.writer(csvfile)
        writer.writerow(["image"] + list(idx2labels.values()))
        for prediction in predictions:
            writer.writerow(prediction)

In [5]:
idx2labels = {
     0: 'ethnicity_Caucasoid',
     1: 'ethnicity_Mongoloid',
     2: 'ethnicity_Negroid',
     3: 'ethnicity_Unsure',
     4: 'age_Adult',
     5: 'age_Old',
     6: 'age_Young',
     7: 'gender_Female',
     8: 'gender_Male',
     9: 'hair_length_Bald',
     10: 'hair_length_Long',
     11: 'hair_length_Short',
     12: 'upper_body_length_Long',
     13: 'upper_body_length_Short',
     14: 'upper_body_color_Black',
     15: 'upper_body_color_Blue',
     16: 'upper_body_color_Brown',
     17: 'upper_body_color_Green',
     18: 'upper_body_color_Grey',
     19: 'upper_body_color_Orange',
     20: 'upper_body_color_Other',
     21: 'upper_body_color_Pink',
     22: 'upper_body_color_Purple',
     23: 'upper_body_color_Red',
     24: 'upper_body_color_White',
     25: 'upper_body_color_Yellow',
     26: 'upper_body_type_Blouse',
     27: 'upper_body_type_Dress',
     28: 'upper_body_type_Jacket',
     29: 'upper_body_type_Other',
     30: 'upper_body_type_Polo',
     31: 'upper_body_type_Shirt',
     32: 'upper_body_type_Tanktop',
     33: 'upper_body_type_Tshirt',
     34: 'lower_body_length_Long',
     35: 'lower_body_length_Short',
     36: 'lower_body_color_Black',
     37: 'lower_body_color_Blue',
     38: 'lower_body_color_Brown',
     39: 'lower_body_color_Green',
     40: 'lower_body_color_Grey',
     41: 'lower_body_color_Orange',
     42: 'lower_body_color_Other',
     43: 'lower_body_color_Pink',
     44: 'lower_body_color_Purple',
     45: 'lower_body_color_Red',
     46: 'lower_body_color_White',
     47: 'lower_body_color_Yellow',
     48: 'lower_body_type_Skirt&Dress',
     49: 'lower_body_type_Trousers&Shorts',
     50: 'footwear_Sandals',
     51: 'footwear_Shoes',
     52: 'backpack_Yes',
     53: 'bag_Yes',
     54: 'glasses_No',
     55: 'glasses_Normal',
     56: 'glasses_Sun',
     57: 'hat_Yes',
     58: 'mask_Yes'
}

In [6]:
DEVICE = torch.device("cuda:0") if torch.cuda.is_available() else torch.device("cpu")
# Define the path to your .pth file
model_path = f"./{TrainingConfig.MODEL_NAME}_1.pth"

model = torch.load(model_path, map_location=DEVICE)
model.to(DEVICE)
model.eval()
preprocess = TF.Compose(
        [
            TF.Resize(size=DatasetConfig.IMAGE_SIZE[::-1]),
            TF.ToTensor(),
            TF.Normalize(DatasetConfig.MEAN, DatasetConfig.STD, inplace=True),
        ]
    )

annotation_path = '/home/noeyiue/cpe/ICSEC/annotation/test_data.csv'
test_data = pd.read_csv(annotation_path, index_col=0)
test_img_ids = test_data.index

IMG_DIR: str = "/home/noeyiue/cpe/ICSEC/annotation/dataset"
# Prepare input image paths
input_image_paths = [os.path.join(IMG_DIR, img_id ) for img_id in test_img_ids]

# Perform predictions and save to CSV
predict_prob(input_image_paths, model=model, preprocess_fn=preprocess, idx2labels=idx2labels, device=DEVICE)



Predicting: 100%|███████████████████████| 2437/2437 [00:06<00:00, 373.11image/s]

Total prediction time: 6.53 seconds



