In [None]:
!pip install catboost

Collecting catboost
  Downloading catboost-1.2.8-cp311-cp311-manylinux2014_x86_64.whl.metadata (1.2 kB)
Downloading catboost-1.2.8-cp311-cp311-manylinux2014_x86_64.whl (99.2 MB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m99.2/99.2 MB[0m [31m8.1 MB/s[0m eta [36m0:00:00[0m
[?25hInstalling collected packages: catboost
Successfully installed catboost-1.2.8


In [None]:
from google.colab import userdata
userdata.get('GITHUB_TOKEN')

In [None]:
# Replace YOUR_USERNAME and YOUR_TOKEN with your actual GitHub username and token.
from getpass import getpass
import os
from google.colab import userdata

!git clone https://{userdata.get('GITHUB_TOKEN')}@github.com/rzsoli/Evaluations_on_Robotic_Choreographies.git

Cloning into 'Evaluations_on_Robotic_Choreographies'...
remote: Enumerating objects: 716, done.[K
remote: Counting objects: 100% (155/155), done.[K
remote: Compressing objects: 100% (147/147), done.[K
remote: Total 716 (delta 57), reused 8 (delta 8), pack-reused 561 (from 2)[K
Receiving objects: 100% (716/716), 62.59 MiB | 8.62 MiB/s, done.
Resolving deltas: 100% (291/291), done.
Updating files: 100% (357/357), done.


# SHAP Analysis for Classification and Regression

In [None]:
import shap
import pandas as pd
import numpy as np
import joblib
import os
import matplotlib.pyplot as plt
from xgboost import XGBClassifier
from catboost import CatBoostClassifier
from sklearn.pipeline import Pipeline
from sklearn.impute import KNNImputer
# Add any other specific model classes if they appear as 'best models' later

## Classification SHAP Analysis

In [None]:
classification_target_names = [
    "HumanCharacterization",
    "HumanReproducibility",
    "MovementTechnique",
    "PublicInvolvement",
    "Rhythm",
    "SpaceUse",
    "StoryTelling"
]

phase1

In [None]:
import os
import json
import joblib
import numpy as np
import pandas as pd
import shap
import yaml
import logging
from sklearn.pipeline import Pipeline
from sklearn.model_selection import StratifiedShuffleSplit
from xgboost import XGBClassifier
from catboost import CatBoostClassifier
import warnings
warnings.filterwarnings("ignore", message=".*please export the model by calling `Booster.save_model`.*")

# Set up logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s [%(levelname)s] %(message)s')
logger = logging.getLogger(__name__)

# Load settings
with open('settings.yaml') as f:
    settings = yaml.safe_load(f)

RND = settings.get('random_seed', 42)
MODEL_DIR = "/content/Evaluations_on_Robotic_Choreographies/classification/saved_overall_best_models/"
DATA_DIR = "/content/Evaluations_on_Robotic_Choreographies/classification/classification_models_data/"
OUTPUT_DIR = "/content/Evaluations_on_Robotic_Choreographies/shap/classification/"

os.makedirs(OUTPUT_DIR, exist_ok=True)

def get_targets(dirpath):
    return [d.replace('EvaluationChoreography', '')
            for d in os.listdir(dirpath)
            if d.startswith('EvaluationChoreography')]

def unwrap_pipeline(model):
    preproc = None
    actual = model
    if isinstance(model, Pipeline):
        actual = model.steps[-1][1]
        if len(model.steps) > 1:
            preproc = Pipeline(model.steps[:-1])
    return actual, preproc

for target in get_targets(DATA_DIR):
    logger.info(f"Starting Phase1 for target: {target}")
    try:
        # Paths
        subdir = os.path.join(DATA_DIR, f"EvaluationChoreography{target}")
        train_path = os.path.join(subdir, "X_train.csv")
        hold_path = os.path.join(subdir, "X_holdout.csv")
        y_path = os.path.join(subdir, "y_train.csv")
        if not all(os.path.exists(p) for p in (train_path, hold_path, y_path)):
            logger.error(f"Missing data files for {target}, skipping Phase1.")
            continue
        X_train = pd.read_csv(train_path)
        # Assuming y_train is the first (and likely only) column in y_train.csv
        y_train = pd.read_csv(y_path).iloc[:, 0]
        X_hold = pd.read_csv(hold_path)

        # Load model
        model_file = next((f for f in os.listdir(MODEL_DIR)
                           if target in f and f.startswith(('OVERALL_BEST_XGBoost_GridCV','OVERALL_BEST_CatBoost_GridCV'))), None)
        if not model_file:
            logger.warning(f"No model found for {target}, skipping.")
            continue
        model = joblib.load(os.path.join(MODEL_DIR, model_file))

        # Unwrap pipeline
        actual_model, preproc = unwrap_pipeline(model)

        # Determine and save feature names
        if preproc:
            try:
                feat_names = preproc.get_feature_names_out(X_train.columns)
            except Exception as e:
                logger.error(f"get_feature_names_out failed for {target}: {e}")
                raise
        else:
            feat_names = X_train.columns.tolist()
        out_dir = os.path.join(OUTPUT_DIR, target)
        os.makedirs(out_dir, exist_ok=True)
        fn_path = os.path.join(out_dir, f"feature_names_{target}.txt")
        pd.Series(feat_names).to_csv(fn_path, index=False, header=False)
        assert os.path.exists(fn_path), "Feature names file not saved"

        # Preprocess data
        def apply_proc(df):
            if preproc:
                arr = preproc.transform(df)
                return pd.DataFrame(arr, columns=feat_names, index=df.index)
            return df.copy()
        X_train_p = apply_proc(X_train)
        X_hold_p = apply_proc(X_hold)

        # Background sampling
        bcfg = settings['background']
        desired = bcfg['sample_size']
        actual_n = min(desired, len(X_train_p))
        actual_n = max(actual_n, bcfg['min_size'])

        # --- Handle single-class y_train for stratification ---
        if bcfg['stratify']:
            if y_train.nunique() < 2:
                logger.warning(f"Skipping stratification for {target}: y_train has only one unique class. Falling back to random sampling.")
                background = X_train_p.sample(actual_n, random_state=RND)
            elif len(y_train) < actual_n: # Ensure enough samples for stratification based on actual_n
                 logger.warning(f"Not enough samples in y_train ({len(y_train)}) for desired background size ({actual_n}) with stratification for {target}. Falling back to random sampling.")
                 background = X_train_p.sample(actual_n, random_state=RND)
            else:
                sss = StratifiedShuffleSplit(n_splits=1, train_size=actual_n, random_state=RND)
                idx, _ = next(sss.split(X_train_p, y_train))
                background = X_train_p.iloc[idx]
        else: # If stratification is explicitly turned off in settings
            background = X_train_p.sample(actual_n, random_state=RND)

        bg_path = os.path.join(out_dir, 'background.csv')
        background.to_csv(bg_path, index=False)
        assert os.path.exists(bg_path) and len(background) >= bcfg['min_size'], "Background sample invalid"

        # Instantiate explainer
        if isinstance(actual_model, (XGBClassifier, CatBoostClassifier)):
            explainer = shap.TreeExplainer(actual_model, background)
        else:
            explainer = shap.KernelExplainer(actual_model.predict_proba, background)
        ev = np.atleast_1d(explainer.expected_value)
        logger.info(f"Explainer expected_value shape for {target}: {ev.shape}")

        # Compute raw SHAP values
        raw = explainer.shap_values(X_hold_p)
        shap_vals = raw[1] if isinstance(raw, list) and len(raw) >= 2 else np.array(raw)
        assert shap_vals.shape == X_hold_p.shape, f"SHAP shape mismatch: {shap_vals.shape} vs {X_hold_p.shape}"
        raw_path = os.path.join(out_dir, f"raw_{target}.npy")
        np.save(raw_path, shap_vals)
        assert os.path.exists(raw_path), "Raw SHAP file not saved"

        # Save baseline
        baseline = ev[1] if ev.size >= 2 else ev[0]
        base_path = os.path.join(out_dir, 'baseline.txt')
        with open(base_path, 'w') as f:
            f.write(str(baseline))
        assert os.path.exists(base_path), "Baseline file not saved"

        # Compute interactions (Tree only)
        # int_path needs to be defined for config even if interactions are skipped
        int_path = None
        if isinstance(explainer, shap.TreeExplainer):
            topn = settings['interaction']['top_n_features']
            subsz = settings['interaction']['subsample_size']
            # select top features
            means = np.abs(shap_vals).mean(axis=0)
            idx = np.argsort(means)[-topn:][::-1]
            subset = X_hold_p.iloc[:, idx]
            # subsample rows
            n_rows = min(subsz, len(subset))
            subp = subset.sample(n_rows, random_state=RND)
            inter_vals = explainer.shap_interaction_values(subp)
            int_path = os.path.join(out_dir, 'interaction.npy')
            np.save(int_path, inter_vals)
            assert os.path.exists(int_path), "Interaction file not saved"
        else:
            logger.warning(f"Skipping interactions for {target}: not a TreeExplainer.")

        # Persist config & validate
        cfg = {
            'model_path': os.path.join(MODEL_DIR, model_file),
            'background_path': bg_path,
            'eval_data_path': hold_path,
            'feature_names_path': fn_path,
            'raw_shap_path': raw_path,
            'baseline_path': base_path,
            'interaction_path': int_path, # Now int_path is always defined
            'explainer_type': type(explainer).__name__,
            'explainer_params': {
                'feature_perturbation': getattr(explainer, 'feature_perturbation', None),
                'nsamples': getattr(explainer, 'nsamples', None)
            }
        }
        cfg_path = os.path.join(out_dir, 'config.json')
        with open(cfg_path, 'w') as f:
            json.dump(cfg, f, indent=2)
        # validate all paths
        for key, p in cfg.items():
            if key.endswith('_path') and p: # Only check if path is not None
                assert os.path.exists(p), f"Config path missing: {key}"

        logger.info(f"Phase1 complete for {target}")
    except Exception as e:
        logger.error(f"Error in Phase1 for {target}: {e}")
        continue

logger.info("All classification Phase1 done.")


import os
import json
import joblib
import numpy as np
import pandas as pd
import shap
import yaml
import logging
from sklearn.pipeline import Pipeline
from xgboost import XGBRegressor
from catboost import CatBoostRegressor

# Optional: enable if using NN models
try:
    import tensorflow as tf
except ImportError:
    tf = None

# Logging setup
logging.basicConfig(
    level=logging.INFO,
    format="%(asctime)s [%(levelname)s] %(message)s",
    force=True
)

logger = logging.getLogger(__name__)

# Load settings
with open('/content/settings.yaml') as f:
    settings = yaml.safe_load(f)

RND = settings.get('random_seed', 42)
PROJECT_ROOT = "/content/Evaluations_on_Robotic_Choreographies/"

# --- MODIFICATION START ---
# Path for datasets remains the same.
DATA_DIR = os.path.join(PROJECT_ROOT, 'data', 'regression')
# New path for loading the tuned regression models.
MODEL_DIR_REG = os.path.join(PROJECT_ROOT, 'regression', 'Tuned Models')
# --- MODIFICATION END ---

SHAP_OUT = os.path.join(PROJECT_ROOT, 'shap', 'regression')
os.makedirs(SHAP_OUT, exist_ok=True)

# Dictionary to map specific targets to specific regression methods
TARGET_METHOD_MAP = {
    "Storytelling": "XGBoost",
    "SpaceUse": "XGBoost",
    "Rhythm": "CatBoost",
    "PublicInvolvement": "XGBoost",
    "MovementTechnique": "XGBoost",
    "HumanReproducibility": "XGBoost",
    "HumanCharacterization": "CatBoost"
}

# Utility: unwrap pipelines
def unwrap_pipeline(model):
    preproc = None
    actual = model
    if isinstance(model, Pipeline):
        actual = model.steps[-1][1]
        if len(model.steps) > 1:
            preproc = Pipeline(model.steps[:-1])
    return actual, preproc

# Utility: apply preprocessing and enforce feature names
def apply_preproc(df, preproc, feat_names):
    if preproc:
        arr = preproc.transform(df)
        return pd.DataFrame(arr, columns=feat_names, index=df.index)
    else:
        return df.copy()

# Utility: top-N features by mean abs shap
def get_top_n_features(shap_vals, feat_names, n=10):
    means = np.abs(shap_vals).mean(axis=0)
    idx = np.argsort(means)[-n:][::-1]
    return [feat_names[i] for i in idx]

# Main Phase1 loop
def run_regression_phase1():
    # Iterate through the target-method mapping
    for target, method in TARGET_METHOD_MAP.items():
        logger.info(f"Starting Phase1 for {method}/{target}")
        try:
            # Paths for train/test (Uses DATA_DIR - UNCHANGED)
            low = target.lower()
            train_path = os.path.join(DATA_DIR, 'Data Splits', target, f'X_train_{low}.csv')
            test_path  = os.path.join(DATA_DIR, 'Data Splits', target, f'X_test_{low}.csv')
            if not os.path.exists(train_path) or not os.path.exists(test_path):
                logger.warning(f"Missing train or test for {target}, skipping.")
                continue
            X_train = pd.read_csv(train_path)
            X_test  = pd.read_csv(test_path)

            # Load model and optional scaler (Uses MODEL_DIR_REG - CHANGED)
            model = None
            pipeline_pre = None
            model_path = None
            if method == "CatBoost" and CatBoostRegressor is not None:
                model_path = os.path.join(MODEL_DIR_REG, 'CatBoost', f'catboost_{low}.cbm')
                if os.path.exists(model_path):
                    model = CatBoostRegressor()
                    model.load_model(model_path)
            elif method == "XGBoost" and XGBRegressor is not None:
                model_path = os.path.join(MODEL_DIR_REG, 'XGBoost', f'xgb_model_{low}.json')
                if os.path.exists(model_path):
                    model = XGBRegressor()
                    model.load_model(model_path)
            elif method == "Ridge Pipeline":
                model_path = os.path.join(MODEL_DIR_REG, 'Ridge Pipeline', f'ridge_pipeline_{low}.pkl')
                if os.path.exists(model_path):
                    model = joblib.load(model_path)
            elif method == "NN Model" and tf is not None:
                mpath = os.path.join(MODEL_DIR_REG, 'NN Model', f'mlp_best_{low}.keras')
                spath = os.path.join(MODEL_DIR_REG, 'NN Model', f'scaler_{low}.pkl')
                if os.path.exists(mpath) and os.path.exists(spath):
                    model = tf.keras.models.load_model(mpath, compile=False)
                    pipeline_pre = joblib.load(spath)
                model_path = mpath

            if model is None:
                logger.warning(f"Model file not found for {method}/{target}, skipping.")
                continue

            # Unwrap any sklearn Pipeline
            actual_model, preproc = unwrap_pipeline(model)
            # Override for NN
            if method == "NN Model":
                preproc = pipeline_pre

            # Determine feature names
            if preproc is not None:
                try:
                    feat_names = preproc.get_feature_names_out(X_train.columns)
                except Exception as e:
                    logger.error(f"get_feature_names_out failed for {method}/{target}: {e}")
                    raise
            else:
                feat_names = X_train.columns.tolist()

            # Save feature names to the new directory structure
            out_dir = os.path.join(SHAP_OUT, "result", target)
            os.makedirs(out_dir, exist_ok=True)
            fn_path = os.path.join(out_dir, f'feature_names_{target}.txt')
            pd.Series(feat_names).to_csv(fn_path, index=False, header=False)

            # Preprocess datasets
            X_train_p = apply_preproc(X_train, preproc, feat_names)
            X_test_p  = apply_preproc(X_test,  preproc, feat_names)

            # Background sampling (regression random)
            bcfg = settings['background']
            desired = bcfg['sample_size']
            actual_n = min(desired, len(X_train_p))
            actual_n = max(actual_n, bcfg['min_size'])
            background = X_train_p.sample(actual_n, random_state=RND)
            bg_path = os.path.join(out_dir, 'background.csv')
            background.to_csv(bg_path, index=False)

            # Instantiate SHAP explainer
            if isinstance(actual_model, (CatBoostRegressor, XGBRegressor)):
                expl = shap.TreeExplainer(actual_model, background)
            else:
                expl = shap.KernelExplainer(actual_model.predict, background)
            ev = np.atleast_1d(expl.expected_value)
            logger.info(f"Expected value shape for {method}/{target}: {ev.shape}")

            # Compute and save raw SHAP values
            raw = expl.shap_values(X_test_p)
            shap_vals = raw if not isinstance(raw, list) else raw[0] # For regression, shap_values returns array directly
            raw_path = os.path.join(out_dir, f"raw_{target}.npy")
            np.save(raw_path, shap_vals)

            # Save baseline
            baseline = ev[0] # For regression, expected_value is typically a single scalar
            base_path = os.path.join(out_dir, 'baseline.txt')
            with open(base_path, 'w') as f:
                f.write(str(baseline))

            # Compute interactions (Tree only) with subsample
            if isinstance(expl, shap.TreeExplainer):
                topn = settings['interaction']['top_n_features']
                subsz = settings['interaction']['subsample_size']
                feats = get_top_n_features(shap_vals, feat_names, n=topn)
                subset = X_test_p[feats]
                n_rows = min(subsz, len(subset))
                subp = subset.sample(n_rows, random_state=RND)
                iv = expl.shap_interaction_values(subp)
                int_path = os.path.join(out_dir, 'interaction.npy')
                np.save(int_path, iv)
            else:
                int_path = None # Ensure int_path is defined even if not used

            # Persist config
            cfg = {
                'model_path': model_path,
                'background_path': bg_path,
                'eval_data_path': test_path,
                'feature_names_path': fn_path,
                'raw_shap_path': raw_path,
                'baseline_path': base_path,
                'interaction_path': int_path,
                'explainer_type': type(expl).__name__,
                'explainer_params': {
                    'feature_perturbation': getattr(expl, 'feature_perturbation', None),
                    'nsamples': getattr(expl, 'nsamples', None)
                }
            }
            cfg_path = os.path.join(out_dir, 'config.json')
            with open(cfg_path, 'w') as f:
                json.dump(cfg, f, indent=2)
            # Add assertion for config.json
            assert os.path.exists(cfg_path), f"Phase1 failed to write config.json for {method}/{target}"

            logger.info(f"Phase1 complete for {method}/{target}")

        except Exception as e:
            logger.error(f"Error in Phase1 for {method}/{target}: {e}")
            continue

    logger.info("All regression Phase1 done.")

if __name__ == '__main__':
    run_regression_phase1()

configuration generated by an older version of XGBoost, please export the model by calling
`Booster.save_model` from that version first, then load it back in current version. See:

    https://xgboost.readthedocs.io/en/stable/tutorials/saving_model.html

for more details about differences between saving model and serializing.

2025-07-06 20:31:10,773 [INFO] Starting Phase1 for XGBoost/Storytelling
2025-07-06 20:31:10,952 [INFO] Expected value shape for XGBoost/Storytelling: (1,)
2025-07-06 20:31:27,763 [INFO] Starting Phase1 for XGBoost/SpaceUse
2025-07-06 20:31:27,865 [INFO] Expected value shape for XGBoost/SpaceUse: (1,)
2025-07-06 20:31:29,459 [INFO] Phase1 complete for XGBoost/SpaceUse
2025-07-06 20:31:29,460 [INFO] Starting Phase1 for CatBoost/Rhythm
2025-07-06 20:31:29,514 [INFO] Expected value shape for CatBoost/Rhythm: (1,)
2025-07-06 20:31:33,224 [INFO] Phase1 complete for CatBoost/Rhythm
2025-07-06 20:31:33,225 [INFO] Starting Phase1 for XGBoost/PublicInvolvement
2025-07-06 2

newest and lets be final try on phase2

In [None]:
import os
import json
import yaml
import logging
import joblib
import numpy as np
import pandas as pd
import shap
import matplotlib
matplotlib.use('Agg') # Set the Matplotlib backend to 'Agg' for headless plot generation
import matplotlib.pyplot as plt
from sklearn.pipeline import Pipeline
from sklearn.inspection import PartialDependenceDisplay
from xgboost import XGBClassifier, XGBRegressor
from catboost import CatBoostClassifier, CatBoostRegressor

# Logging setup
logging.basicConfig(
    level=logging.INFO,
    format="%(asctime)s [%(levelname)s] %(message)s",
    force=True
)
logger = logging.getLogger(__name__)

# Load settings
with open('settings.yaml') as f:
    settings = yaml.safe_load(f)
RND = settings.get('random_seed', 42)
PROJECT_ROOT = "/content/Evaluations_on_Robotic_Choreographies/"
SHAP_ROOT = os.path.join(PROJECT_ROOT, 'shap')

# Helper: unwrap pipeline

def unwrap_pipeline(model):
    preproc = None
    actual = model
    if isinstance(model, Pipeline):
        actual = model.steps[-1][1]
        if len(model.steps) > 1:
            preproc = Pipeline(model.steps[:-1])
    return actual, preproc

# Helper: apply preprocessing

def apply_preproc(df, preproc, feat_names):
    """Applies preprocessing if a preproc pipeline is provided."""
    if preproc is not None:
        # Ensure the preprocessor can handle DataFrame or convert to numpy array if needed
        arr = preproc.transform(df)
        return pd.DataFrame(arr, columns=feat_names, index=df.index)
    # If no preprocessor, return a copy of the DataFrame, ensuring column names are set if feat_names is available
    if feat_names is not None:
        return pd.DataFrame(df.values, columns=feat_names, index=df.index)
    return df.copy()

# Helper: top-N features by mean abs shap

def get_top_n_features(shap_vals, feat_names, n=10):
    means = np.abs(shap_vals).mean(axis=0)
    idx = np.argsort(means)[-n:][::-1]
    return [feat_names[i] for i in idx]

# Main Phase2
for domain in ['classification', 'regression']:
    base_dir = os.path.join(SHAP_ROOT, domain)
    if domain == 'classification':
           # This classification block remains unchanged.
        # one folder per target
        for target in os.listdir(base_dir):
            out_dir = os.path.join(base_dir, target)
            if not os.path.isdir(out_dir):
                logger.info(f"Skipping non-directory entry in classification: {out_dir}")
                continue
            cfg_path = os.path.join(out_dir, 'config.json')
            if not os.path.exists(cfg_path):
                logger.warning(f"Config file not found: {cfg_path}. Skipping {domain}/{target}.")
                continue
            cfg = json.load(open(cfg_path))
            model_path = cfg['model_path']
            bg_path = cfg['background_path']
            eval_path = cfg['eval_data_path']
            fn_path = cfg['feature_names_path']
            shap_path = cfg['raw_shap_path']
            expl_type = cfg['explainer_type']
            X_raw = pd.read_csv(eval_path)
            feat_names = pd.read_csv(fn_path, header=None).iloc[:,0].tolist()
            shap_vals = np.load(shap_path)
            background = pd.read_csv(bg_path)
            model = joblib.load(model_path)
            actual_model, preproc = unwrap_pipeline(model)
            X_proc = apply_preproc(X_raw, preproc, feat_names)
            if preproc is not None:
                estimator = Pipeline([('pre', preproc), ('model', actual_model)])
            else:
                estimator = actual_model
            pdp_cfg = settings['pdp_ice']
            features_for_pdp = feat_names[:pdp_cfg['top_n_features']]
            for feat in features_for_pdp:
                fig, ax = plt.subplots()
                try:
                    # Added check for constant feature
                    if X_raw[feat].nunique() < 2:
                        logger.warning(f"Skipping PDP/ICE for {domain}/{target}/{feat}: only {X_raw[feat].nunique()} unique value(s). Feature is constant or has insufficient variation for plotting.")
                        plt.close(fig) # Ensure any implicitly created figure is closed if the plot is skipped
                        continue # Skip to the next feature

                    PartialDependenceDisplay.from_estimator(
                         estimator, X_raw, [feat],
                        grid_resolution=pdp_cfg['grid_points'],
                        kind='both',
                        ice_lines_kw={'alpha': 0.3},
                        ax=ax,
                        target=1
                    )
                    plt.tight_layout()
                    savep = os.path.join(out_dir, f'plots_pdp_ice_{feat}.png')
                    fig.savefig(savep, bbox_inches='tight')
                    logger.info(f"Saved PDP/ICE for {domain}/{target}/{feat}")
                except Exception as e:
                    logger.warning(f"Could not plot PDP/ICE for {domain}/{target}/{feat}: {e}")
                finally:
                    plt.close(fig) # Ensure figure is closed even if an error occurred

            dep_n = settings['dependence']['top_n_features']
            features_for_dep_plot = feat_names[:dep_n]
            for feat in features_for_dep_plot:
                # Removed: fig = plt.figure() - Let SHAP manage figure creation initially
                try:
                    shap.dependence_plot(feat, shap_vals, X_proc,
                                          interaction_index='auto', show=False)
                    fig = plt.gcf() # Get the current figure that shap.dependence_plot has drawn on
                    savep = os.path.join(out_dir, f'dep_int_{feat}.png')
                    fig.savefig(savep, bbox_inches='tight')
                    logger.info(f"Saved dependence plot for {domain}/{target}/{feat}")
                except Exception as e:
                    logger.warning(f"Could not plot dependence plot for {domain}/{target}/{feat}: {e}")
                finally:
                    plt.close(fig) # Close figure after saving

            if expl_type == 'TreeExplainer':
                inter_cfg = settings['interaction']
                top_feats = get_top_n_features(shap_vals, feat_names, n=inter_cfg['top_n_features'])
                subset = X_proc[top_feats]
                rows = min(inter_cfg['subsample_size'], len(subset))
                subset_samp = subset.sample(rows, random_state=RND)
                explainer = shap.TreeExplainer(actual_model, background)
                inter_vals = explainer.shap_interaction_values(subset_samp)

                # --- FIX 2: HANDLE CLASSIFICATION LIST OUTPUT ---
                if isinstance(inter_vals, list):
                    # for binary/multiclass, take the “positive‐class” interaction matrix
                    inter_vals = inter_vals[1]

                mi = np.abs(inter_vals).mean(axis=0)
                df = pd.DataFrame(mi, index=top_feats, columns=top_feats)

                # --- FIX 1: EMIT A LONG "feature1,feature2,mean_interaction" TABLE ---
                # keep only upper triangle, stack into long form
                pairs = (
                    df.where(np.triu(np.ones(df.shape), k=1).astype(bool))
                      .stack()
                      .reset_index()
                      .rename(columns={
                          'level_0': 'feature1',
                          'level_1': 'feature2',
                          0:         'mean_interaction'
                      })
                )
                savef = os.path.join(out_dir, 'interaction_summary.csv')
                pairs.to_csv(savef, index=False)
                logger.info(f"Saved interaction summary for {domain}/{target}")

    # ----------------------------------------------------------------------------------
    # MODIFIED REGRESSION BLOCK STARTS HERE
    # ----------------------------------------------------------------------------------
    else: # regression
        logger.info("Starting modified regression analysis.")

        # MODIFIED: Changed keys to PascalCase to match phase1's output directories
        target_to_method_map = {
            "Storytelling": "XGBoost",
            "SpaceUse": "XGBoost",
            "Rhythm": "catboost",
            "PublicInvolvement": "XGBoost",
            "MovementTechnique": "XGBoost",
            "HumanReproducibility": "XGBoost",
            "HumanCharacterization": "catboost"
        }

        method_folder_map = {
            "catboost": "CatBoost",
            "ridge": "Ridge Pipeline",
            "XGBoost": "XGBoost"
        }

        for target, method_short_name in target_to_method_map.items():

            if method_short_name not in method_folder_map:
                logger.warning(f"Method '{method_short_name}' for target '{target}' is not defined in method_folder_map. Skipping.")
                continue

            method_folder_name = method_folder_map[method_short_name]

            # --- CHANGE: Define a single directory for both input and output ---
            # All files will be READ from and SAVED to this location.
            # This 'target' is now PascalCase and directly matches the folders created by phase1
            io_dir = os.path.join(base_dir, "result", target)
            os.makedirs(io_dir, exist_ok=True)

            logger.info(f"Processing target: '{target}' with method: '{method_folder_name}'")
            logger.info(f"  > Reading from and saving to: {io_dir}")

            # The config file is now expected inside the 'result' directory
            cfg_path = os.path.join(io_dir, 'config.json')
            if not os.path.exists(cfg_path):
                logger.warning(f"Config file not found: {cfg_path}. Did a previous step save it here? Skipping.")
                continue

            cfg = json.load(open(cfg_path))
            model_path = cfg['model_path']
            bg_path = cfg['background_path']
            eval_path = cfg['eval_data_path']
            fn_path = cfg['feature_names_path']
            raw_path = cfg['raw_shap_path']
            expl_type = cfg['explainer_type']
            X_raw = pd.read_csv(eval_path)
            feat_names = pd.read_csv(fn_path, header=None).iloc[:,0].tolist()
            shap_vals = np.load(raw_path)
            background = pd.read_csv(bg_path)
            model = None
            if method_folder_name == 'Ridge Pipeline':
                model = joblib.load(model_path)
            elif method_folder_name == 'CatBoost':
                model = CatBoostRegressor()
                model.load_model(model_path)
            elif method_folder_name == 'XGBoost':
                model = XGBRegressor()
                model.load_model(model_path)
            else:
                logger.warning(f"Unsupported regression method '{method_folder_name}' for {target}. Skipping.")
                continue
            actual_model, preproc = unwrap_pipeline(model)
            X_proc = apply_preproc(X_raw, preproc, feat_names)
            if preproc is not None:
                estimator = Pipeline([('pre', preproc), ('model', actual_model)])
            else:
                estimator = actual_model
            pdp_cfg = settings['pdp_ice']
            features_for_pdp = feat_names[:pdp_cfg['top_n_features']]
            for feat_orig in features_for_pdp:
                fig, ax = plt.subplots()
                try:
                    # Added check for constant feature
                    if X_raw[feat_orig].nunique() < 2:
                        logger.warning(f"Skipping PDP/ICE for {method_folder_name}/{target}/{feat_orig}: only {X_raw[feat_orig].nunique()} unique value(s). Feature is constant or has insufficient variation for plotting.")
                        plt.close(fig) # Ensure any implicitly created figure is closed if the plot is skipped
                        continue # Skip to the next feature

                    if method_folder_name == 'Ridge Pipeline':
                        feat_for_pdp = f'scale_all__{feat_orig}'
                    else:
                        feat_for_pdp = feat_orig
                    PartialDependenceDisplay.from_estimator(
                        estimator, X_raw, [feat_for_pdp],
                        grid_resolution=pdp_cfg['grid_points'],
                        kind='both',
                        ice_lines_kw={'alpha':0.3},
                        ax=ax
                       )
                    # Save plot to the unified I/O directory
                    savep = os.path.join(io_dir, f'plots_pdp_ice_{feat_orig}.png')
                    fig.savefig(savep, bbox_inches='tight')
                    logger.info(f"Saved PDP/ICE for {method_folder_name}/{target}/{feat_orig}")
                except Exception as e:
                    logger.warning(f"Could not plot PDP/ICE for {method_folder_name}/{target}/{feat_orig}: {e}")
                finally:
                    plt.close(fig) # Close figure after saving

            dep_n = settings['dependence']['top_n_features']
            features_for_dep_plot = feat_names[:dep_n]
            for feat in features_for_dep_plot:
                # Removed: fig = plt.figure() - Let SHAP manage figure creation initially
                try:
                    shap.dependence_plot(feat, shap_vals, X_proc,
                                          interaction_index='auto', show=False)
                    fig = plt.gcf() # Get the current figure that shap.dependence_plot has drawn on
                    savep = os.path.join(io_dir, f'dep_int_{feat}.png')
                    fig.savefig(savep, bbox_inches='tight')
                    logger.info(f"Saved dependence plot for {method_folder_name}/{target}/{feat}")
                except Exception as e:
                    logger.warning(f"Could not plot dependence plot for {method_folder_name}/{target}/{feat}: {e}")
                finally:
                    plt.close(fig) # Close figure after saving

            if expl_type == 'TreeExplainer':
                inter_cfg = settings['interaction']
                top_feats = get_top_n_features(shap_vals, feat_names, n=inter_cfg['top_n_features'])
                subset = X_proc[top_feats]
                rows = min(inter_cfg['subsample_size'], len(subset))
                subset_samp = subset.sample(rows, random_state=RND)
                explainer = shap.TreeExplainer(actual_model, background)
                inter_vals = explainer.shap_interaction_values(subset_samp)

                # Regression does not return a list, so no list handling is needed here.

                mi = np.abs(inter_vals).mean(axis=0)
                df = pd.DataFrame(mi, index=top_feats, columns=top_feats)

                # --- FIX 1: EMIT A LONG "feature1,feature2,mean_interaction" TABLE ---
                # keep only upper triangle, stack into long form
                pairs = (
                    df.where(np.triu(np.ones(df.shape), k=1).astype(bool))
                      .stack()
                      .reset_index()
                      .rename(columns={
                          'level_0': 'feature1',
                          'level_1': 'feature2',
                          0:         'mean_interaction'
                      })
                )
                savef = os.path.join(io_dir, 'interaction_summary.csv')
                pairs.to_csv(savef, index=False)
                logger.info(f"Saved interaction summary for {method_folder_name}/{target}")

2025-07-06 20:32:11,461 [INFO] Saved PDP/ICE for classification/HumanCharacterization/timeDuration
2025-07-06 20:32:13,068 [INFO] Saved PDP/ICE for classification/HumanCharacterization/nMovements
2025-07-06 20:32:15,972 [INFO] Saved PDP/ICE for classification/HumanCharacterization/movementsDifficulty
2025-07-06 20:32:17,587 [INFO] Saved PDP/ICE for classification/HumanCharacterization/acrobaticMovements
2025-07-06 20:32:18,073 [INFO] Saved dependence plot for classification/HumanCharacterization/timeDuration
2025-07-06 20:32:18,584 [INFO] Saved dependence plot for classification/HumanCharacterization/nMovements
2025-07-06 20:32:19,170 [INFO] Saved dependence plot for classification/HumanCharacterization/movementsDifficulty
2025-07-06 20:32:19,532 [INFO] Saved dependence plot for classification/HumanCharacterization/robotSpeech
2025-07-06 20:32:20,017 [INFO] Saved dependence plot for classification/HumanCharacterization/acrobaticMovements
2025-07-06 20:32:20,085 [INFO] Saved interaction

hope last try on phase3 phase-3

In [None]:
import os
import json
import yaml
import logging
import joblib
import numpy as np
import pandas as pd
import shap
from sklearn.pipeline import Pipeline
from sklearn.utils import resample # For bootstrap sampling
from scipy.stats import spearmanr # For Spearman's rank correlation (though not used for per-feature stability directly now)
from xgboost import XGBClassifier, XGBRegressor
from catboost import CatBoostClassifier, CatBoostRegressor
# import matplotlib.pyplot as plt # Commented out as no plots are generated in Phase 3 as per current spec

# Logging setup
logging.basicConfig(
    level=logging.INFO,
    format="%(asctime)s [%(levelname)s] %(message)s",
    force=True
)
logger = logging.getLogger(__name__)

# Load settings
try:
    with open('settings.yaml') as f:
        settings = yaml.safe_load(f)
except FileNotFoundError:
    logger.error("settings.yaml not found. Please ensure it's in the current directory.")
    exit(1) # Exit with an error code

RND = settings.get('random_seed', 42)
PROJECT_ROOT = "/content/Evaluations_on_Robotic_Choreographies/"
SHAP_ROOT = os.path.join(PROJECT_ROOT, 'shap')

# Validate essential settings for Phase 3
if 'stability' not in settings:
    logger.error("Missing 'stability' section in settings.yaml. Please add it with 'bootstrap_samples' and 'random_seed_offset'.")
    exit(1)
if 'bootstrap_samples' not in settings['stability']:
    logger.error("Missing 'stability.bootstrap_samples' in settings.yaml. Please add it.")
    exit(1)
if 'random_seed_offset' not in settings['stability']:
    logger.error("Missing 'stability.random_seed_offset' in settings.yaml. Please add it.")
    exit(1)
if 'subgroup_analysis' not in settings:
    logger.warning("Missing 'subgroup_analysis' section in settings.yaml. Subgroup analysis will be skipped.")
    settings['subgroup_analysis'] = {'min_group_size': 0, 'categorical_features': []}

# Helper: unwrap pipeline
def unwrap_pipeline(model):
    preproc = None
    actual = model
    if isinstance(model, Pipeline):
        actual = model.steps[-1][1]
        if len(model.steps) > 1:
            preproc = Pipeline(model.steps[:-1])
    return actual, preproc

# Helper: apply preprocessing
def apply_preproc(df, preproc, feat_names):
    """Applies preprocessing if a preproc pipeline is provided."""
    if preproc is not None:
        arr = preproc.transform(df)
        return pd.DataFrame(arr, columns=feat_names, index=df.index)
    if feat_names is not None:
        if not isinstance(df, pd.DataFrame):
            df = pd.DataFrame(df, columns=feat_names, index=df.index if hasattr(df, 'index') else None)
        return df.copy()
    return df.copy()

# Helper: top-N features by mean abs shap
def get_top_n_features(shap_vals, feat_names, n=None):
    """
    Returns feature names ranked by mean absolute SHAP values. If n is None, returns all features ranked.
    """
    if shap_vals.ndim > 1:
        means = np.abs(shap_vals).mean(axis=0)
    else:
        means = np.abs(shap_vals)

    if n is None or n >= len(feat_names):
        idx = np.argsort(means)[::-1]
    else:
        actual_n = min(n, len(feat_names))
        idx = np.argsort(means)[-actual_n:][::-1]

    return [feat_names[i] for i in idx]

# New helper function to load models based on file type
def load_model(model_path):
    """Loads a model from the given path."""
    if model_path.endswith('.pkl') or model_path.endswith('.joblib'):
        return joblib.load(model_path)
    elif model_path.endswith('.cbm'):
        try:
            return CatBoostRegressor().load_model(model_path)
        except:
            return CatBoostClassifier().load_model(model_path)
    elif model_path.endswith(('.json', '.json.bst')):
        import xgboost as xgb
        bst = xgb.Booster()
        bst.load_model(model_path)
        return bst
    elif model_path.endswith(('.keras', '.h5')):
        from tensorflow import keras
        return keras.models.load_model(model_path)
    else:
        raise ValueError(f"Unsupported model file extension: {model_path}")

# Main Phase3 loop
logger.info("Starting Phase 3: Stability & Subgroup Analysis")

for domain in ['classification', 'regression']:
    base_dir = os.path.join(SHAP_ROOT, domain)
    if not os.path.exists(base_dir):
        logger.warning(f"Base directory not found for {domain}: {base_dir}. Skipping.")
        continue

    if domain == 'classification':
        target_dirs = [d for d in os.listdir(base_dir) if os.path.isdir(os.path.join(base_dir, d))]
    else: # regression
        regression_result_dir = os.path.join(base_dir, 'result')
        if not os.path.exists(regression_result_dir):
            logger.warning(f"Regression result directory not found: {regression_result_dir}. Skipping.")
            continue
        target_dirs = [d for d in os.listdir(regression_result_dir) if os.path.isdir(os.path.join(regression_result_dir, d))]

    if not target_dirs:
        logger.info(f"No targets found in {base_dir} for {domain}. Skipping.")
        continue

    for target in target_dirs:
        if domain == 'classification':
            io_dir = os.path.join(base_dir, target)
        else: # regression
            io_dir = os.path.join(regression_result_dir, target)

        logger.info(f"Processing {domain}/{target}")

        cfg_path = os.path.join(io_dir, 'config.json')
        if not os.path.exists(cfg_path):
            logger.warning(f"Config file not found: {cfg_path}. Skipping {domain}/{target}.")
            continue

        try:
            cfg = json.load(open(cfg_path))
            model_path = cfg['model_path']
            bg_path = cfg['background_path']
            eval_data_path = cfg['eval_data_path']
            fn_path = cfg['feature_names_path']
            shap_path = cfg['raw_shap_path']
            expl_type = cfg['explainer_type']

            X_raw = pd.read_csv(eval_data_path)
            feat_names = pd.read_csv(fn_path, header=None).iloc[:,0].tolist()
            shap_vals = np.load(shap_path)

            if shap_vals.shape[1] != len(feat_names):
                logger.error(f"Mismatch: SHAP values (columns: {shap_vals.shape[1]}) do not match number of feature names ({len(feat_names)}) for {domain}/{target}. Skipping.")
                continue

            background = pd.read_csv(bg_path)

            model = load_model(model_path)
            logger.info(f"Loaded model from {model_path}")

            actual_model, preproc = unwrap_pipeline(model)
            X_proc = apply_preproc(X_raw, preproc, feat_names)

            explainer = None
            if expl_type == 'TreeExplainer':
                explainer = shap.TreeExplainer(actual_model, background)
            elif expl_type == 'KernelExplainer':
                if domain == 'classification':
                    explainer = shap.KernelExplainer(actual_model.predict_proba, background)
                else: # regression
                    explainer = shap.KernelExplainer(actual_model.predict, background)
            elif expl_type == 'LinearExplainer':
                explainer = shap.LinearExplainer(actual_model, background)

            if explainer is None:
                logger.error(f"Unsupported explainer type '{expl_type}' for {domain}/{target}. Skipping.")
                continue

            # --- Bootstrap Stability Analysis ---
            stability_cfg = settings['stability']
            n_bootstrap_samples = stability_cfg['bootstrap_samples']
            random_seed_offset = stability_cfg['random_seed_offset']
            stability_top_n_features = stability_cfg.get('top_n_features', len(feat_names)) # Default to all features

            logger.info(f"Starting bootstrap stability analysis for {domain}/{target} with {n_bootstrap_samples} samples.")

            bootstrap_feature_ranks = []
            bootstrap_mean_abs_shaps = []

            for i in range(n_bootstrap_samples):
                current_seed = RND + random_seed_offset + i

                if X_proc.empty:
                    logger.warning(f"X_proc is empty for {domain}/{target}. Skipping bootstrap sample {i}.")
                    continue

                n_samples_to_take = len(X_proc)

                bootstrap_indices = resample(
                    np.arange(len(X_proc)),
                    replace=True,
                    n_samples=n_samples_to_take,
                    random_state=current_seed
                )

                X_bootstrap = X_proc.iloc[bootstrap_indices]
                # shap_bootstrap = shap_vals[bootstrap_indices] # No longer directly using pre-computed shap_vals

                bootstrap_shap_values = explainer.shap_values(X_bootstrap)

                if isinstance(bootstrap_shap_values, list):
                    bootstrap_shap_values_combined = np.sum([np.abs(s) for s in bootstrap_shap_values], axis=0)
                else:
                    bootstrap_shap_values_combined = np.abs(bootstrap_shap_values)

                current_mean_abs_shaps = np.mean(bootstrap_shap_values_combined, axis=0)
                bootstrap_mean_abs_shaps.append(current_mean_abs_shaps)

                current_ranks = pd.Series(current_mean_abs_shaps, index=feat_names).rank(ascending=False)
                bootstrap_feature_ranks.append(current_ranks)

            if not bootstrap_feature_ranks:
                logger.warning(f"No bootstrap samples were processed for {domain}/{target}. Skipping stability analysis.")
                continue

            # Convert list of ranks to DataFrame for stability calculation
            # Each column is a feature, each row is a bootstrap sample's rank for that feature
            rank_df = pd.DataFrame(bootstrap_feature_ranks, columns=feat_names)

            # --- CORRECTED PER-FEATURE STABILITY CALCULATION ---
            # Calculate the standard deviation of ranks for each feature across bootstrap samples
            rank_stds = rank_df.std(axis=0)

            # Normalize the standard deviation to get a stability score (0 to 1, 1 being most stable)
            # Max possible std for ranks 1 to N is roughly N / sqrt(12) for uniform distribution
            # A simpler normalization: 1 - (std / max_possible_std_deviation)
            # Max possible std deviation for ranks 1 to N is when half are rank 1 and half are rank N.
            # For N features, ranks range from 1 to N.
            # The maximum possible standard deviation of ranks for a feature is roughly (N_features - 1) / 2
            # when ranks are perfectly split between 1 and N_features.
            num_features = len(feat_names)
            if num_features > 1:
                max_possible_rank_std = np.std(np.arange(1, num_features + 1)) # Std of perfectly ordered ranks
            else:
                max_possible_rank_std = 1.0 # If only one feature, it's perfectly stable

            # Avoid division by zero if max_possible_rank_std is 0 (e.g., if num_features is 0 or 1)
            mean_bootstrap_rho = 1 - (rank_stds / max_possible_rank_std) if max_possible_rank_std > 0 else pd.Series(1.0, index=feat_names)

            # Ensure stability scores are not negative (can happen if std > max_possible_std due to approximations/sampling)
            mean_bootstrap_rho = mean_bootstrap_rho.clip(lower=0, upper=1)

            # Save mean bootstrap rho per feature (now representing actual stability)
            mean_rho_save_path = os.path.join(io_dir, f'mean_bootstrap_rho_{domain}_{target}.csv')
            mean_bootstrap_rho.to_csv(mean_rho_save_path, header=['stability_rho'])
            logger.info(f"Saved mean bootstrap rho (stability) per feature to {mean_rho_save_path}")

            # Optionally, save the full feature-feature correlation matrix (if still desired for co-ranking analysis)
            # This is the original calculation pointed out, which is valid for co-ranking, just not for per-feature stability
            feature_rho_matrix = rank_df.corr(method='spearman')
            stability_matrix_save_path = os.path.join(io_dir, f'bootstrap_stability_features_{domain}_{target}.csv')
            feature_rho_matrix.to_csv(stability_matrix_save_path)
            logger.info(f"Saved feature-feature bootstrap correlation matrix to {stability_matrix_save_path}")


            # --- Subgroup Analysis (remains unchanged) ---
            subgroup_cfg = settings['subgroup_analysis']
            min_group_size = subgroup_cfg['min_group_size']
            categorical_features = subgroup_cfg['categorical_features']

            if not categorical_features:
                logger.info(f"No categorical features defined for subgroup analysis in settings.yaml for {domain}/{target}. Skipping subgroup analysis.")
            else:
                logger.info(f"Starting subgroup analysis for {domain}/{target}.")
                subgroup_shap_means = {}

                for cat_feat in categorical_features:
                    if cat_feat not in X_raw.columns:
                        logger.warning(f"Categorical feature '{cat_feat}' not found in X_raw for {domain}/{target}. Skipping.")
                        continue

                    categories = X_raw[cat_feat].unique()
                    for category in categories:
                        subgroup_indices = X_raw[X_raw[cat_feat] == category].index

                        if len(subgroup_indices) < min_group_size:
                            logger.info(f"Subgroup '{category}' for feature '{cat_feat}' has {len(subgroup_indices)} samples, which is less than min_group_size ({min_group_size}). Skipping.")
                            continue

                        logger.info(f"Analyzing subgroups for feature: {cat_feat} - Category: {category} with {len(subgroup_indices)} samples.")

                        shap_subgroup = shap_vals[subgroup_indices]

                        if shap_subgroup.ndim == 3:
                            mean_abs_shap = np.mean(np.abs(shap_subgroup), axis=(0, 1))
                        elif shap_subgroup.ndim == 2:
                            mean_abs_shap = np.mean(np.abs(shap_subgroup), axis=0)
                        else:
                            mean_abs_shap = np.abs(shap_subgroup)
                            if mean_abs_shap.ndim > 1:
                                mean_abs_shap = mean_abs_shap.flatten()

                        col_name = f"subgroup_{cat_feat}_{category}_mean_abs_shap"
                        subgroup_shap_means[col_name] = pd.Series(mean_abs_shap, index=feat_names[:len(mean_abs_shap)])

                if subgroup_shap_means:
                    subgroup_df = pd.DataFrame(subgroup_shap_means)
                    subgroup_save_path = os.path.join(io_dir, f'subgroup_mean_abs_shap_{domain}_{target}.csv')
                    subgroup_df.to_csv(subgroup_save_path)
                    logger.info(f"Saved subgroup mean absolute SHAP values to {subgroup_save_path}")
                else:
                    logger.info(f"No subgroups met the minimum size requirement or no categorical features were processed successfully for {domain}/{target}. No subgroup analysis CSV saved.")

        except Exception as e:
            logger.error(f"An unexpected error occurred while processing {domain}/{target}: {e}", exc_info=True)
            continue

logger.info("Phase 3: Stability & Subgroup Analysis - Complete.")


2025-07-06 20:35:07,449 [INFO] Starting Phase 3: Stability & Subgroup Analysis
2025-07-06 20:35:07,453 [INFO] Processing classification/HumanCharacterization
2025-07-06 20:35:07,477 [INFO] Loaded model from /content/Evaluations_on_Robotic_Choreographies/classification/saved_overall_best_models/OVERALL_BEST_XGBoost_GridCV_HumanCharacterization.joblib
2025-07-06 20:35:07,508 [INFO] Starting bootstrap stability analysis for classification/HumanCharacterization with 100 samples.
2025-07-06 20:38:29,769 [INFO] Saved mean bootstrap rho (stability) per feature to /content/Evaluations_on_Robotic_Choreographies/shap/classification/HumanCharacterization/mean_bootstrap_rho_classification_HumanCharacterization.csv
2025-07-06 20:38:29,777 [INFO] Saved feature-feature bootstrap correlation matrix to /content/Evaluations_on_Robotic_Choreographies/shap/classification/HumanCharacterization/bootstrap_stability_features_classification_HumanCharacterization.csv
2025-07-06 20:38:29,778 [INFO] Starting subg

phase 4

In [None]:
import pandas as pd
import numpy as np
import shap
import json
import os
import glob
import logging
import joblib
import yaml
import matplotlib.pyplot as plt
from types import SimpleNamespace
import xgboost as xgb
from catboost import CatBoostClassifier, CatBoostRegressor
from sklearn.pipeline import Pipeline

# --- Setup Logging ---
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)

# --- Configuration (Load from settings.yaml) ---
class Settings:
    def __init__(self, config_file='settings.yaml'):
        try:
            with open(config_file, 'r') as f:
                settings_data = yaml.safe_load(f)
        except FileNotFoundError:
            logger.error(f"Configuration file '{config_file}' not found. Please ensure it's in the current directory.")
            exit(1)

        self.random_seed = settings_data.get('random_seed', 42)
        self.background = self._get_nested_setting(settings_data, 'background', {'min_size': 100})
        self.prototypes = self._get_nested_setting(settings_data, 'prototypes', {'n_high': 1, 'n_low': 1})
        self.report = self._get_nested_setting(settings_data, 'report', {'stability_rho_threshold': 0.8})

    def _get_nested_setting(self, data, key, default):
        cfg = default.copy()
        if key in data and isinstance(data[key], dict):
            cfg.update(data[key])
        return SimpleNamespace(**cfg)

settings = Settings()

# --- Global Paths ---
PROJECT_ROOT = "/content/Evaluations_on_Robotic_Choreographies/"
BASE_SHAP_DIR = os.path.join(PROJECT_ROOT, "shap")
BASE_PLOTS_DIR = os.path.join(PROJECT_ROOT, "plots")
BASE_REPORTS_DIR = os.path.join(PROJECT_ROOT, "reports")

os.makedirs(BASE_PLOTS_DIR, exist_ok=True)
os.makedirs(BASE_REPORTS_DIR, exist_ok=True)

# --- Helper Functions ---

def unwrap_pipeline(model):
    """Utility to get the final estimator and preprocessor from a scikit-learn pipeline."""
    preproc = None
    actual_model = model
    if isinstance(model, Pipeline):
        actual_model = model.steps[-1][1]
        if len(model.steps) > 1:
            preproc = Pipeline(model.steps[:-1])
    return actual_model, preproc

def load_model_intelligently(model_path, domain):
    """Loads a model based on its file extension."""
    if not os.path.exists(model_path):
        logger.error(f"Model file not found: {model_path}")
        return None

    logger.info(f"Loading model from: {model_path}")
    if model_path.endswith((".pkl", ".joblib")):
        return joblib.load(model_path)
    elif model_path.endswith(".cbm"):
        if domain == 'classification':
            model = CatBoostClassifier()
        else:
            model = CatBoostRegressor()
        model.load_model(model_path)
        return model
    elif model_path.endswith(".json"):
        if domain == 'classification':
            model = xgb.XGBClassifier()
        else:
            model = xgb.XGBRegressor()
        model.load_model(model_path)
        return model
    else:
        logger.error(f"Unsupported model file extension for: {model_path}")
        return None

def get_explainer_and_data(config_path):
    """Loads explainer, model, and data based on the config."""
    try:
        with open(config_path, 'r') as f:
            config = json.load(f)

        # Correctly determine domain from path
        io_dir = os.path.dirname(config_path)
        domain_dir = os.path.dirname(io_dir)
        if os.path.basename(domain_dir) == 'result': # Handle regression's nested structure
            domain_dir = os.path.dirname(domain_dir)
        domain = os.path.basename(domain_dir)

        # Load essential components
        model_path = config["model_path"]
        background_path = config["background_path"]
        eval_data_path = config["eval_data_path"]
        feature_names_path = config["feature_names_path"]

        # Use the intelligent loader
        model = load_model_intelligently(model_path, domain)
        if model is None: return None, None, None, None, None

        actual_model, preproc = unwrap_pipeline(model)

        background_data = pd.read_csv(background_path)
        X_eval_raw = pd.read_csv(eval_data_path)

        with open(feature_names_path, 'r') as f:
            feature_names = [line.strip() for line in f if line.strip()]

        # Apply preprocessing if it exists
        if preproc:
            X_eval_proc = pd.DataFrame(preproc.transform(X_eval_raw), columns=feature_names, index=X_eval_raw.index)
        else:
            X_eval_proc = X_eval_raw.copy()
            X_eval_proc.columns = feature_names

        # Instantiate explainer with the ACTUAL model
        explainer = shap.TreeExplainer(actual_model, background_data)

        return explainer, actual_model, X_eval_proc, feature_names, domain

    except Exception as e:
        logger.error(f"Failed to load components for {config_path}: {e}", exc_info=True)
        return None, None, None, None, None

# --- Main Phase 4 Execution ---
def run_phase4():
    logger.info("Starting Phase 4: Synthesize the Narrative")

    # --- 2. Prototypical Decision Plots ---
    logger.info("Generating Prototypical Decision Plots.")
    all_config_files = glob.glob(os.path.join(BASE_SHAP_DIR, "**", "config.json"), recursive=True)

    for config_path in all_config_files:
        try:
            explainer, model, X_proc, feature_names, domain = get_explainer_and_data(config_path)
            if not all([explainer, model, X_proc is not None, feature_names]):
                logger.warning(f"Skipping decision plots for {config_path} due to loading errors.")
                continue

            io_dir = os.path.dirname(config_path)
            target = os.path.basename(io_dir)
            model_filename = os.path.basename(json.load(open(config_path))["model_path"])
            method = model_filename.split('_')[2] if 'OVERALL_BEST' in model_filename else model_filename.split('_')[0]

            raw_shap_path = json.load(open(config_path))["raw_shap_path"]
            shap_vals = np.load(raw_shap_path, allow_pickle=True)

            # For classification, SHAP values can be a list of arrays. We typically want the one for the positive class.
            shap_vals_for_plot = shap_vals[1] if isinstance(shap_vals, list) else shap_vals

            # Get model predictions to find high/low cases
            # NO MORE DMatrix. The sklearn wrappers handle it.
            if domain == 'classification':
                predictions = model.predict_proba(X_proc)[:, 1] # Probability of positive class
            else: # Regression
                predictions = model.predict(X_proc)

            sorted_indices = np.argsort(predictions)
            high_indices = sorted_indices[-settings.prototypes.n_high:]
            low_indices = sorted_indices[:settings.prototypes.n_low]

            expected_value = explainer.expected_value
            # Handle case where expected_value is a list (for classification)
            if isinstance(expected_value, (list, np.ndarray)) and len(expected_value) > 1:
                expected_value = expected_value[1]

            # Generate and save plots
            for i, idx in enumerate(high_indices):
                plot_path = os.path.join(BASE_PLOTS_DIR, f"decision_{method}_{target}_high_{i+1}.png")
                shap.decision_plot(expected_value, shap_vals_for_plot[idx], X_proc.iloc[[idx]], show=False)
                plt.savefig(plot_path, bbox_inches='tight')
                plt.close()

            for i, idx in enumerate(low_indices):
                plot_path = os.path.join(BASE_PLOTS_DIR, f"decision_{method}_{target}_low_{i+1}.png")
                shap.decision_plot(expected_value, shap_vals_for_plot[idx], X_proc.iloc[[idx]], show=False)
                plt.savefig(plot_path, bbox_inches='tight')
                plt.close()

            logger.info(f"Generated decision plots for {method}-{target}")

        except Exception as e:
            logger.error(f"Error generating decision plots for {config_path}: {e}", exc_info=True)
            continue

    logger.info("Phase 4: Synthesize the Narrative - Complete.")

if __name__ == "__main__":
    plt.ioff()
    run_phase4()



2025-07-06 23:07:22,137 [INFO] Starting Phase 4: Synthesize the Narrative
2025-07-06 23:07:22,138 [INFO] Generating Prototypical Decision Plots.
2025-07-06 23:07:22,143 [INFO] Loading model from: /content/Evaluations_on_Robotic_Choreographies/regression/Tuned Models/CatBoost/catboost_humancharacterization.cbm
2025-07-06 23:07:23,358 [INFO] Generated decision plots for catboost-HumanCharacterization
2025-07-06 23:07:23,359 [INFO] Loading model from: /content/Evaluations_on_Robotic_Choreographies/regression/Tuned Models/XGBoost/xgb_model_storytelling.json
2025-07-06 23:07:24,459 [INFO] Generated decision plots for xgb-Storytelling
2025-07-06 23:07:24,460 [INFO] Loading model from: /content/Evaluations_on_Robotic_Choreographies/regression/Tuned Models/XGBoost/xgb_model_movementtechnique.json
2025-07-06 23:07:25,820 [INFO] Generated decision plots for xgb-MovementTechnique
2025-07-06 23:07:25,821 [INFO] Loading model from: /content/Evaluations_on_Robotic_Choreographies/regression/Tuned Mod

In [None]:
import os
import json
import numpy as np
import pandas as pd
import logging
import yaml

# --- Setup Logging ---
logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger(__name__)

# --- Load Settings ---
def load_settings(config_file='settings.yaml'):
    try:
        with open(config_file, 'r') as f:
            data = yaml.safe_load(f)
    except FileNotFoundError:
        logger.error(f"Settings file '{config_file}' not found.")
        raise
    return data

# --- Compute mean absolute SHAP values ---
def compute_mean_abs_shap(shap_values, feature_names):
    if isinstance(shap_values, list):
        arr = shap_values[1] if len(shap_values) > 1 else shap_values[0]
    else:
        arr = shap_values
    arr = np.asarray(arr)
    if arr.ndim != 2:
        logger.error(f"Expected 2D SHAP array, got shape {arr.shape}")
        return pd.Series([], dtype=float)
    # align feature names
    if arr.shape[1] != len(feature_names):
        n = min(arr.shape[1], len(feature_names))
        feature_names = feature_names[:n]
        arr = arr[:, :n]
    mean_abs = np.abs(arr).mean(axis=0)
    return pd.Series(mean_abs, index=feature_names)

# --- Generate Combined Markdown Snippet ---
def generate_combined_snippet(df_summary, out_file):
    os.makedirs(os.path.dirname(out_file) or '.', exist_ok=True)
    with open(out_file, 'w') as f:
        f.write(f"# Automated SHAP Report Snippets\n\n")
        f.write("This document synthesizes the top features across all methods and targets.\n\n---\n\n")
        for (method, target), group in df_summary.groupby(['method', 'target']):
            f.write(f"## {method} on {target}\n\n")
            f.write("**Key Findings**\n\n")
            top_feats = group.nlargest(3, 'mean_shap')
            for _, row in top_feats.iterrows():
                f.write(f"- **{row['feature']}**: mean SHAP = {row['mean_shap']:.4f}\n")
            f.write("\n---\n\n")
    logger.info(f"Combined snippet written to {out_file}")

# --- Main Execution ---
def run_phase4_summary(shap_root='shap', output_csv='final_summary.csv', snippet_file='report_snippets.md'):
    entries = []
    # scan for configs
    for root, _, files in os.walk(shap_root):
        if 'config.json' not in files:
            continue
        cfg_path = os.path.join(root, 'config.json')
        try:
            with open(cfg_path, 'r') as f:
                cfg = json.load(f)
            # derive domain/target
            rel = os.path.relpath(root, shap_root).split(os.sep)
            if rel[0] == 'regression' and rel[1] == 'result':
                domain, target = rel[0], rel[2]
            else:
                domain, target = rel[0], rel[1]
            # method
            model_file = os.path.basename(cfg.get('model_path',''))
            method = os.path.splitext(model_file)[0]
            # load names & values
            fn_path = cfg.get('feature_names_path')
            raw_path = cfg.get('raw_shap_path')
            if not os.path.exists(fn_path) or not os.path.exists(raw_path):
                logger.error(f"Missing files for {method}-{target}")
                continue
            feature_names = [ln.strip() for ln in open(fn_path) if ln.strip()]
            shap_vals = np.load(raw_path, allow_pickle=True)
            mean_shap = compute_mean_abs_shap(shap_vals, feature_names)
            for feat, val in mean_shap.items():
                entries.append({
                    'method': method,
                    'target': target,
                    'feature': feat,
                    'mean_shap': val
                })
        except Exception as e:
            logger.error(f"Error at {cfg_path}: {e}")
            continue
    if not entries:
        logger.warning("No summary entries found.")
        return
    # assemble DataFrame
    df = pd.DataFrame(entries)
    df = df.sort_values(['method','target','mean_shap'], ascending=[True, True, False])
    df.to_csv(output_csv, index=False)
    logger.info(f"Final summary CSV: {output_csv}")
    # generate combined markdown snippet
    generate_combined_snippet(df, snippet_file)

if __name__ == '__main__':
    settings = load_settings()
    shap_root = "/content/Evaluations_on_Robotic_Choreographies/shap"
    output_csv = settings.get('final_summary_file','final_summary.csv')
    snippet_file = settings.get('snippet_file','report_snippets.md')
    run_phase4_summary(shap_root, output_csv, snippet_file)


2025-07-06 23:07:35,447 [INFO] Final summary CSV: final_summary.csv
2025-07-06 23:07:35,480 [INFO] Combined snippet written to report_snippets.md


In [None]:
import os
from zipfile import ZipFile, ZIP_DEFLATED
from google.colab import files

def zip_folder_zipfile(folder_path, output_zip_path):
    """
    Zips folder_path (including its contents) into output_zip_path.
    Preserves directory structure.
    """
    with ZipFile(output_zip_path, 'w', compression=ZIP_DEFLATED) as zipf:
        for root, dirs, files in os.walk(folder_path):
            for file in files:
                full_path = os.path.join(root, file)
                # Store paths relative to the root folder
                rel_path = os.path.relpath(full_path, start=os.path.dirname(folder_path))
                zipf.write(full_path, arcname=rel_path)
    print(f"Created: {output_zip_path}")
zip_folder_zipfile('/content/Evaluations_on_Robotic_Choreographies/shap', '/content/shap.zip')

# Replace 'your_file.zip' with the actual name of your zip file
files.download('/content/shap.zip')

Created: /content/shap.zip


<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>