In [None]:
import pandas as pd
import scanpy as sc
import pyranges as pr
import scglue
import snapatac2 as snap
import re
import numpy as np

## Load celltype annotation

In [None]:
# Load Annotation
anno = pd.read_csv("scatac-predicted-cell-type.csv", index_col=0)
map_dict = dict(zip(anno.index, anno.cell_type))

## Accessibility Analysis of Foundation Genes

We evaluated the accessibility of predefined foundation genes using Gene Activity scores. The ability of these genes to distinguish a target cell type was assessed using the area under the ROC curve (AUC).

1. Gene activity scores were calculated with `sc.tl.score_genes`.
2. AUC was computed using `roc_auc_score` based on binary cell type labels.
3. Statistical significance was estimated via 1,000 random permutations, generating a null AUC distribution to compute a p-value and Z-score.

**Example Output:**

```
AUC(real) = 0.842  
p_perm    = 0.001  
Z-score   = 3.27
```

These results indicate that the foundation genes show significantly higher accessibility in the target cell type.


In [None]:
import numpy as np
from sklearn.metrics import roc_auc_score
from joblib import Parallel, delayed
from scipy.sparse import issparse

def compute_auc_distribution(
    X, var_names, labels, gene_pool, my_gene_count, n_perm=1000, n_jobs=-1, seed=None
):
    """
    Compute null distribution of AUCs using random gene sets.

    Parameters:
    - X: gene expression matrix (cells x genes), sparse or dense
    - var_names: list of gene names (len == X.shape[1])
    - labels: binary array (1 = target cell type, 0 = others)
    - gene_pool: list of gene names to sample from
    - my_gene_count: number of genes per random sample
    - n_perm: number of permutations
    - n_jobs: parallel jobs (use -1 for all cores)
    - seed: random seed (optional)

    Returns:
    - auc_null: array of AUCs from random gene sets
    """
    if issparse(X):
        X = X.toarray()

    rng = np.random.default_rng(seed)
    gene_idx_dict = {g: i for i, g in enumerate(var_names)}

    def single_perm(_):
        rnd_genes = rng.choice(gene_pool, my_gene_count, replace=False)
        idxs = [gene_idx_dict[g] for g in rnd_genes]
        mean_expr = X[:, idxs].mean(axis=1)
        return roc_auc_score(labels, mean_expr)

    auc_null = Parallel(n_jobs=n_jobs)(
        delayed(single_perm)(i) for i in range(n_perm)
    )
    return np.array(auc_null)


In [None]:
gene_matrix = sc.read_h5ad("gene_mat.h5ad")
gene_matrix.obs['cell_type'] = gene_matrix.obs_names.map(map_dict)

In [None]:
my_genes = [] # Foundational genes
target_cell_type = "" # Target cell type

In [None]:
labels   = (gene_matrix.obs['cell_type'] == target_cell_type).astype(int).values # 1=A, 0=其他

In [None]:
# analyze the discriminative ability of the scores calculated for the given genes
sc.tl.score_genes(gene_matrix, my_genes, score_name='my_score', ctrl_as_ref=False)
auc_true = roc_auc_score(labels, gene_matrix.obs['my_score'])

In [None]:
import scanpy as sc

X = gene_matrix.X
var_names = gene_matrix.var_names.tolist()
gene_pool = [g for g in var_names if g not in my_genes]  # Optional: avoid drawing the selected genes

auc_null = compute_auc_distribution(
    X=X,
    var_names=var_names,
    labels=labels,
    gene_pool=gene_pool,
    my_gene_count=len(my_genes),
    n_perm=1000,
    n_jobs=1,
    seed=42
)


In [None]:
p_value = (np.sum(auc_null >= auc_true) + 1) / (1000 + 1)  # Right-tailed test
z_score = (auc_true - auc_null.mean()) / auc_null.std()
print(f"AUC(real) = {auc_true:.3f}")
print(f"p_perm     = {p_value:.4g}")
print(f"Z-score    = {z_score:.2f}")


In [None]:
import seaborn as sns
import matplotlib.pyplot as plt

plt.figure(figsize=(6,4))
sns.kdeplot(auc_null, fill=True, alpha=0.6, label='Random gene sets')
plt.axvline(auc_true, color='red', lw=2, ls='--', label='Foundational gene sets')
plt.xlabel(f'AUC (target cell type vs others)')
plt.title('Smoothed distribution of AUC from random gene sets')
plt.legend()
plt.text(0.7, 0.8, f"p = {p_value:.4g}", transform=plt.gca().transAxes)
plt.tight_layout()
plt.show()


## Export coverage

Export the coverage in bigwig for IGV


In [None]:
data = snap.read("scatac")

In [None]:
data.obs['cell_type'] = [map_dict[x] for x in data.obs_names]

In [None]:
snap.ex.export_coverage(data, groupby='cell_type', out_dir="igv", suffix='.bw')


## Add peak annotation 

In [None]:
atac = sc.read_h5ad("scatac-pp.h5ad")
atac.obs['cell_type'] = atac.obs_names.map(map_dict)

In [None]:

##############################################
# 1. Process gene annotation GTF - extract TSS
##############################################
# Only keep features labeled as "gene"
gtf = scglue.genomics.read_gtf("at_genes.gtf.gz").query("feature == 'gene'").split_attribute()
genes = gtf.query("feature == 'gene'").copy()

# Parse gene_name from attribute field if not already present
def parse_gene_name(attr):
    m = re.search(r'gene_name "([^"]+)"', attr)
    return m.group(1) if m else np.nan

if "gene_name" not in genes.columns:
    genes["gene_name"] = genes["attribute"].apply(parse_gene_name)

# Calculate TSS: for + strand use start, for - strand use end-1 (0-based closed interval)
genes["tss_start"] = np.where(genes["strand"] == "+", genes["start"], genes["end"] - 1)
genes["tss_end"] = genes["tss_start"] + 1   # 1 bp for interval operations

# Create PyRanges object for genes with TSS information
genes_pr = pr.PyRanges(pd.DataFrame({
    "Chromosome": genes["seqname"],
    "Start": genes["tss_start"],
    "End": genes["tss_end"],
    "Strand": genes["strand"],
    "gene_id": genes["gene_id"],
    "gene_name": genes["gene_name"],
}))

In [None]:

##############################################
# 2. Process ATAC peaks - convert to intervals
##############################################
# Create PyRanges object for ATAC peaks
peaks_pr = pr.PyRanges(
    pd.DataFrame({
    'Chromosome': atac.var.chrom,
    'Start': atac.var.chromStart,
    'End': atac.var.chromEnd,
    'peak_id': atac.var_names
    })
)


In [None]:

##############################################
# 3. Find nearest TSS & calculate distances  
##############################################
# Find nearest gene TSS for each peak
nearest = peaks_pr.nearest(
    genes_pr,
    suffix="_gene",       # Add suffix to gene columns
)

# Rename distance column
ndf = nearest.df.rename(columns={"Distance": "abs_dist"})

# Automatically find the strand column
strand_col = "Strand_gene" if "Strand_gene" in ndf.columns else "Strand"

# Calculate signed distance (positive if peak is downstream of TSS, negative if upstream)
peak_center = ((ndf.End + ndf.Start) // 2).astype(int)
ndf["signed_dist"] = np.where(
    ndf[strand_col] == "+",
    peak_center - ndf.Start_gene,
    ndf.Start_gene - peak_center
)



In [None]:
# Extract annotation columns
anno_cols = ndf[["peak_id", "gene_id", "gene_name",
                 "tss_strand" if "tss_strand" in ndf.columns else strand_col,
                 "abs_dist", "signed_dist"]].set_index("peak_id")



# Add nearest gene information to ATAC object
atac.var = atac.var.join(anno_cols, how="left").rename(columns={
    "gene_id": "nearest_gene_id",
    "gene_name": "nearest_gene_name",
    strand_col: "tss_strand"
})