# Minimum Permutations Analysis for ML Models

This notebook determines the minimum number of permutations needed for ML models (from **Notebook 4**) to accurately learn empirical edge frequency distributions.

## Methodology

1. **Progressive Training**: Train models using 1, 2, 3, 5, 7, 10, ... up to max_permutations
2. **Models Tested**: All models from Notebook 4:
   - Simple Neural Network
   - Random Forest
   - Logistic Regression
   - Polynomial Logistic Regression
3. **Evaluation**: Compare predictions against 200-permutation empirical frequencies
4. **Convergence Detection**: Identify when additional permutations provide diminishing returns

## Usage with Papermill

```bash
papermill 6_minimum_permutations_analysis.ipynb output.ipynb \
  -p edge_type "CtD" \
  -p max_permutations 50 \
  -p convergence_threshold 0.02 \
  -p target_metric "correlation" \
  -p min_metric_value 0.90
```

## Output

- Minimum N for each model to achieve target performance
- Convergence curves showing learning progression
- Model comparison: which learns fastest from limited data?
- Saved results for cross-edge-type analysis in Notebook 7

In [None]:
# Papermill parameters
edge_type = "CtD"  # Edge type to analyze
max_permutations = 50  # Maximum number of permutations to test
convergence_threshold = 0.02  # Improvement threshold for convergence (2%)
target_metric = "correlation"  # Metric to optimize: 'correlation', 'mae', or 'rmse'
min_metric_value = 0.90  # Target minimum correlation (or max MAE/RMSE)
random_seed = 42  # Random seed for reproducibility

In [None]:
import sys
from pathlib import Path
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns
import scipy.sparse as sp
import json
import warnings
from typing import Dict, List, Tuple
warnings.filterwarnings('ignore')

# Setup paths
repo_dir = Path.cwd().parent
src_dir = repo_dir / 'src'
data_dir = repo_dir / 'data'
sys.path.append(str(src_dir))

# Import from notebook 4 pipeline
from model_comparison import ModelCollection, prepare_edge_features_and_labels, filter_zero_degree_nodes
from model_training import ModelTrainer, predict_with_model
from model_evaluation import ModelEvaluator

print(f"Repository directory: {repo_dir}")
print(f"Edge type: {edge_type}")
print(f"Max permutations: {max_permutations}")
print(f"Target: {target_metric} >= {min_metric_value}")

# Set random seed
np.random.seed(random_seed)

# Setup output directory
output_dir = repo_dir / 'results' / 'minimum_permutations_ml' / f'{edge_type}_results'
output_dir.mkdir(parents=True, exist_ok=True)
print(f"Output directory: {output_dir}")

## 1. Load Empirical Frequencies (200-perm Gold Standard)

In [None]:
# Load empirical frequencies from notebook 3
empirical_file = repo_dir / 'results' / 'empirical_edge_frequencies' / f'edge_frequency_by_degree_{edge_type}.csv'

if not empirical_file.exists():
    raise FileNotFoundError(f"Empirical frequency file not found: {empirical_file}\nRun notebook 3 first.")

empirical_df = pd.read_csv(empirical_file)
print(f"Loaded {len(empirical_df)} empirical frequency records")
print(f"Degree ranges: source {empirical_df['source_degree'].min()}-{empirical_df['source_degree'].max()}, "
      f"target {empirical_df['target_degree'].min()}-{empirical_df['target_degree'].max()}")
print(f"Frequency range: {empirical_df['frequency'].min():.6f} - {empirical_df['frequency'].max():.6f}")

## 2. Discover Available Permutations

In [None]:
# Find all permutation directories
permutations_dir = data_dir / 'permutations'
permutation_dirs = sorted([p for p in permutations_dir.iterdir() if p.is_dir() and p.name.endswith('.hetmat')])

print(f"Found {len(permutation_dirs)} permutation directories")
if len(permutation_dirs) < max_permutations:
    print(f"WARNING: Only {len(permutation_dirs)} available, but max_permutations={max_permutations}")
    print(f"Will test up to {len(permutation_dirs)} permutations")
    max_permutations = min(max_permutations, len(permutation_dirs))

# Verify edge files exist
for perm_dir in permutation_dirs[:3]:
    edge_file = perm_dir / 'edges' / f'{edge_type}.sparse.npz'
    print(f"  {perm_dir.name}: {'✓' if edge_file.exists() else '✗'}")

## 3. Progressive Training Function

In [None]:
def load_and_combine_permutations(edge_type: str, num_permutations: int, permutation_dirs: List[Path]) -> Tuple[np.ndarray, np.ndarray]:
    """
    Load and combine data from N permutations.
    
    Returns:
        features: Combined features from all permutations
        labels: Combined labels from all permutations
    """
    all_features = []
    all_labels = []
    
    print(f"Loading {num_permutations} permutations...")
    
    for i in range(num_permutations):
        perm_dir = permutation_dirs[i]
        edge_file = perm_dir / 'edges' / f'{edge_type}.sparse.npz'
        
        if not edge_file.exists():
            print(f"  WARNING: {edge_file} not found, skipping")
            continue
        
        # Load edge matrix and prepare features
        features, labels = prepare_edge_features_and_labels(
            str(edge_file),
            sample_ratio=0.01,  # Use same sampling as notebook 4
            adaptive_sampling=True,
            enhanced_features=False  # Use basic 2D features
        )
        
        all_features.append(features)
        all_labels.append(labels)
        print(f"  Loaded {perm_dir.name}: {len(features)} samples")
    
    # Combine all permutations
    combined_features = np.vstack(all_features)
    combined_labels = np.concatenate(all_labels)
    
    print(f"Combined: {len(combined_features)} total samples")
    print(f"  Positive: {np.sum(combined_labels)} ({np.mean(combined_labels):.1%})")
    
    return combined_features, combined_labels


def train_models_with_n_permutations(edge_type: str, num_permutations: int, permutation_dirs: List[Path]) -> Dict:
    """
    Train all models from notebook 4 using N permutations.
    
    Returns:
        Dictionary with trained models and metrics
    """
    print(f"\n{'='*60}")
    print(f"Training with {num_permutations} permutation(s)")
    print(f"{'='*60}")
    
    # Load and combine data
    features, labels = load_and_combine_permutations(edge_type, num_permutations, permutation_dirs)
    
    # Get edge file for model parameter adaptation
    ref_edge_file = permutation_dirs[0] / 'edges' / f'{edge_type}.sparse.npz'
    
    # Create models (same as notebook 4)
    model_collection = ModelCollection(random_state=random_seed)
    models = model_collection.create_models(
        use_class_weights=True,
        input_dim=features.shape[1],
        edge_file_path=str(ref_edge_file)
    )
    
    # Train all models
    trainer = ModelTrainer(random_state=random_seed)
    training_results = trainer.train_all_models(
        models, features, labels,
        test_size=0.2,
        val_size=0.1
    )
    
    return {
        'num_permutations': num_permutations,
        'training_results': training_results,
        'n_samples': len(features)
    }


def evaluate_against_empirical(training_results: Dict, empirical_df: pd.DataFrame) -> Dict:
    """
    Evaluate model predictions against 200-perm empirical frequencies.
    
    Returns:
        Dictionary with metrics for each model
    """
    # Prepare empirical features
    empirical_features = empirical_df[['source_degree', 'target_degree']].values
    empirical_freqs = empirical_df['frequency'].values
    
    results = {}
    
    for model_name, model_result in training_results.items():
        if model_name == 'data_splits':
            continue
        
        model = model_result['model']
        scaler = model_result['training_result'].get('scaler')
        
        # Generate predictions
        predictions = predict_with_model(model, empirical_features, model_name, scaler)
        
        # Calculate metrics
        mae = np.mean(np.abs(predictions - empirical_freqs))
        rmse = np.sqrt(np.mean((predictions - empirical_freqs) ** 2))
        correlation = np.corrcoef(predictions, empirical_freqs)[0, 1]
        
        if np.isnan(correlation):
            correlation = 0.0
        
        results[model_name] = {
            'mae': mae,
            'rmse': rmse,
            'correlation': correlation,
            'predictions': predictions
        }
        
        print(f"{model_name:30} - MAE: {mae:.4f}, RMSE: {rmse:.4f}, Corr: {correlation:.4f}")
    
    return results

print("Functions defined")

## 4. Run Progressive Analysis

In [None]:
# Define N values to test
N_candidates = [1, 2, 3, 5, 7, 10, 15, 20, 30, 40, 50]
N_candidates = [n for n in N_candidates if n <= max_permutations]

print(f"Testing N values: {N_candidates}")

# Storage for results
all_results = []
model_names = None

for N in N_candidates:
    # Train models with N permutations
    training_output = train_models_with_n_permutations(edge_type, N, permutation_dirs)
    
    # Evaluate against empirical
    print("\nEvaluating against 200-perm empirical frequencies:")
    eval_results = evaluate_against_empirical(training_output['training_results'], empirical_df)
    
    # Store model names on first iteration
    if model_names is None:
        model_names = list(eval_results.keys())
    
    # Store results
    result_entry = {
        'N': N,
        'n_samples': training_output['n_samples']
    }
    
    for model_name, metrics in eval_results.items():
        result_entry[f'{model_name}_mae'] = metrics['mae']
        result_entry[f'{model_name}_rmse'] = metrics['rmse']
        result_entry[f'{model_name}_correlation'] = metrics['correlation']
    
    all_results.append(result_entry)
    
    # Check convergence for each model
    if len(all_results) >= 2:
        print("\nConvergence check:")
        prev_result = all_results[-2]
        curr_result = all_results[-1]
        
        for model_name in model_names:
            metric_key = f'{model_name}_{target_metric}'
            improvement = abs(curr_result[metric_key] - prev_result[metric_key])
            
            if target_metric in ['mae', 'rmse']:
                improvement_pct = improvement / (prev_result[metric_key] + 1e-10)
            else:
                improvement_pct = improvement
            
            status = "✓ CONVERGED" if improvement_pct < convergence_threshold else "  continuing"
            print(f"  {model_name:30} improvement: {improvement:.4f} ({improvement_pct:.1%}) {status}")
    
    print("\n" + "="*60)

# Convert to DataFrame
results_df = pd.DataFrame(all_results)
print("\nProgressive analysis complete!")
print(f"Tested N = {results_df['N'].tolist()}")

## 5. Analyze Convergence and Find N_min

In [None]:
def find_N_min_for_model(results_df: pd.DataFrame, model_name: str, target_metric: str, min_value: float) -> Dict:
    """
    Find minimum N where model achieves target performance.
    """
    metric_col = f'{model_name}_{target_metric}'
    
    if target_metric in ['mae', 'rmse']:
        # Lower is better
        passing = results_df[results_df[metric_col] <= min_value]
    else:
        # Higher is better (correlation)
        passing = results_df[results_df[metric_col] >= min_value]
    
    if len(passing) > 0:
        N_min = passing['N'].min()
        achieved_value = passing[passing['N'] == N_min].iloc[0][metric_col]
        return {
            'N_min': int(N_min),
            'achieved': achieved_value,
            'target_met': True
        }
    else:
        # Target not met, return best
        if target_metric in ['mae', 'rmse']:
            best_idx = results_df[metric_col].idxmin()
        else:
            best_idx = results_df[metric_col].idxmax()
        
        return {
            'N_min': int(results_df.loc[best_idx, 'N']),
            'achieved': results_df.loc[best_idx, metric_col],
            'target_met': False
        }

# Find N_min for each model
convergence_summary = {}

print(f"\nFinding N_min for each model (target: {target_metric} {'≤' if target_metric in ['mae', 'rmse'] else '≥'} {min_metric_value}):\n")

for model_name in model_names:
    result = find_N_min_for_model(results_df, model_name, target_metric, min_metric_value)
    convergence_summary[model_name] = result
    
    status = "✓ TARGET MET" if result['target_met'] else "✗ Target not met"
    print(f"{model_name:30} N_min = {result['N_min']:3d}  ({target_metric} = {result['achieved']:.4f})  {status}")

# Overall minimum N (most data-efficient model)
min_N_overall = min(r['N_min'] for r in convergence_summary.values())
best_models = [m for m, r in convergence_summary.items() if r['N_min'] == min_N_overall]

print(f"\nMost data-efficient model(s): {', '.join(best_models)} (N_min = {min_N_overall})")

## 6. Visualizations

In [None]:
# Create convergence curves
fig, axes = plt.subplots(1, 3, figsize=(18, 5))

metrics_to_plot = ['mae', 'rmse', 'correlation']
titles = ['MAE vs N Permutations', 'RMSE vs N Permutations', 'Correlation vs N Permutations']

for idx, (metric, title) in enumerate(zip(metrics_to_plot, titles)):
    ax = axes[idx]
    
    for model_name in model_names:
        col = f'{model_name}_{metric}'
        ax.plot(results_df['N'], results_df[col], marker='o', label=model_name, linewidth=2)
    
    # Add target line if this is the target metric
    if metric == target_metric:
        ax.axhline(min_metric_value, color='red', linestyle='--', linewidth=2, 
                   label=f'Target ({min_metric_value})', alpha=0.7)
    
    ax.set_xlabel('Number of Permutations', fontsize=12)
    ax.set_ylabel(metric.upper(), fontsize=12)
    ax.set_title(title, fontsize=14, fontweight='bold')
    ax.legend()
    ax.grid(True, alpha=0.3)
    ax.set_xscale('log')

plt.tight_layout()
plt.savefig(output_dir / 'convergence_curves.png', dpi=300, bbox_inches='tight')
plt.show()
print("Saved convergence curves")

In [None]:
# N_min comparison bar chart
fig, ax = plt.subplots(figsize=(10, 6))

n_mins = [convergence_summary[m]['N_min'] for m in model_names]
colors = ['green' if convergence_summary[m]['target_met'] else 'orange' for m in model_names]

bars = ax.bar(range(len(model_names)), n_mins, color=colors, alpha=0.7, edgecolor='black')
ax.set_xticks(range(len(model_names)))
ax.set_xticklabels(model_names, rotation=45, ha='right')
ax.set_ylabel('Minimum N Permutations', fontsize=12)
ax.set_title(f'N_min by Model (Target: {target_metric} {"≤" if target_metric in ["mae", "rmse"] else "≥"} {min_metric_value})', 
             fontsize=14, fontweight='bold')
ax.grid(axis='y', alpha=0.3)

# Add value labels on bars
for i, (bar, n_min) in enumerate(zip(bars, n_mins)):
    ax.text(bar.get_x() + bar.get_width()/2, bar.get_height() + 0.5, 
            str(n_min), ha='center', va='bottom', fontweight='bold')

# Legend
from matplotlib.patches import Patch
legend_elements = [
    Patch(facecolor='green', alpha=0.7, label='Target Met'),
    Patch(facecolor='orange', alpha=0.7, label='Target Not Met')
]
ax.legend(handles=legend_elements)

plt.tight_layout()
plt.savefig(output_dir / 'N_min_comparison.png', dpi=300, bbox_inches='tight')
plt.show()
print("Saved N_min comparison")

## 7. Save Results

In [None]:
# Save detailed results
results_df.to_csv(output_dir / f'{edge_type}_convergence_data.csv', index=False)
print(f"Saved convergence data: {output_dir / f'{edge_type}_convergence_data.csv'}")

# Save convergence summary
summary_data = {
    'edge_type': edge_type,
    'target_metric': target_metric,
    'min_metric_value': min_metric_value,
    'convergence_threshold': convergence_threshold,
    'N_tested': results_df['N'].tolist(),
    'models': {}
}

for model_name, result in convergence_summary.items():
    summary_data['models'][model_name] = {
        'N_min': result['N_min'],
        'achieved_value': result['achieved'],
        'target_met': result['target_met']
    }

with open(output_dir / f'{edge_type}_summary.json', 'w') as f:
    json.dump(summary_data, f, indent=2)
print(f"Saved summary: {output_dir / f'{edge_type}_summary.json'}")

# Save text report
with open(output_dir / f'{edge_type}_report.txt', 'w') as f:
    f.write(f"Minimum Permutations Analysis - {edge_type}\n")
    f.write("="*60 + "\n\n")
    f.write(f"Target: {target_metric} {'≤' if target_metric in ['mae', 'rmse'] else '≥'} {min_metric_value}\n")
    f.write(f"Convergence threshold: {convergence_threshold}\n\n")
    
    f.write("Results by Model:\n")
    f.write("-"*60 + "\n")
    for model_name, result in convergence_summary.items():
        status = "✓ TARGET MET" if result['target_met'] else "✗ Target not met"
        f.write(f"{model_name:30} N_min = {result['N_min']:3d}  "
                f"({target_metric} = {result['achieved']:.4f})  {status}\n")
    
    f.write("\n" + "="*60 + "\n")
    f.write(f"Most data-efficient: {', '.join(best_models)} (N_min = {min_N_overall})\n")

print(f"Saved report: {output_dir / f'{edge_type}_report.txt'}")
print("\nAll results saved successfully!")

## 8. Summary

In [None]:
print("\n" + "="*80)
print(f"MINIMUM PERMUTATIONS ANALYSIS - {edge_type}")
print("="*80)

print(f"\nTarget: {target_metric} {'≤' if target_metric in ['mae', 'rmse'] else '≥'} {min_metric_value}")
print(f"N values tested: {results_df['N'].tolist()}")

print("\nResults:")
for model_name, result in convergence_summary.items():
    status = "✓" if result['target_met'] else "✗"
    print(f"  {status} {model_name:30} N_min = {result['N_min']:3d}  ({target_metric} = {result['achieved']:.4f})")

print(f"\nMost data-efficient model: {', '.join(best_models)} (N_min = {min_N_overall})")
print(f"\nOutput directory: {output_dir}")
print("="*80)