In [2]:
import os
import pandas as pd
import numpy as np
from sklearn.impute import SimpleImputer
from sklearn.preprocessing import StandardScaler
import matplotlib.pyplot as plt
import seaborn as sns
from sklearn.model_selection import train_test_split
from sklearn.metrics import accuracy_score, log_loss, confusion_matrix, classification_report, roc_curve, auc, roc_auc_score
from sklearn.linear_model import LogisticRegression

def prepare_data_for_vae_eval(selected_genes_nt_subsample_path, selected_genes_path):
    # Read in the data
    selected_genes_nt_subsample = pd.read_csv(selected_genes_nt_subsample_path)
    selected_genes = pd.read_csv(selected_genes_path)

    # Concatenate the dataframes
    selected_genes_all = pd.concat([selected_genes_nt_subsample, selected_genes])

    # Get index of nucleus_dapi_int
    nucleus_dapi_int_index = selected_genes_all.columns.get_loc('nucleus_dapi_int')

    # Get feature columns
    feature_columns = selected_genes_all.columns[nucleus_dapi_int_index:]

    # Create a list of columns to keep
    columns_to_keep = ['gene_symbol_0'] + list(feature_columns)

    # Keep only the selected columns
    selected_genes_all_cleaned = selected_genes_all[columns_to_keep]

    # Reset index 
    selected_genes_all_cleaned.reset_index(drop=True, inplace=True)

    # Split the data into metadata (gene_symbol_0) and features
    metadata = selected_genes_all_cleaned['gene_symbol_0']
    cp_df = selected_genes_all_cleaned.drop('gene_symbol_0', axis=1)

    # Replace Inf with NaN
    cp_df.replace([np.inf, -np.inf], np.nan, inplace=True)

    # Impute missing values with the mean of the column
    imputer = SimpleImputer(strategy='mean')
    cp_df = imputer.fit_transform(cp_df)

    # Scale the data
    scaler = StandardScaler()
    cp_df_scaled = scaler.fit_transform(cp_df)

    # Split into training, validation, and test sets
    X_train_val, X_test, y_train_val, y_test = train_test_split(cp_df_scaled, metadata, test_size=0.7, random_state=42)
    X_train, X_val, y_train, y_val = train_test_split(X_train_val, y_train_val, test_size=0.5, random_state=42)

    # Create DataFrames for each split
    train_df = pd.DataFrame(X_train, columns=feature_columns)
    train_df['gene_symbol_0'] = y_train.values
    
    val_df = pd.DataFrame(X_val, columns=feature_columns)
    val_df['gene_symbol_0'] = y_val.values
    
    test_df = pd.DataFrame(X_test, columns=feature_columns)
    test_df['gene_symbol_0'] = y_test.values

    # Create a dictionary to mimic the structure expected by VAE evaluation
    results = {
        'train': {
            'latent_df': train_df.drop('gene_symbol_0', axis=1),
            'metadata': pd.DataFrame({'gene': train_df['gene_symbol_0']})
        },
        'val': {
            'latent_df': val_df.drop('gene_symbol_0', axis=1),
            'metadata': pd.DataFrame({'gene': val_df['gene_symbol_0']})
        },
        'test': {
            'latent_df': test_df.drop('gene_symbol_0', axis=1),
            'metadata': pd.DataFrame({'gene': test_df['gene_symbol_0']})
        }
    }

    return results

def plot_clustermap(data, metadata, output_path):
    # Check for infinite or NaN values
    if np.any(np.isinf(data)) or np.any(np.isnan(data)):
        raise ValueError("Data contains infinite or NaN values")

    # Get unique gene symbols and assign colors
    gene_symbols = metadata['gene'].unique()
    n_colors = len(gene_symbols)
    colors = plt.cm.rainbow(np.linspace(0, 1, n_colors))
    color_map = dict(zip(gene_symbols, colors))
    
    # Create a color list for each row
    row_colors = metadata['gene'].map(color_map).tolist()
    
    # Create a clustermap with adjusted parameters
    g = sns.clustermap(
        data,
        row_cluster=True,
        col_cluster=True,
        cmap='viridis',
        figsize=(20, 20),
        cbar_kws={'label': 'Standardized Value'},
        xticklabels=False,
        yticklabels=False,
        row_colors=row_colors
    )
    
    # Adjust colorbar limits to show more variation
    vmin, vmax = np.percentile(data.values, [5, 95])
    g.ax_heatmap.collections[0].set_clim(vmin, vmax)
    
    # Add title
    plt.suptitle("Feature Clustermap", fontsize=16, y=1.02)
    
    # Create a custom legend for gene symbols
    legend_elements = [plt.Rectangle((0, 0), 1, 1, fc=color_map[gene], label=gene) for gene in gene_symbols]
    plt.legend(handles=legend_elements, title='Gene Symbols', 
               loc='center left', bbox_to_anchor=(2, 0.5), ncol=1)
    
    # Save the figure
    plt.savefig(output_path, dpi=300, bbox_inches='tight')
    plt.close()
    
def train_classifier(train_latents, train_labels):
    class_names = np.unique(train_labels)
    classifier = LogisticRegression(
        C=1.0,
        class_weight='balanced',
        random_state=42,
        max_iter=1000
    )
    classifier.fit(train_latents, train_labels)
    print(f"Convergence status: {'Converged' if classifier.n_iter_ < classifier.max_iter else 'Did not converge'}")
    print(f"Number of iterations: {classifier.n_iter_}")
    return classifier, class_names

def evaluate_classifier(classifier, latents, labels, class_names):
    y_pred = classifier.predict(latents)
    y_prob = classifier.predict_proba(latents)
    
    accuracy = accuracy_score(labels, y_pred)
    loss = log_loss(labels, y_prob)
    
    class_report = classification_report(labels, y_pred, target_names=class_names)
    conf_matrix = confusion_matrix(labels, y_pred)
    
    # Calculate AUC for each class
    auc_scores = {}
    for i, class_name in enumerate(class_names):
        auc_scores[class_name] = roc_auc_score((np.array(labels) == class_name).astype(int), 
                                               [prob[i] for prob in y_prob])
    
    # Calculate macro-average AUC
    macro_avg_auc = np.mean(list(auc_scores.values()))
    
    return {
        'loss': loss,
        'accuracy': accuracy,
        'predictions': y_pred,
        'true_labels': labels,
        'probabilities': y_prob,
        'classification_report': class_report,
        'confusion_matrix': conf_matrix,
        'auc_scores': auc_scores,
        'macro_avg_auc': macro_avg_auc
    }

def plot_confusion_matrix(conf_matrix, class_names, output_path):
    conf_matrix_percent = conf_matrix.astype('float') / conf_matrix.sum(axis=1)[:, np.newaxis] * 100
    
    plt.figure(figsize=(12, 10))
    sns.heatmap(conf_matrix_percent, annot=True, fmt='.1f', cmap='Blues',
                xticklabels=class_names, yticklabels=class_names)
    plt.title('Confusion Matrix (%)')
    plt.ylabel('True label')
    plt.xlabel('Predicted label')
    
    cbar = plt.gca().collections[0].colorbar
    cbar.set_label('Percentage (%)')
    
    plt.tight_layout()
    plt.savefig(output_path, dpi=300, bbox_inches='tight')
    plt.close()

def plot_roc_curve(true_labels, probabilities, class_names, output_path):
    fpr = dict()
    tpr = dict()
    roc_auc = dict()
    for i, class_name in enumerate(class_names):
        fpr[i], tpr[i], _ = roc_curve((np.array(true_labels) == class_name).astype(int), 
                                    [prob[i] for prob in probabilities])
        roc_auc[i] = auc(fpr[i], tpr[i])
    
    plt.figure(figsize=(10, 8))
    for i, class_name in enumerate(class_names):
        plt.plot(fpr[i], tpr[i], label=f'{class_name} (AUC = {roc_auc[i]:.2f})')
    plt.plot([0, 1], [0, 1], 'k--')
    plt.xlim([0.0, 1.0])
    plt.ylim([0.0, 1.05])
    plt.xlabel('False Positive Rate')
    plt.ylabel('True Positive Rate')
    plt.title('ROC Curve')
    plt.legend(loc="lower right")
    plt.tight_layout()
    plt.savefig(output_path, dpi=300, bbox_inches='tight')
    plt.close()

def save_results(results, output_path, class_names):
    summary = pd.DataFrame({
        'loss': [results['loss']],
        'accuracy': [results['accuracy']],
        **{f'auc_{class_name}': [results['auc_scores'][class_name]] for class_name in class_names},
        'macro_avg_auc': [results['macro_avg_auc']]
    })
    summary.to_csv(output_path, index=False)

    with open(output_path.replace('.csv', '_report.txt'), 'w') as f:
        f.write(results['classification_report'])


In [3]:
# make a directory called cct2_nontargeting_diane

output_path = 'cct2_nontargeting_diane'
os.makedirs(output_path, exist_ok=True)

results = prepare_data_for_vae_eval('cct2.csv', 'nontargeting_subsample_diane.csv')
classifier, class_names = train_classifier(results['train']['latent_df'], results['train']['metadata']['gene'])

for split in ['train', 'val', 'test']:
    plot_clustermap(results[split]['latent_df'], results[split]['metadata'], f'{output_path}/{split}_heatmap.png')
    split_results = evaluate_classifier(classifier, results[split]['latent_df'], results[split]['metadata']['gene'], class_names)
    plot_confusion_matrix(split_results['confusion_matrix'], class_names, f'{output_path}/{split}_confusion_matrix.png')
    plot_roc_curve(split_results['true_labels'], split_results['probabilities'], class_names, f'{output_path}/{split}_roc_curve.png')
    save_results(split_results, f'{output_path}/{split}_results.csv', class_names)



Clustermap saved to cct2_nontargeting_diane/train_heatmap.png




Clustermap saved to cct2_nontargeting_diane/val_heatmap.png




Clustermap saved to cct2_nontargeting_diane/test_heatmap.png
