In [None]:
!pip install xlsxwriter

Collecting xlsxwriter
  Downloading XlsxWriter-3.2.3-py3-none-any.whl.metadata (2.7 kB)
Downloading XlsxWriter-3.2.3-py3-none-any.whl (169 kB)
[?25l   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m0.0/169.4 kB[0m [31m?[0m eta [36m-:--:--[0m[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m169.4/169.4 kB[0m [31m11.1 MB/s[0m eta [36m0:00:00[0m
[?25hInstalling collected packages: xlsxwriter
Successfully installed xlsxwriter-3.2.3


In [None]:
# -*- coding: utf-8 -*-
"""Brain Tumor Analysis - Configurable Excel Report Version"""
import os
import warnings
import logging
from datetime import datetime

import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns

from sklearn.exceptions import ConvergenceWarning
from sklearn.model_selection import StratifiedKFold
from sklearn.preprocessing import StandardScaler
from sklearn.pipeline import Pipeline
from sklearn.feature_selection import SelectKBest, f_classif, mutual_info_classif, SelectFromModel
from sklearn.metrics import (
    accuracy_score, f1_score, recall_score, precision_score, roc_auc_score,
    confusion_matrix, roc_curve, precision_recall_curve, auc
)
from sklearn.ensemble import ExtraTreesClassifier, RandomForestClassifier, GradientBoostingClassifier
from sklearn.linear_model import LogisticRegression
from sklearn.svm import SVC
from sklearn.neighbors import KNeighborsClassifier
from sklearn.naive_bayes import GaussianNB
from sklearn.tree import DecisionTreeClassifier
from sklearn.neural_network import MLPClassifier
from sklearn.base import clone

import xgboost as xgb
from imblearn.over_sampling import SMOTE
from google.colab import files
import joblib

# =============================================
# CONFIGURATION SECTION - EDIT THESE PARAMETERS
# =============================================
CONFIG = {
    'target_column': 'Contrast Label',  # Name of your target variable column
    'drop_columns': ['Patient ID', 'Source'],  # Columns to exclude from features
    'test_datasets': ['UPENN'],  # Which datasets to use for testing
    'train_datasets': ['Africa', 'TCGA-LGG', 'UCSF-PDGM'],  # Datasets to use for training
    'balance_ratio': 5,  # Ratio for balancing classes (majority:minority)
    'num_features': 5,  # Number of features to select
    'random_state': 42,  # Random seed for reproducibility
    'n_splits': 5,  # Number of folds for cross-validation
}
# =============================================

# Suppress specific warnings
warnings.filterwarnings("ignore", category=UserWarning)
warnings.filterwarnings("ignore", category=ConvergenceWarning)

# Basic configurations
logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(levelname)s - %(message)s',
    handlers=[logging.FileHandler("brain_tumor_analysis.log"), logging.StreamHandler()]
)
logger = logging.getLogger(__name__)

sns.set_theme(style="whitegrid", palette="husl")
pd.set_option('display.float_format', '{:.4f}'.format)
np.random.seed(CONFIG['random_state'])
plt.rcParams['font.family'] = 'DejaVu Sans'
plt.rcParams['axes.unicode_minus'] = False

# Data loading and preprocessing
def load_data(paths):
    dfs = []
    for name, path in paths.items():
        df = pd.read_excel(path)
        df['Source'] = name
        dfs.append(df)
        logger.info(f"Loaded {name}: {len(df)} rows")
    return pd.concat(dfs).reset_index(drop=True)

def balance_data(df, target_col=CONFIG['target_column'], ratio=CONFIG['balance_ratio']):
    counts = df[target_col].value_counts()
    if len(counts) != 2:
        raise ValueError("Data must have exactly two classes")
    minor, maj = counts.idxmin(), counts.idxmax()
    df_min, df_maj = df[df[target_col]==minor], df[df[target_col]==maj]
    n_maj = min(len(df_maj), ratio * len(df_min))
    df_maj_sample = df_maj.sample(n=n_maj, random_state=CONFIG['random_state'])
    return pd.concat([df_min, df_maj_sample]).sample(frac=1, random_state=CONFIG['random_state'])

def prepare_features(df, target=CONFIG['target_column'], drop_cols=CONFIG['drop_columns']):
    X = df.drop(columns=[target] + drop_cols)
    y = df[target]
    return X, y

def get_feature_names(selector, feature_names):
    try:
        if hasattr(selector, 'get_support'):
            mask = selector.get_support()
            selected_features = np.array(feature_names)[mask]
            return ', '.join(selected_features)
        elif hasattr(selector, 'feature_importances_'):
            top_indices = np.argsort(selector.feature_importances_)[-CONFIG['num_features']:]
            selected_features = np.array(feature_names)[top_indices]
            return ', '.join(selected_features)
        else:
            return "Feature names not available"
    except Exception as e:
        logger.error(f"Error getting feature names: {e}")
        return "Error retrieving features"

# Evaluation function
def evaluate_model(model, X, y, n_splits=CONFIG['n_splits'], mode='train'):
    cv = StratifiedKFold(n_splits=n_splits, shuffle=True, random_state=CONFIG['random_state'])
    metrics = {
        'accuracy': [], 'f1': [], 'recall': [], 'precision': [],
        'roc_auc': [], 'pr_auc': []
    }

    for tr, te in cv.split(X, y):
        model.fit(X.iloc[tr], y.iloc[tr])
        y_pred = model.predict(X.iloc[te])
        y_proba = model.predict_proba(X.iloc[te])[:, 1]

        metrics['accuracy'].append(accuracy_score(y.iloc[te], y_pred))
        metrics['f1'].append(f1_score(y.iloc[te], y_pred))
        metrics['recall'].append(recall_score(y.iloc[te], y_pred))
        metrics['precision'].append(precision_score(y.iloc[te], y_pred))
        metrics['roc_auc'].append(roc_auc_score(y.iloc[te], y_proba))
        p, r, _ = precision_recall_curve(y.iloc[te], y_proba)
        metrics['pr_auc'].append(auc(r, p))

    result = {
        f'{mode}_mean_accuracy': np.mean(metrics['accuracy']),
        f'{mode}_std_accuracy': np.std(metrics['accuracy']),
        f'{mode}_mean_f1': np.mean(metrics['f1']),
        f'{mode}_std_f1': np.std(metrics['f1']),
        f'{mode}_mean_recall': np.mean(metrics['recall']),
        f'{mode}_std_recall': np.std(metrics['recall']),
        f'{mode}_mean_precision': np.mean(metrics['precision']),
        f'{mode}_std_precision': np.std(metrics['precision']),
        f'{mode}_mean_roc_auc': np.mean(metrics['roc_auc']),
        f'{mode}_std_roc_auc': np.std(metrics['roc_auc']),
        f'{mode}_mean_pr_auc': np.mean(metrics['pr_auc']),
        f'{mode}_std_pr_auc': np.std(metrics['pr_auc'])
    }

    return result

# Training and evaluation of pipelines
def train_and_evaluate(X_train, y_train, X_test, y_test):
    selectors = {
        'KBest_f': SelectKBest(f_classif, k=CONFIG['num_features']),
        'KBest_mi': SelectKBest(mutual_info_classif, k=CONFIG['num_features']),
        'FromModel_ET': SelectFromModel(
            ExtraTreesClassifier(n_estimators=100, random_state=CONFIG['random_state']),
            threshold='mean'
        )
    }

    classifiers = {
        'LR': LogisticRegression(class_weight='balanced', max_iter=1000, random_state=CONFIG['random_state']),
        'XGB': xgb.XGBClassifier(scale_pos_weight=CONFIG['balance_ratio'], eval_metric='logloss', verbosity=0, random_state=CONFIG['random_state']),
        'RF': RandomForestClassifier(class_weight='balanced', n_estimators=100, random_state=CONFIG['random_state']),
        'GB': GradientBoostingClassifier(n_estimators=100, random_state=CONFIG['random_state']),
        'SVC': SVC(class_weight='balanced', probability=True, random_state=CONFIG['random_state']),
        'KNN': KNeighborsClassifier(n_neighbors=5),
        'NB': GaussianNB(),
        'DT': DecisionTreeClassifier(class_weight='balanced', random_state=CONFIG['random_state']),
        'MLP': MLPClassifier(max_iter=500, random_state=CONFIG['random_state'])
    }

    report_data = []
    feature_names = X_train.columns

    for sname, sel in selectors.items():
        for cname, clf in classifiers.items():
            name = f"{cname}_{sname}"
            logger.info(f"Evaluating pipeline: {name}")

            # Clone models to avoid contamination
            sel_clone = clone(sel)
            clf_clone = clone(clf)

            pipe = Pipeline([
                ('scaler', StandardScaler()),
                ('selector', sel_clone),
                ('clf', clf_clone)
            ])

            # Train evaluation
            train_metrics = evaluate_model(pipe, X_train, y_train, mode='train')

            # Test evaluation
            pipe.fit(X_train, y_train)
            y_test_pred = pipe.predict(X_test)
            y_test_proba = pipe.predict_proba(X_test)[:, 1]

            p_test, r_test, _ = precision_recall_curve(y_test, y_test_proba)

            test_metrics = {
                'test_mean_accuracy': accuracy_score(y_test, y_test_pred),
                'test_std_accuracy': 0,  # No std for single test set evaluation
                'test_mean_f1': f1_score(y_test, y_test_pred),
                'test_std_f1': 0,
                'test_mean_recall': recall_score(y_test, y_test_pred),
                'test_std_recall': 0,
                'test_mean_precision': precision_score(y_test, y_test_pred),
                'test_std_precision': 0,
                'test_mean_roc_auc': roc_auc_score(y_test, y_test_proba),
                'test_std_roc_auc': 0,
                'test_mean_pr_auc': auc(r_test, p_test),
                'test_std_pr_auc': 0
            }

            # Get selected features
            try:
                selected_features = get_feature_names(pipe.named_steps['selector'], feature_names)
            except Exception as e:
                logger.error(f"Error getting features for {name}: {e}")
                selected_features = "Error retrieving features"

            # Get class distribution
            train_counts = y_train.value_counts()
            test_counts = y_test.value_counts()

            # Compile all results
            model_data = {
                'Model_Name': name,
                'Classifier_Type': cname,
                'Feature_Selector': sname,
                'Selected_Features': selected_features,
                'Train_Class_0': train_counts.get(0, 0),
                'Train_Class_1': train_counts.get(1, 0),
                'Train_Class_Ratio': f"1:{round(train_counts.get(1, 1)/train_counts.get(0, 1))}",
                'Test_Class_0': test_counts.get(0, 0),
                'Test_Class_1': test_counts.get(1, 0),
                'Test_Class_Ratio': f"1:{round(test_counts.get(1, 1)/test_counts.get(0, 1))}",
                **train_metrics,
                **test_metrics
            }

            report_data.append(model_data)

    return pd.DataFrame(report_data)

# Generate Excel report
def generate_excel_report(report_df, train_sources, test_sources):
    # Create a Pandas Excel writer
    excel_file = "brain_tumor_analysis_report.xlsx"
    writer = pd.ExcelWriter(excel_file, engine='xlsxwriter')

    # Write the main report
    report_df.to_excel(writer, sheet_name='Model_Performance', index=False)

    # Create a summary sheet
    summary_data = {
        'Configuration': [
            f"Target Column: {CONFIG['target_column']}",
            f"Test Datasets: {', '.join(CONFIG['test_datasets'])}",
            f"Train Datasets: {', '.join(CONFIG['train_datasets'])}",
            f"Balance Ratio: 1:{CONFIG['balance_ratio']}",
            f"Number of Features: {CONFIG['num_features']}",
            f"CV Folds: {CONFIG['n_splits']}",
            f"Random State: {CONFIG['random_state']}"
        ],
        'Performance Metrics': [
            f"Best Train Accuracy: {report_df['train_mean_accuracy'].max():.3f}",
            f"Best Test Accuracy: {report_df['test_mean_accuracy'].max():.3f}",
            f"Best Train AUC: {report_df['train_mean_roc_auc'].max():.3f}",
            f"Best Test AUC: {report_df['test_mean_roc_auc'].max():.3f}",
            f"Best Train F1: {report_df['train_mean_f1'].max():.3f}",
            f"Best Test F1: {report_df['test_mean_f1'].max():.3f}"
        ]
    }

    pd.DataFrame.from_dict(summary_data, orient='index').to_excel(
        writer, sheet_name='Summary', header=False)

    # Format the Excel file
    workbook = writer.book
    worksheet = writer.sheets['Model_Performance']

    # Add header formats
    header_format = workbook.add_format({
        'bold': True,
        'text_wrap': True,
        'valign': 'top',
        'fg_color': '#4472C4',
        'font_color': 'white',
        'border': 1
    })

    # Write the column headers with the defined format
    for col_num, value in enumerate(report_df.columns.values):
        worksheet.write(0, col_num, value, header_format)

    # Auto-adjust columns' width
    for column in report_df:
        column_width = max(report_df[column].astype(str).map(len).max(), len(column)) + 2
        col_idx = report_df.columns.get_loc(column)
        worksheet.set_column(col_idx, col_idx, column_width)

    # Add conditional formatting for metrics
    high_format = workbook.add_format({'bg_color': '#C6EFCE', 'font_color': '#006100'})
    medium_format = workbook.add_format({'bg_color': '#FFEB9C', 'font_color': '#9C6500'})

    # Apply formatting to metrics columns
    metric_cols = [col for col in report_df.columns if 'mean_' in col]
    for col in metric_cols:
        col_idx = report_df.columns.get_loc(col)
        worksheet.conditional_format(1, col_idx, len(report_df), col_idx, {
            'type': '3_color_scale',
            'min_color': "#F8696B",
            'mid_color': "#FFEB84",
            'max_color': "#63BE7B"
        })

    writer.close()
    logger.info(f"Excel report generated: {excel_file}")
    return excel_file

# Main execution
if __name__ == "__main__":
    paths = {
        "Africa": "/content/Africa.xlsx",
        "TCGA-LGG": "/content/TCGA-LGG.xlsx",
        "UCSF-PDGM": "/content/UCSF-PDGM.xlsx",
        "UPENN": "/content/UPENN.xlsx"
    }

    # Load and prepare data
    df = load_data(paths)
    train_df = balance_data(df[df['Source'].isin(CONFIG['train_datasets'])])
    test_df = balance_data(df[df['Source'].isin(CONFIG['test_datasets'])])

    # Get source names for reporting
    train_sources = list(train_df['Source'].unique())
    test_sources = list(test_df['Source'].unique())

    X_train, y_train = prepare_features(train_df)
    X_test, y_test = prepare_features(test_df)

    # Apply SMOTE to training data
    X_resampled_train, y_resampled_train = SMOTE(random_state=CONFIG['random_state']).fit_resample(X_train, y_train)

    # Train models and generate report
    report_df = train_and_evaluate(X_resampled_train, y_resampled_train, X_test, y_test)
    excel_file = generate_excel_report(report_df, train_sources, test_sources)

    # Save models and download results
    joblib.dump(report_df, 'model_results.pkl')
    files.download(excel_file)

    log_file = 'brain_tumor_analysis.log'
    if os.path.exists(log_file):
        files.download(log_file)
    else:
        logger.warning(f"Log file not found, skipping download: {log_file}")

    logger.info("✅ Analysis completed successfully!")
    # Now you can select which datasets to use in both train and test by modifying the CONFIG dictionary

<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>