# ODAPIA : One Dimension Analysis Pipeline "Inteligence-Artificial"
version 1

In [None]:
import math
import pandas as pd
import numpy as np
import scipy.stats as stats
from scipy.stats import kruskal
from itertools import combinations
import seaborn as sns
from statannot import add_stat_annotation
import matplotlib.pyplot as plt
from scipy.signal import savgol_filter, find_peaks
from sklearn.model_selection import train_test_split, cross_val_score, cross_val_predict, KFold
from sklearn.pipeline import Pipeline
from sklearn.preprocessing import StandardScaler
from sklearn.linear_model import RidgeClassifierCV
from sklearn.metrics import classification_report, confusion_matrix, ConfusionMatrixDisplay
from lazypredict.Supervised import LazyClassifier
from sklearn.linear_model import *
from sklearn.ensemble import *
import lightgbm as lgb
import xgboost as xgb
import importlib
import joblib
import plotly.graph_objects as go
import eli5
import warnings

# Ignore warnings
warnings.filterwarnings("ignore")

def import_data(data_file):
    """
    Imports data from a CSV.
    Args:
        data_file (str): Path to the CSV data file.
    Returns:
        pd.DataFrame:data.
    """
    data = pd.read_csv(data_file)
    return data

def plot_mean_spectra(data, class_column='Class', threshold=None, colors=None):
    """
    Plots the mean spectrum for each unique target class in the specified class column.

    Args:
        data (pd.DataFrame): The DataFrame containing the spectral data.
        class_column (str): The name of the column containing class labels.
        threshold (float): Optional threshold value for highlighting peaks.
        colors (dict): A dictionary specifying custom colors for each class.

    Returns:
        fig (go.Figure): The Plotly figure containing the mean spectra.
    """
    data = data.drop(["Start scan", "End scan", "Sum.", "File"], axis=1)
    fig = go.Figure()
    unique_classes = data[class_column].unique()

    # Create a color map for plotting different classes
    if colors is None:
        colors = {class_label: f'rgb({i * 10}, {255 - i * 40}, {i * 20})'
                  for i, class_label in enumerate(unique_classes)}

    for class_label in unique_classes:
        class_data = data[data[class_column] == class_label].drop(class_column, axis=1)
        mean_spectrum = class_data.mean()

        # Add a trace for the mean spectrum of each class
        fig.add_trace(go.Scatter(x=mean_spectrum.index, y=mean_spectrum.values,
                                 mode='lines', name=f'Class {class_label}',
                                 line=dict(color=colors.get(class_label, 'blue'))))

        if threshold:
            # Highlight peaks exceeding the threshold on the y-axis
            max_value = mean_spectrum.max()
            for mz, intensity in zip(mean_spectrum.index, mean_spectrum.values):
                if intensity > threshold:
                    fig.add_annotation(
                        go.layout.Annotation(
                            x=mz,
                            y=intensity,
                            text=mz,
                            showarrow=True,
                            arrowhead=3,
                            arrowsize=1,
                            arrowwidth=2,
                            arrowcolor='green',
                            ax=0,
                            ay=-30,
                            yref='y',
                            xref='x'
                        )
                    )

    # Update layout and axis titles
    fig.update_layout(width=1000, xaxis_title='m/z', yaxis_title='Relative Intensities')
    fig.update_xaxes(tickangle=45, tickfont=dict(size=10))  # Adjust the font size of x-axis ticks

    return fig


def process_data(data):
    """
    preprocess data.
    Args:
        pd.DataFrame: data.
    Returns:
        pd.DataFrame: Features (X).
        pd.Series: Target (y).
    """
    # Load the dataset

    train = pd.read_csv(data_file)

    # Data preprocessing
    train_id = train
    train = train.drop(["File", "Sum.", "Start scan", "End scan"], axis=1) # adapt this to your specific columns

    # Split data into features (X) and target (y)
    y = train.pop('Class')
    X = train

    return X, y

def lazy_predict(X_train, y_train, X_test, y_test):
    """
    Perform lazy prediction of multiple models.
    Args:
        X_train (pd.DataFrame): Training features.
        y_train (pd.Series): Training target.
        X_test (pd.DataFrame): Testing features.
        y_test (pd.Series): Testing target.
    Returns:
        pd.DataFrame: LazyClassifier results.
        pd.DataFrame: Model predictions.
    """
    
    # Initialize LazyClassifier
    clf = LazyClassifier(verbose=0, predictions=True)
    models, predictions = clf.fit(X_train, X_test, y_train, y_test)
    
    # Print the models
    print(models)

    return models, predictions

def find_and_build_best_model(models, X_train, y_train, specific_model=None):
    """
    Find and build the best model based on Lazy Predict results or a specific model if specified.
    Args:
        models (pd.DataFrame): LazyClassifier results.
        X_train (pd.DataFrame): Training features.
        y_train (pd.Series): Training target.
        specific_model (str, optional): If provided, build the specified model (e.g., 'RidgeClassifier').
    Returns:
        str: Best model name.
        Pipeline: Best model pipeline.
    """
    best_model_name = None
    best_f1_score = -1

    # If a specific model is specified, use it directly
    if specific_model:
        best_model_name = specific_model
    else:
        # Find the best model based on Lazy Predict results
        for model_name in models.index:
            f1_score = models.at[model_name, 'F1 Score']
            if f1_score > best_f1_score:
                best_f1_score = f1_score
                best_model_name = model_name

    if best_model_name:
        print("Best Classifier:", best_model_name)

        try:
            # If the specific model is RidgeClassifier, use it directly
            if best_model_name == 'RidgeClassifier':
                best_model = RidgeClassifier()
            else:
                model_module = importlib.import_module('sklearn.linear_model')
                if hasattr(model_module, best_model_name):
                    best_model = getattr(model_module, best_model_name)()
                else:
                    model_module = importlib.import_module('sklearn.ensemble')
                    if hasattr(model_module, best_model_name):
                        best_model = getattr(model_module, best_model_name)()
                    else:
                        model_module = importlib.import_module('sklearn.svm')
                        if hasattr(model_module, best_model_name):
                            best_model = getattr(model_module, best_model_name)()
                        else:
                            # Check if it's a LightGBM classifier
                            if best_model_name.startswith("LGBM"):
                                best_model = getattr(lgb, best_model_name)()
                            # Check if it's an XGBoost classifier
                            elif best_model_name.startswith("XGB"):
                                best_model = getattr(xgb, best_model_name)()
                            else:
                                print("Best Classifier not found.")
                                return None, None
            
            pipeline = Pipeline([('scaler', StandardScaler()), (best_model_name, best_model)])
            pipeline.fit(X_train, y_train)
            return best_model_name, pipeline
        except ImportError:
            print("Best Classifier not found.")
            return None, None
    else:
        print("Best Classifier not found.")
        return None, None

def confusion_matrix_scores_classification_report(pipeline, X_test, y_test):
    """
    Show scores, confusion matrix, and classification report.
    Args:
        pipeline (Pipeline): Trained model pipeline.
        X_test (pd.DataFrame): Testing features.
        y_test (pd.Series): Testing target.
    """
    y_pred = pipeline.predict(X_test)
    score = pipeline.score(X_test, y_test)
    print('Accuracy:', score)
    print(classification_report(y_test, y_pred))

    ConfusionMatrixDisplay.from_estimator(pipeline, X_test, y_test)
    plt.rcParams["figure.figsize"] = (10, 15)
    plt.show()

def cross_validate_and_report(pipeline, X, y):
    """
    K-fold cross-validation and show scores, classification report, and confusion matrix.
    Args:
        pipeline (Pipeline): Trained model pipeline.
        X (pd.DataFrame): Features.
        y (pd.Series): Target.
    """
    kfold = KFold(n_splits=5, shuffle=True, random_state=1)
    cv_scores = cross_val_score(pipeline, X, y, cv=kfold)
    print('CV Scores:', cv_scores)
    print('Mean CV Score:', cv_scores.mean())
    print('Std CV Score:', cv_scores.std())

    y_pred = cross_val_predict(pipeline, X, y, cv=kfold)
    print(classification_report(y, y_pred))

    class_names = pipeline.named_steps[pipeline.steps[-1][0]].classes_
    cm = confusion_matrix(y, y_pred)

    fig, ax = plt.subplots()
    im = ax.imshow(cm, interpolation='nearest', cmap='viridis')
    ax.figure.colorbar(im, ax=ax)
    ax.set(xticks=np.arange(cm.shape[1]), yticks=np.arange(cm.shape[0]), xticklabels=class_names, yticklabels=class_names, title='Confusion matrix', ylabel='True label', xlabel='Predicted label')
    plt.setp(ax.get_xticklabels(), rotation=0, ha="right", rotation_mode="anchor")

    thresh = cm.max() / 2.
    for i in range(cm.shape[0]):
        for j in range(cm.shape[1]):
            ax.text(j, i, format(cm[i, j], 'd'), ha="center", va="center", color="black" if cm[i, j] > thresh else "yellow")
    fig.tight_layout()
    plt.rcParams["figure.figsize"] = (10, 15)
    plt.show()

def eli5_feature_importance(pipeline, X_train, top_features=40):
    """
    Show feature importances using Eli5 (LIME (contribution) or permutation(weights).
    Args:
        pipeline (Pipeline): Trained model pipeline.
        X_train (pd.DataFrame): Training features.
        top_features (int): Number of top features to display.
    """
    model = pipeline.named_steps[pipeline.steps[-1][0]]
    sample_contribution = eli5.show_weights(model, feature_names=X_train.columns.tolist(), top=top_features, feature_re='^.*$')
    return sample_contribution

def save_contributions(csv_name, pipeline, X_train):
    """
    Save contributions in CSV format for all samples in the train set.
    """
    model = pipeline.named_steps[pipeline.steps[-1][0]]
    sample_contributions = []

    for idx in range(len(X_train.index)):
        sample_contribution_df = eli5.explain_weights_df(model, feature_names=X_train.columns.tolist(), feature_re='^.*$')
        sample_contributions.append(sample_contribution_df)

    all_contributions_df = pd.concat(sample_contributions)
    all_contributions_df.to_csv(csv_name, index=False)
    

def peak_picking(ms_data, min_sn=10):
    """
    Performs peak picking on mass spectrometry data using S/N ratio and find_peaks.
    Args:
        ms_data (pd.DataFrame): Mass spectrometry data.
        min_sn (int): Minimum signal-to-noise ratio for peak picking.
    Returns:
        pd.DataFrame: Picked peaks data.
    """
    peaks_df_list = []
    ms_data = data.drop(["Class", "Start scan", "End scan", "Sum.", "File"], axis=1)
    for i in range(len(ms_data)):
        spectrum = ms_data.iloc[i].values

        # Calculate the noise level as the standard deviation of the spectrum
        noise_std = np.std(spectrum)

        # Calculate the threshold based on the noise level and min_sn
        threshold = noise_std * min_sn

        # Find peaks using the threshold
        peaks, _ = find_peaks(spectrum, height=threshold)

        peaks_df_i = pd.DataFrame({
            'spectrum_index': i,
            'm/z': ms_data.columns[peaks],
            'intensity': spectrum[peaks],
        })

        peaks_df_list.append(peaks_df_i)

    peaks_df = pd.concat(peaks_df_list, ignore_index=True)
    peaks_df = peaks_df.dropna(subset=['m/z'])
    peaks_df = peaks_df.pivot_table(index='spectrum_index', columns='m/z', values='intensity')
    data_pick_picked = pd.concat([peaks_df,data['Class']], axis=1)
    data_pick_picked = data_pick_picked.fillna(0)
    data_pick_picked
    
    return data_pick_picked 

def create_heatmap(data,cmap='viridis_r', distance_metric='cosine', z_score=0):
    """
    Create a heatmap with custom colors for over-expression (red) and under-expression (green) with black for no correlation.
    Args:
        data (pd.DataFrame): Data for creating the heatmap.
        distance_metric (str): Distance metric for clustering (e.g., 'cosine', 'euclidean').
        z_score (int): Z-score for data normalization.
    """
    # Create the heatmap using the custom colormap
    sns.clustermap(data.groupby('Class').mean().T, cmap=cmap, center=0, col_cluster=False, row_cluster=True, metric=distance_metric, z_score=z_score, cbar_kws={'label': ''}, cbar=True, xticklabels=True, yticklabels=False)

    plt.show()

def significant_features(data, alpha=0.05):
    """
    Identify significant features using the Kruskal-Wallis test.
    Args:
        data (pd.DataFrame): Data containing features and target.
        alpha (float): Significance level for the test.
    Returns:
        list: Significant feature column names.
    """
    x = 'Class'
    y_columns = data.columns.tolist()
    
    # Check if 'Class' is in the list of columns
    if x in y_columns:
        y_columns.remove(x)
    else:
        print("'Class' column not found in the data.")
        return None

    order = data[x].unique()
    significant_columns = []

    num_comparisons = len(y_columns)  # Number of comparisons (number of features)

    # Apply Bonferroni correction
    corrected_alpha = alpha / num_comparisons

    for col in y_columns:
        data_dict = {group: data[col][data[x] == group] for group in order}
        test_statistic, p_value = kruskal(*data_dict.values())
        
        if p_value <= corrected_alpha:
            significant_columns.append(col)

    return significant_columns


def boxplot_significant_features(data, mz_values, class_colors=None, test='Kruskal', loc='inside'):
    """
    Create box plots for significant features.
    Args:
        data (pd.DataFrame): Data containing features and target.
        significant_features (list): List of significant feature column names.
        class_colors (dict): Custom class colors.
        test (str): Statistical test to use ('Kruskal' by default).
    """
    # Define order and box pairs
    label = 'Class'
    order = sorted(data[label].unique())  # Sort the unique class labels
    # Generate box pairs using itertools.combinations
    box_pairs = list(combinations(order, 2))
    print("Class labels in dataset:", order)  # Add this line for debugging
    
    # Create a custom color palette mapping class labels to colors
    custom_palette = {class_label: class_colors.get(class_label, 'blue') for class_label in order}
    
    # Calculate the number of rows and columns for the grid layout
    num_mz_values = len(mz_values)
    num_cols = int(num_mz_values ** 0.5)  # Calculate the number of columns based on sqrt(num_mz_values)
    num_rows = (num_mz_values + num_cols - 1) // num_cols  # Calculate the number of rows
    
    # Calculate the figure size based on the number of box plots
    figsize_x = 16
    figsize_y = 5 * num_rows
    
    # Create a figure and axis grid for the boxplots
    fig, axes = plt.subplots(num_rows, num_cols, figsize=(figsize_x, figsize_y), dpi=100)
    
    # Flatten the axes array if there's only one row
    if num_rows == 1:
        axes = axes.reshape(1, -1)
    
    for i, mz in enumerate(mz_values):
        x = "Class"
        y = mz

        # Select the current axis
        row = i // num_cols
        col = i % num_cols
        ax = axes[row, col]
        
        # Create a boxplot on the current axis with custom class color
        sns.boxplot(data=data, x=x, y=y, order=order, ax=ax, palette=custom_palette)
        ax.set_xticklabels(ax.get_xticklabels(), rotation=45)
        
        # Add simplified statistical annotations to the boxplot
        add_stat_annotation(ax, data=data, x=x, y=y, order=order, box_pairs=box_pairs,
                            test=test, text_format='star', loc='inside', verbose=0)
        
        # Set smaller font size for y-axis labels
        ax.tick_params(axis='y', labelsize=8)
    
    # Remove empty subplots
    for i in range(num_mz_values, num_rows * num_cols):
        fig.delaxes(axes.flatten()[i])
    
    # Adjust layout and spacing of subplots
    plt.tight_layout()
    
    # Display the final figure
    plt.show()
        

        
def violin_significant_features(data, mz_values, class_colors=None, test='Kruskal', loc='inside'):
    
    """
    Create violin plots for significant features.
    Args:
        data (pd.DataFrame): Data containing features and target.
        significant_features (list): List of significant feature column names.
        class_colors (dict): Custom class colors.
        test (str): Statistical test to use ('Kruskal' by default).
    """
    label = 'Class'
    order = sorted(data[label].unique())  # Sort the unique class labels
    box_pairs = list(combinations(order, 2))

    print("Class labels in dataset:", order)  # Add this line for debugging

    # Create a custom color palette mapping class labels to colors
    custom_palette = {class_label: class_colors.get(class_label, 'blue') for class_label in order}

    # Calculate the number of rows and columns for the grid layout
    num_mz_values = len(mz_values)
    num_cols = int(num_mz_values ** 0.5)  # Calculate the number of columns based on sqrt(num_mz_values)
    num_rows = (num_mz_values + num_cols - 1) // num_cols  # Calculate the number of rows

    # Calculate the figure size based on the number of violin plots
    figsize_x = 16
    figsize_y = 5 * num_rows

    # Create a figure and axis grid for the violin plots
    fig, axes = plt.subplots(num_rows, num_cols, figsize=(figsize_x, figsize_y), dpi=100)

    # Flatten the axes array if there's only one row
    if num_rows == 1:
        axes = axes.reshape(1, -1)

    for i, mz in enumerate(mz_values):
        x = "Class"
        y = mz

        # Select the current axis
        row = i // num_cols
        col = i % num_cols
        ax = axes[row, col]

        # Create a violin plot on the current axis with custom class color
        sns.violinplot(data=data, x=x, y=y, order=order, inner="quart", ax=ax, palette=custom_palette)
        ax.set_xticklabels(ax.get_xticklabels(), rotation=45)

        # Add simplified statistical annotations to the violin plot
        add_stat_annotation(ax, data=data, x=x, y=y, order=order, box_pairs=box_pairs,
                            test=test, text_format='star', loc='inside', verbose=0)

        # Set smaller font size for y-axis labels
        ax.tick_params(axis='y', labelsize=8)

    # Remove empty subplots
    for i in range(num_mz_values, num_rows * num_cols):
        fig.delaxes(axes.flatten()[i])

    # Adjust layout and spacing of subplots
    plt.tight_layout()

    # Display the final figure
    plt.show()

# Usages

# 1- Supervised Learning

In [None]:
# Data file path
data_file ='data.csv'

# Import data
data = import_data(data_file)
print("Labels :",data.Class.unique())
data

In [None]:
# Display spectra 
custom_colors = {'Class1': 'green', 'Class2': 'pink', 'Class3'}  # Customize class colors
plot = plot_mean_spectra(data, class_column='Class', threshold=None, colors=custom_colors) 
plot.show()

In [None]:
# ML models
X, y =process_data(data)
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=1, shuffle=True, stratify=y)
models, predictions = lazy_predict(X_train, y_train, X_test, y_test)
models

In [None]:
# Find the best model or a specific model if specified
best_model_name, best_model_pipeline = find_and_build_best_model(models, X_train, y_train, specific_model=None) # specific_model='RidgeClassifierCV'

In [None]:
# Build the best model for 20% out validation 
confusion_matrix_scores_classification_report(best_model_pipeline, X_test, y_test)

In [None]:
# Cross validate the best model with 5-fold cv
cross_validate_and_report(best_model_pipeline, X, y)

In [None]:
# Save the best model for blind test
joblib.dump(best_model_pipeline, "Model_X.pkl") # put your model name

In [None]:
# Display feature contributions
sample_contribution = eli5_feature_importance(best_model_pipeline, X_train)
sample_contribution

In [None]:
save_contributions("LIME_X.csv", best_model_pipeline, X_train)  # put you csv name

# 2-Unsupervised Learning

In [None]:
# Peak picking 
data_peak_picked = peak_picking(data, min_sn=10) # specify the S/N 
data_peak_picked

In [None]:
# Create a heatmap
create_heatmap(data_peak_picked,cmap='viridis_r',distance_metric='cosine', z_score=0)

In [None]:
# Significant features
significant_mz_values = significant_features(data_peak_picked)
print(len(significant_mz_values))
significant_mz_values

In [None]:
# Create boxplots for significant features
boxplot_significant_features(data, significant_mz_values1, class_colors=custom_colors)

In [None]:
# Create violin plots for significant features
violin_significant_features(data, significant_mz_values, class_colors=custom_colors)