In [None]:
import tensorflow as tf
from tensorflow.keras import layers, models
import anndata
import pandas as pd
import numpy as np
import scanpy as sc
import matplotlib.pyplot as plt
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler
from sklearn.metrics import adjusted_rand_score
from sklearn.cluster import KMeans
from sklearn.metrics import pairwise_distances_argmin_min





# Assuming you have your gene expression data in 'X_train' (with cells as rows, genes as columns)







In [None]:
import sys
import os
os.chdir('C:/Users/jccarado/Downloads/rotation_3')
X_train = pd.read_csv('e17_expr.csv', index_col=0)
cell_labels = pd.read_csv('e17_labels.csv', index_col=0, header=None)
print(X_train)

In [None]:
def mse_loss(reconstructed, original):
    # Cast both tensors to float32 to avoid type mismatch
    reconstructed = tf.cast(reconstructed, tf.float32)
    original = tf.cast(original, tf.float32)
    return tf.reduce_mean(tf.square(reconstructed - original))


def kl_divergence(p, q):
    # Add a small epsilon to avoid log(0) issues
    epsilon = 1e-10
    p = tf.clip_by_value(p, epsilon, 1.0)
    q = tf.clip_by_value(q, epsilon, 1.0)
    return tf.reduce_sum(p * (tf.math.log(p) - tf.math.log(q)), axis=-1)

def combined_loss(reconstructed, original, p_cluster, q_cluster, alpha=1.0, beta=1.0):
    # Compute MSE loss
    mse = mse_loss(reconstructed, original)
    
    # Compute KL Divergence loss
    kl_loss = kl_divergence(p_cluster, q_cluster)
    mse = tf.cast(mse, tf.float32)
    kl_loss = tf.cast(kl_loss, tf.float32)
    # Total loss is a weighted sum of MSE and KL divergence
    return alpha * mse + beta * kl_loss

In [None]:

import tensorflow as tf
from tensorflow.keras import layers, models
from tensorflow.keras.models import Model

def create_stacked_autoencoder(input_shape, num_clusters):
    # Encoder
    input_data = layers.Input(shape=input_shape, name='input_layer')  # Input layer
    
    # Encoder layers (stacked)
    x = layers.Dense(256, activation='relu')(input_data)  # First encoding layer        # Second encoding layer
    latent_space = layers.Dense(128, activation='tanh')(x)  # Bottleneck layer (latent representation)
    
    x = layers.Dense(256, activation='relu')(x)            # Second decoding layer
    reconstruction = layers.Dense(input_shape, activation='relu', name='reconstruction')(x)  # Output reconstruction
    
    # Clustering output (softmax for probabilities)
    cluster_probs = layers.Dense(num_clusters, activation='softmax', name='clusters')(latent_space)  # Softmax output
    
    # Define the full model (encoder + decoder)
    model = models.Model(inputs=input_data, outputs=[reconstruction, cluster_probs])
    
    return model

In [None]:


class KLLossCallBack(tf.keras.callbacks.Callback):
    def __init__(self, X_train, cell_labels_numeric, encoder):
        super(KLLossCallBack, self).__init__()
        self.X_train = X_train
        self.cell_labels_numeric = cell_labels_numeric
        self.encoder = encoder
        self.kl_values = []
        self.p_dist = []
        self.q_dist = []
        self.cluster_labels = []
        self.num_classes = len(np.unique(self.cell_labels_numeric))
        self.num_classes_q = len(np.unique(self.cluster_labels))

    def on_epoch_end(self, epoch, logs=None):
        # Get the encoded data after this epoch
        encoded_data = self.encoder.predict(self.X_train)[0]
        adata = anndata.AnnData(encoded_data)
        sc.pp.pca(adata)
        sc.pp.neighbors(adata, n_neighbors=15, n_pcs=50)
        sc.tl.louvain(adata)  # Run Louvain clustering on the encoded data
        
        self.cluster_labels = adata.obs['louvain']  # Extract cluster labels
        self.num_classes_q = len(np.unique(self.cluster_labels))
        # Compute KL Divergence Loss
        self.p_dist = tf.one_hot(self.cluster_labels, depth=14, dtype=tf.float32)
        self.q_dist = tf.one_hot(self.cell_labels_numeric, depth=14, dtype=tf.float32)
        kl_score = kl_divergence(self.p_dist, self.q_dist)
        kl_score = tf.reduce_mean(kl_score)
        self.kl_values.append(kl_score)
        # Optionally print KL at every epoch
        print(kl_score)

    def plot_kl(self):
        # Plot ARI across epochs
        plt.plot(range(1, len(self.kl_values) + 1), self.kl_values)
        plt.xlabel('Epochs')
        plt.ylabel('KL Loss')
        plt.title('KL Loss through Epochs')
        plt.show()
    

In [None]:
from sklearn.mixture import GaussianMixture
from keras.callbacks import EarlyStopping
import scanpy as sc
import desc

scaler = StandardScaler()
#X_train = X_train.T
input_dim = X_train.shape[1]

# Build the autoencoder model as before

# Use K-means to initialize the cluster labels


encoder = create_stacked_autoencoder(input_dim, 14)
adata = sc.AnnData(X_train)
sc.pp.pca(adata, n_comps=50)
sc.pp.neighbors(adata, n_neighbors=15, n_pcs=50)
sc.tl.louvain(adata) 

# Train the model with the combined loss
#autoencoder.compile(optimizer='adam', loss=lambda y_true, y_pred: clustering_loss(y_true, y_pred, encoded_data, cluster_labels, 14))
cluster_labels = adata.obs['louvain'].astype(int)

cell_type_counts = cell_labels.value_counts()
sorted_cell_types = cell_type_counts.sort_values(ascending=False)
print("GT: ", sorted_cell_types)
# Ensure cluster_labels is a Pandas Series
sorted_cluster_labels = pd.Series(cluster_labels).value_counts().sort_values(ascending=False)
print("Prediction", sorted_cluster_labels)
cell_type_to_numeric = {cell_type: idx for idx, cell_type in enumerate(sorted_cell_types.index)}

cell_type_to_numeric = {key[0]: value for key, value in cell_type_to_numeric.items()}


In [None]:
print(encoder.summary())

In [None]:

def getdims(x=(10000,200)):
    """
    return the dims for network
    """
    assert len(x)==2
    n_sample=x[0]
    if n_sample>20000:# may be need complex network
        dims=[x[-1],128,32]
    elif n_sample>10000:#10000
        dims=[x[-1],64,32]
    elif n_sample>5000: #5000
        dims=[x[-1],32,16] #16
    elif n_sample>2000:
        dims=[x[-1],128]
    elif n_sample>500:
        dims=[x[-1],64]
    else:
        dims=[x[-1],16]
    #dims=[x[-1],64,32] if n_sample>10000 else [x[-1],32,16]
    return dims
dims= getdims(adata.shape)
print(dims)
print(adata.shape)

In [None]:

cell_labels_numeric = cell_labels.iloc[:,0].map(cell_type_to_numeric)
sorted_cell_types = cell_labels_numeric.value_counts().sort_values(ascending=False)
print(cell_labels_numeric)

In [None]:
import keras
import tensorflow as tf
from sklearn.metrics import adjusted_rand_score
from keras import layers, models
from sklearn.mixture import GaussianMixture
import numpy as np
import matplotlib
import matplotlib.pyplot as plt
from tensorflow.keras.optimizers import SGD
%matplotlib inline


cluster_labels_tf = tf.convert_to_tensor(cluster_labels)
p_cluster = tf.one_hot(cluster_labels, depth=14, dtype=tf.float32)
q_cluster = tf.one_hot(cell_labels_numeric, depth=14, dtype=tf.float32)
early_stopping = EarlyStopping(monitor='loss', patience=10, restore_best_weights=True)
kl_callback = KLLossCallBack(X_train, cell_labels_numeric, encoder=encoder)
encoder.compile(optimizer=SGD(0.001, momentum=0.9), loss=lambda y_true, y_pred: combined_loss(y_true, y_pred, p_cluster, q_cluster, alpha=1.0, beta=1.0))
#encoder.compile(optimizer=SGD(0.01, momentum=0.9), loss='mse')



In [None]:
encoder.fit(X_train, [X_train, p_cluster], batch_size=32, epochs=100, callbacks=[kl_callback, early_stopping])
kl_callback.plot_kl()

In [None]:
from sklearn.cluster import KMeans
from sklearn.metrics import normalized_mutual_info_score
n_clusters = np.unique(cell_labels_numeric).shape[0]
km = KMeans(n_clusters, n_init=20)
y_pred = km.fit_predict(encoder.predict(X_train)[0])
y = cell_labels_numeric
print(tf.keras.losses.KLDivergence()(kl_callback.p_dist, kl_callback.q_dist))
print ('K-means clustering result on extracted features: NMI =', normalized_mutual_info_score(y, y_pred))

In [None]:

print(tf.keras.losses.KLDivergence()(kl_callback.p_dist, kl_callback.q_dist))

In [None]:
print(kmeans)

In [None]:
from sklearn.metrics import normalized_mutual_info_score
from sklearn.metrics import adjusted_mutual_info_score
nmi_score = normalized_mutual_info_score(kl_callback.cell_labels_numeric, kl_callback.cluster_labels)
ami_score = adjusted_mutual_info_score(kl_callback.cell_labels_numeric, kl_callback.cluster_labels)
print(f"Normalized Mutual Info: {nmi_score}")
print(f"Adjusted Mutual Information: {ami_score}")

In [None]:
import umap
import matplotlib.pyplot as plt
from sklearn.manifold import TSNE
from sklearn.decomposition import PCA
from sklearn.preprocessing import LabelEncoder
%matplotlib inline


pca = PCA(n_components=50)  # You can change this value as needed
X_pca = pca.fit_transform(X_train)

umap_model = umap.UMAP(n_components=2)  # We want to reduce it to 2D for visualization
X_umap = umap_model.fit_transform(X_pca)
cluster_labels = kl_callback.cluster_labels
cluster_labels = cluster_labels.astype('category').cat.codes
print(np.unique(cluster_labels))
# Step 4: Visualize the UMAP projection
plt.figure(figsize=(10, 8))
plt.scatter(X_umap[:, 0], X_umap[:, 1], c=cell_labels_numeric, cmap='tab20', s=10)
plt.title('UMAP of Gene Expression Data (Ground Truth)')
plt.xlabel('UMAP 1')
plt.ylabel('UMAP 2')
plt.colorbar(label='Cell Type Label')
plt.figure(figsize=(10, 8))
plt.scatter(X_umap[:, 0], X_umap[:, 1], c= cluster_labels, cmap='tab20', s=10)
plt.title('UMAP of Gene Expression Data')
plt.xlabel('UMAP 1')
plt.ylabel('UMAP 2')
plt.colorbar(label='Cluster Label')
plt.show()

In [None]:

inertias = []
for n in range(1, 50):  # Try values from 1 to 20
    kmeans = KMeans(n_clusters=n, random_state=42)
    kmeans.fit(encoded_data)
    inertias.append(kmeans.inertia_)

plt.plot(range(1, 50), inertias)
plt.xlabel('Number of clusters')
plt.ylabel('Inertia')
plt.title('Elbow Method for Optimal K')
plt.show()


In [None]:
silhouette_scores = []
for n in np.arange(0.1, 2.1, 0.1):  # test only from 5 to 15
    sc.tl.louvain(adata, resolution=n) 
    cluster_labels = adata.obs['louvain'].astype(int) 
    silhouette_scores.append(silhouette_score(X_train, cluster_labels))





In [None]:
plt.plot(silhouette_scores)
plt.xlim(0.1,2.1)
plt.xlabel('Resolution Parameter')
plt.ylabel('Silhouette Score')
plt.title('Silhouette Scores for Different resolutions')
plt.show()

print(silhouette_scores)