# Stage 1.5 ‚Äî Latent Separability Audit

**Projeto:** Controle Expl√≠cito de Sotaque Regional em pt-BR  
**Objetivo:** Verificar se representa√ß√µes internas do Qwen3-TTS codificam informa√ß√£o suficiente de sotaque regional para classifica√ß√£o acima de chance, com leakage controlado.  
**Backbone:** Qwen3-TTS 1.7B-CustomVoice (frozen)  
**Dataset:** CORAA-MUPE (speaker-disjoint splits)  

Este notebook √© a **camada de orquestra√ß√£o**. Toda l√≥gica est√° em `src/` (test√°vel, audit√°vel).  
O notebook apenas: instala deps ‚Üí configura ambiente ‚Üí chama m√≥dulos ‚Üí exibe resultados.

## 0. Setup do Ambiente

In [1]:
import os, subprocess, sys

REPO_DIR = '/content/TCC'

# 1. Clone repo (idempotent ‚Äî skips if already cloned)
if not os.path.exists(os.path.join(REPO_DIR, '.git')):
    !rm -rf {REPO_DIR}
    !git clone https://github.com/paulohenriquevn/tcc.git {REPO_DIR}

os.chdir(REPO_DIR)
!pip install -r requirements.txt -q

# 2. NumPy ABI check ‚Äî Colab pre-loads numpy 2.x in memory, but
#    requirements.txt pins 1.26.4. After pip downgrades the on-disk
#    files, stale C-extensions cause:
#      "numpy.dtype size changed, may indicate binary incompatibility"
#    Fix: restart runtime ONCE. After restart both match ‚Üí no loop.
_installed_np = subprocess.check_output(
    [sys.executable, '-c', 'import numpy; print(numpy.__version__)'],
    text=True,
).strip()

try:
    import numpy as _np
    _loaded_np = _np.__version__
except Exception:
    _loaded_np = None

if _loaded_np != _installed_np:
    print(f'\nNumPy ABI mismatch: loaded={_loaded_np}, installed={_installed_np}')
    print('Restarting runtime... After restart, re-run this cell (no second restart).')
    os.kill(os.getpid(), 9)
else:
    print(f'\nEnvironment OK (numpy=={_installed_np})')

[31mERROR: Ignored the following versions that require a different python version: 1.10.0 Requires-Python <3.12,>=3.8; 1.10.0rc1 Requires-Python <3.12,>=3.8; 1.10.0rc2 Requires-Python <3.12,>=3.8; 1.10.1 Requires-Python <3.12,>=3.8; 1.21.2 Requires-Python >=3.7,<3.11; 1.21.3 Requires-Python >=3.7,<3.11; 1.21.4 Requires-Python >=3.7,<3.11; 1.21.5 Requires-Python >=3.7,<3.11; 1.21.6 Requires-Python >=3.7,<3.11; 1.6.2 Requires-Python >=3.7,<3.10; 1.6.3 Requires-Python >=3.7,<3.10; 1.7.0 Requires-Python >=3.7,<3.10; 1.7.1 Requires-Python >=3.7,<3.10; 1.7.2 Requires-Python >=3.7,<3.11; 1.7.3 Requires-Python >=3.7,<3.11; 1.8.0 Requires-Python >=3.8,<3.11; 1.8.0rc1 Requires-Python >=3.8,<3.11; 1.8.0rc2 Requires-Python >=3.8,<3.11; 1.8.0rc3 Requires-Python >=3.8,<3.11; 1.8.0rc4 Requires-Python >=3.8,<3.11; 1.8.1 Requires-Python >=3.8,<3.11; 1.9.0 Requires-Python >=3.8,<3.12; 1.9.0rc1 Requires-Python >=3.8,<3.12; 1.9.0rc2 Requires-Python >=3.8,<3.12; 1.9.0rc3 Requires-Python >=3.8,<3.12; 1.9.1

In [None]:
# Seeds e determinismo ‚Äî OBRIGAT√ìRIO antes de qualquer opera√ß√£o
from src.utils.seed import set_global_seed

SEED = 42
generator = set_global_seed(SEED)
print(f'Seed global configurado: {SEED}')

In [None]:
# Verificar GPU e vers√µes
import torch
import sys

print(f'Python: {sys.version}')
print(f'PyTorch: {torch.__version__}')
print(f'CUDA available: {torch.cuda.is_available()}')
if torch.cuda.is_available():
    print(f'CUDA device: {torch.cuda.get_device_name(0)}')
    print(f'CUDA version: {torch.version.cuda}')
    print(f'VRAM total: {torch.cuda.get_device_properties(0).total_mem / 1e9:.1f} GB')

DEVICE = 'cuda' if torch.cuda.is_available() else 'cpu'
print(f'\nUsando device: {DEVICE}')

In [None]:
# Carregar config
import yaml
from pathlib import Path

CONFIG_PATH = Path('configs/stage1_5.yaml')
with open(CONFIG_PATH) as f:
    config = yaml.safe_load(f)

print(f"Experiment: {config['experiment']['name']}")
print(f"Seed: {config['seed']['global']}")
print(f"Dataset: {config['dataset']['name']}")

## 1. Download e Build Manifest

Baixa o CORAA-MUPE-ASR do HuggingFace, aplica filtros e constr√≥i o manifest JSONL.  

**Filtros aplicados:**  
- `speaker_type='R'` (apenas entrevistados, n√£o entrevistadores)  
- Dura√ß√£o: 3‚Äì15s  
- `birth_state` v√°lido ‚Üí macro-regi√£o IBGE (N, NE, CO, SE, S)  
- G√™nero: M ou F  

**Nota:** O download inicial √© ~42 GB. Runs subsequentes usam o cache do HuggingFace.

In [None]:
from datasets import load_dataset, concatenate_datasets

print('Downloading CORAA-MUPE-ASR from HuggingFace...')
print('(~42 GB na primeira vez ‚Äî usa cache nas pr√≥ximas execu√ß√µes)\n')

ds = load_dataset("nilc-nlp/CORAA-MUPE-ASR")

print(f'Splits dispon√≠veis: {list(ds.keys())}')
for split_name, split_data in ds.items():
    print(f'  {split_name}: {len(split_data):,} rows')

# Concatenar todos os splits ‚Äî criaremos nossos pr√≥prios splits speaker-disjoint
all_data = concatenate_datasets([ds[split] for split in ds.keys()])
print(f'\nTotal concatenado: {len(all_data):,} rows')
print(f'Colunas: {all_data.column_names}')

In [None]:
from src.data.manifest_builder import build_manifest_from_hf_dataset

AUDIO_DIR = Path('data/audio/')
MANIFEST_PATH = Path(config['dataset']['manifest_path'])

entries, build_stats = build_manifest_from_hf_dataset(
    dataset=all_data,
    audio_output_dir=AUDIO_DIR,
    manifest_output_path=MANIFEST_PATH,
    speaker_type_filter=config['dataset']['filters']['speaker_type'],
    min_duration_s=config['dataset']['filters']['min_duration_s'],
    max_duration_s=config['dataset']['filters']['max_duration_s'],
    min_speakers_per_region=config['dataset']['filters']['min_speakers_per_region'],
)

print(f"\nManifest: {len(entries):,} entries")
print(f"SHA-256: {build_stats['manifest_sha256']}")
print(f"\nFilter stats:")
for key, count in build_stats['filter_stats'].items():
    print(f"  {key}: {count:,}")
print(f"\nRegi√µes:")
for region, info in build_stats['regions'].items():
    print(f"  {region}: {info['n_speakers']} speakers, {info['n_utterances']:,} utterances")

## 2. Speaker-Disjoint Splits

In [None]:
from src.data.splits import (
    generate_speaker_disjoint_splits,
    generate_stratified_splits,
    save_splits,
    save_stratified_splits,
    assign_entries_to_splits,
    assign_entries_to_stratified_splits,
)

split_info = generate_speaker_disjoint_splits(
    entries,
    train_ratio=config['splits']['ratios']['train'],
    val_ratio=config['splits']['ratios']['val'],
    test_ratio=config['splits']['ratios']['test'],
    seed=config['splits']['seed'],
)

# Persistir splits
split_path = save_splits(split_info, Path(config['splits']['output_dir']))
print(f"Splits salvos em: {split_path}")
print(f"Train: {len(split_info.train_speakers)} speakers, {split_info.utterances_per_split['train']} utts")
print(f"Val:   {len(split_info.val_speakers)} speakers, {split_info.utterances_per_split['val']} utts")
print(f"Test:  {len(split_info.test_speakers)} speakers, {split_info.utterances_per_split['test']} utts")

# Assign entries (speaker-disjoint)
split_entries = assign_entries_to_splits(entries, split_info)

# Generate stratified split for leakage A‚Üíspeaker probes
stratified_split_info = generate_stratified_splits(
    entries,
    train_ratio=config['splits']['ratios']['train'],
    seed=config['splits']['seed'],
)
stratified_split_path = save_stratified_splits(
    stratified_split_info, Path(config['splits']['output_dir'])
)
stratified_entries = assign_entries_to_stratified_splits(entries, stratified_split_info)
print(f"\nStratified splits salvos em: {stratified_split_path}")
print(f"Stratified Train: {stratified_split_info.utterances_per_split['train']} utts")
print(f"Stratified Test:  {stratified_split_info.utterances_per_split['test']} utts")
print(f"Speakers in common: {stratified_split_info.speakers_in_common}")

## 3. An√°lise de Confounds

**Sanity checks obrigat√≥rios** (recomenda√ß√£o do mentor):  
- Tabela accent √ó gender com chi-quadrado + Cramer's V  
- Histograma de dura√ß√£o por regi√£o + Kruskal-Wallis

In [None]:
from src.analysis.confounds import run_all_confound_checks
import pandas as pd

confound_results = run_all_confound_checks(
    entries,
    gender_blocking_threshold=config['evaluation']['confounds']['accent_x_gender']['threshold_blocker'],
    duration_practical_diff_s=config['evaluation']['confounds']['accent_x_duration']['practical_diff_s'],
    snr_practical_diff_db=config['evaluation']['confounds']['accent_x_snr']['practical_diff_db'],
)

print("=== CONFOUND ANALYSIS ===")
for result in confound_results:
    status = 'üî¥ BLOCKING' if result.is_blocking else ('üü° SIGNIFICANT' if result.is_significant else 'üü¢ OK')
    print(f"\n{result.variable_a} √ó {result.variable_b}: {status}")
    print(f"  Test: {result.test_name}")
    print(f"  Statistic: {result.statistic:.4f}")
    print(f"  p-value: {result.p_value:.6f}")
    print(f"  Effect size ({result.effect_size_name}): {result.effect_size:.4f}")
    print(f"  Interpretation: {result.interpretation}")

# Tabela accent x gender
gender_table = pd.crosstab(
    [e.accent for e in entries],
    [e.gender for e in entries],
    margins=True,
)
print("\n=== ACCENT √ó GENDER TABLE ===")
print(gender_table)

In [None]:
# Histograma de dura√ß√£o por regi√£o
import matplotlib.pyplot as plt
import seaborn as sns

durations_df = pd.DataFrame([
    {'accent': e.accent, 'duration_s': e.duration_s}
    for e in entries
])

fig, ax = plt.subplots(figsize=(10, 5))
sns.boxplot(data=durations_df, x='accent', y='duration_s', ax=ax,
            order=sorted(durations_df['accent'].unique()))
ax.set_title('Duration by Accent Region')
ax.set_xlabel('Macro-Region (IBGE)')
ax.set_ylabel('Duration (s)')
plt.tight_layout()
plt.savefig('reports/figures/duration_by_accent.png', dpi=150)
plt.show()

## 4. Feature Extraction

Extrai features de 4 fontes:
1. **Ac√∫sticas** (MFCC, pitch, energy, speech rate) ‚Äî CPU
2. **ECAPA-TDNN** (speaker embeddings, 192-dim) ‚Äî CPU/GPU
3. **WavLM** (SSL features, 5 camadas) ‚Äî GPU recomendado
4. **Qwen3-TTS** (backbone features, 8 camadas) ‚Äî GPU obrigat√≥rio

In [None]:
import numpy as np
from tqdm.auto import tqdm
from src.features.acoustic import extract_acoustic_features, features_to_vector
from src.features.ecapa import extract_ecapa_embedding

# 4.1 Acoustic features (CPU, fast)
print('=== Extracting acoustic features ===')
acoustic_vectors = {}
for entry in tqdm(entries, desc='Acoustic'):
    feats = extract_acoustic_features(
        Path(entry.audio_path), entry.utt_id,
        n_mfcc=config['features']['acoustic']['n_mfcc'],
    )
    acoustic_vectors[entry.utt_id] = features_to_vector(feats)

print(f'Extracted {len(acoustic_vectors)} acoustic feature vectors')
print(f'Dimension: {next(iter(acoustic_vectors.values())).shape}')

In [None]:
# 4.2 ECAPA-TDNN speaker embeddings
print('=== Extracting ECAPA embeddings ===')
ecapa_embeddings = {}
for entry in tqdm(entries, desc='ECAPA'):
    emb = extract_ecapa_embedding(Path(entry.audio_path), device=DEVICE)
    ecapa_embeddings[entry.utt_id] = emb

print(f'Extracted {len(ecapa_embeddings)} ECAPA embeddings')
print(f'Dimension: {next(iter(ecapa_embeddings.values())).shape}')

In [None]:
# 4.3 WavLM SSL features (layer-wise)
from src.features.ssl import extract_ssl_features

SSL_LAYERS = config['features']['ssl']['layers']
print(f'=== Extracting WavLM features (layers {SSL_LAYERS}) ===')

ssl_features = {layer: {} for layer in SSL_LAYERS}  # {layer: {utt_id: vector}}
for entry in tqdm(entries, desc='WavLM'):
    layer_feats = extract_ssl_features(
        Path(entry.audio_path),
        layers=SSL_LAYERS,
        device=DEVICE,
    )
    for layer_idx, feat_vec in layer_feats.items():
        ssl_features[layer_idx][entry.utt_id] = feat_vec

print(f'WavLM extraction complete')
for layer in SSL_LAYERS:
    dim = next(iter(ssl_features[layer].values())).shape
    print(f'  Layer {layer}: {len(ssl_features[layer])} vectors, dim={dim}')

In [None]:
# 4.4 Qwen3-TTS backbone features (layer-wise) ‚Äî GPU required
from src.features.backbone import extract_backbone_features

BACKBONE_LAYERS = config['features']['backbone']['layers']
print(f'=== Extracting backbone features (layers {BACKBONE_LAYERS}) ===')

backbone_features = {layer: {} for layer in BACKBONE_LAYERS}
# Note: backbone needs text input. Use a fixed neutral text for all utterances
# since we're probing the audio representation, not text-conditioned generation.
NEUTRAL_TEXT = 'Este √© um texto neutro para extra√ß√£o de features.'

for entry in tqdm(entries, desc='Backbone'):
    layer_feats = extract_backbone_features(
        Path(entry.audio_path),
        text=NEUTRAL_TEXT,
        layers=BACKBONE_LAYERS,
        device=DEVICE,
    )
    for layer_idx, feat_vec in layer_feats.items():
        backbone_features[layer_idx][entry.utt_id] = feat_vec

print(f'Backbone extraction complete')
for layer in BACKBONE_LAYERS:
    if backbone_features[layer]:
        dim = next(iter(backbone_features[layer].values())).shape
        print(f'  Layer {layer}: {len(backbone_features[layer])} vectors, dim={dim}')

# Free GPU memory
torch.cuda.empty_cache()

## 5. Baseline ECAPA Speaker Similarity

Mede similaridade intra-speaker (mesmo speaker, utterances diferentes) e inter-speaker no √°udio real.  
Este baseline √© refer√™ncia obrigat√≥ria para Stage 2 (preserva√ß√£o de identidade com LoRA).

In [None]:
from src.features.ecapa import compute_speaker_similarity_baseline
from src.evaluation.bootstrap_ci import bootstrap_cosine_similarity

# Group embeddings by speaker
speaker_embs = {}
for entry in entries:
    speaker_embs.setdefault(entry.speaker_id, []).append(
        ecapa_embeddings[entry.utt_id]
    )

sim_baseline = compute_speaker_similarity_baseline(speaker_embs)

# CI for intra and inter
intra_ci = bootstrap_cosine_similarity(
    np.array(sim_baseline['intra']['values']), seed=SEED
)
inter_ci = bootstrap_cosine_similarity(
    np.array(sim_baseline['inter']['values']), seed=SEED
)

print('=== SPEAKER SIMILARITY BASELINE (ECAPA-TDNN, 192-dim) ===')
print(f"Intra-speaker: {sim_baseline['intra']['mean']:.4f} ¬± {sim_baseline['intra']['std']:.4f}")
print(f"  CI 95%: [{intra_ci.ci_lower:.4f}, {intra_ci.ci_upper:.4f}]")
print(f"  N pairs: {sim_baseline['intra']['n_pairs']}")
print(f"\nInter-speaker: {sim_baseline['inter']['mean']:.4f} ¬± {sim_baseline['inter']['std']:.4f}")
print(f"  CI 95%: [{inter_ci.ci_lower:.4f}, {inter_ci.ci_upper:.4f}]")
print(f"  N pairs: {sim_baseline['inter']['n_pairs']}")
print(f"\nSeparation: {sim_baseline['intra']['mean'] - sim_baseline['inter']['mean']:.4f}")

## 6. Linear Probes

Probe architecture: **Logistic Regression** (linear only ‚Äî protocol requirement).  

Split assignments (corrected ‚Äî Achado 1 da auditoria):  
- Accent probe: **speaker-disjoint** split  
- Speaker probe: **stratified** split  
- Leakage A‚Üíspeaker: **stratified** split (same speakers in train/test)  
- Leakage S‚Üíaccent: **speaker-disjoint** split (different speakers in test)

In [None]:
from src.evaluation.probes import (
    train_linear_probe,
    evaluate_probe_against_thresholds,
    sweep_regularization,
    train_selectivity_control,
)
from src.evaluation.confusion import plot_confusion_matrix

# Helper: build X, y arrays from features dict and entries
def build_probe_data(feature_dict, entry_list, target_field):
    """Build X, y arrays for probing."""
    X, y = [], []
    for entry in entry_list:
        if entry.utt_id in feature_dict:
            X.append(feature_dict[entry.utt_id])
            y.append(getattr(entry, target_field))
    return np.array(X), np.array(y)

# Collect all probe results
all_probe_results = []
all_selectivity_results = []

In [None]:
# 6.1 Accent Probe (per layer, speaker-disjoint split)
print('=== ACCENT PROBES ===')

# Build train/test for speaker-disjoint
train_entries = split_entries['train']
test_entries = split_entries['test']

# Probe each feature source
feature_sources = {}

# Acoustic
feature_sources['acoustic'] = acoustic_vectors

# ECAPA
feature_sources['ecapa'] = ecapa_embeddings

# WavLM layers
for layer in SSL_LAYERS:
    feature_sources[f'wavlm_layer_{layer}'] = ssl_features[layer]

# Backbone layers
for layer in BACKBONE_LAYERS:
    if backbone_features[layer]:
        feature_sources[f'backbone_layer_{layer}'] = backbone_features[layer]

C_values = config['probes']['regularization_C']

for source_name, feat_dict in feature_sources.items():
    X_train, y_train = build_probe_data(feat_dict, train_entries, 'accent')
    X_test, y_test = build_probe_data(feat_dict, test_entries, 'accent')
    
    if len(X_train) == 0 or len(X_test) == 0:
        print(f'  {source_name}: SKIPPED (no data)')
        continue
    
    # Sweep regularization to find best C
    sweep_results = sweep_regularization(
        X_train, y_train, X_test, y_test,
        C_values=C_values,
        probe_name=f'accent_{source_name}',
        feature_source=source_name,
        target='accent',
        split_type='speaker_disjoint',
        seed=SEED,
    )
    best_sweep = max(sweep_results, key=lambda r: r.balanced_accuracy)
    best_C = best_sweep.regularization_C
    
    # Re-train with best C and full CI
    result = train_linear_probe(
        X_train, y_train, X_test, y_test,
        probe_name=f'accent_{source_name}',
        feature_source=source_name,
        target='accent',
        split_type='speaker_disjoint',
        C=best_C,
        seed=SEED,
    )
    all_probe_results.append(result)
    
    decision = evaluate_probe_against_thresholds(
        result, config['thresholds']['accent_probe']
    )
    print(f'  {source_name}: bal_acc={result.balanced_accuracy:.4f} '
          f'CI=[{result.ci.ci_lower:.4f}, {result.ci.ci_upper:.4f}] '
          f'delta={result.delta_pp:+.1f}pp C={best_C} ‚Üí {decision}')

# Selectivity control for accent probes
print('\n=== SELECTIVITY CONTROL (accent probes) ===')
accent_results = [r for r in all_probe_results if r.target == 'accent' and 'leakage' not in r.probe_name]
for result in accent_results:
    feat_dict = feature_sources[result.feature_source]
    X_train, y_train = build_probe_data(feat_dict, train_entries, 'accent')
    X_test, y_test = build_probe_data(feat_dict, test_entries, 'accent')
    
    sel = train_selectivity_control(
        X_train, y_train, X_test, y_test,
        real_result=result,
        seed=SEED,
        C=result.regularization_C,
    )
    sel['probe_name'] = result.probe_name
    sel['feature_source'] = result.feature_source
    all_selectivity_results.append(sel)
    
    print(f'  {result.feature_source}: real={sel["real_bal_acc"]:.4f} '
          f'permuted={sel["permuted_bal_acc_mean"]:.4f}¬±{sel["permuted_bal_acc_std"]:.4f} '
          f'selectivity={sel["selectivity_pp"]:+.1f}pp')

In [None]:
# 6.2 Leakage Probes
print('\n=== LEAKAGE PROBES ===')

# --- Leakage A‚Üíspeaker: Do accent features contain speaker identity? ---
# Uses STRATIFIED split (same speakers in train/test ‚Äî we need known speakers)
print('Leakage A‚Üíspeaker (accent feature sources, stratified split):')

strat_train_entries = stratified_entries['train']
strat_test_entries = stratified_entries['test']

# Accent feature sources: WavLM layers, backbone layers, acoustic
# (NOT ECAPA ‚Äî those are speaker embeddings, not accent features)
leakage_a2s_sources = {}
for layer in SSL_LAYERS:
    leakage_a2s_sources[f'wavlm_layer_{layer}'] = ssl_features[layer]
for layer in BACKBONE_LAYERS:
    if backbone_features[layer]:
        leakage_a2s_sources[f'backbone_layer_{layer}'] = backbone_features[layer]
leakage_a2s_sources['acoustic'] = acoustic_vectors

leakage_a2s_results = []
for source_name, feat_dict in leakage_a2s_sources.items():
    X_train, y_train = build_probe_data(feat_dict, strat_train_entries, 'speaker_id')
    X_test, y_test = build_probe_data(feat_dict, strat_test_entries, 'speaker_id')
    
    if len(X_train) == 0 or len(X_test) == 0:
        print(f'  {source_name}: SKIPPED (no data)')
        continue

    result = train_linear_probe(
        X_train, y_train, X_test, y_test,
        probe_name=f'leakage_a2s_{source_name}',
        feature_source=source_name,
        target='speaker_id',
        split_type='stratified',
        C=config['probes']['default_C'],
        seed=SEED,
    )
    leakage_a2s_results.append(result)
    all_probe_results.append(result)

    leak_decision = evaluate_probe_against_thresholds(
        result, config['thresholds']['leakage']
    )
    print(f'  {source_name}: bal_acc={result.balanced_accuracy:.4f} '
          f'chance={result.chance_level:.4f} '
          f'delta={result.delta_pp:+.1f}pp ‚Üí {leak_decision}')

# Selectivity control for A‚Üíspeaker leakage
print('\n=== SELECTIVITY CONTROL (leakage A‚Üíspeaker) ===')
for result in leakage_a2s_results:
    feat_dict = leakage_a2s_sources[result.feature_source]
    X_train, y_train = build_probe_data(feat_dict, strat_train_entries, 'speaker_id')
    X_test, y_test = build_probe_data(feat_dict, strat_test_entries, 'speaker_id')

    sel = train_selectivity_control(
        X_train, y_train, X_test, y_test,
        real_result=result,
        seed=SEED,
        C=result.regularization_C,
    )
    sel['probe_name'] = result.probe_name
    sel['feature_source'] = result.feature_source
    all_selectivity_results.append(sel)

    print(f'  {result.feature_source}: real={sel["real_bal_acc"]:.4f} '
          f'permuted={sel["permuted_bal_acc_mean"]:.4f}¬±{sel["permuted_bal_acc_std"]:.4f} '
          f'selectivity={sel["selectivity_pp"]:+.1f}pp')

# --- Leakage S‚Üíaccent: Do speaker features contain accent info? ---
# Uses SPEAKER-DISJOINT split (different speakers in test ‚Äî tests generalization)
print('\nLeakage S‚Üíaccent (ECAPA embeddings, speaker-disjoint split):')
X_train, y_train = build_probe_data(ecapa_embeddings, train_entries, 'accent')
X_test, y_test = build_probe_data(ecapa_embeddings, test_entries, 'accent')

leakage_s2a = train_linear_probe(
    X_train, y_train, X_test, y_test,
    probe_name='leakage_s2a_ecapa',
    feature_source='ecapa',
    target='accent',
    split_type='speaker_disjoint',
    C=config['probes']['default_C'],
    seed=SEED,
)
all_probe_results.append(leakage_s2a)

leak_decision = evaluate_probe_against_thresholds(
    leakage_s2a, config['thresholds']['leakage']
)
print(f'  bal_acc={leakage_s2a.balanced_accuracy:.4f} '
      f'chance={leakage_s2a.chance_level:.4f} '
      f'delta={leakage_s2a.delta_pp:+.1f}pp ‚Üí {leak_decision}')

# Selectivity control for S‚Üíaccent leakage
sel_s2a = train_selectivity_control(
    X_train, y_train, X_test, y_test,
    real_result=leakage_s2a,
    seed=SEED,
    C=leakage_s2a.regularization_C,
)
sel_s2a['probe_name'] = leakage_s2a.probe_name
sel_s2a['feature_source'] = leakage_s2a.feature_source
all_selectivity_results.append(sel_s2a)
print(f'  selectivity: real={sel_s2a["real_bal_acc"]:.4f} '
      f'permuted={sel_s2a["permuted_bal_acc_mean"]:.4f}¬±{sel_s2a["permuted_bal_acc_std"]:.4f} '
      f'selectivity={sel_s2a["selectivity_pp"]:+.1f}pp')

In [None]:
# 6.3 Confusion Matrices (best accent probe)
accent_results = [r for r in all_probe_results if r.target == 'accent' and 'leakage' not in r.probe_name]
if accent_results:
    best = max(accent_results, key=lambda r: r.balanced_accuracy)
    print(f'Best accent probe: {best.feature_source} (bal_acc={best.balanced_accuracy:.4f})')
    
    if best.confusion_matrix is not None:
        Path('reports/figures').mkdir(parents=True, exist_ok=True)
        plot_confusion_matrix(
            best.confusion_matrix,
            best.confusion_labels,
            title=f'Accent Confusion Matrix ({best.feature_source})',
            output_path=Path('reports/figures/confusion_matrix_accent.png'),
        )
        print('Confusion matrix saved to reports/figures/confusion_matrix_accent.png')

## 7. Robustness (Multiple Seeds)

Repete o melhor probe com 3 seeds para reportar m√©dia e desvio.

In [None]:
ROBUSTNESS_SEEDS = config['seed']['robustness_seeds']
print(f'=== ROBUSTNESS CHECK (seeds: {ROBUSTNESS_SEEDS}) ===')

if accent_results:
    best_source = best.feature_source
    best_features = feature_sources[best_source]
    
    seed_results = []
    for s in ROBUSTNESS_SEEDS:
        set_global_seed(s)
        X_tr, y_tr = build_probe_data(best_features, train_entries, 'accent')
        X_te, y_te = build_probe_data(best_features, test_entries, 'accent')
        
        r = train_linear_probe(
            X_tr, y_tr, X_te, y_te,
            probe_name=f'accent_{best_source}_seed{s}',
            feature_source=best_source,
            target='accent',
            split_type='speaker_disjoint',
            seed=s,
            compute_ci=True,
        )
        seed_results.append(r)
        print(f'  Seed {s}: bal_acc={r.balanced_accuracy:.4f} CI=[{r.ci.ci_lower:.4f}, {r.ci.ci_upper:.4f}]')
    
    accs = [r.balanced_accuracy for r in seed_results]
    print(f'\n  Mean: {np.mean(accs):.4f} ¬± {np.std(accs):.4f}')
    
    # Restore original seed
    set_global_seed(SEED)

## 8. Gate Decision

Avalia√ß√£o autom√°tica contra os thresholds do protocolo.

In [None]:
print('=' * 60)
print('STAGE 1.5 ‚Äî GATE DECISION')
print('=' * 60)

all_decisions = []

# Check confound blocking first ‚Äî if any confound is blocking, gate is FAIL
blocking_confounds = [r for r in confound_results if r.is_blocking]
if blocking_confounds:
    print('\nCONFOUND BLOCKING:')
    for bc in blocking_confounds:
        print(f'  {bc.variable_a} x {bc.variable_b}: '
              f'{bc.effect_size_name}={bc.effect_size:.4f}')
        print(f'  Interpretation: {bc.interpretation}')
    all_decisions.append('FAIL')
    print('  Decision: FAIL (confound blocking)')
else:
    print('\nConfound check: PASS (no blocking confounds)')

# Best accent probe
accent_results = [r for r in all_probe_results if r.target == 'accent' and 'leakage' not in r.probe_name]
if accent_results:
    best = max(accent_results, key=lambda r: r.balanced_accuracy)
    accent_decision = evaluate_probe_against_thresholds(
        best, config['thresholds']['accent_probe']
    )
    all_decisions.append(accent_decision)
    print(f'\nAccent probe ({best.feature_source}):')
    print(f'  bal_acc = {best.balanced_accuracy:.4f}')
    print(f'  CI 95% = [{best.ci.ci_lower:.4f}, {best.ci.ci_upper:.4f}]')
    print(f'  Chance = {best.chance_level:.4f}')
    print(f'  C = {best.regularization_C}')
    print(f'  Decision: {accent_decision}')

# Leakage
leakage_results = [r for r in all_probe_results if 'leakage' in r.probe_name]
for lr in leakage_results:
    ld = evaluate_probe_against_thresholds(lr, config['thresholds']['leakage'])
    all_decisions.append(ld)
    print(f'\nLeakage {lr.probe_name}:')
    print(f'  bal_acc = {lr.balanced_accuracy:.4f}')
    print(f'  Chance = {lr.chance_level:.4f}')
    print(f'  Delta = {lr.delta_pp:+.1f}pp')
    print(f'  Decision: {ld}')

# Overall
print(f'\n{"=" * 60}')
if 'FAIL' in all_decisions:
    overall = 'FAIL'
elif 'GO_CONDITIONAL' in all_decisions:
    overall = 'GO_CONDITIONAL'
elif len(all_decisions) == 0:
    overall = 'NOT_EVALUATED'
else:
    overall = 'GO'
print(f'OVERALL GATE DECISION: {overall}')
print(f'{"=" * 60}')

## 9. Save Report

Gera `stage1_5_report.json` com todos os resultados para auditoria.

In [None]:
import json
import subprocess
from datetime import datetime

# Get git commit hash
try:
    commit_hash = subprocess.check_output(
        ['git', 'rev-parse', 'HEAD'], text=True
    ).strip()
except Exception:
    commit_hash = 'unknown'

report = {
    'experiment': config['experiment']['name'],
    'date': datetime.now().isoformat(),
    'commit_hash': commit_hash,
    'seed': SEED,
    'environment': {
        'cuda_version': torch.version.cuda if torch.cuda.is_available() else None,
        'cudnn_version': torch.backends.cudnn.version() if torch.cuda.is_available() else None,
        'torch_version': torch.__version__,
        'commit_hash': config.get('experiment', {}).get('commit_hash'),
    },
    'dataset': {
        'name': config['dataset']['name'],
        'manifest_sha256': build_stats.get('manifest_sha256'),
        'total_entries': len(entries),
        'regions': build_stats.get('regions'),
    },
    'splits': split_info.to_dict(),
    'stratified_splits': stratified_split_info.to_dict(),
    'confounds': [
        {
            'test': r.test_name,
            'variables': f'{r.variable_a} x {r.variable_b}',
            'statistic': r.statistic,
            'p_value': r.p_value,
            'effect_size': r.effect_size,
            'is_blocking': r.is_blocking,
            'interpretation': r.interpretation,
        }
        for r in confound_results
    ],
    'speaker_similarity_baseline': {
        'intra': {
            'mean': sim_baseline['intra']['mean'],
            'std': sim_baseline['intra']['std'],
            'ci_lower': intra_ci.ci_lower,
            'ci_upper': intra_ci.ci_upper,
            'n_pairs': sim_baseline['intra']['n_pairs'],
        },
        'inter': {
            'mean': sim_baseline['inter']['mean'],
            'std': sim_baseline['inter']['std'],
            'ci_lower': inter_ci.ci_lower,
            'ci_upper': inter_ci.ci_upper,
            'n_pairs': sim_baseline['inter']['n_pairs'],
        },
    },
    'probes': [
        {
            'name': r.probe_name,
            'feature_source': r.feature_source,
            'target': r.target,
            'split_type': r.split_type,
            'balanced_accuracy': r.balanced_accuracy,
            'f1_macro': r.f1_macro,
            'chance_level': r.chance_level,
            'delta_pp': r.delta_pp,
            'ci_lower': r.ci.ci_lower if r.ci else None,
            'ci_upper': r.ci.ci_upper if r.ci else None,
            'n_train': r.n_train,
            'n_test': r.n_test,
            'n_classes': r.n_classes,
            'C': r.regularization_C,
        }
        for r in all_probe_results
    ],
    'selectivity_controls': all_selectivity_results,
    'gate_decision': overall if 'overall' in dir() else 'NOT_EVALUATED',
}

Path('reports').mkdir(exist_ok=True)
report_path = Path('reports/stage1_5_report.json')
with open(report_path, 'w') as f:
    json.dump(report, f, indent=2, default=str)

print(f'Report saved to {report_path}')
print(f'Total probe results: {len(all_probe_results)}')
print(f'Total selectivity controls: {len(all_selectivity_results)}')
print(f'Gate decision: {report["gate_decision"]}')