In [None]:
# !pip install numpy pandas scipy sklearn statsmodels pyreadr

In [1]:
import pyreadr
from sklearn.cluster import KMeans
import pandas as pd
import numpy as np
from statsmodels.discrete.discrete_model import NegativeBinomial
from scipy.stats import nbinom

In [2]:
cd C:\Users\paras\codebase\data

C:\Users\paras\codebase\data


In [8]:
def clr_normalize(df: pd.DataFrame) -> pd.DataFrame:
    f = np.exp(np.log1p(df).sum(axis=0) / len(df))
    return np.log1p(df / f)


def calc_cluster_labels(
    df: pd.DataFrame,
    n_centers: int | None = None,
    n_starts: int = 100
):
    if n_centers is None:
        n_centers = df.shape[1] + 1
    kmeans = KMeans(n_clusters=n_centers, init='random', n_init=n_starts)
    kmeans.fit(df)
    return kmeans.labels_


def calc_cluster_avg_exp(df: pd.DataFrame) -> (pd.Series, pd.DataFrame):
    df["cluster"] = calc_cluster_labels(df)
    return df["cluster"], df.groupby("cluster").mean()


def get_background_cutoff(vals: np.ndarray, quantile: float = 0.99) -> int:
    fit = NegativeBinomial(vals, np.ones_like(vals)).fit(start_params=[1, 1], disp=0)
    mu = np.exp(fit.params[0])
    p = 1/(1+np.exp(fit.params[0])*fit.params[1])
    n = np.exp(fit.params[0])*p/(1-p)
    dist = nbinom(n=n, p=p, loc=mu)
    return round(dist.ppf(quantile))


def discretize_counts(
    df: pd.DataFrame,
    clust_labels: pd.Series, 
    clust_exp: pd.DataFrame
) -> pd.DataFrame:
    min_clust = clust_exp.idxmin()
    cutoffs = {}
    for hto in df:
        bg_values = df[hto][clust_labels == min_clust[hto]].values
        cutoffs[hto] = get_background_cutoff(bg_values)
    cutoffs = pd.Series(cutoffs)
    return df > cutoffs


def identity_renamer(x: int):
    if x == 0:
        return "Negative"
    elif x == 1:
        return "Singlet"
    else:
        return "Doublet"


def demux_htos(
    df_counts: pd.DataFrame,
) -> pd.Series:
    cluster_labels, avg_exp = calc_cluster_avg_exp(clr_normalize(df_counts))
    # Seurat does the following check and hard stops the process if the assertion fails
    assert any(avg_exp.sum(axis=1) == 0) == False
    hto_discrete = discretize_counts(df_counts, cluster_labels, avg_exp)
    g_class = hto_discrete.sum(axis=1).apply(identity_renamer)
    singlet_ident = df_counts[g_class == "Singlet"].idxmax(axis=1)
    g_class[singlet_ident.index] = singlet_ident
    return g_class

In [None]:
# Download the data here:
# https://www.dropbox.com/sh/ntc33ium7cg1za1/AAD_8XIDmu4F7lJ-5sp-rGFYa?dl=0

In [4]:
hto_counts = list(pyreadr.read_r('pbmc_hto_mtx.rds').values())[0].T
hto_counts.shape

(16916, 8)

In [10]:
hto_labels = demux_htos(hto_counts)
hto_labels.value_counts()

Doublet     2446
HTO_B       2007
HTO_A       1908
HTO_C       1891
HTO_H       1843
HTO_D       1728
HTO_G       1544
HTO_E       1502
HTO_F       1427
Negative     620
Name: count, dtype: int64

In [None]:
##  Doublet Negative  Singlet 
##     2598      346    13972