In [3]:
# moduleA_define_subjective_targets.ipynb
"""
Module A：主観ターゲット（情動軸・カテゴリ）の確定【再現性最優先・運用手順書どおり】

出力（主要）:
- master_sound_level_with_PC.csv（48音：PC1-3 + emo_* + valence classes + ambiguous flag）
- master_participant_sound_level_with_targets.csv（被験者×音：sound-levelターゲットを付与 + 任意で個人ターゲット）
- figures/ : 図A1〜A4
- tables/  : 表A1〜A12、manifest、metadata_moduleA.json

注意:
- PCAの符号は proxy 相関で固定（PC1>=0 with proxy_arousal, PC2>=0 with proxy_approach）
- valenceはPCAに無理に割当てず proxy_valence をz化して emo_valence とする
- 3分類は (1) 既存カテゴリ列があれば優先、なければ (2) emo_valence の三分位で生成（閾値固定）
"""

from __future__ import annotations

from pathlib import Path
from datetime import datetime
import json
import shutil

import numpy as np
import pandas as pd

import matplotlib as mpl
import matplotlib.pyplot as plt
import seaborn as sns

from sklearn.preprocessing import StandardScaler
from sklearn.decomposition import PCA


# =========================================================
# 0) ユーザー設定
# =========================================================
SEED = 42
RNG = np.random.default_rng(SEED)

ROOT_DIR = Path("/Users/shunsuke/EEG_48sounds")  # ★環境に合わせる
MASTER_TABLE_DIR = ROOT_DIR / "derivatives" / "master_tables"

IN_SOUND = MASTER_TABLE_DIR / "master_sound_level.csv"
IN_PSUB  = MASTER_TABLE_DIR / "master_participant_sound_level.csv"

# ===== 出力は EEG_48sounds 直下の moduleA_outputs に統一 =====
OUT_DIR  = ROOT_DIR / "moduleA_outputs"
FIG_DIR  = OUT_DIR / "figures"
TAB_DIR  = OUT_DIR / "tables"
LOG_DIR  = OUT_DIR / "logs"

FIG_DIR.mkdir(parents=True, exist_ok=True)
TAB_DIR.mkdir(parents=True, exist_ok=True)
LOG_DIR.mkdir(parents=True, exist_ok=True)

print("[OUT_DIR ]", OUT_DIR)
print("[FIG_DIR ]", FIG_DIR)
print("[TAB_DIR ]", TAB_DIR)
print("[LOG_DIR ]", LOG_DIR)


FIG_DPI = 300
N_COMPONENTS = 3
N_PERM = 500  # proxy相関・類似度のPermutation p

EXPECTED_N_PARTICIPANTS = 12  # ★必要に応じて変更

# 日本語フォント（Mac想定）
mpl.rcParams["font.family"] = "Hiragino Sans"
mpl.rcParams["axes.unicode_minus"] = False


# =========================================================
# 0.1) 便利関数
# =========================================================
FIG_MANIFEST: list[dict] = []
TAB_MANIFEST: list[dict] = []

def add_fig(fig_id: str, filename: str, title: str, what: str, how_to_read: str, key_result: str):
    FIG_MANIFEST.append({
        "figure_id": fig_id,
        "filename": filename,
        "title": title,
        "what": what,
        "how_to_read": how_to_read,
        "key_result": key_result,
    })

def add_tab(tab_id: str, filename: str, title: str, what: str, columns: str):
    TAB_MANIFEST.append({
        "table_id": tab_id,
        "filename": filename,
        "title": title,
        "what": what,
        "columns": columns,
    })

def savefig(path: Path):
    plt.tight_layout()
    plt.savefig(path, dpi=FIG_DPI, bbox_inches="tight")
    plt.close()

def find_first_column(df: pd.DataFrame, include_keywords: list[str], prefer_keywords: list[str] | None = None) -> str | None:
    """
    include_keywords をすべて含む列を探す。prefer_keywords を含む列を優先。
    """
    include_keywords = [k.lower() for k in include_keywords]
    prefer_keywords = [k.lower() for k in (prefer_keywords or [])]

    candidates = []
    for col in df.columns:
        low = str(col).lower()
        if all(k in low for k in include_keywords):
            candidates.append(col)

    if not candidates:
        return None

    if prefer_keywords:
        def key(c):
            low = str(c).lower()
            score = 0
            for pk in prefer_keywords:
                if pk in low:
                    score -= 1
            return score, len(str(c))
        candidates = sorted(candidates, key=key)
    else:
        candidates = sorted(candidates, key=lambda x: len(str(x)))

    return candidates[0]

def perm_pvalue_corr(x: np.ndarray, y: np.ndarray, n_perm: int = 500, seed: int = 42) -> tuple[float, float, int]:
    """
    相関rのPermutation p値（両側）。SciPy不要。
    """
    rng = np.random.default_rng(seed)
    x = np.asarray(x, dtype=float)
    y = np.asarray(y, dtype=float)
    m = np.isfinite(x) & np.isfinite(y)
    x = x[m]; y = y[m]
    n = len(x)
    if n < 5:
        return np.nan, np.nan, n
    r_obs = np.corrcoef(x, y)[0, 1]
    cnt = 0
    for _ in range(n_perm):
        yp = rng.permutation(y)
        r = np.corrcoef(x, yp)[0, 1]
        if abs(r) >= abs(r_obs):
            cnt += 1
    p = (cnt + 1) / (n_perm + 1)
    return float(r_obs), float(p), int(n)

def zscore(s: pd.Series) -> pd.Series:
    return (s - s.mean()) / (s.std(ddof=0) + 1e-12)

def safe_mean(df: pd.DataFrame, cols: list[str]) -> pd.Series:
    cols = [c for c in cols if c in df.columns]
    if len(cols) == 0:
        return pd.Series(np.nan, index=df.index)
    return df[cols].mean(axis=1)

def sign_fix(pc_scores: np.ndarray, loading_vec: np.ndarray, proxy: np.ndarray) -> tuple[np.ndarray, np.ndarray, float]:
    """
    corr(PC, proxy) >= 0 となるよう符号を固定。
    """
    r, _, _ = perm_pvalue_corr(pc_scores, proxy, n_perm=200, seed=SEED)
    if np.isfinite(r) and r < 0:
        return -pc_scores, -loading_vec, -r
    return pc_scores, loading_vec, r


# =========================================================
# 1) 入力読み込み
# =========================================================
if not IN_SOUND.exists():
    raise FileNotFoundError(f"master_sound_level.csv が見つかりません: {IN_SOUND}")

df_sound = pd.read_csv(IN_SOUND)
print("[load] sound-level:", df_sound.shape, "from", IN_SOUND)

df_psub = None
if IN_PSUB.exists():
    df_psub = pd.read_csv(IN_PSUB)
    print("[load] participant-level:", df_psub.shape, "from", IN_PSUB)
else:
    print("[warn] participant-level が見つからないため、個人レベルの出力はスキップします。")


# =========================================================
# 2) 列マッピング（主観8項目）
# =========================================================
# 音レベル（*_mean）
SUBJECTIVE_SOUND_PATTERNS = {
    "驚き": ["驚き", "mean"],
    "緊急感": ["緊急感", "mean"],
    "脅威感": ["脅威感", "mean"],
    "圧倒感": ["圧倒", "mean"],
    "接近": ["接近", "mean"],
    "興味": ["興味", "mean"],
    "没入": ["没入", "mean"],
    "退屈": ["退屈", "mean"],
}

sound_cols = []
labels = []
for lab, keys in SUBJECTIVE_SOUND_PATTERNS.items():
    c = find_first_column(df_sound, keys, prefer_keywords=["_mean", "mean"])
    if c is None:
        raise ValueError(f"[fatal] 音レベル主観列が見つかりません: {lab} (keywords={keys})")
    sound_cols.append(c)
    labels.append(lab)

print("\n[subjective sound cols]")
for lab, c in zip(labels, sound_cols):
    print(f"  {lab}: {c}")

# 個人レベル（*_meanなし想定）：存在すれば検出
psub_cols = None
pid_col = None
sound_id_col = None

if df_psub is not None:
    pid_col = next((c for c in df_psub.columns if any(k in str(c).lower() for k in ["participant", "subject", "sub_id", "sid", "pid"])), None)
    sound_id_col = next((c for c in df_psub.columns if any(k in str(c).lower() for k in ["sound_id", "stim_id", "stimulus_id", "sound", "number"])), None)
    if pid_col is None or sound_id_col is None:
        raise ValueError("[fatal] participant-level の participant列 or sound_id列 の自動検出に失敗しました。")

    tmp = []
    for lab in labels:
        c = find_first_column(df_psub, [lab], prefer_keywords=None)
        if c is None:
            # “接近したい気持ち”など別名救済
            if lab == "接近":
                c = find_first_column(df_psub, ["接近"], prefer_keywords=None)
        if c is None:
            raise ValueError(f"[fatal] participant-level 主観列が見つかりません: {lab}")
        tmp.append(c)
    psub_cols = tmp

    print("\n[subjective participant cols]")
    for lab, c in zip(labels, psub_cols):
        print(f"  {lab}: {c}")

# join key（音ID）
join_key = None
for k in ["number", "sound_id", "SoundID", "stim_id"]:
    if k in df_sound.columns:
        join_key = k
        break
if join_key is None:
    raise ValueError("[fatal] sound-level の音ID列（number/sound_id等）が見つかりません。")

print("\n[join_key sound-level]:", join_key)

# カテゴリ列（既存があれば）
category_col = None
for cand in ["カテゴリー", "カテゴリ", "category", "Category"]:
    if cand in df_sound.columns:
        category_col = cand
        break
if category_col is None:
    # 部分一致
    category_col = next((c for c in df_sound.columns if "カテゴリ" in str(c) or "category" in str(c).lower()), None)
print("[category_col]:", category_col)

# 曖昧フラグ列（既存優先）
ambig_col = "is_ambiguous_approach_sd_top10" if "is_ambiguous_approach_sd_top10" in df_sound.columns else None
if ambig_col is None:
    ambig_col = next((c for c in df_sound.columns if "曖昧" in str(c) or "ambig" in str(c).lower()), None)
print("[ambig_col]:", ambig_col)


# =========================================================
# 3) 入力QC（欠損・人数）
# =========================================================
qc_rows = []

# 音レベル欠損
na_sound = df_sound[sound_cols].isna().mean().to_dict()
for c, r in na_sound.items():
    qc_rows.append({"level": "sound", "unit": "column", "name": c, "na_ratio": r})

# 音ごとの欠損率
na_by_sound = df_sound[[join_key] + sound_cols].copy()
na_by_sound["na_ratio"] = na_by_sound[sound_cols].isna().mean(axis=1)
for _, row in na_by_sound.iterrows():
    qc_rows.append({"level": "sound", "unit": "sound", "name": int(row[join_key]), "na_ratio": float(row["na_ratio"])})

# participant-level: 人数一致
if df_psub is not None:
    n_subj = df_psub[pid_col].nunique()
    n_by_sound = df_psub.groupby(sound_id_col)[pid_col].nunique()
    qc_rows.append({"level": "participant", "unit": "n_participants", "name": "unique_participants", "na_ratio": np.nan, "value": int(n_subj)})
    qc_rows.append({"level": "participant", "unit": "n_by_sound_min", "name": "min", "na_ratio": np.nan, "value": int(n_by_sound.min())})
    qc_rows.append({"level": "participant", "unit": "n_by_sound_max", "name": "max", "na_ratio": np.nan, "value": int(n_by_sound.max())})

    bad = n_by_sound[n_by_sound != EXPECTED_N_PARTICIPANTS]
    if len(bad) > 0:
        for sid, v in bad.items():
            qc_rows.append({"level": "participant", "unit": "bad_sound", "name": int(sid), "na_ratio": np.nan, "value": int(v)})

qc_df = pd.DataFrame(qc_rows)
qc_path = TAB_DIR / "A_table_input_qc.csv"
qc_df.to_csv(qc_path, index=False, encoding="utf-8-sig")
add_tab("A0", qc_path.name, "入力QC（欠損率・人数整合）", "解析前提の健全性チェックログ。", "level, unit, name, na_ratio, value")


# =========================================================
# 4) 主観相関（図A1 + 表A1）
# =========================================================
corr_subj = df_sound[sound_cols].corr()

plt.figure(figsize=(8, 6))
sns.heatmap(
    corr_subj, annot=True, fmt=".2f", square=True,
    vmin=-1, vmax=1, center=0,
    cbar_kws={"label": "Pearson r"},
    xticklabels=labels, yticklabels=labels
)
plt.title("図A1 主観評価項目の相関行列（音レベル, n=48）")
fig_path = FIG_DIR / "phaseA_subjective_corr_heatmap.png"
savefig(fig_path)

corr_path = TAB_DIR / "phaseA_subjective_corr_matrix.csv"
corr_subj.to_csv(corr_path, encoding="utf-8-sig")

add_fig("A1", fig_path.name, "主観評価項目の相関行列（音レベル）",
        "48音の音平均主観8項目の相関（r）。",
        "強度系のクラスター・退屈の逆相関・接近/興味/没入のまとまりを見る。",
        "強度系（驚き/緊急/脅威/圧倒）が同方向、退屈が反対側に位置しやすい。")
add_tab("A1", corr_path.name, "主観評価項目の相関行列（音レベル）",
        "図A1の元データ（相関係数）。", "8×8（主観項目）")


# =========================================================
# 5) PCA（音レベル） + 寄与率/ローディング保存
# =========================================================
X_sound = df_sound[sound_cols].copy()
# 原則：欠損がある音は除外（ここでは厳密にdrop）
mask_complete = X_sound.notna().all(axis=1)
excluded_sounds = df_sound.loc[~mask_complete, join_key].tolist()
df_sound_ok = df_sound.loc[mask_complete].copy()
X_sound_ok = X_sound.loc[mask_complete].copy()

if len(df_sound_ok) < 10:
    raise RuntimeError("[fatal] 欠損除外後の音数が少なすぎます。入力QCを修正してください。")

scaler = StandardScaler()
Xz = scaler.fit_transform(X_sound_ok.values)

pca = PCA(n_components=N_COMPONENTS, random_state=SEED)
S = pca.fit_transform(Xz)
explained = pca.explained_variance_ratio_

# loadings（features×PC）
loadings = pd.DataFrame(
    pca.components_.T,
    index=sound_cols,
    columns=[f"PC{i}" for i in range(1, N_COMPONENTS + 1)]
)
loadings["label"] = labels

# PCスコア付与（欠損除外のdf_sound_okに対して）
for i in range(N_COMPONENTS):
    df_sound_ok[f"PC{i+1}_emotion"] = S[:, i]

# 寄与率
var_table = pd.DataFrame({
    "PC": [f"PC{i}" for i in range(1, N_COMPONENTS + 1)],
    "explained_variance_ratio": explained,
    "cumulative": np.cumsum(explained),
})
var_path = TAB_DIR / "phaseA_PCA_explained_variance.csv"
var_table.to_csv(var_path, index=False, encoding="utf-8-sig")
add_tab("A2", var_path.name, "主観PCA寄与率（音レベル）",
        "PC1〜PC3の寄与率と累積寄与率。", "PC, explained_variance_ratio, cumulative")


# =========================================================
# 6) proxy作成 + PC符号固定（最重要）
# =========================================================
# label -> col
lab2col = dict(zip(labels, sound_cols))

proxy_arousal  = safe_mean(df_sound_ok, [lab2col.get("緊急感"), lab2col.get("脅威感"), lab2col.get("驚き"), lab2col.get("圧倒感")])
proxy_approach = df_sound_ok[lab2col.get("接近")] - df_sound_ok[lab2col.get("退屈")]
proxy_valence  = (df_sound_ok[lab2col.get("興味")] + df_sound_ok[lab2col.get("没入")]) - df_sound_ok[lab2col.get("退屈")]

df_sound_ok["proxy_arousal"] = proxy_arousal
df_sound_ok["proxy_approach"] = proxy_approach
df_sound_ok["proxy_valence"] = proxy_valence

# PC1/PC2の符号固定
pc1, l1, r1 = sign_fix(df_sound_ok["PC1_emotion"].to_numpy(), loadings["PC1"].to_numpy(), proxy_arousal.to_numpy())
pc2, l2, r2 = sign_fix(df_sound_ok["PC2_emotion"].to_numpy(), loadings["PC2"].to_numpy(), proxy_approach.to_numpy())

df_sound_ok["PC1_emotion"] = pc1
df_sound_ok["PC2_emotion"] = pc2
loadings["PC1"] = l1
loadings["PC2"] = l2

print(f"[sign-fix] corr(PC1, proxy_arousal)={r1:.3f}  (>=0)")
print(f"[sign-fix] corr(PC2, proxy_approach)={r2:.3f}  (>=0)")

# proxy↔PC 相関（Permutation pつき）
rows = []
for pr in ["proxy_arousal", "proxy_valence", "proxy_approach"]:
    for pc in ["PC1_emotion", "PC2_emotion", "PC3_emotion"]:
        r, p, n = perm_pvalue_corr(df_sound_ok[pc].to_numpy(), df_sound_ok[pr].to_numpy(), n_perm=N_PERM, seed=SEED)
        rows.append({"proxy": pr, "pc": pc, "r": r, "p_perm": p, "n": n})
corr_perm = pd.DataFrame(rows)
corr_perm_path = TAB_DIR / "phaseA_proxy_vs_PC_corr_with_perm_p.csv"
corr_perm.to_csv(corr_perm_path, index=False, encoding="utf-8-sig")
add_tab("A4", corr_perm_path.name, "proxy↔PC相関（Permutation p付き）",
        "PC解釈の根拠（符号固定後）。", "proxy, pc, r, p_perm, n")

# ローディング保存（符号固定後）
load_path = TAB_DIR / "phaseA_PCA_loadings_subjective.csv"
loadings.to_csv(load_path, index=False, encoding="utf-8-sig")
add_tab("A3", load_path.name, "主観PCAローディング（音レベル）",
        "主観8項目→PC寄与。PC1/PC2は符号固定済み。", "PC1, PC2, PC3, label")


# =========================================================
# 7) 図A2（Biplot）・図A3（散布図）
# =========================================================
pc1_lab = f"PC1_emotion（覚醒/情動強度; {explained[0]*100:.1f}%）"
pc2_lab = f"PC2_emotion（接近/関与; {explained[1]*100:.1f}%）"

# カテゴリ（快/中間/不快）を色に
cats = None
if category_col is not None and category_col in df_sound_ok.columns:
    cats = df_sound_ok[category_col].dropna().unique().tolist()
    # よくある順序を優先
    pref = ["快", "中間", "不快"]
    cats_sorted = [c for c in pref if c in cats] + [c for c in cats if c not in pref]
    cats = cats_sorted
else:
    df_sound_ok["_category_tmp"] = "all"
    category_col = "_category_tmp"
    cats = ["all"]

palette = sns.color_palette("Set1", n_colors=len(cats))
cat2color = {c: palette[i] for i, c in enumerate(cats)}

# 曖昧フラグが無ければ後で生成する可能性があるので一旦仮
if ambig_col is None or ambig_col not in df_sound_ok.columns:
    df_sound_ok["_ambig_tmp"] = False
    ambig_col = "_ambig_tmp"

# 図A3：散布図
plt.figure(figsize=(8, 6))
for c in cats:
    sub_cat = df_sound_ok[df_sound_ok[category_col] == c]
    for is_ambig, marker in [(False, "o"), (True, "s")]:
        sub = sub_cat[sub_cat[ambig_col] == is_ambig]
        if sub.empty:
            continue
        plt.scatter(sub["PC1_emotion"], sub["PC2_emotion"],
                    s=60, alpha=0.85, marker=marker,
                    color=cat2color.get(c, "C0"),
                    edgecolor="k", linewidth=0.5,
                    label=f"{c}（{'曖昧' if is_ambig else '非曖昧'}）")
plt.axhline(0, color="gray", linestyle="--", linewidth=0.8)
plt.axvline(0, color="gray", linestyle="--", linewidth=0.8)
plt.xlabel(pc1_lab)
plt.ylabel(pc2_lab)
plt.title("図A3 48音の情動空間配置（PC1×PC2）")
plt.legend(bbox_to_anchor=(1.05, 1), loc="upper left")
fig_path = FIG_DIR / "phaseA_PC1_PC2_scatter.png"
savefig(fig_path)
add_fig("A3", fig_path.name, "48音の情動空間配置（PC1×PC2）",
        "各点=各音（音レベル）。色=カテゴリ、形=曖昧フラグ。",
        "PC1方向の勾配（強度）と、同カテゴリ内のPC2分散を見る。",
        "カテゴリ勾配＋カテゴリ内差（接近/関与）が残ることが多い。")

# 図A2：Biplot
plt.figure(figsize=(8, 6))
for c in cats:
    sub = df_sound_ok[df_sound_ok[category_col] == c]
    plt.scatter(sub["PC1_emotion"], sub["PC2_emotion"], s=40, alpha=0.45,
                color=cat2color.get(c, "C0"), label=str(c))

xspan = np.nanpercentile(np.abs(df_sound_ok["PC1_emotion"]), 95)
yspan = np.nanpercentile(np.abs(df_sound_ok["PC2_emotion"]), 95)
arrow_scale = 0.8 * min(xspan, yspan)

for col, lab in zip(sound_cols, labels):
    x = float(loadings.loc[loadings["label"] == lab, "PC1"].iloc[0]) * arrow_scale
    y = float(loadings.loc[loadings["label"] == lab, "PC2"].iloc[0]) * arrow_scale
    plt.arrow(0, 0, x, y, head_width=0.06, head_length=0.06,
              length_includes_head=True, color="black", alpha=0.85)
    plt.text(x * 1.12, y * 1.12, lab, fontsize=10, ha="center", va="center")

plt.axhline(0, color="gray", linestyle="--", linewidth=0.8)
plt.axvline(0, color="gray", linestyle="--", linewidth=0.8)
plt.xlabel(pc1_lab)
plt.ylabel(pc2_lab)
plt.title("図A2 情動空間Biplot（PC1×PC2＋主観ローディング）")
plt.legend(bbox_to_anchor=(1.05, 1), loc="upper left")
fig_path = FIG_DIR / "phaseA_biplot_PC1_PC2.png"
savefig(fig_path)
add_fig("A2", fig_path.name, "情動空間Biplot（PC1×PC2＋主観ローディング）",
        "点=各音、矢印=主観項目ローディング。",
        "矢印方向（増加方向）と点の位置関係からPCの意味を読む。",
        "PC1は強度系↔退屈、PC2は接近/関与の成分になりやすい。")


# =========================================================
# 8) 情動軸（emo_*）確定 + valence分類ターゲット確定
# =========================================================
df_sound_ok["emo_arousal"]  = df_sound_ok["PC1_emotion"]
df_sound_ok["emo_approach"] = df_sound_ok["PC2_emotion"]
df_sound_ok["emo_valence"]  = zscore(df_sound_ok["proxy_valence"])

# 分類ターゲット
valence_rule = {}
if category_col is not None and category_col in df_sound_ok.columns and set(df_sound_ok[category_col].unique()) >= set(["快", "不快"]):
    # 既存カテゴリを採用（存在する場合）
    df_sound_ok["valence_3class"] = df_sound_ok[category_col].astype(str)
    valence_rule = {"type": "existing_category_column", "column": category_col}
else:
    # 三分位で生成（再現性最強）
    q33 = float(df_sound_ok["emo_valence"].quantile(1/3))
    q67 = float(df_sound_ok["emo_valence"].quantile(2/3))
    df_sound_ok["valence_3class"] = np.where(
        df_sound_ok["emo_valence"] <= q33, "不快",
        np.where(df_sound_ok["emo_valence"] >= q67, "快", "中間")
    )
    valence_rule = {"type": "tertiles_of_emo_valence", "q33": q33, "q67": q67}

# 2値（任意）
df_sound_ok["valence_binary"] = (df_sound_ok["emo_valence"] >= 0.0).astype(int)

# 軸相関監査ログ
axis_cols = ["PC1_emotion", "PC2_emotion", "PC3_emotion", "emo_arousal", "emo_approach", "emo_valence"]
axis_corr = df_sound_ok[axis_cols].corr()
axis_path = TAB_DIR / "phaseA_axis_corr_check.csv"
axis_corr.to_csv(axis_path, encoding="utf-8-sig")
add_tab("A5", axis_path.name, "情動軸監査（PC↔emo_*相関）",
        "軸の対応関係の監査ログ。", "相関行列")


# =========================================================
# 9) 曖昧音フラグ（無ければ計算して付与）
# =========================================================
ambig_rule = {"source": "existing"} if "is_ambiguous_approach_sd_top10" in df_sound_ok.columns else None

if "is_ambiguous_approach_sd_top10" not in df_sound_ok.columns:
    if df_psub is not None and psub_cols is not None:
        # participant-levelから“接近”のSDが大きい音トップ10を曖昧扱い
        # まず接近列を得る
        approach_lab_idx = labels.index("接近")
        col_approach_psub = psub_cols[approach_lab_idx]
        sd_by_sound = df_psub.groupby(sound_id_col)[col_approach_psub].std(ddof=0)
        top10 = sd_by_sound.sort_values(ascending=False).head(10).index.tolist()

        df_sound_ok["is_ambiguous_approach_sd_top10"] = df_sound_ok[join_key].isin(top10)
        ambig_rule = {"source": "computed_from_participant_SD", "column": col_approach_psub, "topk": 10}
    else:
        df_sound_ok["is_ambiguous_approach_sd_top10"] = False
        ambig_rule = {"source": "not_available_set_false"}

# 曖昧列をメインとして統一
df_sound_ok["is_ambiguous"] = df_sound_ok["is_ambiguous_approach_sd_top10"].astype(bool)


# =========================================================
# 10) sound-level テーブル保存（with_PC）
# =========================================================
# 元df_soundに戻す：欠損除外音は残すならNaNで埋める、ここでは “解析対象48音のうち欠損除外音は落とす” 仕様
out_sound = df_sound_ok.copy()

OUT_SOUND_WITH_PC = MASTER_TABLE_DIR / "master_sound_level_with_PC.csv"
OUT_SOUND_MASTER  = MASTER_TABLE_DIR / "master_sound_level.csv"

# backup master_sound_level.csv
if OUT_SOUND_MASTER.exists():
    bak = MASTER_TABLE_DIR / f"master_sound_level__bak_{datetime.now():%Y%m%d_%H%M%S}.csv"
    shutil.copy2(OUT_SOUND_MASTER, bak)
    print("[backup]", bak)

out_sound.to_csv(OUT_SOUND_WITH_PC, index=False, encoding="utf-8-sig")
out_sound.to_csv(OUT_SOUND_MASTER,  index=False, encoding="utf-8-sig")
print("[save] sound-level with targets:", OUT_SOUND_WITH_PC)


# =========================================================
# 11) participant-level へのターゲット付与（EEG実務用）
# =========================================================
OUT_PSUB_WITH = MASTER_TABLE_DIR / "master_participant_sound_level_with_targets.csv"

psub_out_path = None
participant_targets_info = {"generated": False}

if df_psub is not None:
    # sound-levelターゲットをjoinして付与（分類/回帰の主ターゲットはこれで固定可能）
    merge_cols = [join_key, "emo_arousal", "emo_approach", "emo_valence",
                  "valence_3class", "valence_binary", "is_ambiguous"]
    sound_targets = out_sound[merge_cols].copy()

    # participant側の音ID列名を join_key に揃えるためのrename
    df_psub_work = df_psub.copy()
    if sound_id_col != join_key:
        # join_key が participant側に無ければrenameして合わせる
        if join_key not in df_psub_work.columns:
            df_psub_work = df_psub_work.rename(columns={sound_id_col: join_key})
        else:
            # 両方あるなら sound_id_col を使うのをやめる（join_key優先）
            pass

    df_psub_merged = df_psub_work.merge(sound_targets, on=join_key, how="left", validate="many_to_one")

    # 任意：個人の主観から “個人ターゲット” も作る（個人差分析で役立つ）
    # ここは “統合解析の主ターゲット” ではなく補助列として作る
    if psub_cols is not None:
        Xp = df_psub_merged[psub_cols].astype(float)

        # 欠損行は落とさず、列平均で最小限処理（補助列なのでOK）
        Xp = Xp.fillna(Xp.mean())

        # participant内z（評定癖を除去）
        Xp_wz = df_psub_merged.groupby(pid_col)[psub_cols].transform(
            lambda x: (x - x.mean()) / (x.std(ddof=0) + 1e-12)
        ).fillna(0.0)

        # 個人レベルPCA（再現性チェック用）
        pca_psub = PCA(n_components=N_COMPONENTS, random_state=SEED)
        pca_psub.fit(Xp_wz.values)

        load_psub = pd.DataFrame(
            pca_psub.components_.T,
            index=labels,
            columns=[f"PC{i}_psub" for i in range(1, N_COMPONENTS+1)]
        )
        load_psub_path = TAB_DIR / "phaseA_PCA_loadings_subjective_participant.csv"
        load_psub.to_csv(load_psub_path, encoding="utf-8-sig")
        add_tab("A10", load_psub_path.name, "主観PCAローディング（個人レベル）",
                "参加者内z後の個人レベルPCAローディング。", "PC1_psub, PC2_psub, PC3_psub")

        # 音レベル loadings と類似度（相関）評価
        load_sound_for_sim = loadings.set_index("label")[[f"PC{i}" for i in range(1, N_COMPONENTS+1)]].loc[labels]
        sim = pd.DataFrame(index=[f"PC{i}_sound" for i in range(1, N_COMPONENTS+1)],
                           columns=[f"PC{j}_psub" for j in range(1, N_COMPONENTS+1)],
                           dtype=float)
        for i in range(N_COMPONENTS):
            v1 = load_sound_for_sim.iloc[:, i].to_numpy()
            for j in range(N_COMPONENTS):
                v2 = load_psub.iloc[:, j].to_numpy()
                sim.iloc[i, j] = np.corrcoef(v1, v2)[0, 1]

        sim_path = TAB_DIR / "phaseA_PCA_loading_similarity_sound_vs_participant.csv"
        sim.to_csv(sim_path, encoding="utf-8-sig")
        add_tab("A11", sim_path.name, "ローディング類似度（音レベル vs 個人レベル）",
                "音レベルPCAと個人レベルPCAのローディング相関。", "PC*_sound × PC*_psub（r）")

        # 図A4
        plt.figure(figsize=(5, 4))
        sns.heatmap(sim.astype(float), annot=True, fmt=".2f", vmin=-1, vmax=1, center=0, square=True,
                    cbar_kws={"label": "Pearson r"})
        plt.title("図A4 ローディング類似度（音レベル vs 個人レベル）")
        fig_path = FIG_DIR / "phaseA_loading_similarity_sound_vs_participant.png"
        savefig(fig_path)
        add_fig("A4", fig_path.name, "ローディング類似度（音レベル vs 個人レベル）",
                "音平均の因子構造が個人レベルでも再現されるかのチェック。",
                "対角が高い（PC1↔PC1など）か・非対角が低いかを見る。",
                "構造が一致すれば、音レベル情動軸が個人レベルでも妥当と言える。")

        # 類似度Permutation p（表）
        perm_rows = []
        for i in range(N_COMPONENTS):
            v1 = load_sound_for_sim.iloc[:, i].to_numpy()
            for j in range(N_COMPONENTS):
                v2 = load_psub.iloc[:, j].to_numpy()
                r, p, n = perm_pvalue_corr(v1, v2, n_perm=N_PERM, seed=SEED)
                perm_rows.append({"sound_pc": f"PC{i+1}_sound", "psub_pc": f"PC{j+1}_psub", "r": r, "p_perm": p, "n": n})
        sim_p = pd.DataFrame(perm_rows)
        sim_p_path = TAB_DIR / "phaseA_loading_similarity_with_perm_p.csv"
        sim_p.to_csv(sim_p_path, index=False, encoding="utf-8-sig")
        add_tab("A12", sim_p_path.name, "ローディング類似度Permutation p",
                "ローディング相関の偶然性をPermutationで評価。", "sound_pc, psub_pc, r, p_perm, n")

        participant_targets_info = {
            "generated": True,
            "pid_col": pid_col,
            "sound_id_col_used": join_key,
            "psub_columns": dict(zip(labels, psub_cols)),
            "participant_level_PCA": "within-subject z then PCA (for structure check)",
        }

    # 保存
    df_psub_merged.to_csv(OUT_PSUB_WITH, index=False, encoding="utf-8-sig")
    psub_out_path = OUT_PSUB_WITH
    print("[save] participant-level with targets:", psub_out_path)


# =========================================================
# 12) manifest / metadata 出力
# =========================================================
fig_manifest_path = TAB_DIR / "moduleA_figure_manifest.csv"
tab_manifest_path = TAB_DIR / "moduleA_table_manifest.csv"
pd.DataFrame(FIG_MANIFEST).to_csv(fig_manifest_path, index=False, encoding="utf-8-sig")
pd.DataFrame(TAB_MANIFEST).to_csv(tab_manifest_path, index=False, encoding="utf-8-sig")

metadata = {
    "module": "A",
    "created_at": datetime.now().isoformat(timespec="seconds"),
    "seed": SEED,
    "paths": {
        "root_dir": str(ROOT_DIR),
        "in_sound": str(IN_SOUND),
        "in_psub": str(IN_PSUB),
        "out_dir": str(OUT_DIR),
        "fig_dir": str(FIG_DIR),
        "tab_dir": str(TAB_DIR),
        "out_sound_with_pc": str(OUT_SOUND_WITH_PC),
        "out_psub_with_targets": str(psub_out_path) if psub_out_path else None,
    },
    "columns": {
        "join_key_sound": join_key,
        "category_col": category_col,
        "ambig_col_initial": ambig_col,
        "subjective_sound": dict(zip(labels, sound_cols)),
        "subjective_participant": dict(zip(labels, psub_cols)) if psub_cols is not None else None,
        "pid_col": pid_col,
        "sound_id_col_psub_original": sound_id_col,
    },
    "qc": {
        "expected_n_participants": EXPECTED_N_PARTICIPANTS,
        "excluded_sounds_due_to_missing_sound_level": excluded_sounds,
        "n_sounds_used": int(len(df_sound_ok)),
    },
    "pca": {
        "n_components": N_COMPONENTS,
        "explained_variance_ratio": [float(x) for x in explained],
        "sign_fix_rule": {
            "PC1": "corr(PC1, proxy_arousal) >= 0",
            "PC2": "corr(PC2, proxy_approach) >= 0",
            "corr_after_fix": {"PC1_proxy_arousal_r": float(r1), "PC2_proxy_approach_r": float(r2)}
        }
    },
    "targets": {
        "emo_arousal": "PC1_emotion",
        "emo_approach": "PC2_emotion",
        "emo_valence": "zscore(proxy_valence)",
        "valence_3class_rule": valence_rule,
        "valence_binary_rule": "emo_valence >= 0 (z-scored)",
        "ambiguous_rule": ambig_rule,
    },
    "participant_level": participant_targets_info,
}

meta_path = TAB_DIR / "metadata_moduleA.json"
with open(meta_path, "w", encoding="utf-8") as f:
    json.dump(metadata, f, ensure_ascii=False, indent=2)

print("[save] manifest:", fig_manifest_path, tab_manifest_path)
print("[save] metadata:", meta_path)

print("\n[Module A DONE]")
print(" - sound-level targets saved ->", OUT_SOUND_WITH_PC)
if psub_out_path:
    print(" - participant-level targets saved ->", psub_out_path)


[OUT_DIR ] /Users/shunsuke/EEG_48sounds/moduleA_outputs
[FIG_DIR ] /Users/shunsuke/EEG_48sounds/moduleA_outputs/figures
[TAB_DIR ] /Users/shunsuke/EEG_48sounds/moduleA_outputs/tables
[LOG_DIR ] /Users/shunsuke/EEG_48sounds/moduleA_outputs/logs
[load] sound-level: (48, 12725) from /Users/shunsuke/EEG_48sounds/derivatives/master_tables/master_sound_level.csv
[load] participant-level: (576, 11392) from /Users/shunsuke/EEG_48sounds/derivatives/master_tables/master_participant_sound_level.csv

[subjective sound cols]
  驚き: 驚き_mean
  緊急感: 緊急感_mean
  脅威感: 脅威感_mean
  圧倒感: 圧倒感_mean
  接近: 接近したい気持ち_mean
  興味: 興味_mean
  没入: 没入_mean
  退屈: 退屈_mean

[subjective participant cols]
  驚き: 驚き
  緊急感: 緊急感
  脅威感: 脅威感
  圧倒感: 圧倒感
  接近: 接近したい気持ち
  興味: 興味
  没入: 没入
  退屈: 退屈

[join_key sound-level]: number
[category_col]: カテゴリー
[ambig_col]: is_ambiguous_approach_sd_top10
[sign-fix] corr(PC1, proxy_arousal)=0.929  (>=0)
[sign-fix] corr(PC2, proxy_approach)=0.772  (>=0)
[backup] /Users/shunsuke/EEG_48sounds/derivati