In [1]:
import numpy as np
import pandas as pd
from sklearn.preprocessing import StandardScaler
from sklearn.model_selection import (train_test_split, StratifiedKFold, 
                                   GridSearchCV, cross_val_score)
from sklearn.metrics import (roc_curve, auc, confusion_matrix, 
                           classification_report)
from sklearn.linear_model import LogisticRegression
from sklearn.ensemble import RandomForestClassifier
from sklearn.svm import SVC
import matplotlib.pyplot as plt
from typing import Dict, Tuple, List
import logging

In [2]:
class RegionalMLAnalysis:
    def __init__(self, region_analysis):
        """
        Initialize with previously created RegionalAnalysis instance
        """
        self.logger = logging.getLogger(__name__)
        self.region_analysis = region_analysis
        self.feature_columns = region_analysis.feature_columns
        self.results = {}
        
    def prepare_region_data(self, region: str) -> Tuple[np.ndarray, np.ndarray]:
        """
        Prepare data for a specific region for ML analysis
        """
        # Get data for specific region
        hup_region = self.region_analysis.hup_features[
            self.region_analysis.hup_features['roi'] == region
        ]
        mni_region = self.region_analysis.mni_features[
            self.region_analysis.mni_features['roi'] == region
        ]
        
        # Group by patient to get mean values
        hup_data = hup_region.groupby('patient_id')[self.feature_columns].mean()
        mni_data = mni_region.groupby('patient_id')[self.feature_columns].mean()
        
        # Check if enough samples
        if len(hup_data) < 5 or len(mni_data) < 5:
            raise ValueError(f"Insufficient samples for region {region} "
                           f"(HUP: {len(hup_data)}, MNI: {len(mni_data)})")
        
        # Create feature matrix and labels
        X = np.vstack([hup_data.values, mni_data.values])
        y = np.hstack([np.ones(len(hup_data)), np.zeros(len(mni_data))])
        
        return X, y
    
    def train_evaluate_region(self, region: str) -> Dict:
        """
        Train and evaluate models for a specific region
        """
        try:
            # Prepare data
            X, y = self.prepare_region_data(region)
            
            # Initialize results dictionary
            region_results = {
                'n_hup': sum(y == 1),
                'n_mni': sum(y == 0),
                'models': {}
            }
            
            # Define models to try
            models = {
                'logistic': (LogisticRegression(), {
                    'C': [0.1, 1.0, 10.0],
                    'class_weight': ['balanced', None]
                }),
                'rf': (RandomForestClassifier(), {
                    'n_estimators': [100, 200],
                    'max_depth': [3, 5, None],
                    'class_weight': ['balanced', None]
                }),
                'svm': (SVC(probability=True), {
                    'C': [0.1, 1.0, 10.0],
                    'kernel': ['linear', 'rbf'],
                    'class_weight': ['balanced', None]
                })
            }
            
            # Scale features
            scaler = StandardScaler()
            X_scaled = scaler.fit_transform(X)
            
            # Cross-validation setup
            cv = StratifiedKFold(n_splits=5, shuffle=True, random_state=42)
            
            # Train and evaluate each model type
            for model_name, (model, param_grid) in models.items():
                # Grid search
                grid_search = GridSearchCV(
                    model, param_grid,
                    cv=cv, scoring='roc_auc',
                    n_jobs=-1
                )
                
                # Fit model
                grid_search.fit(X_scaled, y)
                
                # Get cross-validation scores
                cv_scores = cross_val_score(
                    grid_search.best_estimator_,
                    X_scaled, y,
                    cv=cv,
                    scoring='roc_auc'
                )
                
                # Store results
                model_results = {
                    'best_params': grid_search.best_params_,
                    'best_cv_score': grid_search.best_score_,
                    'cv_scores_mean': cv_scores.mean(),
                    'cv_scores_std': cv_scores.std()
                }
                
                # Get feature importances for interpretable models
                if model_name == 'logistic':
                    importances = pd.Series(
                        grid_search.best_estimator_.coef_[0],
                        index=self.feature_columns
                    ).sort_values(ascending=False)
                    model_results['feature_importances'] = importances.to_dict()
                elif model_name == 'rf':
                    importances = pd.Series(
                        grid_search.best_estimator_.feature_importances_,
                        index=self.feature_columns
                    ).sort_values(ascending=False)
                    model_results['feature_importances'] = importances.to_dict()
                
                region_results['models'][model_name] = model_results
            
            return region_results
            
        except Exception as e:
            self.logger.warning(f"Error processing region {region}: {str(e)}")
            return None
    
    def analyze_all_regions(self):
        """
        Perform ML analysis for all regions with sufficient data
        """
        for region in self.region_analysis.common_regions:
            self.logger.info(f"Analyzing region: {region}")
            
            results = self.train_evaluate_region(region)
            if results is not None:
                self.results[region] = results
    
    def summarize_results(self):
        """
        Print summary of ML analysis results
        """
        print("\nRegional ML Analysis Summary")
        print("=" * 50)
        
        # Find top performing regions
        region_performances = []
        for region, results in self.results.items():
            best_score = max(
                model['cv_scores_mean'] 
                for model in results['models'].values()
            )
            best_model = max(
                results['models'].items(),
                key=lambda x: x[1]['cv_scores_mean']
            )[0]
            
            region_performances.append({
                'region': region,
                'best_score': best_score,
                'best_model': best_model,
                'n_hup': results['n_hup'],
                'n_mni': results['n_mni']
            })
        
        # Sort by performance
        region_performances.sort(key=lambda x: x['best_score'], reverse=True)
        
        # Print top 10 regions
        print("\nTop 10 regions by classification performance:")
        for i, perf in enumerate(region_performances[:10], 1):
            print(f"\n{i}. {perf['region']}")
            print(f"   Best model: {perf['best_model']}")
            print(f"   ROC AUC: {perf['best_score']:.3f}")
            print(f"   Samples: HUP={perf['n_hup']}, MNI={perf['n_mni']}")
            
            # Print feature importances if available
            region_results = self.results[perf['region']]
            best_model_results = region_results['models'][perf['best_model']]
            if 'feature_importances' in best_model_results:
                print("   Top features:")
                importances = best_model_results['feature_importances']
                for feature, importance in list(sorted(
                    importances.items(), 
                    key=lambda x: abs(x[1]), 
                    reverse=True
                ))[:3]:
                    print(f"    - {feature}: {importance:.3f}")
        
        # Calculate overall statistics
        print("\nOverall Statistics:")
        print(f"Total regions analyzed: {len(self.results)}")
        good_regions = sum(1 for perf in region_performances if perf['best_score'] > 0.7)
        print(f"Regions with ROC AUC > 0.7: {good_regions}")
        
        # Model performance comparison
        model_scores = {model: [] for model in ['logistic', 'rf', 'svm']}
        for results in self.results.values():
            for model_name, model_results in results['models'].items():
                model_scores[model_name].append(model_results['cv_scores_mean'])
        
        print("\nAverage performance by model type:")
        for model_name, scores in model_scores.items():
            mean_score = np.mean(scores)
            std_score = np.std(scores)
            print(f"{model_name}: {mean_score:.3f} ± {std_score:.3f}")