In [None]:
import copy
import json

import pandas as pd
import numpy as np
from sklearn.model_selection import train_test_split
from sklearn.metrics import f1_score
from sklearn.tree import DecisionTreeClassifier
from sklearn.naive_bayes import GaussianNB
from sklearn.ensemble import (
    RandomForestClassifier,
    AdaBoostClassifier,
    GradientBoostingClassifier,
    ExtraTreesClassifier,
    BaggingClassifier
)
from sklearn.linear_model import LogisticRegression, SGDClassifier
from sklearn.svm import SVC
from sklearn.neighbors import KNeighborsClassifier
from sklearn.neural_network import MLPClassifier
from sklearn.discriminant_analysis import LinearDiscriminantAnalysis, QuadraticDiscriminantAnalysis
import xgboost as xgb
import pickle
# ! pip install --user scikit-misc
import warnings
from datetime import datetime

import xgboost as xgb
from matplotlib.pyplot import title
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import MinMaxScaler

warnings.simplefilter(action='ignore', )
warnings.simplefilter(action='ignore', )
import pandas as pd
import scanpy as sc
import anndata as ad
import seaborn as sns
import maxfuse as mf
import anndata
import hdbscan
from scipy.cluster.hierarchy import cut_tree
from sklearn.metrics import silhouette_score, calinski_harabasz_score, davies_bouldin_score, f1_score
from sklearn.mixture import GaussianMixture
from sklearn.metrics import adjusted_mutual_info_score
import numpy as np
import matplotlib.pyplot as plt
from scipy.io import mmread
from scipy import sparse
from sklearn.metrics import confusion_matrix, ConfusionMatrixDisplay

pd.set_option('display.max_rows', 10)  # Display only 10 rows
pd.set_option('display.max_columns', 5)  # Display only 5 columns

# setup and load datasets (only run once)
## CODEX

In [None]:
# num rna cell vs num codex cell
codex_embedding = anndata.read('codex_embedding.h5ad')
rna_embedding = anndata.read('rna_embedding.h5ad')
print(rna_embedding.shape)
print(codex_embedding.shape)


In [None]:

# prepare data for training
features = codex_embedding.X
labels = codex_embedding.obs['CN']
labels = copy.deepcopy(labels.astype('category').values.codes)
labels
# make small subset for testing
# features = features[:100]
# labels = labels[:100]
# # labels[:50] =1
# labels[50:]=0
# labels

random_state = 42

X = pd.DataFrame(features).reset_index()
y = pd.DataFrame(labels).reset_index()
sample_fraction = 0.7
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=random_state)
X_train = X_train.sample(frac=sample_fraction, random_state=42)
y_train = y_train.loc[X_train.index]
X_train = X_train.values
y_train = y_train.values

# model = xgb.XGBClassifier(use_label_encoder=False, eval_metric='mlogloss')
# model.fit(X_train, y_train)
# y_pred = model.predict(X_test)
# score = f1_score(y_test, y_pred, average='weighted')
# print(f'f1 score: {score:.4f}')
from sklearn.decomposition import PCA




In [None]:

import gc
from sklearn.ensemble import HistGradientBoostingClassifier

models = {
'Random Forest': RandomForestClassifier(n_estimators=50, max_depth=5, random_state=random_state),
'Gradient Boosting': GradientBoostingClassifier(n_estimators=50, max_depth=3, random_state=random_state),
    'Extra Trees': ExtraTreesClassifier(random_state=random_state),
    'AdaBoost': AdaBoostClassifier(random_state=random_state),
'Support Vector Machine': SVC(kernel='linear', max_iter=1000, random_state=random_state),
    'K-Nearest Neighbors': KNeighborsClassifier(),
    'Stochastic Gradient Descent': SGDClassifier(random_state=random_state),
    'Neural Network': MLPClassifier(max_iter=1000, random_state=random_state),
    'Linear Discriminant Analysis': LinearDiscriminantAnalysis(),
    'Quadratic Discriminant Analysis': QuadraticDiscriminantAnalysis(),
    'Logistic Regression': LogisticRegression(max_iter=200, tol=1e-2, random_state=random_state),
    'Naive Bayes': GaussianNB(),
    'Decision Tree': DecisionTreeClassifier(max_depth=5, random_state=random_state),
    'SGD Classifier': SGDClassifier(max_iter=1000, tol=1e-3, random_state=random_state),
    'Hist Gradient Boosting': HistGradientBoostingClassifier(max_iter=100, random_state=random_state),
    'XGBoost': xgb.XGBClassifier(use_label_encoder=False, eval_metric='mlogloss', n_estimators=50, max_depth=3, random_state=random_state),

}

f1_scores = {}

for name, model in models.items():
    print(f'Training {name}...')
    try:
        with open('f1_scores.json', 'r') as f:
            f1_scores = json.load(f)
    except:
        f1_scores = {}
    if name in f1_scores:
        print(f'{name} already trained. F1 Score: {f1_scores[name]:.4f}\n')
        continue
    
    try:
        model.fit(X_train, y_train)
        y_pred = model.predict(X_test)
        score = f1_score(y_test, y_pred, average='weighted')
        f1_scores[name] = score
        print(f'{name} F1 Score: {score:.4f}')
        with open('f1_scores.json', 'w') as f:
            json.dump(f1_scores, f)
    except Exception as e:
        print(f'{name} could not be trained. Error: {e}\n')
    finally:
        del model
        gc.collect()


# Display the F1 scores
print('\nModel Performance Comparison:')
for name, score in f1_scores.items():
    print(f'{name}: F1 Score = {score:.4f}')

original_f1_scores = f1_scores

In [None]:



def plot_f1_scores(scores_dict, title, filename=None):
    plt.figure(figsize=(12, 6))
    sns.set_theme(style="whitegrid")

    # Convert the scores_dict to a DataFrame and sort by F1 Score
    scores_df = pd.DataFrame(list(scores_dict.items()), columns=['Model', 'F1 Score'])
    scores_df = scores_df.sort_values('F1 Score', ascending=False)

    # Create a barplot
    ax = sns.barplot(x='Model', y='F1 Score', data=scores_df, palette='Blues_d')

    plt.ylabel('Weighted F1 Score', fontsize=12)
    plt.title(title, fontsize=14)
    plt.xticks(rotation=45, ha='right', fontsize=10)
    plt.yticks(fontsize=10)
    plt.ylim(0, 1)

    # Add data labels on top of each bar
    for p in ax.patches:
        height = p.get_height()
        ax.text(p.get_x() + p.get_width() / 2., height + 0.01,
                f'{height:.2f}', ha="center", fontsize=10)

    plt.tight_layout()

    # Save the plot if a filename is provided
    if filename:
        plt.savefig(filename, dpi=300, bbox_inches='tight')

    plt.show()

# save scores to json:
with open('original_f1_scores.json', 'w') as f:
    json.dump(original_f1_scores, f)

# Plot original scores
plot_f1_scores(original_f1_scores, 'Model Performance with Original Features')


In [None]:

y_pred = model.predict(X_test)
cm = confusion_matrix(y_test, y_pred)
disp = ConfusionMatrixDisplay(confusion_matrix=cm)
disp.plot()
plt.show()

In [None]:

# Prepare the RNA features
# prepare data for training
features = rna_embedding.X
labels = None
predicted_RNA_CN = model.predict(features)
rna_embedding = anndata.AnnData(rna_embedding)
# rna_embedding.obs = rna.obs
#  add the predicted CN labels to the RNA embedding
rna_embedding.obs['predicted_CN'] = pd.Categorical(predicted_RNA_CN)
# plot the RNA embedding with the predicted CN labels PCA
sc.tl.pca(rna_embedding)
# make the shape be the cell type
sc.pl.tsne(rna_embedding, color=['predicted_CN', 'Cluster'], title='Predicted CN labels on RNA-seq data')

# 
cell_types = rna_embedding.obs['Cluster'].unique()
silhouette_score_per_cell_type = {}
davies_bouldin_score_per_cell_type = {}
calinski_harabasz_score_per_cell_type = {}
rna_embedding.obs['silhouette_score'] = [None] * rna_embedding.obs.shape[0]
rna_embedding.obs['davies_bouldin_score'] = [None] * rna_embedding.obs.shape[0]
rna_embedding.obs['calinski_harabasz_score'] = [None] * rna_embedding.obs.shape[0]
for curr_cell_type in cell_types:
    # get the scores for each cell type cluster
    curr_cell_type_indexes = rna_embedding.obs['Cluster'] == curr_cell_type
    curr_cell_type_data = rna_embedding[curr_cell_type_indexes].X
    curr_predicted_CN = rna_embedding.obs['predicted_CN'][curr_cell_type_indexes]

    silhouette_score_per_cell_type[curr_cell_type] = silhouette_score(curr_cell_type_data, curr_predicted_CN)
    davies_bouldin_score_per_cell_type[curr_cell_type] = -  davies_bouldin_score(curr_cell_type_data,
                                                                                 curr_predicted_CN)  # we want higer better
    calinski_harabasz_score_per_cell_type[curr_cell_type] = calinski_harabasz_score(curr_cell_type_data,
                                                                                    curr_predicted_CN)

    rna_embedding.obs['silhouette_score'][curr_cell_type_indexes] = silhouette_score_per_cell_type[curr_cell_type]
    rna_embedding.obs['davies_bouldin_score'][curr_cell_type_indexes] = davies_bouldin_score_per_cell_type[
        curr_cell_type]
    rna_embedding.obs['calinski_harabasz_score'][curr_cell_type_indexes] = calinski_harabasz_score_per_cell_type[
        curr_cell_type]

# normalize all scores between 0 and 1 and means them to one final score using sklearn.preprocessing.MinMaxScaler:
scaler = MinMaxScaler()
rna_embedding.obs['norm_silhouette_score'] = scaler.fit_transform(
    rna_embedding.obs['silhouette_score'].values.reshape(-1, 1))
rna_embedding.obs['norm_davies_bouldin_score'] = scaler.fit_transform(
    rna_embedding.obs['davies_bouldin_score'].values.reshape(-1, 1))
rna_embedding.obs['norm_calinski_harabasz_score'] = scaler.fit_transform(
    rna_embedding.obs['calinski_harabasz_score'].values.reshape(-1, 1))

# mean all score to final score
rna_embedding.obs['final_score'] = (rna_embedding.obs['norm_silhouette_score'] + rna_embedding.obs[
    'norm_davies_bouldin_score'] + rna_embedding.obs['norm_calinski_harabasz_score']) / 3
sns.barplot(x='Cluster', y='final_score', data=rna_embedding.obs)
plt.title('Final Clustering Score on RNA-seq data')
plt.show()


In [None]:

# plot the RNA embedding with the silhouette score as color with jet cmap
sc.pl.tsne(rna_embedding, color='Cluster', title='cell types')
sc.pl.tsne(rna_embedding, cmap='plasma', color=['silhouette_score', 'davies_bouldin_score', 'calinski_harabasz_score'])
sc.pl.tsne(rna_embedding, cmap='plasma', color=['final_score'], title='final clustering Score on RNA-seq data')
# merge all score to one when higher means better so I will have to max(davies_bouldin_score) - davies_bouldin_score and normalize all scores


In [None]:


best_sil_score_cell_types_index = np.argsort(list(silhouette_score_per_cell_type.values()))

truncated_cell_types = rna_embedding.obs['Cluster'].unique()[:3]
# take the best 3 cell types
np.argmax(silhouette_score_per_cell_type)
for curr_cell_type in truncated_cell_types:
    subset_data = rna_embedding[rna_embedding.obs['Cluster'] == curr_cell_type]
    sc.pl.tsne(subset_data, color='predicted_CN', title=f'Predicted CN labels for {curr_cell_type}')
num_clusters = len(np.unique(codex_embedding.obs['CN']))
gmm = GaussianMixture(n_components=num_clusters, random_state=0)
gmm_labels = gmm.fit_predict(rna_embedding.X)
ami_score = adjusted_mutual_info_score(rna_embedding.obs['predicted_CN'], gmm_labels)
rna_embedding.obs['GMM'] = pd.Categorical(gmm_labels)
print('Adjusted Mutual Information Score:', ami_score)
# plot the RNA embedding with the HDBSCAN labels vs the predicted CN labels
sc.pl.tsne(rna_embedding, color=['GMM', 'predicted_CN'], title='GMM vs Predicted CN labels on RNA-seq data')


In [None]:

clusterer = hdbscan.HDBSCAN(min_cluster_size=2, gen_min_span_tree=True)
clusterer.fit(rna_embedding.X)
hierarchy = clusterer.single_linkage_tree_.to_numpy()
num_clusters = len(np.unique(codex_embedding.obs['CN']))
selected_clusters = cut_tree(hierarchy, n_clusters=num_clusters).flatten()
rna_embedding.obs['HDBSCAN_Cut'] = pd.Categorical(selected_clusters)
# Check mutual information score between predicted CN labels and the cut HDBSCAN labels
ami_score = adjusted_mutual_info_score(rna_embedding.obs['predicted_CN'], rna_embedding.obs['HDBSCAN_Cut'])
print('Adjusted Mutual Information Score:', ami_score)
# plot the RNA embedding with the HDBSCAN labels vs the predicted CN labels
sc.pl.pca(rna_embedding, color=['HDBSCAN_Cut', 'predicted_CN'], title='HDBSCAN vs Predicted CN labels on RNA-seq data')

