# Introduction

This notebook evaluates the performance of the baseline and proposed IQA models. The assessment considers memory footprint, runtime, and quality metrics (PLCC, SROCC, and RMSE) at both the cross-validation and inference stages.

The baseline model is described here: https://jiangliu5.github.io/imqac.github.io/

# Importing Libraries

In [None]:
import os
import gc
import torch
import joblib
import pickle
import torch.backends.cudnn as cudnn
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt

from torchvision.transforms import v2
from pathlib import Path
from torch.utils.data import DataLoader
from torchvision.transforms import InterpolationMode

from sklearn.linear_model import LinearRegression
from sklearn.metrics import root_mean_squared_error
from sklearn.model_selection import KFold
from scipy.stats import pearsonr, spearmanr
from sklearn.decomposition import PCA

from memory_profiler import memory_usage
from models.iqa_module_baseline import mymodel
from utils.dataset import MyDataset_xinbo
from models.iqa_module_proposed import FuseBackbones, IQAModel
from utils.dataset_proposed import CustomDataset

import warnings
os.environ['TORCH_USE_CUDA_DSA'] = "1"
warnings.filterwarnings("ignore", category=UserWarning, module="torch.autograd.graph")
warnings.filterwarnings("ignore", category=FutureWarning, module="onnxscript.converter")

In [None]:
import random
def seed_torch(seed):
    random.seed(seed)
    os.environ['PYTHONHASHSEED'] = str(seed)
    np.random.seed(seed)
    torch.manual_seed(seed)
    torch.cuda.manual_seed(seed)
    # torch.cuda.manual_seed_all(seed)
    torch.backends.cudnn.deterministic = True
seed_torch(seed=1)

# Setting Directories

In [None]:
# Define some constants
NUM_WORKERS = 0
AMOUNT_TO_GET = 1.0
SEED = 42

# Define target data directory
BASELINE_NAME = f"VCIP_IMQA/VCIP"
BASELINE = Path(BASELINE_NAME)
TARGET_DIR = BASELINE / "EQ420_image"
TARGET_LABEL = BASELINE / "Labels"
TARGET_BASE = BASELINE / "IMQA"

# Setup training and test directories
TARGET_DIR.mkdir(parents=True, exist_ok=True)

# Create target model directory
MODEL_DIR = Path("trained_3072to128")
MODEL_DIR.mkdir(parents=True, exist_ok=True)

MODEL_DIR2 = Path("trained_3072to128_weights")
MODEL_DIR2.mkdir(parents=True, exist_ok=True)

# Set seeds
def set_seeds(seed: int=42):

    """Sets random sets for torch operations.

    Args:
        seed (int, optional): Random seed to set. Defaults to 42.
    """
    
    # Set the seed for general torch operations
    torch.manual_seed(seed)
    # Set the seed for CUDA torch operations (ones that happen on the GPU)
    torch.cuda.manual_seed(seed)
set_seeds(SEED)

RUN_PROFILING_PROPOSED = True
RUN_PROFILING_BASELINE = False

# Classes and Functions

In [None]:
def apply_pca_train(df, n_components):
    pca = PCA(n_components=n_components, random_state=SEED)
    X_pca = pca.fit_transform(df.drop(columns=['fold', 'mos']))
    pca_columns = [f'PCA_{i}' for i in range(n_components)]
    df_pca = pd.DataFrame(X_pca, columns=pca_columns, index=df.index)
    df_pca['fold'] = df['fold']
    df_pca['mos'] = df['mos']
    return df_pca, pca

def apply_pca_test(df, pca, n_components):    
    X_pca = pca.transform(df.drop(columns=['image_name', 'fold']))
    pca_columns = [f'PCA_{i}' for i in range(n_components)]
    df_pca = pd.DataFrame(X_pca, columns=pca_columns, index=df.index)
    df_pca['fold'] = df['fold']
    df_pca['image_name'] = df['image_name']
    return df_pca

# Specifying Target Device

In [None]:
# Activate cuda benchmark
cudnn.benchmark = True

# Set device
device = "cuda" if torch.cuda.is_available() else "cpu"
print(f"Device: {device}")

if device == "cuda":
    !nvidia-smi

# Memory Profiling for the Proposed Method

In [None]:
if RUN_PROFILING_PROPOSED:

    # Constant definition
    IMG_SIZE = 512
    BATCH_SIZE = 1
    N_SPLITS = 8
    PCA_COMPONENTS = 128
    test_vector = [9, 10]
    MODEL_ARCH =  MODEL_DIR / "lgbm_model_fold"
    MODEL_ARCH2 =  MODEL_DIR2 / "lgbm_model_fold"
    test_csv = pd.read_csv(TARGET_LABEL / 'mos_fold_test.csv') #.sample(frac=1)
    test_ids = test_csv[test_csv['folds'].isin(test_vector)]

In [None]:
if RUN_PROFILING_PROPOSED:
    
    # Save model temporarily
    temp_path = "temp_model.pth"
    model_list = ['swin_v2_s', 'efficientnet_b3', 'convnext_s']
    fuse_backbones = FuseBackbones(model_list=model_list, vector_size=None)
    torch.save(fuse_backbones.state_dict(), temp_path)

    # Get size in MB
    size_mb_bk = os.path.getsize(temp_path) / (1024 * 1024)
    print(f"Feature extractor: {size_mb_bk:.2f} MB")

    # Clean up
    os.remove(temp_path)

    # Collect models in a list    
    model_list = [
            f'{MODEL_ARCH2}_mse_1_8_6.pkl',
            f'{MODEL_ARCH}_mse_2_3_5.pkl',
            f'{MODEL_ARCH2}_r2_3_8_4.pkl',
            f'{MODEL_ARCH}_mse_4_3_6.pkl',
            f'{MODEL_ARCH}_mse_5_1_5.pkl',     
            f'{MODEL_ARCH2}_r2_6_4_5.pkl',
            f'{MODEL_ARCH2}_r2_7_7_5.pkl',
            f'{MODEL_ARCH2}_r2_8_6_6.pkl',
    ]

    size_mb_lgb = 0

    for filename in model_list:
        size_bytes = os.path.getsize(filename)
        size_MB = size_bytes / (1024 ** 2)  # Convert to MB
        size_mb_lgb += size_MB
        print(f"{filename}: {size_MB:.2f} MB")

    print(f"\nEnsemble: {size_mb_lgb:.2f} MB")
    print(f"Total network size: {size_mb_bk + size_mb_lgb}")

In [None]:
if RUN_PROFILING_PROPOSED:
    
    torch.cuda.empty_cache()
    torch.cuda.synchronize()
    torch.cuda.reset_peak_memory_stats()

    def run_inference():

        # Pre-processing
        transforms = v2.Compose([
            v2.Resize((IMG_SIZE), interpolation=InterpolationMode.BICUBIC),
            v2.ToImage(),
            v2.ToDtype(torch.float32, scale=True),
            v2.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])
        ])

        test_dataloader = DataLoader(
            dataset=CustomDataset(ids=test_ids, ref_dir=TARGET_DIR, transform=transforms),
            batch_size=BATCH_SIZE,
            shuffle=False,
            num_workers=NUM_WORKERS,
            pin_memory=True)

        # Create model
        fuse_backbones = FuseBackbones(model_list=['swin_v2_s', 'efficientnet_b3', 'convnext_s'], vector_size=None)
        fuse_backbones.to(device)
        fuse_backbones.eval()
        fuse_backbones = fuse_backbones.float()

        # Warm-up run (to load kernels, allocate caches, etc.)
        with torch.inference_mode():
            for img, _, _, _, _ in test_dataloader:        
                _ = fuse_backbones(img.to(device))

        peak      = torch.cuda.max_memory_allocated() / 1024**2
        print(f"[INFO] Warmup Max memory used: {peak:.2f} MB")

        torch.cuda.empty_cache()
        torch.cuda.synchronize()
        torch.cuda.reset_peak_memory_stats()

        # Extract features
        test_features = []
        test_fold = []
        test_names = []

        print(f"[INFO] Profiling started...")

        for img, _, fold, name, _ in test_dataloader:
            with torch.inference_mode():
                features = fuse_backbones(img.to(device))
            test_features.append(features.cpu())
            test_fold.append(fold.cpu())
            test_names += list(name)

        torch.cuda.synchronize()

        allocated = torch.cuda.memory_allocated() / 1024**2
        reserved  = torch.cuda.memory_reserved() / 1024**2
        peak      = torch.cuda.max_memory_allocated() / 1024**2

        print(f"[INFO] Current allocated: {allocated:.2f} MB")
        print(f"[INFO] Memory reserved: {reserved:.2f} MB")
        print(f"[INFO] Max memory used: {peak:.2f} MB")
            
        # Tensor to numpy
        test_features_np = torch.cat(test_features).numpy()
        test_fold_np = torch.cat(test_fold).numpy()
        test_names_np = np.array(test_names).reshape(-1, 1)

        # Create column names
        num_features = test_features_np.shape[1]
        feature_columns = [f"f_{i}" for i in range(num_features)]
        columns = ['image_name'] + feature_columns + ['fold']

        # Combine into DataFrame
        features_df = pd.DataFrame(data=np.hstack([test_names_np, test_features_np, test_fold_np.reshape(-1, 1)]), columns=columns)

        # PCA for test set
        # Fit PCA
        train_df_orig = pd.read_csv('train_features.csv')
        pca = PCA(n_components=PCA_COMPONENTS, random_state=SEED)
        pca.fit(train_df_orig.drop(columns=['fold','mos']))

        # Apply PCA on the test set
        X_pca = pca.transform(features_df.drop(columns=['image_name', 'fold']))
        pca_columns = [f'PCA_{i}' for i in range(PCA_COMPONENTS)]    
        pca_df = pd.DataFrame(X_pca, columns=pca_columns, index=features_df.index)
        pca_df['fold'] = features_df['fold'].astype(int)
        pca_df['image_name'] = features_df['image_name']

        # Collect models in a list
        model_list = [
                f'{MODEL_ARCH2}_mse_1_8_6.pkl',
                f'{MODEL_ARCH}_mse_2_3_5.pkl',
                f'{MODEL_ARCH2}_r2_3_8_4.pkl',
                f'{MODEL_ARCH}_mse_4_3_6.pkl',
                f'{MODEL_ARCH}_mse_5_1_5.pkl',     
                f'{MODEL_ARCH2}_r2_6_4_5.pkl',
                f'{MODEL_ARCH2}_r2_7_7_5.pkl',
                f'{MODEL_ARCH2}_r2_8_6_6.pkl',
        ]

        models = []
        for i in range(N_SPLITS):
            model = joblib.load(model_list[i])
            models.append(model)

        pca_ids = pca_df[pca_df['fold'].isin(test_vector)]
        X_test = pca_ids.drop(columns=['image_name', 'fold'])

        preds_per_model = np.column_stack([model.predict(X_test) for model in models])
        return preds_per_model.mean(axis=1)

    mem_usage = memory_usage(run_inference)
    print(f"Max memory usage: {max(mem_usage):.2f} MB")

# Memory Profiling for the Baseline Method

In [None]:
if RUN_PROFILING_BASELINE:
    
    file_path = r'H:\outputs\iqa_total_20250616_fold0mseadam.pth'
    size_bytes = os.path.getsize(file_path)
    size_mb = size_bytes / (1024 ** 2)

    print(f"Baseline network size: {size_mb:.2f} MB")

In [None]:
if RUN_PROFILING_BASELINE:
    
    TARGET_DIR = r'D:/Repos/ML_Projects/Image-Manipulation-Quality-Assessment/VCIP_IMQA/VCIP/EQ420_image/'
    fold_vector = [1, 2, 3, 4, 5, 6, 7, 8]
    dfbase = pd.DataFrame(0, index=[f"folder {i}" for i in fold_vector] + ["average"] + ["global"], columns=['plcc base', 'srocc base', 'rmse base'])

    torch.cuda.empty_cache()
    torch.cuda.synchronize()
    torch.cuda.reset_peak_memory_stats()

    all_pred_base = []
    all_true_base = []
    opt='adam'
    error='mse'

    def run_inference():
        # Load models
        models = []
        for fold in range(1):
            print(f"Loading fold {fold + 1}...")
            model = mymodel()
            model_name = f"iqa_total_20250616_fold{fold}{error}{opt}.pth"
            model.load_state_dict(torch.load(model_name))
            model = model.to(device)
            model.eval()
            models.append(model)
            
        fold_test = [9, 10]
        test_csv = pd.read_csv(TARGET_LABEL / 'mos_fold_test.csv') #.sample(frac=1)
        test_ids_base = test_csv[test_csv['folds'].isin(fold_test)]

        # Prepare the dataloader
        test_set = MyDataset_xinbo(ids=test_ids_base, ref_dir=TARGET_DIR)
        test_dataloader = {'test':DataLoader(test_set, batch_size=1,shuffle=False, num_workers=4)}

        # For each OOF image
        cont = 0
        for sample in test_dataloader["test"]:
            cont += 1
            img = sample['ref']
            img = img.to(device, dtype=torch.float)
            with torch.set_grad_enabled(False):
                test_preds = []
                for model in models:
                    score  = model(img).squeeze().cpu().item() 
                
            if cont == 5:
                break

        peak      = torch.cuda.max_memory_allocated() / 1024**2
        print(f"[INFO] Warmup Max memory used: {peak:.2f} MB")

        torch.cuda.empty_cache()
        torch.cuda.synchronize()
        torch.cuda.reset_peak_memory_stats()

        print(f"[INFO] Profiling started...")

        # For each OOF image
        mos_preds = []
        for sample in test_dataloader["test"]:
            img = sample['ref']
            img = img.to(device, dtype=torch.float)
            with torch.set_grad_enabled(False):
                test_preds = []
                for model in models:
                    score  = model(img).squeeze().cpu().item()       
                    test_preds.append(score)
                mos_preds.append(np.mean(test_preds))    

        torch.cuda.synchronize()

        allocated = torch.cuda.memory_allocated() / 1024**2
        reserved  = torch.cuda.memory_reserved() / 1024**2
        peak      = torch.cuda.max_memory_allocated() / 1024**2

        print(f"[INFO] Current allocated: {allocated:.2f} MB")
        print(f"[INFO] Memory reserved: {reserved:.2f} MB")
        print(f"[INFO] Max memory used: {peak:.2f} MB")

        test_ids_base['mos'] = mos_preds
        test_ids_base.drop(['folds'], axis=1, inplace=True) 

    mem_usage = memory_usage(run_inference)
    print(f"Max memory usage: {max(mem_usage):.2f} MB")

# Loading LightGBM Models

In [None]:
# Variables
IMG_SIZE = 512
BATCH_SIZE = 1
NUM_METRICS = 1
N_SPLITS = 8
idx_csv = pd.read_csv(TARGET_LABEL / 'mos_fold_train.csv') #.sample(frac=1)
test_csv = pd.read_csv(TARGET_LABEL / 'mos_fold_test.csv') #.sample(frac=1)
fold_vector = [1, 2, 3, 4, 5, 6, 7, 8]
fold_test = [9, 10]
MODEL_ARCH =  MODEL_DIR / "lgbm_model_fold"
MODEL_ARCH2 =  MODEL_DIR2 / "lgbm_model_fold"
train_df_orig = pd.read_csv('train_features.csv')
test_df_orig = pd.read_csv('test_features.csv') #Suffix 2 includes a row with the names of the image files!

In [None]:
train_df_orig

In [None]:
N_COMPONENTS = 128
train_df_s, pca_s = apply_pca_train(train_df_orig, N_COMPONENTS)
test_df_s = apply_pca_test(test_df_orig, pca_s, N_COMPONENTS)

# Define calibrator folder
PCA_DIR = "pca"
os.makedirs(PCA_DIR, exist_ok=True)  # Create if not exists

# Save calibrator for this fold
with open(f"{PCA_DIR}/pca.pkl", "wb") as f:
    pickle.dump(pca_s, f)


In [None]:
test_df_s

In [None]:
# Pre-processing
manual_transforms = v2.Compose([
    v2.Resize((IMG_SIZE), interpolation=InterpolationMode.BICUBIC),
    v2.ToImage(),
    v2.ToDtype(torch.float32, scale=True),
    v2.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])
])


# Loading models
model_list = [
        (f'{MODEL_ARCH2}_mse_1_8_6.pkl', train_df_s, test_df_s),
        (f'{MODEL_ARCH}_mse_2_3_5.pkl', train_df_s, test_df_s),
        (f'{MODEL_ARCH2}_r2_3_8_4.pkl',  train_df_s, test_df_s),
        (f'{MODEL_ARCH}_mse_4_3_6.pkl',  train_df_s, test_df_s),
        (f'{MODEL_ARCH}_mse_5_1_5.pkl',   train_df_s, test_df_s),     
        (f'{MODEL_ARCH2}_r2_6_4_5.pkl',   train_df_s, test_df_s),
        (f'{MODEL_ARCH2}_r2_7_7_5.pkl',  train_df_s, test_df_s),
        (f'{MODEL_ARCH2}_r2_8_6_6.pkl',  train_df_s, test_df_s),
    ]

APPLY_CALIBRATION_PER_FOLD = True

lgb_models = []
for i in range(N_SPLITS):
    model = joblib.load(model_list[i][0])
    lgb_models.append((model, model_list[i][1], model_list[i][2]))

In [None]:
model_params = []

for i, context in enumerate(lgb_models):
    model = context[0]
    params = model.get_params()
    model_params.append({
        "Model": f"Model {i + 1}",
        "n_estimators": params.get("n_estimators"),
        "learning_rate": round(params.get("learning_rate", 0), 3),
        "max_depth": params.get("max_depth"),
        "num_leaves": params.get("num_leaves"),
        "max_bin": params.get("max_bin", 255)  # Default to 255 if not available
    })

df_model_params = pd.DataFrame(model_params)

print("TABLE II. 	BEST TRAINING SETUP PER CROSS-VALIDATION FOLD")
display(df_model_params)

# Computing Metrics for the Proposed Method

In [None]:
# Create a dataframe to store the results
dfprop = pd.DataFrame(0, index=[f"folder {i}" for i in fold_vector] + ["average"] + ["global"], columns=['plcc prop', 'srocc prop', 'rmse prop'])

fold_xyz = []
all_pred_prop = []
all_true_prop = []
calibrators = []

# Define calibrator folder
CALIBRATOR_DIR = "calibrators"
os.makedirs(CALIBRATOR_DIR, exist_ok=True)  # Create if not exists

# Execute K-folds
kf = KFold(n_splits=N_SPLITS, shuffle=True, random_state=SEED)
for fold, (_, val_idx) in enumerate(kf.split(fold_vector)):

    # Load data
    model = lgb_models[fold][0]
    train_df = lgb_models[fold][1]
    # K-fold preparation
    fold_val = fold_vector[val_idx[0]]
    val_ids = train_df.loc[train_df['fold'] == fold_val]
    X_val_fold = val_ids.drop(columns=['fold', 'mos'])
    y_true = val_ids['mos'].values

    # Predict using the model for this fold
    y_pred = model.predict(X_val_fold)

    # Per-fold linear regression
    lr_k = LinearRegression().fit(y_pred.reshape(-1, 1), y_true)
    calibrators.append(lr_k)
    
    # Save calibrator for this fold
    with open(f"{CALIBRATOR_DIR}/calibrator_fold{fold}.pkl", "wb") as f:
        pickle.dump(lr_k, f)

    # Accumulate all predictions and ground truths
    all_pred_prop.extend(y_pred)
    all_true_prop.extend(y_true) 

    fold_xyz.append((y_pred, y_true))

# Fit global regressor
all_pred_np = np.array(all_pred_prop).reshape(-1, 1)
all_true_np = np.array(all_true_prop).reshape(-1)

# Compute calibrator
lr = LinearRegression().fit(all_pred_np, all_true_np)

all_pred_prop = []
all_true_prop = []

# Execute K-folds
kf = KFold(n_splits=N_SPLITS, shuffle=True, random_state=SEED)
for fold, (_, val_idx) in enumerate(kf.split(fold_vector)):

    # Load data
    model = lgb_models[fold][0]
    train_df = lgb_models[fold][1]

    # K-fold preparation
    fold_val = fold_vector[val_idx[0]]
    val_ids = train_df.loc[train_df['fold'] == fold_val]
    X_val_fold = val_ids.drop(columns=['fold', 'mos'])
    y_true = val_ids['mos'].values

    # Predict using the model for this fold
    y_pred = model.predict(X_val_fold)

    # Calibrate the estimated MOS
    if APPLY_CALIBRATION_PER_FOLD:
        y_pred = calibrators[fold].predict(y_pred.reshape(-1, 1))
        #print(calibrators[fold].intercept_, calibrators[fold].coef_[0])
    else:
        y_pred = lr.predict(y_pred.reshape(-1, 1))
        
    # Accumulate all predictions and ground truths
    all_pred_prop.extend(y_pred)
    all_true_prop.extend(y_true)   

    # Correlation metrics
    plcc, _ = pearsonr(y_pred, y_true)    
    srocc, _ = spearmanr(y_pred, y_true)    
    rmse = root_mean_squared_error(y_pred, y_true) 

    # Store in dataframes
    dfprop.iloc[fold] = [plcc, srocc, rmse]

# Compute average metrics across folds and store in dataframes
dfprop.loc['average'] = [dfprop['plcc prop'].iloc[:8].mean(), dfprop['srocc prop'].iloc[:8].mean(), dfprop['rmse prop'].iloc[:8].mean()]

# Compute global metrics from all predictions and store in dataframes
global_plcc_prop, _ = pearsonr(all_pred_prop, all_true_prop)
global_srocc_prop, _ = spearmanr(all_pred_prop, all_true_prop)
global_rmse_prop = root_mean_squared_error(all_pred_prop, all_true_prop)
dfprop.loc['global'] = [global_plcc_prop, global_srocc_prop, global_rmse_prop]
dfprop = dfprop.round(3)

# Display dataframes
print("Results for the proposed method")
display(dfprop)

# Making Predictions on the Test Dataset Using the Proposed Method

In [None]:
# Constant definition
N_SPLITS = 8
IMG_SIZE = 512
BATCH_SIZE = 1
NUM_WORKERS = 0
BASELINE_NAME = f"VCIP_IMQA/VCIP"
BASELINE = Path(BASELINE_NAME)
TARGET_LABEL = BASELINE / "Labels"
CALIBRATOR_DIR = "calibrators"
PCA_DIR = 'pca'
test_vector = [9, 10]
MODEL_ARCH =  MODEL_DIR / "lgbm_model_fold"
MODEL_ARCH2 =  MODEL_DIR2 / "lgbm_model_fold"
test_csv = pd.read_csv(TARGET_LABEL / 'mos_fold_test.csv') #.sample(frac=1)
test_ids = test_csv[test_csv['folds'].isin(test_vector)]

# Load lgb models
model_list = [
        f'{MODEL_ARCH2}_mse_1_8_6.pkl',
        f'{MODEL_ARCH}_mse_2_3_5.pkl',
        f'{MODEL_ARCH2}_r2_3_8_4.pkl',
        f'{MODEL_ARCH}_mse_4_3_6.pkl',
        f'{MODEL_ARCH}_mse_5_1_5.pkl',     
        f'{MODEL_ARCH2}_r2_6_4_5.pkl',
        f'{MODEL_ARCH2}_r2_7_7_5.pkl',
        f'{MODEL_ARCH2}_r2_8_6_6.pkl',
    ]
lgb_models = []
for i in range(N_SPLITS):
    model = joblib.load(model_list[i])
    lgb_models.append(model)

# Load calibrators
calibrators = []
for fold in range(N_SPLITS):
    with open(f"{CALIBRATOR_DIR}/calibrator_fold{fold}.pkl", "rb") as f:
        lr_k = pickle.load(f)
    calibrators.append(lr_k)

# Load PCA model
with open(f"{PCA_DIR}/pca.pkl", "rb") as f:
    pca_s = pickle.load(f)

# Create test dataset
manual_transforms = v2.Compose([
    v2.Resize((IMG_SIZE), interpolation=InterpolationMode.BICUBIC),
    v2.ToImage(),
    v2.ToDtype(torch.float32, scale=True),
    v2.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])
])

test_dataset = CustomDataset(
    ids=test_ids,
    ref_dir=TARGET_DIR,
    transform=manual_transforms)

# Instantate the IQA model
iqa_model = IQAModel(
    model_list=['swin_v2_s', 'efficientnet_b3', 'convnext_s'],    
    batch_size=BATCH_SIZE,
    num_workers=NUM_WORKERS,
    lgb_models=lgb_models,
    calibrators=calibrators,
    pca_model=pca_s,
    dl_device=device,
)

# Make predictions
df_preds = iqa_model.predict(test_dataset, len(test_dataset))

# Display profiling
results = iqa_model.profile().round(2)
results["Device"] = ["CPU", "GPU", "CPU", "CPU", "CPU/GPU", "-", "-"]
results

In [None]:
test_ids_prop = test_csv.copy()
test_ids_prop = test_ids_prop.drop(columns=['folds'])
test_ids_prop['mos'] = test_ids_prop['image_name'].map(df_preds.set_index('image_name')['mos'])

In [None]:
test_ids_prop

In [None]:
# Print maximum and minimum MOS values
print(f"Max. prediction: {test_ids_prop['mos'].max()}")
print(f"Min. prediction: {test_ids_prop['mos'].min()}")

# Store dataframe in the csv submission file
#test_ids_prop.to_csv(f"submitted_papers/Challenge Submission - Sergio-Sanz-Rodriguez_0717.csv",index=False)

# Computing Metrics for the Baseline Method

In [None]:
import warnings
from models.iqa_module_baseline import mymodel
from utils.dataset import MyDataset_xinbo
warnings.filterwarnings('ignore')
from torch.utils.data import DataLoader

In [None]:
READ_CSV_BASE = True

# Load baseline results, its predictions have been previously made
if READ_CSV_BASE:
    #dfbase.to_csv("df_base.csv",index=False)
    #test_ids_base.to_csv("test_ids_base.csv", index=False)
    dfbase = pd.read_csv("df_base.csv")
    dfbase.index = [f"folder {i+1}" for i in range(8)] + ["average", "global"]
    test_ids_base = pd.read_csv("test_ids_base.csv")
    with open("all_pred_base.pkl", "rb") as f:
        all_pred_base = pickle.load(f)

    with open("all_true_base.pkl", "rb") as f:
        all_true_base = pickle.load(f)
else:

    #TARGET_DIR = r'D:/Repos/ML_Projects/Image-Manipulation-Quality-Assessment/VCIP_IMQA/VCIP/EQ420_image/'
    fold_vector = [1, 2, 3, 4, 5, 6, 7, 8]
    dfbase = pd.DataFrame(0, index=[f"folder {i}" for i in fold_vector] + ["average"] + ["global"], columns=['plcc base', 'srocc base', 'rmse base'])

    all_pred_base = []
    all_true_base = []
    opt='adam'
    error='mse'

    # Load models
    models = []
    for fold in range(8):
        print(f"Loading fold {fold + 1}...")
        model = mymodel()
        model_name = f"H:/outputs/iqa_total_20250616_fold{fold}{error}{opt}.pth"
        model.load_state_dict(torch.load(model_name))
        model = model.to(device)
        model.eval()
        models.append(model)

    # Execute K-folds
    kf = KFold(n_splits=N_SPLITS, shuffle=True, random_state=SEED)
    for fold, (_, val_idx) in enumerate(kf.split(fold_vector)):

        print(f"Processing fold {fold + 1}...")
        
        # K-fold preparation
        fold_val = fold_vector[val_idx[0]]
        val_ids = idx_csv.loc[idx_csv['folds'] == fold_val]
        #y_true = val_ids['mos'].values

        # Prepare the dataloader
        val_set = MyDataset_xinbo(ids=val_ids, ref_dir=TARGET_DIR)
        val_dataloader = {'val':DataLoader(val_set, batch_size=1,shuffle=False, num_workers=4)}
        dataset_size = len(val_ids)

        # Load model
        model = models[fold]

        # For each OOF image
        y_pred = []
        for sample_batched in val_dataloader["val"]:
            ref, mos = sample_batched['ref'], sample_batched['mos']
            ref, mos = ref.type(torch.cuda.FloatTensor), mos.type(torch.cuda.FloatTensor)
            ref, mos = ref.to(device), mos.to(device)
            with torch.set_grad_enabled(False):
                pred_score  = model(ref)
                pred_score = pred_score.squeeze().cpu().item()
                y_pred.append(pred_score)
        
        y_pred = np.array(y_pred).reshape(-1)
        y_true = np.array(val_ids["mos"].tolist()).reshape(-1)

        # Accumulate all predictions and ground truths
        all_pred_base.extend(y_pred)
        all_true_base.extend(y_true)    

        # Correlation metrics
        plcc, _ = pearsonr(y_pred, y_true)    
        srocc, _ = spearmanr(y_pred, y_true)    
        rmse = root_mean_squared_error(y_pred, y_true)    

        # Store in dataframes
        dfbase.iloc[fold] = [plcc, srocc, rmse]
        
    # Compute average metrics across folds and store in dataframes
    dfbase.loc['average'] = [dfbase['plcc base'].iloc[:8].mean(), dfbase['srocc base'].iloc[:8].mean(), dfbase['rmse base'].iloc[:8].mean()]

    del model
    torch.cuda.empty_cache()
    gc.collect()
    torch.cuda.synchronize()

    # Compute global metrics from all predictions and store in dataframes
    global_plcc_base, _ = pearsonr(all_pred_base, all_true_base)
    global_srocc_base, _ = spearmanr(all_pred_base, all_true_base)
    global_rmse_base = root_mean_squared_error(all_pred_base, all_true_base)
    dfbase.loc['global'] = [global_plcc_base, global_srocc_base, global_rmse_base]
    dfbase = dfbase.round(3)

    # Display dataframes
    print("Results for the baseline method")
    display(dfbase)

# Making Predictions on the Test Dataset Using the Baseline Method

In [None]:
if not READ_CSV_BASE:
    fold_test = [9, 10]
    test_csv = pd.read_csv(TARGET_LABEL / 'mos_fold_test.csv') #.sample(frac=1)
    test_ids_base = test_csv[test_csv['folds'].isin(fold_test)]

    # Prepare the dataloader
    test_set = MyDataset_xinbo(ids=test_ids_base, ref_dir=TARGET_DIR)
    test_dataloader = {'test':DataLoader(test_set, batch_size=1,shuffle=False, num_workers=4)}

    # For each OOF image
    mos_preds = []
    for sample in test_dataloader["test"]:
        img = sample['ref']
        img = img.to(device, dtype=torch.float)
        with torch.set_grad_enabled(False):
            test_preds = []
            for model in models:
                score  = model(img).squeeze().cpu().item()       
                test_preds.append(score)
            mos_preds.append(np.mean(test_preds))

    test_ids_base['mos'] = mos_preds
    test_ids_base.drop(['folds'], axis=1, inplace=True)   

    print(f"Max. prediction: {test_ids_base['mos'].max()}")
    print(f"Min. prediction: {test_ids_base['mos'].min()}")

    del models, model, sample
    torch.cuda.empty_cache()
    gc.collect()
    torch.cuda.synchronize()

# Assessment Tables and Plots

In [None]:
df = pd.concat([dfbase, dfprop], axis=1)
df = df.round(3)

print("TABLE III. COMPARISON OF BASELINE AND PROPOSED METHODS ACROSS FOLDS USING PLCC, SROCC, AND RMSE METRICS")
display(df)

In [None]:
# Compute calibrator
lrb = LinearRegression().fit(np.array(all_pred_base).reshape(-1, 1), all_true_np)
ab, bb = lrb.intercept_, lrb.coef_[0]
lrp = LinearRegression().fit(np.array(all_pred_prop).reshape(-1, 1), all_true_prop)
ap, bp = lrp.intercept_, lrp.coef_[0]
print(ap, bp)

fig, axes = plt.subplots(1, 2, figsize=(6.5, 3.5)) #, sharex=True, sharey=True)
axes = axes.flatten()
axes[0].scatter(all_pred_base, all_true_base, alpha=0.5, color='grey')
axes[0].plot(np.linspace(0, 1, 100), bb * np.linspace(0, 1, 100) + ab,
                color='black', linestyle='dashed', linewidth=1)
axes[1].scatter(all_pred_prop, all_true_prop, alpha=0.5, color='coral')
axes[1].plot(np.linspace(0, 1, 100), bp * np.linspace(0, 1, 100) + ap,
                color='black', linestyle='dashed', linewidth=1)
axes[0].set_title(
    f"Baseline\nPLCC = {df.loc['global','plcc base']}, SROCC = {df.loc['global','srocc base']}"
)
axes[1].set_title(
    f"Proposal\nPLCC = {df.loc['global','plcc prop']}, SROCC = {df.loc['global','srocc prop']}"
)
axes[0].set_xlabel('Predicted MOS')
axes[0].set_ylabel('True MOS')
axes[1].set_xlabel('Predicted MOS')
axes[1].set_ylabel('True MOS')
axes[0].set_xlim(0, 1)
axes[0].set_ylim(0, 1)
axes[1].set_xlim(0, 1)
axes[1].set_ylim(0, 1)
axes[0].grid(True)
axes[1].grid(True)
plt.tight_layout()
#plt.savefig(r"paper2/correlation_coral2_july17.tiff", dpi=300, bbox_inches='tight')
print("Figure 4. Correlation between predicted and true MOS on the OOF sets for the baseline (left) and proposed (right) models")
plt.show()