# Supervised

In [None]:
# Import necessary libraries
import pandas as pd
import plotly.graph_objects as go
from sklearn.model_selection import train_test_split
from lazypredict.Supervised import LazyClassifier
import importlib
from sklearn.pipeline import Pipeline
from sklearn.preprocessing import StandardScaler
from sklearn.metrics import classification_report, confusion_matrix, ConfusionMatrixDisplay
import matplotlib.pyplot as plt
import numpy as np
from sklearn.model_selection import cross_val_score, cross_val_predict, KFold
import joblib
import eli5

In [None]:
# Load and inspect the data
data = pd.read_csv("Data_Train_Toy.csv")  # Import the CSV file
print("Labels:", data.Class.unique())  # Print unique labels
print("Number of samples:", data.Class.value_counts())  # Print number of samples per class
data

In [None]:
# Define a function to plot average spectra for each class
def plot_average_spectra(data, class_column='Class', threshold=None, colors=None):
    fig = go.Figure()
    unique_classes = data[class_column].unique()
    if colors is None:
        colors = {class_label: f'rgb({i * 10}, {255 - i * 40}, {i * 20})'
                  for i, class_label in enumerate(unique_classes)}
    for class_label in unique_classes:
        class_data = data[data[class_column] == class_label].drop(class_column, axis=1)
        mean_spectrum = class_data.mean()
        fig.add_trace(go.Scatter(x=mean_spectrum.index, y=mean_spectrum.values,
                                 mode='lines', name=f'Class {class_label}',
                                 line=dict(color=colors.get(class_label, 'blue'))))
    fig.update_layout(width=1000, xaxis_title='m/z', yaxis_title='Relative Intensities')
    fig.update_xaxes(tickangle=45, tickfont=dict(size=10))  

    return fig

In [None]:
# Custom colors for the classes
custom_colors = {'Tumor':'red','Necrosis':'black','Benign':'green'}

# Plot the average spectra
plot = plot_average_spectra(data, class_column='Class', threshold=None, colors=custom_colors)
plot.show()

In [None]:
# Plot a specific sample spectrum
fig = go.Figure()
fig.add_trace(go.Scatter(x=data.columns, y=data.iloc[20].values, mode='lines', line=dict(color='orange')))
fig.update_layout(width=1000, xaxis_title='m/z', yaxis_title='relative intensities', showlegend=False)
fig.show()

In [None]:
# Train different ML models
y = data.pop('Class')
X = data
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=1, shuffle=True, stratify=y)
clf = LazyClassifier(verbose=0, ignore_warnings=True, custom_metric=None)
models, predictions = clf.fit(X_train, X_test, y_train, y_test)
models

In [None]:
# Function to find and build the best model based on F1 score
def find_and_build_best_model(models, X_train, y_train, specific_model=None):
    best_model_name = None
    best_f1_score = -1
    
    for model_name in models.index:
        f1_score = models.at[model_name, 'F1 Score']
        if f1_score > best_f1_score:
            best_f1_score = f1_score
            best_model_name = model_name
    if best_model_name:
        print("Best Classifier:", best_model_name)
        
        try:
            if best_model_name == 'RidgeClassifier':
                best_model = RidgeClassifier()
            else:
                model_module = importlib.import_module('sklearn.linear_model')
                if hasattr(model_module, best_model_name):
                    best_model = getattr(model_module, best_model_name)()
                else:
                    model_module = importlib.import_module('sklearn.ensemble')
                    if hasattr(model_module, best_model_name):
                        best_model = getattr(model_module, best_model_name)()
                    else:
                        model_module = importlib.import_module('sklearn.svm')
                        if hasattr(model_module, best_model_name):
                            best_model = getattr(model_module, best_model_name)()
                        else:
                            if best_model_name.startswith("LGBM"):
                                best_model = getattr(lgb, best_model_name)()
                            elif best_model_name.startswith("XGB"):
                                best_model = getattr(xgb, best_model_name)()
                            else:
                                print("Best Classifier not found.")
                                return None, None
            
            pipeline = Pipeline([('scaler', StandardScaler()), (best_model_name, best_model)])
            pipeline.fit(X_train, y_train)
            return best_model_name, pipeline
        except ImportError:
            print("Best Classifier not found.")
            return None, None
    else:
        print("Best Classifier not found.")
        return None, None

In [None]:
# Find and build the best model
best_model_name, best_model_pipeline = find_and_build_best_model(models, X_train, y_train, specific_model=None)

In [None]:
# Function to display confusion matrix, scores, and classification report
def confusion_matrix_scores_classification_report(pipeline, X_test, y_test):
    y_pred = pipeline.predict(X_test)
    score = pipeline.score(X_test, y_test)
    print('Accuracy:', score)
    print(classification_report(y_test, y_pred))
    ConfusionMatrixDisplay.from_estimator(pipeline, X_test, y_test)
    plt.rcParams["figure.figsize"] = (10, 15)
    plt.show()

In [None]:
# Function for cross-validation and reporting results
def cross_validate_and_report(pipeline, X, y):
    kfold = KFold(n_splits=5, shuffle=True, random_state=1)
    cv_scores = cross_val_score(pipeline, X, y, cv=kfold)
    print('CV Scores:', cv_scores)
    print('Mean CV Score:', cv_scores.mean())
    print('Std CV Score:', cv_scores.std())
    y_pred = cross_val_predict(pipeline, X, y, cv=kfold)
    print(classification_report(y, y_pred))
    class_names = pipeline.named_steps[pipeline.steps[-1][0]].classes_
    cm = confusion_matrix(y, y_pred)
    fig, ax = plt.subplots()
    im = ax.imshow(cm, interpolation='nearest', cmap='viridis')
    ax.figure.colorbar(im, ax=ax)
    ax.set(xticks=np.arange(cm.shape[1]), yticks=np.arange(cm.shape[0]), xticklabels=class_names, yticklabels=class_names, title='Confusion matrix', ylabel='True label', xlabel='Predicted label')
    plt.setp(ax.get_xticklabels(), rotation=0, ha="right", rotation_mode="anchor")
    thresh = cm.max() / 2.
    for i in range(cm.shape[0]):
        for j in range(cm.shape[1]):
            ax.text(j, i, format(cm[i, j], 'd'), ha="center", va="center", color="black" if cm[i, j] > thresh else "yellow")
    fig.tight_layout()
    plt.rcParams["figure.figsize"] = (10, 15)
    plt.show()

In [None]:
# Display confusion matrix and classification report for validation data
confusion_matrix_scores_classification_report(best_model_pipeline, X_test, y_test)

In [None]:
# Perform cross-validation and report results
cross_validate_and_report(best_model_pipeline, X, y)

In [None]:
# Save the trained model
joblib.dump(best_model_pipeline, "X_model.pkl")

In [None]:
# Load and validate the model on new data
val = pd.read_csv("data_test_toy.csv")
val_id = val
val = val.drop(["Class"], axis=1)
validation = best_model_pipeline.predict(val)
validation

In [None]:
# Create a DataFrame to compare predicted and true labels
df = pd.DataFrame(validation)
df["True Labels"] = val_id["Class"]
df = df.rename(columns={0: "Predicted Labels"})
df

In [None]:
# Display confusion matrix and classification report for validation data
confusion_matrix_scores_classification_report(best_model_pipeline, val, val_id["Class"])

# Predictions explanation and potential biomarekrs

In [None]:
# Function to get feature importance using eli5 (LIME)
def eli5_feature_importance(pipeline, X_train, top_features=40):
    model = pipeline.named_steps[pipeline.steps[-1][0]]
    sample_contribution = eli5.show_weights(model, feature_names=X_train.columns.tolist(), top=top_features, feature_re='^.*$')
    return sample_contribution

In [None]:
# Get feature importance
sample_contribution = eli5_feature_importance(best_model_pipeline, X_train)
sample_contribution

In [None]:
# Function to save feature contributions to CSV
def save_contributions(csv_name, pipeline, X_train):
    model = pipeline.named_steps[pipeline.steps[-1][0]]
    sample_contributions = []
    for idx in range(len(X_train.index)):
        sample_contribution_df = eli5.explain_weights_df(model, feature_names=X_train.columns.tolist(), feature_re='^.*$')
        sample_contributions.append(sample_contribution_df)
    all_contributions_df = pd.concat(sample_contributions)
    all_contributions_df.to_csv(csv_name, index=False)

In [None]:
# Save feature contributions
save_contributions("X_contributions.csv", best_model_pipeline, X_train)