In [None]:
# mount google drive
from google.colab import drive
drive.mount('/content/drive')

import sys
sys.path.insert(0, '/content/drive/MyDrive/pd-interpretability')

In [None]:
# install dependencies
!pip install -q transformers datasets librosa scipy scikit-learn tqdm

In [None]:
import numpy as np
import pandas as pd
import json
from pathlib import Path
import matplotlib.pyplot as plt
import seaborn as sns
from scipy import stats

# set style for publication-quality figures
plt.style.use('seaborn-v0_8-whitegrid')
plt.rcParams.update({
    'font.size': 12,
    'axes.titlesize': 14,
    'axes.labelsize': 12,
    'xtick.labelsize': 10,
    'ytick.labelsize': 10,
    'legend.fontsize': 10,
    'figure.dpi': 150
})

np.random.seed(42)

## 1. Configuration and Data Loading

In [None]:
# configuration
CONFIG = {
    'data_path': '/content/drive/MyDrive/pd-interpretability/data',
    'activations_path': '/content/drive/MyDrive/pd-interpretability/data/activations',
    'output_path': '/content/drive/MyDrive/pd-interpretability/results/probing',
    'random_seed': 42
}

# create output directory
output_path = Path(CONFIG['output_path'])
output_path.mkdir(parents=True, exist_ok=True)

print(f"output directory: {output_path}")

In [None]:
# load pre-extracted activations
activations_path = Path(CONFIG['activations_path'])

# load activations and metadata
activations_file = activations_path / 'activations.npy'
metadata_file = activations_path / 'metadata.json'

if activations_file.exists():
    activations = np.load(activations_file)
    with open(metadata_file, 'r') as f:
        metadata = json.load(f)
    
    print(f"loaded activations: {activations.shape}")
    print(f"samples: {metadata.get('n_samples', len(metadata.get('labels', [])))}")
    print(f"layers: {activations.shape[1]}")
    print(f"hidden size: {activations.shape[2]}")
else:
    print("activations not found, need to run extraction first")
    activations = None
    metadata = None

In [None]:
# extract labels and subject ids from metadata
if metadata:
    labels = np.array(metadata['labels'])
    subject_ids = np.array(metadata['subject_ids'])
    
    print(f"label distribution: PD={sum(labels==1)}, HC={sum(labels==0)}")
    print(f"unique subjects: {len(np.unique(subject_ids))}")

## 2. Layer-wise PD Classification Probing

For each transformer layer, train a linear classifier to predict PD vs HC.
Uses leave-one-subject-out cross-validation for unbiased estimates.

In [None]:
from src.models.probes import LayerwiseProber

# run layer-wise probing
prober = LayerwiseProber(task='classification', regularization=1.0)

print("running layer-wise pd classification probing...")
print("(using leave-one-subject-out cross-validation)\n")

probing_results = prober.probe_all_layers(
    activations,
    labels,
    groups=subject_ids
)

print("\nlayer-wise probing accuracy:")
print("-" * 50)
for layer_idx, result in sorted(probing_results.items()):
    print(f"layer {layer_idx:2d}: {result['mean']:.3f} ± {result['std']:.3f}")

In [None]:
from src.utils.visualization import plot_layerwise_probing

# create publication-quality figure
fig = plot_layerwise_probing(
    probing_results,
    title="layer-wise pd classification probing accuracy",
    save_path=str(output_path / 'layerwise_probing.png'),
    chance_level=0.5
)

plt.show()

# identify best layer
best_layer = max(probing_results.keys(), key=lambda x: probing_results[x]['mean'])
best_acc = probing_results[best_layer]['mean']

print(f"\nbest probing layer: {best_layer} (accuracy = {best_acc:.3f})")

In [None]:
# statistical analysis: is best layer significantly better than chance?
from scipy.stats import ttest_1samp

best_scores = probing_results[best_layer]['scores']
t_stat, p_value = ttest_1samp(best_scores, 0.5)

print(f"\nstatistical test (layer {best_layer} vs chance):")
print(f"  t-statistic: {t_stat:.3f}")
print(f"  p-value: {p_value:.4e}")
print(f"  significant at α=0.05: {p_value < 0.05}")

# effect size
cohens_d = (np.mean(best_scores) - 0.5) / np.std(best_scores)
print(f"  cohen's d: {cohens_d:.3f}")

## 3. Clinical Feature Probing

Probe each layer for clinical voice biomarkers:
- Jitter (pitch perturbation)
- Shimmer (amplitude perturbation)
- HNR (harmonics-to-noise ratio)
- F0 statistics (fundamental frequency)

This reveals WHERE clinical features are encoded in the model.

In [None]:
# load clinical features
clinical_path = Path(CONFIG['data_path']) / 'clinical_features' / 'italian_pvs_features.csv'

if clinical_path.exists():
    clinical_df = pd.read_csv(clinical_path)
    print(f"loaded clinical features: {clinical_df.shape}")
    print(f"features: {list(clinical_df.columns)}")
else:
    print("clinical features not found, extracting...")
    clinical_df = None

In [None]:
# define features to probe
feature_names = [
    'jitter_local',
    'jitter_rap',
    'shimmer_local',
    'shimmer_apq3',
    'hnr',
    'f0_mean',
    'f0_std'
]

# filter to available features
if clinical_df is not None:
    available_features = [f for f in feature_names if f in clinical_df.columns]
    print(f"probing features: {available_features}")

In [None]:
from src.models.probes import MultiFeatureProber

if clinical_df is not None and len(available_features) > 0:
    # build feature matrix
    feature_matrix = clinical_df[available_features].values
    
    # run multi-feature probing
    multi_prober = MultiFeatureProber(
        feature_names=available_features,
        task='regression',
        regularization=1.0
    )
    
    print("running clinical feature probing...\n")
    
    clinical_results = multi_prober.probe_all_features(
        activations,
        feature_matrix,
        groups=subject_ids
    )
    
    # print results
    for feat_name, layer_results in clinical_results.items():
        if layer_results:
            best_layer = max(layer_results.keys(), key=lambda x: layer_results[x]['mean'])
            best_r2 = layer_results[best_layer]['mean']
            print(f"{feat_name}: best layer = {best_layer}, r² = {best_r2:.3f}")

In [None]:
from src.utils.visualization import plot_clinical_feature_heatmap

if clinical_df is not None:
    # create heatmap
    fig = plot_clinical_feature_heatmap(
        clinical_results,
        feature_names=available_features,
        metric='mean',
        title="clinical feature encoding across layers (r²)",
        save_path=str(output_path / 'clinical_feature_heatmap.png'),
        cmap='viridis',
        annot=True
    )
    
    plt.show()

## 4. Control Task Probing

Validate that probes learn meaningful features, not spurious correlations.
Control tasks (e.g., predicting recording ID) should NOT be predictable.

In [None]:
from src.models.probes import ControlTaskProber

# create control labels (should not be predictable)
control_labels = {
    'segment_index': np.arange(len(labels)),  # should not be predictable
    'random_label': np.random.randint(0, 2, len(labels))  # definitely not predictable
}

# probe best layer with control tasks
control_prober = ControlTaskProber(regularization=1.0)

best_layer_acts = activations[:, best_layer, :]

control_results = control_prober.fit_with_controls(
    best_layer_acts,
    labels,
    control_labels,
    groups=subject_ids
)

print("control task analysis (layer {}):" .format(best_layer))
print("-" * 50)
print(f"target (pd/hc): {control_results['target']['mean']:.3f} ± {control_results['target']['std']:.3f}")

for ctrl_name, result in control_results.items():
    if ctrl_name != 'target' and 'mean' in result:
        print(f"control ({ctrl_name}): {result['mean']:.3f} ± {result['std']:.3f}")

# compute selectivity
selectivity = control_results['target']['mean'] - control_results.get('random_label', {}).get('mean', 0.5)
print(f"\nselectivity score: {selectivity:.3f}")

## 5. Probing Dynamics Analysis

Analyze how information flows through layers.

In [None]:
# compute layer-to-layer improvement
layers = sorted(probing_results.keys())
accuracies = [probing_results[l]['mean'] for l in layers]

# find steepest improvement
improvements = np.diff(accuracies)
steepest_idx = np.argmax(improvements)

print(f"layer-wise accuracy progression:")
print("-" * 50)
for i, (layer, acc) in enumerate(zip(layers, accuracies)):
    if i > 0:
        delta = acc - accuracies[i-1]
        print(f"layer {layer:2d}: {acc:.3f} (Δ = {delta:+.3f})")
    else:
        print(f"layer {layer:2d}: {acc:.3f}")

print(f"\nsteepest improvement: layer {layers[steepest_idx]} → {layers[steepest_idx+1]} ({improvements[steepest_idx]:+.3f})")

In [None]:
# visualize probing dynamics
fig, axes = plt.subplots(1, 2, figsize=(14, 5))

# left: layer-wise accuracy with gradient
ax1 = axes[0]
colors = plt.cm.RdYlGn(np.linspace(0.2, 0.8, len(layers)))
ax1.bar(layers, accuracies, color=colors, edgecolor='black', alpha=0.8)
ax1.axhline(y=0.5, color='gray', linestyle='--', linewidth=2, label='chance')
ax1.set_xlabel('layer', fontweight='bold')
ax1.set_ylabel('probing accuracy', fontweight='bold')
ax1.set_title('layer-wise pd classification', fontweight='bold')
ax1.set_ylim([0.4, max(accuracies) + 0.1])

# right: layer-to-layer improvement
ax2 = axes[1]
bar_colors = ['green' if x > 0 else 'red' for x in improvements]
ax2.bar(layers[1:], improvements, color=bar_colors, edgecolor='black', alpha=0.8)
ax2.axhline(y=0, color='black', linewidth=1)
ax2.set_xlabel('layer', fontweight='bold')
ax2.set_ylabel('accuracy change', fontweight='bold')
ax2.set_title('layer-to-layer improvement', fontweight='bold')

plt.tight_layout()
plt.savefig(output_path / 'probing_dynamics.png', dpi=300, bbox_inches='tight')
plt.show()

## 6. Hypothesis Testing

Test Hypothesis 1: Clinical features are encoded in specific layers.

In [None]:
# hypothesis 1 testing
print("HYPOTHESIS 1 EVALUATION")
print("=" * 60)
print("\nclaim: clinical voice biomarkers are linearly decodable from")
print("specific transformer layers, with prosodic features in middle")
print("layers (5-8) and phonatory features in early layers (2-4).")
print("\n" + "=" * 60)

# group features by type
phonatory_features = ['jitter_local', 'jitter_rap', 'shimmer_local', 'shimmer_apq3']
prosodic_features = ['f0_mean', 'f0_std']

if clinical_df is not None:
    print("\npeak encoding layers by feature type:")
    print("-" * 40)
    
    phonatory_peaks = []
    prosodic_peaks = []
    
    for feat_name, layer_results in clinical_results.items():
        if layer_results:
            best_layer = max(layer_results.keys(), key=lambda x: layer_results[x]['mean'])
            best_r2 = layer_results[best_layer]['mean']
            
            if feat_name in phonatory_features:
                phonatory_peaks.append(best_layer)
                print(f"  {feat_name} (phonatory): layer {best_layer}")
            elif feat_name in prosodic_features:
                prosodic_peaks.append(best_layer)
                print(f"  {feat_name} (prosodic): layer {best_layer}")
    
    print("\nsummary:")
    if phonatory_peaks:
        print(f"  phonatory features peak at: mean layer {np.mean(phonatory_peaks):.1f}")
        hypothesis_early = np.mean(phonatory_peaks) <= 5
        print(f"  hypothesis (early layers 2-4): {'SUPPORTED' if hypothesis_early else 'NOT SUPPORTED'}")
    
    if prosodic_peaks:
        print(f"  prosodic features peak at: mean layer {np.mean(prosodic_peaks):.1f}")
        hypothesis_middle = 5 <= np.mean(prosodic_peaks) <= 8
        print(f"  hypothesis (middle layers 5-8): {'SUPPORTED' if hypothesis_middle else 'NOT SUPPORTED'}")

## 7. Save Results

In [None]:
# compile all results
full_results = {
    'config': CONFIG,
    'layerwise_probing': {
        str(k): {
            'mean': v['mean'],
            'std': v['std'],
            'scores': v['scores']
        } for k, v in probing_results.items()
    },
    'best_layer': int(best_layer),
    'best_accuracy': float(best_acc),
    'statistical_test': {
        't_statistic': float(t_stat),
        'p_value': float(p_value),
        'cohens_d': float(cohens_d)
    }
}

# add clinical probing if available
if clinical_df is not None:
    full_results['clinical_probing'] = {
        feat: {
            str(layer): {
                'mean': results['mean'],
                'std': results['std']
            } for layer, results in layer_results.items()
        } for feat, layer_results in clinical_results.items()
    }

# save to json
results_path = output_path / 'probing_results.json'
with open(results_path, 'w') as f:
    json.dump(full_results, f, indent=2)

print(f"results saved to {results_path}")

In [None]:
# summary
print("\n" + "=" * 60)
print("PROBING EXPERIMENTS SUMMARY")
print("=" * 60)
print(f"\nsamples analyzed: {len(labels)}")
print(f"unique subjects: {len(np.unique(subject_ids))}")
print(f"\npd classification probing:")
print(f"  best layer: {best_layer}")
print(f"  accuracy: {best_acc:.3f} ± {probing_results[best_layer]['std']:.3f}")
print(f"  significance: p = {p_value:.2e}")
print(f"  effect size: d = {cohens_d:.2f}")

if clinical_df is not None:
    print(f"\nclinical feature probing:")
    for feat_name, layer_results in clinical_results.items():
        if layer_results:
            best_l = max(layer_results.keys(), key=lambda x: layer_results[x]['mean'])
            print(f"  {feat_name}: layer {best_l} (r² = {layer_results[best_l]['mean']:.3f})")

print("\n" + "=" * 60)