In [None]:
import anndata
import matplotlib.pyplot as plt
import numpy as np
import pandas as pd
import scanpy
import scvi
from scipy.stats import spearmanr
from scvi.data import cortex, smfish
from scvi.external import GIMVI
import os
import random
import scanpy as sc

import torch
import anndata as ad

In [None]:
# load dataset
data_id = "ctx_hipp_hvg"

data_root = os.path.join("./data", data_id)
seq_data = scanpy.read_h5ad(f"{data_root}_sc.h5ad")
spatial_data = scanpy.read_h5ad(f"{data_root}_st.h5ad")

spatial_data.obsm['spatial'] = np.array(spatial_data.obs[['x', 'y']])

spatial_data.var_names = [x.lower() for x in spatial_data.var_names]
seq_data.var_names = [x.lower() for x in seq_data.var_names]

spatial_data.var_names_make_unique()
seq_data.var_names_make_unique()

# subset spatial data into shared genes
gene_names = np.intersect1d(spatial_data.var_names, seq_data.var_names)

## Cross validation util fun

In [None]:
from tqdm import tqdm
from sklearn.model_selection import LeaveOneOut
from sklearn.model_selection import KFold

def cv_data_gen(genelist, cv_mode="CV", kfold=5):
    """ Generates pair of training/test gene indexes cross validation datasets

    Args:
        genelist (list): list of all shared genes by adata_sc and adata_sp
        mode (str): Optional. support 'loo' and '10fold'. Default is 'loo'.

    Yields:
        tuple: list of train_genes, list of test_genes
    """

    #genes_array = np.array(adata_sp.uns["training_genes"])
    genes_array = np.array(genelist)

    if cv_mode == "loo":
        cv = LeaveOneOut()
    elif cv_mode == "CV":
        cv = KFold(n_splits=kfold)

    for train_idx, test_idx in cv.split(genes_array):
        train_genes = list(genes_array[train_idx])
        test_genes = list(genes_array[test_idx])
        yield train_genes, test_genes

In [None]:
from scvi import REGISTRY_KEYS, settings

def _unpack_tensors(tensors):
    x = tensors[REGISTRY_KEYS.X_KEY].squeeze_(0)
    batch_index = tensors[REGISTRY_KEYS.BATCH_KEY].squeeze_(0)
    y = tensors[REGISTRY_KEYS.LABELS_KEY].squeeze_(0)
    return x, batch_index, y


def get_px_para(model, symbol):

    # symbol is one of the values in the following:
    #px_scale: normalized gene expression frequency
    #px_rate: px_scale * exp(library)
    #px_r: dispersion parameter
    #px_dropout: dropout rate

    model.module.eval()
    
    scdls = model._make_scvi_dls(model.adatas, batch_size=128)
    
    retrive_values = []
    for mode, scdl in enumerate(scdls):
        retrive_value = []
        for tensors in scdl:
            (
                sample_batch,
                batch_index,
                label,
                *_,
            ) = _unpack_tensors(tensors)
            retrive_value.append(
                model.module._run_forward(
                    sample_batch,
                    mode,
                    batch_index,
                    label,
                    deterministic=True,
                    decode_mode=None,
                )[symbol]
            )
    
        retrive_value = torch.cat(retrive_value).cpu().detach().numpy()
        retrive_values.append(retrive_value)

    return (retrive_values)

## Preparing the data and run the model

In [None]:
# only use genes in both datasets
seq_data = seq_data[:, gene_names].copy()
spatial_data = spatial_data[:, gene_names].copy()

seq_gene_names = seq_data.var_names
n_genes = seq_data.n_vars

# randomly permute all the shared genes
np.random.seed(seed=0)
rand_gene_idx = np.random.choice(range(n_genes), n_genes, replace=False)

ST_scale = []
ST_mu = []
ST_theta = []
ST_zero_prob = []
test_gene_list = []

n_fold = 5

for train_genes, test_genes in tqdm(
    cv_data_gen(rand_gene_idx, kfold=n_fold), total=n_fold
):

    # spatial_data_partial has a subset of the genes to train on
    spatial_data_partial = spatial_data[:, train_genes].copy()

    GIMVI.setup_anndata(spatial_data_partial)
    GIMVI.setup_anndata(seq_data)
    
    # spatial_data should use the same cells as our training data
    # cells may have been removed by scanpy.pp.filter_cells()
    
    # create our model
    model = GIMVI(seq_data, spatial_data_partial)
    
    # train for 200 epochs
    model.train(200)

    # Get non-normalized imputed gene array and theta from ST

    sc_imputed, st_imputed = get_px_para(model, "px_rate")
    sc_theta, st_theta = get_px_para(model, "px_r")

    sc_scale, st_scale = get_px_para(model, "px_scale")
    
    nb_zero_prob = np.power((st_theta/(st_theta + st_imputed)), st_theta)

    test_gene_list.append(test_genes)

    ST_scale.append(st_scale[:,test_genes])
    
    ST_mu.append(st_imputed[:,test_genes])
    ST_theta.append(st_theta[:,test_genes])
    ST_zero_prob.append(nb_zero_prob[:,test_genes])

In [None]:
test_gene_ind = np.concatenate(test_gene_list)

In [None]:
adata_ge_cv = scanpy.AnnData(
    X=np.hstack(ST_mu),
    obs=spatial_data.obs.copy(),
)
adata_ge_cv.var_names = gene_names[test_gene_ind.tolist()]

adata_ge_cv.obsm["rou"] = np.hstack(ST_scale)
adata_ge_cv.obsm["theta"] = np.hstack(ST_theta)
adata_ge_cv.obsm["zeroProb"] = np.hstack(ST_zero_prob)

In [None]:
def binary(gene):
    #ind = np.where(gene==0.0)
    gene_copy = gene.copy()
    gene[~(gene_copy==0.0)] = 0
    gene[(gene_copy==0.0)] = 1
    return gene

#spatial_copy = spatial_data[:,test_gene_ind].copy()
spatial_copy = spatial_data[:,adata_ge_cv.var_names].copy()
spatial_copy.X = np.apply_along_axis(binary, 0, spatial_copy.X)

In [None]:
from sklearn.calibration import calibration_curve, CalibrationDisplay
import sys

def get_coef(X, y):
    from sklearn.linear_model import LinearRegression
    reg = LinearRegression().fit(X.reshape(-1,1), y)
    return reg.coef_[0], reg.intercept_

sys.path.append("../")

utils.config_rc(dpi=300, font_size=10)
centimeter = utils.centimeter

fig, ax = plt.subplots(nrows=1, ncols=1,figsize=(15*centimeter, 15*centimeter), sharex=True, sharey=True)
#gene="impl2"
coef_genes = []
inter_genes = []
for gene in range(len(gene_names)): # rand_test_gene_idx
    y_test = spatial_copy[:,gene].X
    y_prob = adata_ge_cv.obsm["zeroProb"][:,gene]
    #y_prob = ad_ge[:,gene].X
    prob_true, prob_pred = calibration_curve(y_test, y_prob, n_bins=5)
    coef_, intercept_ = get_coef(prob_pred, prob_true)
    coef_genes.append(coef_)
    inter_genes.append(intercept_)
    disp = CalibrationDisplay(prob_true, prob_pred, y_prob)
    disp.plot(ax=ax)

In [None]:
from sklearn.metrics import roc_auc_score, mean_squared_error
from scipy import stats

auc = []
pearson = []
spearman = []
kendall= []
RMSE = []

for v1, v2 in zip(spatial_copy.X.T, adata_ge_cv.obsm["zeroProb"].T):  #iterate every col of G and G_predicted
    auc.append(roc_auc_score(v1, v2))

#spatial_data = spatial_data[:, gene_names].copy()
#spatial_data = spatial_data[:,test_gene_ind].copy()
spatial_data = spatial_data[:,adata_ge_cv.var_names].copy()

for v1, v2 in zip(spatial_data.X.T, adata_ge_cv.X.T):
    personR = stats.pearsonr(v1.reshape(-1), v2.reshape(-1))
    spearmanR = stats.spearmanr(v1.reshape(-1), v2.reshape(-1))
    kendallTau = stats.kendalltau(v1.reshape(-1), v2.reshape(-1))
    rmse = mean_squared_error(v1, v2, squared=False)

    pearson.append(personR[0])
    spearman.append(spearmanR[0])
    kendall.append(kendallTau[0])
    RMSE.append(rmse)

norm_raw = stats.zscore(spatial_data.X, axis=0)
norm_imputed = stats.zscore(adata_ge_cv.X, axis=0)

norm_rmse = []
for v1, v2 in zip(norm_raw.T, norm_imputed.T):
    rmse = mean_squared_error(v1, v2, squared=False)
    norm_rmse.append(rmse)

In [None]:
df_sc = pd.DataFrame({"auc":auc, "calib_slope": coef_genes, "calib_inter": inter_genes, "Pearson": pearson, "Spearman": spearman, "Kendalltou":kendall, "norm_RMSE": norm_rmse,"RMSE":RMSE})
df_sc.mean()

In [None]:
adata_ge_cv.var["AUC"] = auc
adata_ge_cv.var["calib_slope"] = coef_genes
adata_ge_cv.var["calib_intercept"] = inter_genes
adata_ge_cv.var["Pearson"] = pearson
adata_ge_cv.var["Spearman"] = spearman
adata_ge_cv.var["Kendall_tau"] = kendall
adata_ge_cv.var["norm_RMSE"] = norm_rmse
adata_ge_cv.var["RMSE"] = RMSE