In [None]:
import os
import zipfile
import numpy as np
import pandas as pd
import scipy.io
import scipy.stats
from scipy.optimize import curve_fit
from sklearn.svm import SVR
from sklearn.preprocessing import MinMaxScaler
from sklearn.metrics import mean_squared_error
from sklearn.impute import SimpleImputer
from joblib import Parallel, delayed
from tqdm import tqdm
import warnings

warnings.filterwarnings("ignore")

NUM_ITERATIONS = 500
N_JOBS = -1

def get_dataset_info(prompt_text):
    print(f"\n--- {prompt_text} ---")
    print("1. UID (Expects 'mos_UID.xlsx')")
    print("2. SAUD (Expects 'SAUD_MOS.xlsx')")

    choice = input("Enter 1 or 2: ").strip()

    if choice == '1':
        mos_file = "mos_UID.xlsx"
        name = "UID"
    elif choice == '2':
        mos_file = "SAUD_MOS.xlsx"
        name = "SAUD"
    else:
        print("Invalid selection. Defaulting to UID.")
        mos_file = "mos_UID.xlsx"
        name = "UID"

    if not os.path.exists(mos_file):
        print(f"[ERROR] '{mos_file}' not found. Please upload it.")
        return None, None, None

    zip_name = input(f"Enter the features ZIP file for {name}: ").strip()

    if not os.path.exists(zip_name):
        print(f"[ERROR] '{zip_name}' not found.")
        return None, None, None

    return name, mos_file, zip_name

def unzip_features(zip_name, extract_to):
    # Clean up if exists
    if os.path.exists(extract_to):
        import shutil
        shutil.rmtree(extract_to)
    os.makedirs(extract_to)

    print(f"Unzipping {zip_name} into {extract_to}...")
    with zipfile.ZipFile(zip_name, 'r') as zip_ref:
        zip_ref.extractall(extract_to)

def load_features_from_folder(feature_folder, df_mos, name_col, mos_col):
    """
    Loads features from a specific folder and matches with MOS.
    Uses os.walk to find .mat files recursively, handling mirrored directory structures.
    """
    # 1. Map Files
    file_map = {}
    for root, _, files in os.walk(feature_folder):
        for f in files:
            if f.endswith('.mat'):
                # Store lower case keys for robust matching
                file_map[f.lower()] = os.path.join(root, f)
                file_map[os.path.splitext(f)[0].lower()] = os.path.join(root, f)

    features = []
    scores = []

    # 2. Align
    for _, row in df_mos.iterrows():
        fname = str(row[name_col]).strip()
        key = os.path.basename(fname).lower()
        key_no_ext = os.path.splitext(key)[0]

        path = file_map.get(key) or file_map.get(key_no_ext)

        if path:
            try:
                mat = scipy.io.loadmat(path)
                # Find the feature variable (ignore __header__, etc)
                k = [k for k in mat.keys() if not k.startswith('__')][0]
                feat_vec = np.array(mat[k]).flatten()
                features.append(feat_vec)
                scores.append(row[mos_col])
            except:
                pass

    return np.array(features), np.array(scores)

def logistic_func(X, b1, b2, b3, b4):
    logisticPart = 1 + np.exp(-(X - b3) / np.abs(b4))
    yhat = b2 + (b1 - b2) / logisticPart
    return yhat

def compute_metrics(y_pred, y):
    try:
        beta_init = [np.max(y), np.min(y), np.mean(y_pred), 0.5]
        popt, _ = curve_fit(logistic_func, y_pred, y, p0=beta_init, maxfev=int(1e8))
        y_pred_logistic = logistic_func(y_pred, *popt)
    except:
        y_pred_logistic = y_pred

    SRCC = scipy.stats.spearmanr(y, y_pred)[0]
    try:
        KRCC = scipy.stats.kendalltau(y, y_pred)[0]
    except:
        KRCC = scipy.stats.kendalltau(y, y_pred, method='asymptotic')[0]

    PLCC = scipy.stats.pearsonr(y, y_pred_logistic)[0]
    RMSE = np.sqrt(mean_squared_error(y, y_pred_logistic))
    return [SRCC, KRCC, PLCC, RMSE]

def run_evaluation(iter_idx, X_train_full, y_train_full, X_test_full, y_test_full, is_same_dataset):
    np.random.seed(iter_idx)

    # CASE A: Same Dataset (70/10/20 Split)
    if is_same_dataset:
        n = len(y_train_full)
        perm = np.random.permutation(n)
        n_train = int(n * 0.7)
        n_val = int(n * 0.1)

        train_idx = perm[:n_train]
        val_idx = perm[n_train:n_train+n_val]
        test_idx = perm[n_train+n_val:]

        X_tr, y_tr = X_train_full[train_idx], y_train_full[train_idx]
        X_val, y_val = X_train_full[val_idx], y_train_full[val_idx]
        X_te, y_te = X_train_full[test_idx], y_train_full[test_idx]

    # CASE B: Cross Dataset (Train on A, Test on B)
    else:
        # Use 90% of Train Set for Training, 10% for Hyperparam Validation
        n = len(y_train_full)
        perm = np.random.permutation(n)
        n_train = int(n * 0.9)

        train_idx = perm[:n_train]
        val_idx = perm[n_train:]

        X_tr, y_tr = X_train_full[train_idx], y_train_full[train_idx]
        X_val, y_val = X_train_full[val_idx], y_train_full[val_idx]
        X_te, y_te = X_test_full, y_test_full # Test on full Dataset B

    # Scale
    scaler = MinMaxScaler((-1, 1))
    X_tr = scaler.fit_transform(X_tr)
    X_val = scaler.transform(X_val)
    X_te = scaler.transform(X_te)

    # Grid Search
    best_srcc = -1
    best_p = {'C': 10, 'gamma': 0.1}

    # Small Grid
    for c in [1, 10, 100]:
        for g in [0.01, 0.1, 1]:
            m = SVR(C=c, gamma=g)
            m.fit(X_tr, y_tr)
            p = m.predict(X_val)
            s = scipy.stats.spearmanr(y_val, p)[0]
            if s > best_srcc:
                best_srcc = s
                best_p = {'C': c, 'gamma': g}

    # Final Train
    X_final = np.vstack((X_tr, X_val))
    y_final = np.concatenate((y_tr, y_val))
    final_m = SVR(C=best_p['C'], gamma=best_p['gamma'])
    final_m.fit(X_final, y_final)

    # Test
    preds = final_m.predict(X_te)
    return compute_metrics(preds, y_te)

if __name__ == "__main__":
    print("=== Feature Combination Analysis (Supports H/S/V/HS Modes) ===")

    # 1. Inputs
    t_name, t_mos_file, t_zip = get_dataset_info("SELECT TRAINING DATASET")
    if not t_name: exit()

    test_name, test_mos_file, test_zip = get_dataset_info("SELECT TESTING DATASET")
    if not test_name: exit()

    is_same = (t_name == test_name) and (t_zip == test_zip)
    if is_same:
        print(f"\n[Mode] Same Dataset Evaluation (70/10/20 on {t_name})")
    else:
        print(f"\n[Mode] Cross Dataset Evaluation (Train {t_name} -> Test {test_name})")

    # 2. Unzip
    train_extract_path = "./train_feats_extracted"
    test_extract_path = "./test_feats_extracted"

    unzip_features(t_zip, train_extract_path)
    if not is_same:
        unzip_features(test_zip, test_extract_path)
    else:
        test_extract_path = train_extract_path

    # 3. Load MOS Dataframes
    df_train = pd.read_excel(t_mos_file)
    df_train.columns = [c.strip() for c in df_train.columns]

    df_test = pd.read_excel(test_mos_file)
    df_test.columns = [c.strip() for c in df_test.columns]

    # Detect MOS Columns
    def get_cols(df):
        if 'Image' in df.columns: name = 'Image'
        elif 'image_name' in df.columns: name = 'image_name'
        else: name = df.columns[0]
        return name, 'MOS'

    t_name_col, t_mos_col = get_cols(df_train)
    test_name_col, test_mos_col = get_cols(df_test)

    # 4. Find Feature Subfolders (Handing New Mode Structure)
    # -------------------------------------------------------------------------
    # Handle case where zip creates a single wrapper folder
    root_items = [f.path for f in os.scandir(train_extract_path) if f.is_dir()]
    if len(root_items) == 1:
        # Check if this single folder is NOT a Mode or Config (e.g. 'Extracted_Features')
        base = os.path.basename(root_items[0])
        if base not in ['H', 'S', 'V', 'HS'] and not base.startswith('S1'):
             train_extract_path = root_items[0]
             if not is_same:
                 # Align test path
                 test_items = [f.path for f in os.scandir(test_extract_path) if f.is_dir()]
                 if len(test_items) == 1:
                     test_extract_path = test_items[0]

    # Detect Structure: Modes (H/S/V/HS) vs Flat Configs (S1_FA...)
    potential_modes = ['H', 'S', 'V', 'HS']
    items = [f.name for f in os.scandir(train_extract_path) if f.is_dir()]

    # Check if we see the Color Mode folders
    has_modes = any(m in items for m in potential_modes)

    experiments = [] # List of dicts {label, train_path, test_path}

    if has_modes:
        print(f"Detected Color Mode Structure (H, S, V, HS)...")
        for mode in items:
            mode_path = os.path.join(train_extract_path, mode)

            # Find configs inside the mode folder (S1_FA, S12_FAB...)
            configs = [f.name for f in os.scandir(mode_path) if f.is_dir()]

            for cfg in configs:
                experiments.append({
                    'label': f"{mode} - {cfg}", # Example: H - S1_FAB
                    'train_path': os.path.join(train_extract_path, mode, cfg),
                    'test_path': os.path.join(test_extract_path, mode, cfg) if test_extract_path else None
                })
    else:
        print(f"Detected Flat Configuration Structure...")
        for cfg in items:
            experiments.append({
                'label': cfg,
                'train_path': os.path.join(train_extract_path, cfg),
                'test_path': os.path.join(test_extract_path, cfg) if test_extract_path else None
            })

    if not experiments:
        print("Error: No feature subfolders found in zip.")
        exit()

    # Sort for cleaner output
    experiments.sort(key=lambda x: x['label'])

    # 5. Iterate Over All Feature Sets
    results_table = []
    print(f"\nEvaluating {len(experiments)} feature sets...")

    for exp in tqdm(experiments, desc="Evaluations"):
        config_label = exp['label']
        t_folder = exp['train_path']
        test_folder = exp['test_path']

        # Load Data
        X_train, y_train = load_features_from_folder(t_folder, df_train, t_name_col, t_mos_col)

        if is_same:
            X_test, y_test = X_train, y_train
        else:
            # Check if test folder exists (it should if zips match)
            if not test_folder or not os.path.exists(test_folder):
                continue
            X_test, y_test = load_features_from_folder(test_folder, df_test, test_name_col, test_mos_col)

        if len(X_train) == 0 or len(X_test) == 0:
            continue

        # Impute
        if np.isnan(X_train).any():
            imp = SimpleImputer(strategy='mean')
            X_train = imp.fit_transform(X_train)
            if is_same:
                X_test = X_train
            else:
                if np.isnan(X_test).any():
                     X_test = imp.transform(X_test)

        # Run Parallel Iterations
        metrics = Parallel(n_jobs=N_JOBS)(delayed(run_evaluation)
                                          (i, X_train, y_train, X_test, y_test, is_same)
                                          for i in range(NUM_ITERATIONS))

        metrics = np.array(metrics)
        means = np.mean(metrics, axis=0)

        # Add to Table
        n_dim = X_train.shape[1]
        results_table.append({
            "Configuration": f"{config_label} ({n_dim})",
            "SRCC": means[0],
            "KRCC": means[1],
            "PLCC": means[2],
            "RMSE": means[3]
        })

    # 6. Display Results
    if results_table:
        res_df = pd.DataFrame(results_table)
        res_df = res_df.sort_values(by="SRCC", ascending=False)

        print("\n" + "="*70)
        print(f"RESULTS TABLE ({NUM_ITERATIONS} Iterations)")
        print(f"Train: {t_name} | Test: {test_name}")
        print("="*70)
        print(res_df.to_string(index=False, float_format="%.4f"))
        print("="*70)

        csv_name = f"Results_{t_name}_vs_{test_name}.csv"
        res_df.to_csv(csv_name, index=False)
        print(f"Saved to {csv_name}")
    else:
        print("No valid results computed.")

In [None]:
pip install xgboost

In [None]:
import os
import zipfile
import numpy as np
import pandas as pd
import scipy.io
import scipy.stats
from scipy.optimize import curve_fit
from sklearn.metrics import mean_squared_error
from sklearn.impute import SimpleImputer
from joblib import Parallel, delayed
from tqdm import tqdm
import xgboost as xgb
import warnings

warnings.filterwarnings("ignore")

# ==========================================
# CONFIGURATION
# ==========================================
NUM_ITERATIONS = 500
N_JOBS = -1

# XGBoost Params
# REMOVED 'random_state' to avoid duplication
# REMOVED 'early_stopping_rounds' from here (we add it in the loop)
XGB_PARAMS = {
    'n_estimators': 500,
    'max_depth': 4,
    'learning_rate': 0.05,
    'subsample': 0.8,
    'colsample_bytree': 0.8,
    'objective': 'reg:squarederror',
    'n_jobs': 1, # Keep this 1 to allow outer parallelization
}

# ==========================================
# HELPER FUNCTIONS
# ==========================================

def get_dataset_info(prompt_text):
    print(f"\n--- {prompt_text} ---")
    print("1. UID (Expects 'mos_UID.xlsx')")
    print("2. SAUD (Expects 'SAUD_MOS.xlsx')")
    choice = input("Enter 1 or 2: ").strip()
    if choice == '1':
        return "UID", "mos_UID.xlsx", input("Enter UID ZIP: ").strip()
    elif choice == '2':
        return "SAUD", "SAUD_MOS.xlsx", input("Enter SAUD ZIP: ").strip()
    return None, None, None

def unzip_features(zip_name, extract_to):
    if os.path.exists(extract_to):
        import shutil
        shutil.rmtree(extract_to)
    os.makedirs(extract_to)
    with zipfile.ZipFile(zip_name, 'r') as z:
        z.extractall(extract_to)

def load_features_from_folder(feature_folder, df_mos, name_col, mos_col):
    file_map = {}
    for root, _, files in os.walk(feature_folder):
        for f in files:
            if f.endswith('.mat'):
                file_map[f.lower()] = os.path.join(root, f)
                file_map[os.path.splitext(f)[0].lower()] = os.path.join(root, f)

    features, scores = [], []
    for _, row in df_mos.iterrows():
        fname = str(row[name_col]).strip()
        key = os.path.basename(fname).lower()
        path = file_map.get(key) or file_map.get(os.path.splitext(key)[0])
        if path:
            try:
                mat = scipy.io.loadmat(path)
                k = [k for k in mat.keys() if not k.startswith('__')][0]
                features.append(np.array(mat[k]).flatten())
                scores.append(row[mos_col])
            except: pass
    return np.array(features), np.array(scores)

def logistic_func(X, b1, b2, b3, b4):
    logisticPart = 1 + np.exp(-(X - b3) / np.abs(b4))
    return b2 + (b1 - b2) / logisticPart

def compute_metrics(y_pred, y):
    try:
        beta_init = [np.max(y), np.min(y), np.mean(y_pred), 0.5]
        popt, _ = curve_fit(logistic_func, y_pred, y, p0=beta_init, maxfev=int(1e4))
        y_pred_logistic = logistic_func(y_pred, *popt)
    except:
        y_pred_logistic = y_pred

    SRCC = scipy.stats.spearmanr(y, y_pred)[0]
    try: KRCC = scipy.stats.kendalltau(y, y_pred)[0]
    except: KRCC = 0
    PLCC = scipy.stats.pearsonr(y, y_pred_logistic)[0]
    RMSE = np.sqrt(mean_squared_error(y, y_pred_logistic))
    return [SRCC, KRCC, PLCC, RMSE]

# ==========================================
# CORE EVALUATION (FIXED FOR XGBOOST 2.0+)
# ==========================================

def run_evaluation_xgboost(iter_idx, X_train_full, y_train_full, X_test_full, y_test_full, is_same_dataset):

    np.random.seed(iter_idx)

    # --- SPLIT LOGIC ---
    if is_same_dataset:
        # INTRA: 70% Train, 10% Val, 20% Test
        n = len(y_train_full)
        perm = np.random.permutation(n)

        n_train = int(n * 0.7)
        n_val   = int(n * 0.1)

        train_idx = perm[:n_train]
        val_idx   = perm[n_train : n_train + n_val]
        test_idx  = perm[n_train + n_val :]

        X_tr, y_tr = X_train_full[train_idx], y_train_full[train_idx]
        X_val, y_val = X_train_full[val_idx], y_train_full[val_idx]
        X_te, y_te = X_train_full[test_idx], y_train_full[test_idx]

    else:
        # INTER: 90% Train, 10% Val -> Test on B
        n = len(y_train_full)
        perm = np.random.permutation(n)

        n_train = int(n * 0.9)

        train_idx = perm[:n_train]
        val_idx   = perm[n_train:]

        X_tr, y_tr = X_train_full[train_idx], y_train_full[train_idx]
        X_val, y_val = X_train_full[val_idx], y_train_full[val_idx]
        X_te, y_te = X_test_full, y_test_full

    # --- MODEL TRAINING ---
    # FIX: Move 'early_stopping_rounds' to constructor
    model = xgb.XGBRegressor(
        **XGB_PARAMS,
        early_stopping_rounds=20,
        random_state=iter_idx
    )

    # FIX: Remove 'early_stopping_rounds' from fit()
    model.fit(
        X_tr, y_tr,
        eval_set=[(X_val, y_val)],
        verbose=False
    )

    preds = model.predict(X_te)
    return compute_metrics(preds, y_te)

# ==========================================
# MAIN
# ==========================================

if __name__ == "__main__":
    print("=== XGBoost Analysis (70/10/20 Intra & 90/10 Inter) ===")

    # 1. Inputs
    t_name, t_mos_file, t_zip = get_dataset_info("SELECT TRAINING DATASET")
    if not t_name: exit()

    test_name, test_mos_file, test_zip = get_dataset_info("SELECT TESTING DATASET")
    if not test_name: exit()

    is_same = (t_name == test_name) and (t_zip == test_zip)

    if is_same:
        print(f"\n[Mode] Same Dataset -> Splitting {t_name}: 70% Train, 10% Val, 20% Test")
    else:
        print(f"\n[Mode] Cross Dataset -> {t_name}: 90% Train, 10% Val | Test on {test_name}")

    # 2. Unzip
    train_extract_path = "./train_feats_extracted"
    test_extract_path = "./test_feats_extracted"

    unzip_features(t_zip, train_extract_path)
    if not is_same:
        unzip_features(test_zip, test_extract_path)
    else:
        test_extract_path = train_extract_path

    # Load MOS
    df_train = pd.read_excel(t_mos_file)
    df_train.columns = [c.strip() for c in df_train.columns]
    df_test = pd.read_excel(test_mos_file)
    df_test.columns = [c.strip() for c in df_test.columns]

    def get_cols(df):
        if 'Image' in df.columns: name = 'Image'
        elif 'image_name' in df.columns: name = 'image_name'
        else: name = df.columns[0]
        return name, 'MOS'

    t_name_col, t_mos_col = get_cols(df_train)
    test_name_col, test_mos_col = get_cols(df_test)

    # 3. Detect Folder Structure
    root_items = [f.path for f in os.scandir(train_extract_path) if f.is_dir()]
    if len(root_items) == 1 and os.path.basename(root_items[0]) not in ['H','S','V','HS']:
         train_extract_path = root_items[0]
         if not is_same:
             test_items = [f.path for f in os.scandir(test_extract_path) if f.is_dir()]
             if len(test_items) == 1: test_extract_path = test_items[0]

    items = [f.name for f in os.scandir(train_extract_path) if f.is_dir()]
    potential_modes = ['H', 'S', 'V', 'HS']
    has_modes = any(m in items for m in potential_modes)
    experiments = []

    if has_modes:
        for mode in items:
            if mode not in potential_modes: continue
            mode_path = os.path.join(train_extract_path, mode)
            for cfg in [f.name for f in os.scandir(mode_path) if f.is_dir()]:
                experiments.append({
                    'label': f"{mode} - {cfg}",
                    'train_path': os.path.join(train_extract_path, mode, cfg),
                    'test_path': os.path.join(test_extract_path, mode, cfg) if test_extract_path else None
                })
    else:
        for cfg in items:
            experiments.append({
                'label': cfg,
                'train_path': os.path.join(train_extract_path, cfg),
                'test_path': os.path.join(test_extract_path, cfg) if test_extract_path else None
            })

    experiments.sort(key=lambda x: x['label'])

    # 4. Run Loop
    results_table = []
    print(f"\nEvaluating {len(experiments)} feature sets...")

    for exp in tqdm(experiments):
        t_folder = exp['train_path']
        test_folder = exp['test_path']

        X_train, y_train = load_features_from_folder(t_folder, df_train, t_name_col, t_mos_col)

        if is_same:
            X_test, y_test = X_train, y_train
        else:
            if not test_folder or not os.path.exists(test_folder): continue
            X_test, y_test = load_features_from_folder(test_folder, df_test, test_name_col, test_mos_col)

        if len(X_train) == 0 or len(X_test) == 0: continue

        # Impute
        if np.isnan(X_train).any():
            imp = SimpleImputer(strategy='mean')
            X_train = imp.fit_transform(X_train)
            X_test = X_train if is_same else imp.transform(X_test)

        metrics = Parallel(n_jobs=N_JOBS)(
            delayed(run_evaluation_xgboost)(
                i, X_train, y_train, X_test, y_test, is_same
            )
            for i in range(NUM_ITERATIONS)
        )

        means = np.mean(metrics, axis=0)
        results_table.append({
            "Configuration": exp['label'],
            "SRCC": means[0],
            "KRCC": means[1],
            "PLCC": means[2],
            "RMSE": means[3]
        })

    # 5. Save
    if results_table:
        res_df = pd.DataFrame(results_table).sort_values(by="SRCC", ascending=False)
        print("\n" + res_df.to_string(index=False, float_format="%.4f"))
        res_df.to_csv(f"Results_XGBoost_{t_name}_{test_name}.csv", index=False)
        print("Done.")