In [10]:
import numpy as np
import pandas as pd

# --- toy 데이터 만들기 ---
# 10개 샘플, 예측 확률 proba
proba = np.array([0.9, 0.8, 0.7, 0.6, 0.4,
                  0.3, 0.2, 0.1, 0.05, 0.95])

# 민감속성 S (여기서는 하나: sex)
# 앞 5명=남자(1), 뒤 5명=여자(0)
S = pd.DataFrame({"sex": [1,1,0,1,0, 1,1,0,0,0]})


In [12]:
# --- 실행 ---
print("전체 positive rate =", (proba>=0.5).mean())
print("worst SPD =", worst_group_spd(proba, S))
print("mean SPD  =", mean_group_spd(proba, S))

전체 positive rate = 0.5
worst SPD = 0.0
mean SPD  = 0.09999999999999998


In [33]:

def _as_mask_list_from_V(V, n):
    """
    subgroup subset 이 input으로 들어오면 각 샘플이 해당 ss에 들어가는지 여부 반환
    subgroup subset 10개 들어오면 각 샘플의 10개 bool mask 출력
    V: list of group specs. 각 원소는
       - bool mask (shape (n,))
       - 정수 인덱스 배열
    return: list[np.ndarray(bool, shape (n,))]
    """
    masks = []
    for g in V:
        if isinstance(g, np.ndarray) and g.dtype == bool:
            m = g
        else:
            idx = np.asarray(g, dtype=int).reshape(-1)
            m = np.zeros(n, dtype=bool)
            m[idx] = True
        masks.append(m)
    return masks


def _rate(pred):
    return float(np.mean(pred)) if len(pred)>0 else np.nan


## [ ] 1) mmd_statistic(P,Q): 두 분포의 MMD.
## [ ] 3) sup_mmd: 전체 vs 각 subgroup 분포 MMD의 최댓값.


def _pairwise_rbf_sum(x, sigma):
    """
    ∑_{i<j} k(x_i,x_j) with RBF. (self-term 제외)
    E[k(x,x')] 계산하는애
    """
    x = x.reshape(-1, 1)
    # (x_i - x_j)^2 = (x^2) + (x^2)^T - 2 x x^T
    x2 = (x ** 2).sum(axis=1, keepdims=True)
    d2 = x2 + x2.T - 2.0 * (x @ x.T)
    K = np.exp(-d2 / (2.0 * sigma * sigma))
    # 대각을 0으로
    np.fill_diagonal(K, 0.0)
    # 상삼각 합
    return float(K.sum() / 2.0)

def _mmd2_unbiased(x, y, sigma):
    """
    Unbiased MMD^2 for RBF kernel (Gretton et al.).
    MMD^2 = E[k(x,x')] + E[k(y,y')] - 2E[k(x,y)]
    """
    x = np.asarray(x).reshape(-1)
    y = np.asarray(y).reshape(-1)
    nx, ny = x.size, y.size
    if nx < 2 or ny < 2:
        return np.nan
    k_xx = _pairwise_rbf_sum(x, sigma)
    k_yy = _pairwise_rbf_sum(y, sigma)
    # 교항: 모든 (i in x, j in y)
    x_ = x.reshape(-1, 1)
    y_ = y.reshape(1, -1)
    d2 = (x_ ** 2) + (y_ ** 2) - 2.0 * (x_ @ y_)
    k_xy = np.exp(-d2 / (2.0 * sigma * sigma)).sum()
    mmd2 = (2.0 / (nx * (nx - 1))) * k_xx \
         + (2.0 / (ny * (ny - 1))) * k_yy \
         - (2.0 / (nx * ny)) * k_xy
    return float(max(mmd2, 0.0))

def _median_heuristic_sigma(z):
    """
    median heuristic for 1D scores.
    """
    z = np.asarray(z).reshape(-1)
    if z.size < 2:
        return 1.0
    zz = np.sort(z)
    d = np.abs(zz[1:] - zz[:-1])
    med = np.median(d)
    return float(max(med, 1e-3))

## [ ] 1) mmd_statistic(P,Q): 두 분포의 MMD.
def sup_mmd_dfcols(proba, S_df, sigma=None, min_support=5):
    """
    각 S_df[col]을 그룹으로 보고 sup MMD(그룹 vs 보완) 계산.
    """
    p = np.asarray(proba).reshape(-1)
    n = p.size
    if sigma is None:
        sigma = _median_heuristic_sigma(p)
    best = 0.0
    for c in S_df.columns:
        m = S_df[c].values.astype(int) == 1
        ns, nsc = int(m.sum()), int((~m).sum())
        if ns < min_support or nsc < min_support:
            continue
        v = _mmd2_unbiased(p[m], p[~m], sigma) ** 0.5  # MMD (not squared)
        if v > best:
            best = v
    return float(best)

## [ ] 3) sup_mmd: 전체 vs 각 subgroup 분포 MMD의 최댓값.
def sup_mmd_over_V(proba, V, min_support=5, sigma=None):
    """
    𝒱(list of groups) 위에서 sup MMD(그룹 vs 보완) 계산.
    V 원소: bool mask (n,) 또는 정수 인덱스 배열.
    """
    p = np.asarray(proba).reshape(-1)
    n = p.size
    masks = _as_mask_list_from_V(V, n)
    if sigma is None:
        sigma = _median_heuristic_sigma(p)
    best = 0.0
    for m in masks:
        ns, nsc = int(m.sum()), int((~m).sum())  # ns = ss에 들어간 샘플 수, nsc = ss complement에 들어간 샘플 수
        if ns < min_support or nsc < min_support:
            continue
        v = _mmd2_unbiased(p[m], p[~m], sigma) ** 0.5
        if v > best:
            best = v
    return float(best)



## [ ] 2) wasserstein_statistic(P,Q): 두 분포의 WD.
## [ ] 4) sup_wd: 전체 vs 각 subgroup 분포 WD의 최댓값.


def _cdf_1d(values):
    """정렬·중복 포함한 값과 CDF 값 쌍 반환. """
    v = np.sort(values.reshape(-1))
    # 고유 지점에서의 CDF (계단함수 좌연속)
    uniq, counts = np.unique(v, return_counts=True)
    cdf = np.cumsum(counts) / float(v.size)
    return uniq, cdf

def _w1_1d(x, y):
    """
    Wasserstein-1 (1D) for empirical distributions.
    수치 적분: CDF 차이의 L1 적분.
    W1(P,Q) = ∫ |F_P(t) - F_Q(t)| dt
    W1(\hat{P}_n, \hat{Q}_m) = \sum |F_P(t_i) - F_Q(t_i)| * (t_{i+1}-t_i)
    (t_i: 적분을 summation으로 근사할때 구간 잘리는 샘플들, 원래 smoothing cdf인데 empirical cdf라 계단식)
    """
    x = np.asarray(x).reshape(-1)
    y = np.asarray(y).reshape(-1)
    if x.size == 0 or y.size == 0:
        return np.nan
    ux, Fx = _cdf_1d(x)
    uy, Fy = _cdf_1d(y)
    # 공통 격자에서 CDF 보간(계단함수)
    grid = np.unique(np.concatenate([ux, uy]))
    # Fx(grid), Fy(grid)
    Fx_on = np.interp(grid, ux, Fx, left=0.0, right=1.0)
    Fy_on = np.interp(grid, uy, Fy, left=0.0, right=1.0)
    # 적분: sum |ΔF| * Δx
    dx = np.diff(grid)
    dF = np.abs(Fx_on[:-1] - Fy_on[:-1])
    return float(np.sum(dF * dx))


## [ ] 2) wasserstein_statistic(P,Q): 두 분포의 WD.
def sup_w1_dfcols(proba, S_df, min_support=5):
    """
    각 S_df[col]을 그룹으로 보고 sup W1(그룹 vs 보완) 계산.
    """
    p = np.asarray(proba).reshape(-1)
    best = 0.0
    for c in S_df.columns:
        m = S_df[c].values.astype(int) == 1
        ns, nsc = int(m.sum()), int((~m).sum())
        if ns < min_support or nsc < min_support:
            continue
        v = _w1_1d(p[m], p[~m])
        if v == v and v > best:
            best = v
    return float(best)

## [ ] 4) sup_wd: 전체 vs 각 subgroup 분포 WD의 최댓값.
def sup_w1_over_V(proba, V, min_support=5):
    """
    𝒱(list of groups) 위에서 sup W1(그룹 vs 보완) 계산.
    """
    p = np.asarray(proba).reshape(-1)
    n = p.size
    masks = _as_mask_list_from_V(V, n)
    best = 0.0
    for m in masks:
        ns, nsc = int(m.sum()), int((~m).sum()) # ns = ss에 들어간 샘플 수, nsc = ss complement에 들어간 샘플 수
        if ns < min_support or nsc < min_support:
            continue
        v = _w1_1d(p[m], p[~m])
        if v == v and v > best:
            best = v
    return float(best)



## [v] 5,6) worst/mean subgroup_spd: SPD.

def worst_group_spd(proba, S_or_list):
    """max_g |Pr(h=1|g) - Pr(h=1)|; h는 proba>=0.5 대체 (측정 시에만)"""
    arr = (proba>=0.5).astype(int)
    overall = _rate(arr)
    worst = 0.0
    if isinstance(S_or_list, list):
        keys = list(S_or_list[0].keys())
        m_all = {k: np.array([int(s[k]) for s in S_or_list]) for k in keys}
        for k,m in m_all.items():
            a = arr[m==1]; 
            if len(a)>5: worst = max(worst, abs(_rate(a)-overall))
        return float(worst)
    else:
        for c in S_or_list.columns:
            m = S_or_list[c].values.astype(int)
            a = arr[m==1]
            if len(a)>5: worst = max(worst, abs(_rate(a)-overall))
        return float(worst)


def mean_group_spd(proba, S_or_list, thr=0.5, min_support=5):
    """
    평균 SPD gap: mean_g |Pr(h=1|g) - Pr(h=1)|  (측정시에만 h = 1(proba>=thr))
    - tabular: S_or_list = DataFrame (0/1 컬럼)
    - image:   S_or_list = list[dict] (각 dict의 키가 민감 속성)
    """
    arr = (proba.reshape(-1) >= thr).astype(int)
    overall = float(arr.mean()) if len(arr) > 0 else np.nan
    diffs = []
    if isinstance(S_or_list, list):  # image 스타일
        keys = list(S_or_list[0].keys())
        m_all = {k: np.array([int(s[k]) for s in S_or_list]) for k in keys}
        for k, m in m_all.items():
            a = arr[m == 1]
            if a.size >= min_support:
                diffs.append(abs(float(a.mean()) - overall))
    else:  # DataFrame
        for c in S_or_list.columns:
            m = S_or_list[c].values.astype(int)
            a = arr[m == 1]
            if a.size >= min_support:
                diffs.append(abs(float(a.mean()) - overall))
    return float(np.mean(diffs)) if len(diffs) > 0 else 0.0



## [ ] 7,8) worst/mean weighted_subgroup_spd: 그룹 빈도로 가중 평균된 SPD.


def worst_weighted_group_spd(proba, S_or_list, thr=0.5, min_support=5):
    """
    가중 최악 SPD: max_g  w_g * |Pr(h=1|g) - Pr(h=1)|
    w_g = |g| / n   (그룹 빈도)
    """
    arr = (np.asarray(proba).reshape(-1) >= float(thr)).astype(int)
    n = arr.size
    if n == 0:
        return np.nan
    overall = float(arr.mean())
    best = 0.0
    if isinstance(S_or_list, list):
        keys = list(S_or_list[0].keys())
        m_all = {k: np.array([int(s[k]) for s in S_or_list]) for k in keys}
        for k, m in m_all.items():
            m = (m == 1)
            ns = int(m.sum())
            if ns < min_support:
                continue
            w = ns / float(n)
            gap = abs(float(arr[m].mean()) - overall)
            best = max(best, w * gap)
    else:
        for c in S_or_list.columns:
            m = (S_or_list[c].values.astype(int) == 1)
            ns = int(m.sum())
            if ns < min_support:
                continue
            w = ns / float(n)
            gap = abs(float(arr[m].mean()) - overall)
            best = max(best, w * gap)
    return float(best)


def mean_weighted_group_spd(proba, S_or_list, thr=0.5, min_support=5):
    """
    가중 평균 SPD: (∑_g w_g * |Pr(h=1|g) - Pr(h=1)|) / (∑_g w_g)
    w_g = |g| / n   (그룹 빈도)
    """
    arr = (np.asarray(proba).reshape(-1) >= float(thr)).astype(int)
    n = arr.size
    if n == 0:
        return np.nan
    overall = float(arr.mean())
    num = 0.0
    den = 0.0
    if isinstance(S_or_list, list):  # image 스타일
        keys = list(S_or_list[0].keys())
        m_all = {k: np.array([int(s[k]) for s in S_or_list]) for k in keys}
        for k, m in m_all.items():
            m = (m == 1)
            ns = int(m.sum())
            if ns < min_support:
                continue
            w = ns / float(n)
            num += w * abs(float(arr[m].mean()) - overall)
            den += w
    else:  # DataFrame
        for c in S_or_list.columns:
            m = (S_or_list[c].values.astype(int) == 1)
            ns = int(m.sum())
            if ns < min_support:
                continue
            w = ns / float(n)
            num += w * abs(float(arr[m].mean()) - overall)
            den += w
    if den == 0.0:
        return 0.0
    return float(num / den)



## [v] 9,10) worst/mean subgroup_subset_spd / dp_sup_V,dp_mean_V : SPD.

def worst_spd_over_V(proba, V, thr=0.5, min_support=5):
    """
    𝒱 위에서 DP sup: max_G |Pr(h=1|G) - Pr(h=1)|.
    """
    arr = (np.asarray(proba).reshape(-1) >= float(thr)).astype(int)
    n = arr.size
    overall = float(arr.mean()) if n > 0 else np.nan
    masks = _as_mask_list_from_V(V, n)
    best = 0.0
    for m in masks:
        ns = int(m.sum())
        if ns < min_support:
            continue
        gap = abs(float(arr[m].mean()) - overall)
        if gap > best:
            best = gap
    return float(best)

def mean_spd_over_V(proba, V, thr=0.5, min_support=5):
    """
    𝒱 위에서 DP 평균 격차.
    """
    arr = (np.asarray(proba).reshape(-1) >= float(thr)).astype(int)
    n = arr.size
    overall = float(arr.mean()) if n > 0 else np.nan
    masks = _as_mask_list_from_V(V, n)
    diffs = []
    for m in masks:
        ns = int(m.sum())
        if ns < min_support:
            continue
        diffs.append(abs(float(arr[m].mean()) - overall))
    return float(np.mean(diffs)) if len(diffs) > 0 else 0.0





## [v] 11,12) worst/mean weighted_subgroup_subset_spd: 그룹 빈도로 가중 평균된 SPD

def worst_weighted_spd_over_V(proba, V, thr=0.5, min_support=5):
    """
    가중 최악 SPD over 𝒱: max_G  w_G * |Pr(h=1|G) - Pr(h=1)|
    w_G = |G| / n
    """
    arr = (np.asarray(proba).reshape(-1) >= float(thr)).astype(int)
    n = arr.size
    if n == 0:
        return np.nan
    overall = float(arr.mean())
    masks = _as_mask_list_from_V(V, n)
    best = 0.0
    for m in masks:
        ns = int(m.sum())
        if ns < min_support:
            continue
        w = ns / float(n)
        gap = abs(float(arr[m].mean()) - overall)
        best = max(best, w * gap)
    return float(best)


def mean_weighted_spd_over_V(proba, V, thr=0.5, min_support=5):
    """
    가중 평균 SPD over 𝒱: (∑_G w_G * |Pr(h=1|G) - Pr(h=1)|) / (∑_G w_G)
    w_G = |G| / n
    """
    arr = (np.asarray(proba).reshape(-1) >= float(thr)).astype(int)
    n = arr.size
    if n == 0:
        return np.nan
    overall = float(arr.mean())
    masks = _as_mask_list_from_V(V, n)
    num = 0.0
    den = 0.0
    for m in masks:
        ns = int(m.sum())
        if ns < min_support:
            continue
        w = ns / float(n)
        num += w * abs(float(arr[m].mean()) - overall)
        den += w
    if den == 0.0:
        return 0.0
    return float(num / den)



In [38]:
# === 간단 토이 ===
import numpy as np, pandas as pd
from scipy.special import expit as sigmoid

# 필요 지표 임포트 (네 파일 기준; 네가 추가한 함수명과 맞춰 사용)
# from fairbench.metrics.subgroup_measures import (
#     worst_group_spd, mean_group_spd,
#     dp_sup_over_V, dp_mean_over_V, dp_weighted_sum,
#     sup_mmd_dfcols, sup_w1_dfcols,
#     sup_mmd_over_V, sup_w1_over_V
# )

# 1) 데이터 생성 (n 작게, q=3)
rng = np.random.default_rng(0)
n, q = 600, 3
S = rng.integers(0, 2, size=(n, q))  # 민감속성 (a,b,c)
a, b, c = S[:,0], S[:,1], S[:,2]

# 점수 만들기: 베이스 + 특정 서브그룹 바이어스 (a=1 & b=0 & c=1에 불리한 편향)
z = rng.normal(size=n) * 0.5
bias = (-1.5) * ((a==1) & (b==0) & (c==1)).astype(float)
z = z + 0.3*a - 0.2*b + 0.1*c + bias
proba = sigmoid(z)

# 라벨 샘플 + 이진 예측
y = (rng.uniform(size=n) < proba).astype(int)
yhat = (proba >= 0.5).astype(int)

# 2) 싱글톤 그룹(DataFrame) — 각 속성의 1인 쪽만 간단히 사용
S_df = pd.DataFrame({
    "a_1": (a==1).astype(int),
    "b_1": (b==1).astype(int),
    "c_1": (c==1).astype(int),
})

# 3) 𝒱(서브큐브 몇 개만) — bool mask 리스트
V = []
V.append((a==1))                 # a=1
V.append((b==1))                 # b=1
V.append((c==1))                 # c=1
V.append((a==1) & (b==0))        # a=1 & b=0
V.append((a==1) & (c==1))        # a=1 & c=1
V.append((a==1) & (b==0) & (c==1))  # 문제의 바이어스 그룹

# 4) 지표 계산
acc = float((yhat == y).mean())

# (A) supIPM — 싱글톤 기준
sup_mmd_single = sup_mmd_dfcols(proba, S_df, sigma=None, min_support=5)
sup_w1_single  = sup_w1_dfcols(proba, S_df, min_support=5)

# (B) supIPM — 𝒱 기준
sup_mmd_V = sup_mmd_over_V(proba, V, min_support=5, sigma=None)
sup_w1_V  = sup_w1_over_V(proba, V, min_support=5)

# (C) DP(통계적 패리티) — 싱글톤과 𝒱 버전
spd_worst_single = worst_group_spd(proba, S_df)
spd_mean_single  = mean_group_spd(proba, S_df)
dp_sup_V  = worst_spd_over_V(proba, V, thr=0.5, min_support=5)    # = worst over V
dp_mean_V = mean_spd_over_V(proba, V, thr=0.5, min_support=5)  # = mean over V

# (D) 그룹 빈도 가중 SPD — 싱글톤 & 𝒱
spd_weighted_single_worst = worst_weighted_group_spd(proba, S_df, thr=0.5, min_support=5)
spd_weighted_single_mean  = mean_weighted_group_spd(proba, S_df,  thr=0.5, min_support=5)

spd_weighted_V_worst = worst_weighted_spd_over_V(proba, V, thr=0.5, min_support=5)
spd_weighted_V_mean  = mean_weighted_spd_over_V(proba, V,  thr=0.5, min_support=5)

# # (E) 기타: SPD worst/mean (싱글톤), FPR worst/mean (싱글톤)
# df_avg = mean_group_spd(proba, S_df)
# df_worst = worst_group_spd(proba, S_df)


# 5) 출력(짧게)
print(f"Acc: {acc:.3f}")
print(f"1supMMD(singleton): {sup_mmd_single:.4f} | 2supW1(singleton): {sup_w1_single:.4f}")
print(f"3supMMD(V): {sup_mmd_V:.4f}         | 4supW1(V): {sup_w1_V:.4f}")

print(f"5SPD_worst(singleton): {spd_worst_single:.4f} | 6SPD_mean(singleton): {spd_mean_single:.4f}")
print(f"7DP_weighted(avg): {spd_weighted_single_worst:.4f} | 8DP_weighted(sum): {spd_weighted_single_mean:.4f}")

print(f"9DP_sup(V): {dp_sup_V:.4f} | 10DP_mean(V): {dp_mean_V:.4f}")
print(f"11DP_weighted(avg): {spd_weighted_V_worst:.4f} | 12DP_weighted(sum): {spd_weighted_V_mean:.4f}")



Acc: 0.633
1supMMD(singleton): 0.0367 | 2supW1(singleton): 0.0456
3supMMD(V): 0.0906         | 4supW1(V): 0.2531
5SPD_worst(singleton): 0.0424 | 6SPD_mean(singleton): 0.0257
7DP_weighted(avg): 0.0215 | 8DP_weighted(sum): 0.0255
9DP_sup(V): 0.4694 | 10DP_mean(V): 0.1254
11DP_weighted(avg): 0.0501 | 12DP_weighted(sum): 0.0655


In [35]:
sigma = None
min_support = 5
p = np.asarray(proba).reshape(-1)
n = p.size
masks = _as_mask_list_from_V(V, n)
if sigma is None:
    sigma = _median_heuristic_sigma(p)
best = 0.0
for m in masks:
    ns, nsc = int(m.sum()), int((~m).sum())
    if ns < min_support or nsc < min_support:
        continue
    v = _mmd2_unbiased(p[m], p[~m], sigma) ** 0.5
    if v > best:
        best = v

In [None]:
def _as_mask_list_from_V(V, n):
    """
    V: list of group specs. 각 원소는
       - bool mask (shape (n,))
       - 정수 인덱스 배열
    return: list[np.ndarray(bool, shape (n,))]
    """
    masks = []
    for g in V:
        if isinstance(g, np.ndarray) and g.dtype == bool:
            m = g
        else:
            idx = np.asarray(g, dtype=int).reshape(-1)
            m = np.zeros(n, dtype=bool)
            m[idx] = True
        masks.append(m)
    return masks

In [27]:
v

0.09055801069486036

In [23]:
masks

[array([ True, False, False,  True,  True,  True,  True,  True, False,
        False,  True,  True, False, False, False,  True, False, False,
         True,  True,  True, False,  True,  True, False,  True,  True,
         True,  True, False, False, False, False,  True, False,  True,
         True, False,  True, False,  True, False,  True, False,  True,
         True,  True,  True, False,  True, False,  True,  True, False,
        False,  True, False,  True,  True, False,  True,  True, False,
         True, False,  True, False, False,  True,  True, False, False,
         True,  True,  True,  True, False,  True, False,  True,  True,
        False, False, False, False, False,  True,  True,  True,  True,
        False,  True,  True, False, False,  True,  True,  True,  True,
        False, False, False, False, False,  True, False, False, False,
        False,  True, False, False,  True,  True,  True, False, False,
         True,  True,  True,  True, False, False,  True,  True,  True,
      

In [None]:
x,y,sigma = p[m], p[~m], sigma
"""
Unbiased MMD^2 for RBF kernel (Gretton et al.).
"""
x = np.asarray(x).reshape(-1)
y = np.asarray(y).reshape(-1)
nx, ny = x.size, y.size
if nx < 2 or ny < 2:
   print(np.nan)
k_xx = _pairwise_rbf_sum(x, sigma)
k_yy = _pairwise_rbf_sum(y, sigma)
# 교항: 모든 (i in x, j in y)
x_ = x.reshape(-1, 1)
y_ = y.reshape(1, -1)
d2 = (x_ ** 2) + (y_ ** 2) - 2.0 * (x_ @ y_)
k_xy = np.exp(-d2 / (2.0 * sigma * sigma)).sum()
mmd2 = (2.0 / (nx * (nx - 1))) * k_xx \
        + (2.0 / (ny * (ny - 1))) * k_yy \
        - (2.0 / (nx * ny)) * k_xy
float(max(mmd2, 0.0))

0.008200753301010444