In [1]:
import sys
sys.path.append("/home/oldrain123/IMBALANCED_CLASSIFICATION/MOMs")

In [2]:
# ------------------------- 기본 라이브러리 ------------------------- #
import numpy as np, pandas as pd, torch
from collections import Counter
from itertools import product
from warnings import filterwarnings
filterwarnings("ignore")

# ML / imb‑learn
from sklearn.datasets import make_classification
from sklearn.model_selection import StratifiedKFold
from sklearn.ensemble import RandomForestClassifier
from sklearn.metrics import roc_auc_score
from sklearn.base import clone
from imblearn.over_sampling import (
    RandomOverSampler, SMOTE, ADASYN, BorderlineSMOTE
)

# moms_* 모듈
from moms_generate import transform     

import numpy as np
import torch
from sklearn.utils import check_random_state
from sklearn.base import BaseEstimator
from moms_generate import transform   # ← 사용자 업로드 파일
torch.set_num_threads(1) 

In [3]:
# ctgan_wrapper.py  (혹은 스크립트 상단에 삽입)
from ctgan import CTGAN
import pandas as pd
import numpy as np

class CTGANOversampler:
    """
    imblearn‑style wrapper around CTGAN.
    - 모든 피처를 연속형으로 간주 (make_classification 기반 실험이므로 OK)
    - minority 샘플이 10 미만이면 RandomOverSampler 로 폴백
    """

    def __init__(self, epochs=200, random_state=None, min_samples=10):
        self.epochs = epochs
        self.random_state = random_state
        self.min_samples = min_samples

    def _ros(self, X, y):
        from imblearn.over_sampling import RandomOverSampler
        return RandomOverSampler(random_state=self.random_state).fit_resample(X, y)

    def fit_resample(self, X, y):
        X = np.asarray(X, dtype=np.float32)
        y = np.asarray(y, dtype=int)
        maj_mask, min_mask = (y == 0), (y == 1)
        X_min, X_maj = X[min_mask], X[maj_mask]

        n_to_gen = len(X_maj) - len(X_min)
        if n_to_gen <= 0:
            return X.copy(), y.copy()

        # minority 표본이 너무 적으면 ROS 폴백
        if len(X_min) < self.min_samples:
            return self._ros(X, y)

        # ---- CTGAN 학습 ----
        col_names = [f"f{i}" for i in range(X.shape[1])]   # 문자열 컬럼명
        df_min = pd.DataFrame(X_min, columns=col_names)

        ctgan = CTGAN(epochs=self.epochs,
                      verbose=False)
        try:
            ctgan.fit(df_min, discrete_columns=[])
        except Exception:          # 어떤 이유로든 실패 시 ROS 폴백
            return self._ros(X, y)

        synth = ctgan.sample(n_to_gen).values.astype(np.float32)
        X_res = np.vstack([X, synth])
        y_res = np.hstack([y, np.ones(n_to_gen, dtype=int)])
        return X_res, y_res

    def fit(self, X, y=None):
        return self


In [4]:
class TransportMMDSampler(BaseEstimator):
    """
    A scikit‑learn compatible wrapper for the Transport‑MMD oversampling
    algorithm defined in moms_* modules.

    Parameters
    ----------
    n_epochs : int
        #epochs for TransMap training.
    beta : float
        Regularization weight (triplet‑loss).
    kernel_type : str
        'gaussian' (default) | 'laplacian' | 'imq' | 'rq'.
    device : 'cpu' | 'cuda'
    random_state : int | None
    **kwargs : dict
        Extra hyper‑parameters forwarded to moms_generate.transform().
    """
    def __init__(self,
                 n_epochs: int = 1000,
                 beta: float = 0.0,
                 kernel_type: str = "gaussian",
                 device: str = "cpu",
                 random_state: int | None = None,
                 **kwargs):
        self.n_epochs = n_epochs
        self.beta = beta
        self.kernel_type = kernel_type
        self.device = device
        self.random_state = random_state
        self.kwargs = kwargs

    # ------------------------------------------------------ #
    def fit_resample(self, X, y):
        """
        Parameters
        ----------
        X : array‑like, shape (n_samples, n_features)
        y : array‑like, shape (n_samples,)   (binary: 0 / 1)

        Returns
        -------
        X_res, y_res : ndarray
            Resampled feature matrix / label vector.
        """
        rng = check_random_state(self.random_state)

        X = np.asarray(X, dtype=np.float32)
        y = np.asarray(y, dtype=int)

        maj_mask = (y == 0)
        min_mask = (y == 1)

        X_maj = X[maj_mask]
        X_min = X[min_mask]

        n_maj, n_min = len(X_maj), len(X_min)

        # minority가 2개 미만이면 oversampling 불가 → 원본 반환
        if n_min < 2 or n_maj <= n_min:
            return X.copy(), y.copy()

        # moms_generate.transform  호출
        X_maj_sel, X_min_copy, X_trans = transform(
            X_maj=X_maj,
            X_min=X_min,
            in_dim=X.shape[1],
            kernel_type=self.kernel_type,
            beta=self.beta,
            n_epochs=self.n_epochs,
            seed=rng.randint(0, 2**31 - 1),
            device=self.device,
            **self.kwargs
        )

        # 생성 샘플은 minority 라벨(1) 부여
        X_res = np.vstack([X, X_trans])
        y_res = np.hstack([y, np.ones(len(X_trans), dtype=int)])
        return X_res, y_res

    # imblearn API 호환을 위해 fit() 리턴 self
    def fit(self, X, y=None):
        return self

In [5]:
# ------------------ 안전 SMOTE / ADASYN / bSMOTE ------------------ #
SEED = 1203
def smote_safe(kind, y):
    cnt = Counter(y); n_min = cnt[1]
    if n_min < 2:
        return None
    k = max(1, min(5, n_min - 1))
    if kind == "smote":
        return SMOTE(k_neighbors=k, random_state=SEED)
    if kind == "adasyn":
        return ADASYN(n_neighbors=k, random_state=SEED)
    if kind == "borderline":
        m = max(1, min(10, cnt[0] - 1))
        return BorderlineSMOTE(kind="borderline-1",
                               k_neighbors=k, m_neighbors=m,
                               random_state=SEED)

# --------------------------- 실험 세팅 --------------------------- #
CV = StratifiedKFold(n_splits=5, shuffle=True, random_state=SEED)
BASE_CLF = RandomForestClassifier(
    n_estimators=200, random_state=SEED, n_jobs=-1
)

OurSampler = lambda **kw: TransportMMDSampler(
    n_epochs=1000, beta=0.0, kernel_type="gaussian",
    device="cpu", random_state=SEED, **kw
)

def evaluate_grid(d, n_min, n_maj_mult=30):
    """단일 (d, n_min) 설정에 대해 5‑fold AUROC 측정"""
    n_maj = n_min * n_maj_mult
    X, y = make_classification(
        n_samples=n_min + n_maj,
        n_features=d,
        n_informative=int(0.6 * d),
        n_redundant=int(0.2 * d),
        n_clusters_per_class=1,
        weights=[n_maj / (n_min + n_maj)],
        flip_y=0.01, class_sep=1.0, random_state=SEED
    )

    samplers = {
        "ROS"   : RandomOverSampler(random_state=SEED),
        "SMOTE" : None,
        "bSMOTE": None,
        "ADASYN": None,
        "CTGAN" : None,                # fold마다 새로 생성
        "OURS"  : OurSampler()
    }
    scores = {m: [] for m in samplers}

    for tr, te in CV.split(X, y):
        X_tr, y_tr = X[tr], y[tr]
        X_te, y_te = X[te], y[te]

        for name in samplers:
            if name == "SMOTE":
                sampler = smote_safe("smote", y_tr)
            elif name == "bSMOTE":
                sampler = smote_safe("borderline", y_tr)
            elif name == "ADASYN":
                sampler = smote_safe("adasyn", y_tr)
            elif name == "CTGAN":
                sampler = CTGANOversampler(
                    epochs=100)
            else:
                sampler = samplers[name]

            if sampler is None:
                X_bal, y_bal = X_tr, y_tr
            else:
                X_bal, y_bal = sampler.fit_resample(X_tr, y_tr)

            clf = clone(BASE_CLF).fit(X_bal, y_bal)
            prob = clf.predict_proba(X_te)[:, 1]
            scores[name].append(roc_auc_score(y_te, prob))

    means = {m: np.mean(v) for m, v in scores.items()}
    delta = means["OURS"] - means["bSMOTE"]
    return means, delta

In [6]:
# --------------------------- 메인 루프 --------------------------- #
Ds, Nmin = [5, 20, 50], [5, 10, 20, 50]
records = []

for d, n_m in product(Ds, Nmin):
    mean_scores, _ = evaluate_grid(d, n_m)

    Δ_bsm = mean_scores["OURS"] - mean_scores["bSMOTE"]
    Δ_ctg = mean_scores["OURS"] - mean_scores["CTGAN"]

    records.append({
        "d": d, "n_min": n_m,
        **mean_scores,                 # ROS, SMOTE, bSMOTE, ADASYN, CTGAN, OURS
        "Δ(OURS-bSMOTE)": Δ_bsm,
        "Δ(OURS-CTGAN)" : Δ_ctg
    })

    print(f"(d={d:2}, n_min={n_m:3})  "
          f"OURS={mean_scores['OURS']:.3f} | "
          f"bSMOTE={mean_scores['bSMOTE']:.3f} | "
          f"CTGAN={mean_scores['CTGAN']:.3f} | "
          f"Δ_bSMOTE={Δ_bsm:+.3f} | Δ_CTGAN={Δ_ctg:+.3f}")

df = pd.DataFrame(records).sort_values(["d", "n_min"])

print("\n=== Factorial Grid Summary (AUROC) ===")
print(df[["d", "n_min",
          "OURS", "bSMOTE", "CTGAN",
          "Δ(OURS-bSMOTE)", "Δ(OURS-CTGAN)"]]
      .to_string(index=False))

(d= 5, n_min=  5)  OURS=nan | bSMOTE=nan | CTGAN=nan | Δ_bSMOTE=+nan | Δ_CTGAN=+nan
(d= 5, n_min= 10)  OURS=0.928 | bSMOTE=0.918 | CTGAN=0.908 | Δ_bSMOTE=+0.010 | Δ_CTGAN=+0.020
(d= 5, n_min= 20)  OURS=0.946 | bSMOTE=0.947 | CTGAN=0.962 | Δ_bSMOTE=-0.001 | Δ_CTGAN=-0.016
(d= 5, n_min= 50)  OURS=0.934 | bSMOTE=0.925 | CTGAN=0.901 | Δ_bSMOTE=+0.008 | Δ_CTGAN=+0.033
(d=20, n_min=  5)  OURS=nan | bSMOTE=nan | CTGAN=nan | Δ_bSMOTE=+nan | Δ_CTGAN=+nan
(d=20, n_min= 10)  OURS=0.823 | bSMOTE=0.850 | CTGAN=0.832 | Δ_bSMOTE=-0.027 | Δ_CTGAN=-0.010
(d=20, n_min= 20)  OURS=0.918 | bSMOTE=0.934 | CTGAN=0.783 | Δ_bSMOTE=-0.016 | Δ_CTGAN=+0.135
(d=20, n_min= 50)  OURS=0.929 | bSMOTE=0.931 | CTGAN=0.925 | Δ_bSMOTE=-0.002 | Δ_CTGAN=+0.004
(d=50, n_min=  5)  OURS=0.920 | bSMOTE=0.783 | CTGAN=0.767 | Δ_bSMOTE=+0.137 | Δ_CTGAN=+0.153
(d=50, n_min= 10)  OURS=0.781 | bSMOTE=0.690 | CTGAN=0.664 | Δ_bSMOTE=+0.091 | Δ_CTGAN=+0.117
(d=50, n_min= 20)  OURS=0.851 | bSMOTE=0.849 | CTGAN=0.794 | Δ_bSMOTE=+0.003 | Δ