In [None]:
import os
import sys
from pathlib import Path
import warnings
import json

import scanpy as sc
import scgpt as scg
import anndata
import mygene
import numpy as np
import pandas as pd
import torch
import torch.nn.functional as F
import matplotlib.pyplot as plt
import scipy.sparse
from scipy.sparse import csr_matrix

# Configuration
%load_ext autoreload
%autoreload 2
sys.path.insert(0, "../")
plt.style.context('default')
warnings.simplefilter("ignore", ResourceWarning)

In [None]:
# Load config
with open('config.json', 'r') as f:
    config = json.load(f)

selected_ds = config['selected_dataset']
ds_params = config["datasets"][selected_ds]["parameters"]

# Extract Dataset Specifics
ds_config = config["datasets"][selected_ds]
data_path = ds_config["path"]
params = ds_config["parameters"]

# Map to variables used in your existing loop
target_percentage = params["target_percentage"]
seeds_per_rate = ds_params["seeds_per_rate"]
seeds_per_rate = {float(k): v for k, v in seeds_per_rate.items()}
N_HVG = params["N_HVG"]
batch_size = params["batch_size"]
model_dir = Path(config["model_dir"])

print(f"Ready to process {selected_ds} with {N_HVG} HVGs and seeds {seeds_per_rate}")

In [None]:
def total_per_zero(X):
  x1=X.to_dense()
  x_np=x1.numpy()

  total_elements = x_np.size  # total number of elements in the DataFrame
  total_zeros = (x_np == 0).sum().sum()  # count of zeros

  # Calculate percentage of zeros
  percentage_zeros = (total_zeros / total_elements) * 100

  print(f"Percentage of zeros: {percentage_zeros:.2f}%")
def drop_data(data_t, rate):
    X = data_t.X
    if scipy.sparse.issparse(X):
        X = X.toarray()

    if rate == 0.0:
        X_train = X.copy()
    else:
        X_train = np.copy(X)
        i, j = np.nonzero(X)
        ix = np.random.choice(len(i), int(np.floor(rate * len(i))), replace=False)
        X_train[i[ix], j[ix]] = 0.0

        data_t.uns['drop_index'] = {'i': i, 'j': j, 'ix': ix}
        data_t.raw.X[i[ix], j[ix]] = 0.0

    # Return sparse matrix to save memory
    X_sparse = csr_matrix(X_train)
    adata_drop = anndata.AnnData(X=X_sparse, obs=data_t.obs.copy(), var=data_t.var.copy())
    return adata_drop

In [None]:
adata = sc.read_h5ad(data_path)

In [None]:
# The 'Human_Innate_T_Cell' dataset uses Ensembl IDs (e.g., ENSG000001) for gene identification.
# Since scGPT's pre-trained vocabulary and matching logic rely on official Gene Symbols 
# (e.g., GAPDH), we must convert the IDs using MyGene.info to ensure compatibility.

if selected_ds == "Human_Innate_T_Cell":
    print(f"Running Ensembl to Symbol conversion for {selected_ds}...")
    
    # 1. Clean and standardize Ensembl IDs (remove version suffixes)
    adata.var_names = [g.split('.')[0] for g in adata.var_names]

    # 2. Query MyGene.info to map Ensembl IDs to gene symbols
    mg = mygene.MyGeneInfo()
    query = mg.querymany(adata.var_names.tolist(), scopes='ensembl.gene', fields='symbol', species='human')

    # 3. Build mapping dictionary and filter unmapped genes
    id_map = {item['query']: item['symbol'] for item in query if 'symbol' in item}
    mapped_genes = [id_map.get(g) for g in adata.var_names]
    mask = [g is not None for g in mapped_genes]

    # 4. Apply mapping and ensure unique gene symbols for scGPT
    adata = adata[:, mask].copy()
    adata.var_names = [mapped_genes[i] for i, m in enumerate(mask) if m]
    adata.var_names_make_unique()
    
    print(f"Conversion complete. Final gene count: {adata.n_vars}")
else:
    print(f"Skipping conversion for {selected_ds}.")

In [None]:
import scipy.sparse
# Calculate initial zero rate
adata.raw=adata.copy()
original_X = adata.X.copy()
if scipy.sparse.issparse(original_X):
    original_X = original_X.toarray()
total_elements = original_X.size
initial_zeros = (original_X == 0).sum()
initial_zero_rate = initial_zeros / total_elements
print(f"Initial zero percentage: {initial_zero_rate*100:.2f}%")

In [None]:
gene_col = "gene_name"
cell_type_key = "celltype"
batch_key = "sample"

In [None]:
results = {}
from scipy.sparse import csr_matrix
# Embedding generation
for percentage in target_percentage:
    results[percentage] = {}
    for seed in seeds_per_rate[percentage]:
        np.random.seed(seed)

        # Reset adata from the full clean version
        adata_work = adata.copy()
        adata_work.raw = adata_work.copy()
        # Compute dropout rate to reach target zero %
        target_zero_percentage = percentage
        needed_zeros = int(target_zero_percentage * total_elements)
        needed_drops = needed_zeros - initial_zeros
        num_nonzero = original_X.nonzero()[0].size
        rate = max(0.0, min(1.0, needed_drops / num_nonzero)) if num_nonzero > 0 else 0.0

        # Apply dropout
        adata_dropped = drop_data(adata_work.copy(), rate)
        num_nonzero = adata_dropped.X.count_nonzero()
        total_elements = adata_dropped.shape[0] * adata_dropped.shape[1]
        actual_zero_percentage = 100 * (1 - num_nonzero / total_elements)
        print(f"Actual zero percentage in adata_drop.X: {actual_zero_percentage:.2f}%")

        # HVG selection
        tmp = adata_dropped.copy()
        sc.pp.highly_variable_genes(tmp, n_top_genes=N_HVG, flavor='seurat_v3')
        tmp = tmp[:, tmp.var['highly_variable']]
        tmp.var[gene_col] = tmp.var.index

        # Embedding
        embed_adata = scg.tasks.embed_data(
            tmp,
            model_dir,
            gene_col=gene_col,
            batch_size=batch_size,
        )

        # Save embedding
        filename = f'Embeddings/{selected_ds}_Embeddings_{int(percentage*100)}_{seed}.h5ad'
        
        # Save the embedding
        embed_adata.write(filename)
        print(f"Successfully saved embedding to: {filename}")