In [None]:
"""
PLS-SEM variable screening for CAI (Type IV HOC, formative–formative, two-stage)

Stage 1:
  - Per-eco-scheme LOC (ES1..ES5) as formative constructs from proxy indicators
  - Compute outer weights via PLS-PM (pyplspm)
  - Bootstrap outer weights for significance (p-values)
  - Multicollinearity screening via VIF per block (VIF > 5 -> flag)
  - Drop indicators that fail significance and/or VIF threshold
  - Refit Stage 1 and export final LOC scores

Stage 2:
  - Build HOC from LOC scores (single-indicator constructs for LOCs)
  - Structural model: LOCs -> HOC
  - Report R²(HOC), f² effects, and Q² via K-fold CV (Stone–Geisser)

Outputs:
  - reports/loc_vif_report.csv
  - reports/outer_weights_bootstrap.csv
  - reports/outer_weights_final.csv
  - reports/stage2_paths_r2_f2.csv
  - reports/stage2_q2_summary.csv
  - data/loc_scores_stage1.csv
"""

from __future__ import annotations

import itertools
import numpy as np
import pandas as pd
from dataclasses import dataclass
from typing import Dict, List, Optional, Tuple

# deps: pip install plspm statsmodels scikit-learn
from plspm import Plspm
from statsmodels.stats.outliers_influence import variance_inflation_factor
from sklearn.model_selection import KFold
from sklearn.linear_model import LinearRegression
from scipy import stats
from pathlib import Path

# -------------------- CONFIG --------------------
DATA_CSV = "data/processed/normalised_with_cai.csv"  

# Either (A) let the script auto-discover by suffix; or (B) specify explicit lists:
AUTO_DISCOVER_BY_SUFFIX = True
SUFFIXES = {
    "ES1": "_es1",  # Eco-scheme 1
    "ES2": "_es2",
    "ES3": "_es3",
    "ES4": "_es4",
    "ES5": "_es5",
}
# If AUTO_DISCOVER_BY_SUFFIX is False, set blocks explicitly:
EXPLICIT_BLOCKS = {
    # "ES1": ["varA_es1", "varB_es1", ...],
    # ...
}

GROUP_COLS = []    
ID_COLS    = []    

# Screening thresholds
VIF_CUTOFF = 5.0
ALPHA      = 0.05
N_BOOT     = 1000      # bootstrap resamples for outer weight p-values
RANDOM_SEED = 12345
N_SPLITS_Q2 = 10       # K-fold for Q²

# Output dirs
Path("reports").mkdir(exist_ok=True)
Path("data").mkdir(exist_ok=True)


# -------------------- HELPERS --------------------
def discover_blocks_by_suffix(df: pd.DataFrame, suffix_map: Dict[str, str]) -> Dict[str, List[str]]:
    blocks = {}
    for loc, suf in suffix_map.items():
        cols = [c for c in df.columns if c.endswith(suf)]
        blocks[loc] = cols
    return blocks

def make_path_matrix_stage1(locs: List[str]) -> pd.DataFrame:
    # No structural paths in stage-1 (just composites), pyplspm needs a square matrix of zeros
    P = pd.DataFrame(0, index=locs, columns=locs, dtype=int)
    return P

def make_path_matrix_stage2(locs: List[str], hoc: str = "HOC") -> pd.DataFrame:
    # LOCs -> HOC
    nodes = locs + [hoc]
    P = pd.DataFrame(0, index=nodes, columns=nodes, dtype=int)
    for l in locs:
        P.loc[hoc, l] = 1
    return P

def block_modes(n_blocks: int, mode: str = "B") -> List[str]:
    # "B" for formative, "A" for reflective
    return [mode] * n_blocks

def compute_vif_table(X: pd.DataFrame) -> pd.DataFrame:
    X_ = X.replace([np.inf, -np.inf], np.nan).dropna()
    if X_.shape[1] < 2 or X_.shape[0] < 5:
        return pd.DataFrame({"variable": X.columns, "VIF": np.nan})
    X_mat = X_.values
    vif_vals = []
    for j in range(X_mat.shape[1]):
        vif_vals.append(variance_inflation_factor(X_mat, j))
    return pd.DataFrame({"variable": X_.columns, "VIF": vif_vals})

def bootstrap_outer_weights(
    df_block: pd.DataFrame,
    construct_name: str,
    n_boot: int = 1000,
    random_state: int = 12345,
) -> pd.DataFrame:
    """
    Bootstraps outer weights for a single formative block using pyplspm by
    running a tiny PLS model with only that construct (no inner paths).
    """
    rng = np.random.default_rng(random_state)
    items = list(df_block.columns)
    if len(items) == 0:
        return pd.DataFrame(columns=["construct", "indicator", "weight_mean", "weight_se", "t", "p"])

    # Prepare a trivial PLS model: one construct measured by formative indicators
    P = pd.DataFrame(0, index=[construct_name], columns=[construct_name], dtype=int)
    blocks = {construct_name: items}
    modes  = ["B"]

    # First run (original sample) for reference weights
    model_ref = Plspm(df_block, P, blocks, modes, scheme="path", bootval=False)
    w_ref = model_ref.outer_model_[["name", "weight"]].copy().rename(columns={"name": "indicator"})
    w_ref["construct"] = construct_name

    # Bootstrap
    W = []
    n = len(df_block)
    for _ in range(n_boot):
        idx = rng.integers(0, n, size=n)
        sample = df_block.iloc[idx].reset_index(drop=True)
        try:
            m = Plspm(sample, P, blocks, modes, scheme="path", bootval=False)
            ow = m.outer_model_[["name", "weight"]].copy()
            ow["construct"] = construct_name
            W.append(ow.rename(columns={"name": "indicator"}))
        except Exception:
            # If a bootstrap sample fails (e.g., singular), skip
            continue
    if not W:
        # Fallback: return reference weights only
        w_ref["weight_mean"] = w_ref["weight"]
        w_ref["weight_se"] = np.nan
        w_ref["t"] = np.nan
        w_ref["p"] = np.nan
        return w_ref[["construct","indicator","weight_mean","weight_se","t","p"]]

    Wdf = pd.concat(W, ignore_index=True)
    stats_tbl = (
        Wdf.groupby(["construct", "indicator"])["weight"]
           .agg(["mean", "std", "count"])
           .reset_index()
           .rename(columns={"mean":"weight_mean","std":"weight_se"})
    )

    # t-stats vs 0, p-values (two-sided)
    stats_tbl["t"] = stats_tbl["weight_mean"] / stats_tbl["weight_se"]
    stats_tbl["p"] = 2 * stats.t.sf(np.abs(stats_tbl["t"]), df=stats_tbl["count"] - 1)

    return stats_tbl[["construct", "indicator", "weight_mean", "weight_se", "t", "p"]]

def compute_f2_effects(df: pd.DataFrame, y: str, X_full: List[str]) -> pd.DataFrame:
    """ Cohen's f² = (R²_included - R²_excluded) / (1 - R²_included) for each predictor. """
    lr = LinearRegression()
    # Full model R2
    lr.fit(df[X_full], df[y])
    r2_full = lr.score(df[X_full], df[y])

    rows = []
    for x in X_full:
        X_red = [v for v in X_full if v != x]
        lr.fit(df[X_red], df[y])
        r2_red = lr.score(df[X_red], df[y])
        f2 = (r2_full - r2_red) / (1 - r2_full + 1e-12)
        rows.append({"predictor": x, "R2_full": r2_full, "R2_reduced": r2_red, "f2": f2})
    return pd.DataFrame(rows)

def compute_q2(df: pd.DataFrame, y: str, X: List[str], n_splits: int = 10, seed: int = 12345) -> Tuple[float, pd.DataFrame]:
    """
    Stone–Geisser Q² via K-fold CV on the endogenous HOC indicator.
    Q² = 1 - PRESS/SST; also return fold-by-fold details.
    """
    kf = KFold(n_splits=n_splits, shuffle=True, random_state=seed)
    y_true_all, y_pred_all, fold_rows = [], [], []
    lr = LinearRegression()
    for k, (tr, te) in enumerate(kf.split(df), 1):
        Xtr, Xte = df.iloc[tr][X], df.iloc[te][X]
        ytr, yte = df.iloc[tr][y], df.iloc[te][y]
        lr.fit(Xtr, ytr)
        yhat = lr.predict(Xte)
        y_true_all.append(yte)
        y_pred_all.append(yhat)
        fold_rows.append(pd.DataFrame({"fold": k, "y_true": yte.values, "y_pred": yhat}))
    y_true = np.concatenate(y_true_all)
    y_pred = np.concatenate(y_pred_all)
    press = np.sum((y_true - y_pred)**2)
    sst   = np.sum((y_true - np.mean(y_true))**2)
    q2 = 1 - press / (sst + 1e-12)
    q2_df = pd.concat(fold_rows, ignore_index=True)
    return q2, q2_df


# -------------------- MAIN PIPELINE --------------------
def main():
    np.random.seed(RANDOM_SEED)
    df = pd.read_csv(DATA_CSV)

    # Build blocks (LOC -> indicators)
    if AUTO_DISCOVER_BY_SUFFIX:
        blocks = discover_blocks_by_suffix(df, SUFFIXES)
    else:
        blocks = EXPLICIT_BLOCKS

    # Keep only rows with all needed indicators present (drop rows with all-NA per LOC)
    for loc, cols in blocks.items():
        if not cols:
            raise ValueError(f"No indicators found for {loc}. Provide explicit lists or adjust suffixes.")
        # allow some missing but drop rows where *all* LOC indicators are NA
        df = df[df[cols].notna().any(axis=1)]

    # Optional: run per-group
    if GROUP_COLS:
        groups = df.groupby(GROUP_COLS, dropna=False)
    else:
        groups = [((), df)]  # single pooled group

    loc_scores_all = []
    outer_boot_all = []
    vif_all = []
    outer_final_all = []

    for gkey, dfg in groups:
        # --- Stage 1: per LOC ---
        locs = list(blocks.keys())
        P1 = make_path_matrix_stage1(locs)
        modes1 = block_modes(len(locs), "B")

        # VIF screening & bootstrap per LOC
        kept_blocks = {}
        for loc, cols in blocks.items():
            X = dfg[cols].copy()
            vif_tbl = compute_vif_table(X)
            vif_tbl.insert(0, "group", str(gkey))
            vif_tbl.insert(1, "construct", loc)
            vif_all.append(vif_tbl)

            # Bootstrap outer weights
            boot_tbl = bootstrap_outer_weights(X, loc, n_boot=N_BOOT, random_state=RANDOM_SEED)
            boot_tbl.insert(0, "group", str(gkey))
            outer_boot_all.append(boot_tbl)

            # Apply screening: keep indicators with p<=alpha and VIF<=cutoff (if VIF available)
            merge_scr = boot_tbl.merge(vif_tbl.rename(columns={"variable":"indicator"}), on=["construct","indicator"], how="left")
            keep_mask = (merge_scr["p"] <= ALPHA) & ((merge_scr["VIF"].isna()) | (merge_scr["VIF"] <= VIF_CUTOFF))
            keep_inds = merge_scr.loc[keep_mask, "indicator"].tolist()

            # If all dropped, fallback: keep the strongest (lowest p) to avoid empty block
            if len(keep_inds) == 0 and len(cols) > 0:
                best = merge_scr.sort_values("p", ascending=True).iloc[0]["indicator"]
                keep_inds = [best]

            kept_blocks[loc] = keep_inds

        # Refit Stage 1 with kept indicators
        P1_refit = make_path_matrix_stage1(locs)
        modes1_refit = block_modes(len(locs), "B")

        model1 = Plspm(
            dfg,
            P1_refit,
            kept_blocks,
            modes1_refit,
            scheme="path",
            bootval=False
        )

        # Save final outer weights (after screening)
        om = model1.outer_model_[["name","block","weight"]].rename(columns={"name":"indicator","block":"construct"})
        om.insert(0, "group", str(gkey))
        outer_final_all.append(om)

        # Extract LOC scores
        scores = model1.scores_.copy()
        if ID_COLS:
            scores = pd.concat([dfg[ID_COLS].reset_index(drop=True), scores.reset_index(drop=True)], axis=1)
        scores.insert(0, "group", str(gkey))
        loc_scores_all.append(scores)

    # Concatenate and save Stage 1 reports
    pd.concat(vif_all, ignore_index=True).to_csv("reports/loc_vif_report.csv", index=False)
    pd.concat(outer_boot_all, ignore_index=True).to_csv("reports/outer_weights_bootstrap.csv", index=False)
    pd.concat(outer_final_all, ignore_index=True).to_csv("reports/outer_weights_final.csv", index=False)
    loc_scores = pd.concat(loc_scores_all, ignore_index=True)
    loc_scores.to_csv("data/loc_scores_stage1.csv", index=False)

    # --- Stage 2: LOC scores -> HOC ---
    # Build a synthetic single-indicator for HOC as the mean of LOC scores (for measurement anchor)
    loc_cols = list(blocks.keys())
    df2 = loc_scores.copy()
    df2["HOC_indicator"] = df2[loc_cols].mean(axis=1)

    # Build stage-2 path matrix: LOCs -> HOC
    P2 = make_path_matrix_stage2(loc_cols, hoc="HOC")

    # Measurement model:
    #  - Each LOC is a single-indicator construct (reflective "A") using its own score column.
    #  - HOC is a single-indicator construct measured by HOC_indicator (mode can be "B" or "A"—single item).
    blocks2 = {loc: [loc] for loc in loc_cols}
    blocks2["HOC"] = ["HOC_indicator"]
    modes2 = ["A"] * len(loc_cols) + ["B"]

    model2 = Plspm(
        df2,
        P2,
        blocks2,
        modes2,
        scheme="path",
        bootval=False
    )

    # Paths & R2
    inner = model2.path_coefficients_.reset_index().rename(columns={"index":"to"})
    r2 = model2.r2_.reset_index().rename(columns={"index":"construct", 0:"R2"})

    # f² effects (OLS on HOC_indicator ~ LOCs)
    f2_tbl = compute_f2_effects(df2, y="HOC_indicator", X_full=loc_cols)

    # Q² via K-fold CV (predict HOC_indicator from LOCs)
    q2, q2_folds = compute_q2(df2, y="HOC_indicator", X=loc_cols, n_splits=N_SPLITS_Q2, seed=RANDOM_SEED)

    # Save stage-2 reports
    stage2_paths = inner.melt(id_vars="to", var_name="from", value_name="beta")
    stage2_paths = stage2_paths[stage2_paths["to"] == "HOC"][["from","to","beta"]]
    stage2_paths = stage2_paths.merge(f2_tbl.rename(columns={"predictor":"from"}), on="from", how="left")

    # Attach R2(HOC)
    r2_hoc = float(r2.loc[r2["construct"]=="HOC","R2"])
    stage2_paths["R2_HOC"] = r2_hoc

    stage2_paths.to_csv("reports/stage2_paths_r2_f2.csv", index=False)

    pd.DataFrame({"metric":["Q2_HOC"], "value":[q2]}).to_csv("reports/stage2_q2_summary.csv", index=False)
    q2_folds.to_csv("reports/stage2_q2_folds.csv", index=False)

    print("Done.")
    print(f"R²(HOC) = {r2_hoc:.3f} | Q²(HOC) = {q2:.3f}")
    print("Key files written to ./reports and ./data")

if __name__ == "__main__":
    main()


In [None]:
"""
Export SEM results to CSV tables for visualization
This script reads the PLS-SEM results and formats them into the CSV tables
expected by the visualization script (table1_formative.csv and table2_structural.csv)
"""

import pandas as pd
from pathlib import Path

def export_sem_results_to_csv():
    """
    Reads PLS-SEM results and exports them as CSV tables for visualization
    """

    # Read the PLS-SEM results from your pipeline
    outer_weights_final = pd.read_csv("reports/outer_weights_final.csv")
    stage2_paths = pd.read_csv("reports/stage2_paths_r2_f2.csv")

    # ==================== TABLE 1: Formative Measurement Model ====================
    # Format: Construct | Indicator | Weight
    table1 = outer_weights_final.copy()

    # Clean up column names to match expected format
    table1_formatted = pd.DataFrame({
        'Construct': table1['construct'],
        'Indicator': table1['indicator'],
        'Weight': table1['weight'].round(3)
    })

    # Sort by construct then by weight (descending)
    table1_formatted = table1_formatted.sort_values(['Construct', 'Weight'],
                                                  ascending=[True, False])

    # Save Table 1
    table1_formatted.to_csv("table1_formative.csv", index=False)
    print(f"✓ Saved table1_formative.csv with {len(table1_formatted)} indicators")


    # ==================== TABLE 2: Structural Model ====================
    # Format: Path | β_std (or beta_std) | [other stats]
    table2_formatted = pd.DataFrame({
        'Path': stage2_paths['from'] + ' → CAI',  # Assuming CAI is your outcome
        'β_std': stage2_paths['beta'].round(3),
        'R2_full': stage2_paths['R2_HOC'].round(3),
        'f2': stage2_paths['f2'].round(3)
    })

    # Add significance indicators if you have p-values
    # (You might need to add these from bootstrap results if available)

    # Save Table 2
    table2_formatted.to_csv("table2_structural.csv", index=False)
    print(f"✓ Saved table2_structural.csv with {len(table2_formatted)} paths")


    # ==================== SUMMARY REPORT ====================
    print("\n" + "="*60)
    print("SEM RESULTS SUMMARY")
    print("="*60)

    print(f"\nFORMATIVE MEASUREMENT MODEL:")
    print(f"Total constructs: {table1_formatted['Construct'].nunique()}")
    for construct in table1_formatted['Construct'].unique():
        n_indicators = len(table1_formatted[table1_formatted['Construct'] == construct])
        avg_weight = table1_formatted[table1_formatted['Construct'] == construct]['Weight'].mean()
        print(f"  {construct}: {n_indicators} indicators (avg weight: {avg_weight:.3f})")

    print(f"\nSTRUCTURAL MODEL:")
    hoc_r2 = stage2_paths['R2_HOC'].iloc[0] if len(stage2_paths) > 0 else 0
    print(f"  R² (Higher-Order Construct): {hoc_r2:.3f}")
    print(f"  Number of paths to CAI: {len(table2_formatted)}")

    # Show path coefficients
    for _, row in table2_formatted.iterrows():
        print(f"    {row['Path']}: β = {row['β_std']:.3f} (f² = {row['f2']:.3f})")

    print(f"\n✓ Files ready for visualization script")


def validate_csv_for_visualization():
    """
    Validate that the CSV files are properly formatted for the visualization script
    """
    try:
        # Check table1
        t1 = pd.read_csv("table1_formative.csv")
        required_cols_t1 = ['Construct', 'Indicator']
        weight_col = "Weight" if "Weight" in t1.columns else ("Loading" if "Loading" in t1.columns else None)

        if weight_col is None:
            print("⚠️  Warning: No Weight or Loading column found in table1")
        else:
            print(f"✓ Table1 validated: {len(t1)} rows, weight column: '{weight_col}'")

        # Check table2
        t2 = pd.read_csv("table2_structural.csv")
        beta_candidates = ["β_std", "beta_std", "Std. Beta", "Std. β", "β", "beta", "Path Coefficient"]
        beta_col = next((c for c in beta_candidates if c in t2.columns), None)

        if beta_col is None:
            print("⚠️  Warning: No beta coefficient column found in table2")
        else:
            print(f"✓ Table2 validated: {len(t2)} rows, beta column: '{beta_col}'")

        if "Path" in t2.columns:
            print(f"✓ Path column found in table2")
        elif "Hypothesis" in t2.columns:
            print(f"✓ Hypothesis column found in table2 (will be used as Path)")
        else:
            print("⚠️  Warning: No Path or Hypothesis column in table2")

        return True

    except FileNotFoundError as e:
        print(f"❌ File not found: {e}")
        return False
    except Exception as e:
        print(f"❌ Validation error: {e}")
        return False


if __name__ == "__main__":
    # Check if input files exist
    required_files = [
        "reports/outer_weights_final.csv",
        "reports/stage2_paths_r2_f2.csv"
    ]

    missing_files = [f for f in required_files if not Path(f).exists()]

    if missing_files:
        print("❌ Missing required input files:")
        for f in missing_files:
            print(f"   {f}")
        print("\n💡 Run PLS-SEM pipeline")
    else:
        print("✓ Found required input files")
        export_sem_results_to_csv()
        print("\n" + "="*60)
        validate_csv_for_visualization()

In [None]:
# Generate diagram with indicator labels colored to match arrow/construct

import math
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
from matplotlib.patches import Circle, FancyArrowPatch, Wedge

# reload data
t1 = pd.read_csv("table1_formative.csv")
t2 = pd.read_csv("table2_structural.csv")

weight_col = "Weight" if "Weight" in t1.columns else ("Loading" if "Loading" in t1.columns else None)
blocks = {}
for _, r in t1.iterrows():
    c = str(r["Construct"]).strip()
    ind = str(r["Indicator"]).strip()
    w = float(r[weight_col])
    blocks.setdefault(c, []).append({"indicator": ind, "weight": w})

beta_candidates = ["β_std", "beta_std", "Std. Beta", "Std. β", "β", "beta", "Path Coefficient"]
beta_col = next((c for c in beta_candidates if c in t2.columns), None)

def to_cai(path_str: str) -> bool:
    if not isinstance(path_str, str): return False
    s = path_str.replace("->", "→")
    return ("→" in s) and (s.split("→")[-1].strip().upper().startswith("CAI"))

beta_rows = t2[t2["Path"].apply(to_cai)] if "Path" in t2.columns else t2.copy()
betas = {}
if not beta_rows.empty and "Path" in beta_rows.columns:
    for _, r in beta_rows.iterrows():
        p = str(r["Path"]).replace("->", "→")
        src = p.split("→")[0].strip()
        betas[src] = float(r[beta_col])
elif "Hypothesis" in t2.columns:
    for _, r in t2.iterrows():
        h = str(r["Hypothesis"]).replace("->", "→")
        if to_cai(h):
            src = h.split("→")[0].strip()
            betas[src] = float(r[beta_col])

constructs = [k for k in ["ES1","ES2","ES3","ES4","ES5"] if k in blocks and k in betas]
constructs += [k for k in blocks.keys() if k not in constructs and k in betas]

# styling
COL = {
    "ES1": "#2A6FDB",
    "ES2": "#2B7A2A",
    "ES3": "#BB6B00",
    "ES4": "#6F3DB2",
    "ES5": "#B53A3A",
}
SECTOR_ALPHA = 0.06

R_ES    = 2.2
R_RING  = 3.5
R_LABEL = 4.3
ES_NODE_R = 0.32
CAI_NODE_R = 0.45
SECTOR_WIDTH_DEG = 65

def pol2cart(r, theta):
    return r*np.cos(theta), r*np.sin(theta)

def nice_angle_text_rotation(theta):
    ang = np.degrees(theta) % 360
    if 90 < ang < 270:
        return ang + 180, "right"
    else:
        return ang, "left"

def arrow_with_text(ax, start, end, text, color="#444", lw=2.0):
    arrow = FancyArrowPatch(start, end, arrowstyle="-|>", mutation_scale=13,
                            lw=lw, color=color, connectionstyle="arc3,rad=0")
    ax.add_patch(arrow)
    mx = (0.6*end[0] + 0.4*start[0])
    my = (0.6*end[1] + 0.4*start[1])
    ax.text(mx, my, text, fontsize=9, color=color, ha="center", va="center",
            bbox=dict(facecolor="white", edgecolor="none", pad=0.2, alpha=0.7))

def arrow_touching_circle(ax, start_xy, circle_center, circle_radius, color="#444", lw=1.6, rad=0.2):
    sx, sy = start_xy
    cx, cy = circle_center
    vx, vy = cx - sx, cy - sy
    L = (vx*vx + vy*vy) ** 0.5 + 1e-12
    ux, uy = vx/L, vy/L
    ex, ey = cx - ux*circle_radius, cy - uy*circle_radius
    patch = FancyArrowPatch((sx,sy), (ex,ey),
                            connectionstyle=f"arc3,rad={rad}",
                            arrowstyle="-|>", mutation_scale=11, lw=lw, color=color)
    ax.add_patch(patch)

def point_toward(p_from, p_to, delta):
    vx, vy = p_to[0]-p_from[0], p_to[1]-p_from[1]
    L = (vx*vx + vy*vy) ** 0.5 + 1e-12
    ux, uy = vx/L, vy/L
    return (p_from[0] + ux*delta, p_from[1] + uy*delta)

# figure
fig, ax = plt.subplots(figsize=(13, 13))
ax.set_aspect("equal")
ax.axis("off")

theta_grid = np.linspace(0, 2*np.pi, 720)
x, y = pol2cart(R_RING, theta_grid)
ax.plot(x, y, lw=1.0, color="#888")

cai = Circle((0,0), CAI_NODE_R, facecolor="#EEEEEE", edgecolor="#333", lw=2)
ax.add_patch(cai)
ax.text(0, 0, "Overall Adoption\n(CAI)", ha="center", va="center", fontsize=12, color="#111")

N = len(constructs)
base_angles = np.linspace(np.radians(90), np.radians(90) - 2*np.pi + 2*np.pi/N, N)
pos_ES = {c: pol2cart(R_ES, base_angles[i]) for i, c in enumerate(constructs)}

for i, c in enumerate(constructs):
    theta = base_angles[i]
    cx, cy = pos_ES[c]
    color = COL.get(c, "#444")
    
    half = np.radians(SECTOR_WIDTH_DEG/2)
    sector = Wedge(center=(0,0), r=R_RING+0.05,
                   theta1=np.degrees(theta-half), theta2=np.degrees(theta+half),
                   width=0.50, facecolor=color, alpha=SECTOR_ALPHA, edgecolor=None)
    ax.add_patch(sector)
    
    node = Circle((cx, cy), ES_NODE_R, facecolor="#FFFFFF", edgecolor=color, lw=2.2)
    ax.add_patch(node)
    ax.text(cx, cy, c, ha="center", va="center", fontsize=12, color=color)
    
    start_beta = point_toward((cx,cy), (0,0), ES_NODE_R)
    end_beta   = point_toward((0,0), (cx,cy), CAI_NODE_R)
    arrow_with_text(ax, start_beta, end_beta, f"β={float(betas[c]):.2f}", color=color, lw=2.3)
    
    inds = blocks.get(c, [])
    if not inds: 
        continue
    K = len(inds)
    if K == 1:
        thetas = np.array([theta])
    else:
        thetas = np.linspace(theta - half*1.0, theta + half*1.0, K)
    
    for j, info in enumerate(inds):
        t = thetas[j]
        ix, iy = pol2cart(R_RING, t)
        ax.plot([ix], [iy], marker="o", ms=3.0, color=color, zorder=3)
        
        rot_deg, align = nice_angle_text_rotation(t)
        tx, ty = pol2cart(R_LABEL, t)
        ax.text(tx, ty, f"ω={info['weight']:.2f} · {info['indicator']}",
                rotation=rot_deg, ha=align, va="center", fontsize=8.8, color=color)
        
        ax.plot([ix, tx], [iy, ty], lw=0.8, color=color, alpha=0.6)
        
        rad = 0.25 if align == "left" else -0.25
        arrow_touching_circle(ax, (ix,iy), (cx,cy), ES_NODE_R, color=color, lw=1.4, rad=rad)

pad = R_LABEL + 1.6
ax.set_xlim(-pad, pad); ax.set_ylim(-pad, pad)

out_png = "/mnt/data/pls_phylo_ring_v5.png"
out_pdf = "/mnt/data/pls_phylo_ring_v5.pdf"
fig.savefig(out_png, dpi=300, bbox_inches="tight")
fig.savefig(out_pdf, bbox_inches="tight")
out_png, out_pdf
