  # scXpand Tutorial: Example of Data Preparation for Model Training and Optimization







  This tutorial demonstrates how to prepare paired scRNA/TCR-seq data as an input for model training and optimization.







  ## Example Dataset







  We use a publicly available paired scRNA/TCR-seq dataset of patients with brain tumors from:



  - **Study**: Wang et al. 2024



  - **Source**: https://zenodo.org/records/10672442


  - **Data type**: Paired scRNA/TCR-seq



  - **Cancer type**: Brain



  - **Tissue**: Tumor and Blood samples







  ## Tutorial Structure







  - **Data Loading and Initial Processing**



  - **Quality Control and Filtering**



  - **Data Preparation for scXpand**

## **Important note (!):**

This tutorial was made available to **examplify the process** we conducted to prepare the data as an input for model training, using just one dataset as an example.

It is highly recommended to directly use our pre-trained models rather than training them 'De-Novo' by yourselves.

---

## Data Loading and Initial Processing

### Loading scRNA-seq Data

In [None]:
# Check required packages
try:
    import magic  # noqa: F401
    import muon  # noqa: F401
except ImportError as e:
    missing = str(e).split("'")[1]
    print(f"✗ Missing package: {missing}")
    print("Install with: pip install muon magic-impute")
    print(
        "Docs: https://muon.readthedocs.io | https://github.com/KrishnaswamyLab/MAGIC"
    )

In [None]:
# Import required libraries for data analysis, visualization, and single-cell processing
import os
import sys
from pathlib import Path

import anndata as ad
import matplotlib.pyplot as plt
import muon as mu
import numpy as np
import pandas as pd
import scanpy as sc
import scirpy as ir
import seaborn as sns
from scipy.signal import argrelextrema, find_peaks

# Set matplotlib backend for Jupyter notebooks
%matplotlib inline

# Plotting settings
plt.rcParams["font.sans-serif"] = ["Arial"]
plt.rcParams["axes.axisbelow"] = True
sns.set_style("whitegrid")

# Setup project paths
project_root = Path.cwd().parent.parent
print(f"Project root: {project_root}")
sys.path.insert(0, str(project_root))

In [None]:
# Download the brain tumor dataset from Zenodo and extract to demo directory
import pooch

demo_path = project_root / "data" / "demo"
cache_dir = demo_path / ".zenodo_cache"

# Create directories
cache_dir.mkdir(exist_ok=True, parents=True)
demo_path.mkdir(exist_ok=True, parents=True)

# Construct Zenodo download URL
zenodo_record_id = "10672442"
filename = "Raw_files.zip"
zenodo_url = f"https://zenodo.org/records/{zenodo_record_id}/files/{filename}"

# Download and extract using Pooch
downloaded_files = pooch.retrieve(
    url=zenodo_url,
    known_hash=None,  # Let Pooch compute hash automatically
    path=str(cache_dir),
    progressbar=True,
    processor=pooch.Unzip(extract_dir=str(demo_path)),
)

source_path = demo_path / "Raw_files"

In [None]:
# Identify all sample directories in the dataset (excluding files and 'Grex' folder)
samples = [
    s for s in os.listdir(source_path) if (source_path / s).is_dir() and s != "Grex"
]

print(f"Found {len(samples)} sample directories: {samples}")

In [None]:
# Define helper function to load 10X data and initialize AnnData object with first sample
def load_sample_data(source_path: Path, sample: str) -> ad.AnnData:
    """Load and prepare 10X data for a single sample (optionally subset to first n_cells)."""
    adata = sc.read_10x_mtx(source_path / sample / "filtered_feature_bc_matrix")

    adata.obs["sample"] = sample
    adata.obs_names = [f"{name.split('-')[0]}_{sample}" for name in adata.obs_names]
    return adata


# Load first sample:
adata = load_sample_data(source_path, samples[0])
adata

In [None]:
# Concatenate all remaining samples into a single AnnData object
for sample in samples[1:]:
    tmp = load_sample_data(source_path, sample)
    adata = ad.concat([adata, tmp])

In [None]:
# Ensure consistent gene metadata across all samples
adata.var = tmp.var.loc[adata.var_names]

In [None]:
# Display the combined dataset
adata

In [None]:
# Mark mitochondrial genes for quality control
adata.var["mt"] = adata.var_names.str.startswith("MT-")

In [None]:
# Calculate quality control metrics including mitochondrial gene percentage
sc.pp.calculate_qc_metrics(adata, qc_vars=["mt"], inplace=True)

In [None]:
# Visualize distribution of mitochondrial gene percentage across cells
sc.pl.violin(adata, "pct_counts_mt", jitter=0.1)

In [None]:
# Filter cells with high mitochondrial content (<10%), low gene count, and rare genes
adata = adata[adata.obs.pct_counts_mt < 10, :]
sc.pp.filter_cells(adata, min_genes=200)
sc.pp.filter_genes(adata, min_cells=3)
adata

In [None]:
# Run Scrublet to detect doublets (two cells captured as one)
sc.pp.scrublet(adata, expected_doublet_rate=0.05, random_state=42)

In [None]:
# Visualize doublet score distribution
sc.pl.scrublet_score_distribution(adata)

In [None]:
# Remove cells with high doublet scores (threshold = 0.3)
doublet_threshold = 0.3
adata = adata[adata.obs["doublet_score"] < doublet_threshold]
print(f"Remaining cells after doublet filtering: {adata.n_obs}")

In [None]:
# Store raw counts in a separate layer before normalization
adata.layers["counts"] = adata.X.copy()

In [None]:
# Assign tissue type based on sample name (PBMC = Blood, others = Tumor)
adata.obs["tissue_type"] = np.where(
    adata.obs["sample"].str.contains("PBMC"), "Blood", "Tumor"
)

---

## TCR-seq Data Processing

### Loading and Quality Control of TCR-seq Data

In [None]:
# Define helper function to load TCR-seq data and load first sample
def load_tcr_sample(source_path: Path, sample: str) -> ad.AnnData | None:
    """Load and prepare 10X VDJ data for a single sample (returns None if file doesn't exist)."""
    tcr_path = source_path / sample / "filtered_contig_annotations.csv"
    if not tcr_path.exists():
        return None
    adata_tcr = ir.io.read_10x_vdj(tcr_path)
    adata_tcr.obs_names = [
        f"{name.split('-')[0]}_{sample}" for name in adata_tcr.obs_names
    ]
    return adata_tcr


# Load first sample:
adata_tcr = load_tcr_sample(source_path, samples[0])
adata_tcr

In [None]:
# Concatenate TCR data from all remaining samples
for sample in samples[1:]:
    tmp = load_tcr_sample(source_path, sample)
    if tmp is not None:
        adata_tcr = ad.concat([adata_tcr, tmp])

In [None]:
# Index TCR chains and perform quality control
ir.pp.index_chains(adata_tcr)
ir.tl.chain_qc(adata_tcr)

In [None]:
# Visualize abundance of different receptor subtypes (TCR, BCR, etc.)
ir.pl.group_abundance(adata_tcr, groupby="receptor_subtype")

In [None]:
# Display counts of cells per receptor subtype
adata_tcr.obs.groupby("receptor_subtype").size()

In [None]:
# Calculate and display the fraction of cells with multiple TCR pairs
multichain_types = ["extra VJ", "extra VDJ", "two full chains", "multichain"]
multichain_fraction = adata_tcr.obs["chain_pairing"].isin(multichain_types).mean()
print(f"Fraction of cells with more than one pair of TCRs: {multichain_fraction:.2f}")

In [None]:
# Filter out cells with multichain TCRs (ambiguous pairing)
mu.pp.filter_obs(adata_tcr, "chain_pairing", lambda x: x != "multichain")
adata_tcr

In [None]:
# Remove cells with orphan chains (incomplete TCR pairs)
mu.pp.filter_obs(
    adata_tcr, "chain_pairing", lambda x: ~np.isin(x, ["orphan VDJ", "orphan VJ"])
)
adata_tcr

In [None]:
# Filter out cells with no immune receptor detected
adata_tcr = adata_tcr[adata_tcr.obs["receptor_subtype"] != "no IR"]
adata_tcr

In [None]:
# Calculate TCR sequence distance matrix for clonotype definition
ir.pp.ir_dist(adata_tcr)

In [None]:
# Define clonotypes (cells with identical TCR sequences)
ir.tl.define_clonotypes(adata_tcr, receptor_arms="all", dual_ir="primary_only")

In [None]:
# Keep only cells that have both scRNA-seq and TCR-seq data
adata = adata[adata.obs_names.isin(adata_tcr.obs_names)].copy()

---

## Cell Type Classification Using MAGIC Imputation

### Important Note

In our manuscript, MAGIC was applied on the integrated data using all single-cell studies together for better robustness, and not for each study separately. Here, we demonstrate the process on a single dataset for tutorial purposes.


### Processing Tumor Samples

First, we'll process the tumor-infiltrating T cells:

In [None]:
# Subset data to tumor samples only
adata_tumor = adata[adata.obs["tissue_type"] == "Tumor"]

In [None]:
# Normalize and log-transform tumor data (target sum = 10,000, log base 2)
sc.pp.normalize_total(adata_tumor, target_sum=1e4)
sc.pp.log1p(adata_tumor, base=2)

In [None]:
# Apply MAGIC imputation to denoise and impute T cell marker genes
adata_magic_tumor = sc.external.pp.magic(
    adata_tumor,
    name_list=["CD8A", "CD8B", "CD4", "FOXP3", "CD3E", "PTPRC"],
    random_state=42,
)

In [None]:
# Extract original (non-imputed) gene expression values
non_imputed_df = sc.get.obs_df(adata_tumor, list(adata_magic_tumor.var_names))

In [None]:
# Combine MAGIC-imputed and original expression values into one dataframe
magic_df_tumor = sc.get.obs_df(adata_magic_tumor, list(adata_magic_tumor.var_names))
magic_df_tumor.columns = [f"{col}_Imputed" for col in magic_df_tumor.columns]
magic_df_tumor = magic_df_tumor.join(non_imputed_df)

In [None]:
# Save tumor MAGIC labels to compressed CSV
magic_df_tumor.to_csv(
    source_path / "Brain_zenodo_MAGIC_labels_tumor.csv.gz", compression="gzip"
)

In [None]:
# Define function to classify T cells as CD8, CD4, Double Positive, or Double Negative
def classify_cell_type(is_cd8a: bool, is_cd8b: bool, is_cd4: bool) -> str:
    """Classify as CD8/CD4/Double_Positive/Double_Negative based on marker expression."""
    has_cd8 = is_cd8a | is_cd8b
    if has_cd8 & ~is_cd4:
        return "is_CD8"
    elif ~has_cd8 & is_cd4:
        return "is_CD4"
    elif has_cd8 & is_cd4:
        return "Double_Positive"
    else:
        return "Double_Negative"

In [None]:
# Define function to reconcile original and MAGIC-imputed cell type classifications
def reconcile_cell_types(original: str, magic: str) -> str:
    """
    Reconcile original and MAGIC classifications using priority rules:
    - Double_Positive takes precedence
    - Original CD8/CD4 retained if MAGIC agrees or is Double_Negative
    - MAGIC result used if original is Double_Negative
    """
    if original == "Double_Positive":
        return "Double_Positive"
    elif original == "is_CD8" and magic in ["is_CD4", "Double_Negative", "is_CD8"]:
        return "is_CD8"
    elif original == "is_CD4" and magic in ["is_CD4", "Double_Negative", "is_CD8"]:
        return "is_CD4"
    elif original in ["is_CD4", "is_CD8"] and magic == "Double_Positive":
        return "Double_Positive"
    elif original == "Double_Negative":
        return magic
    return original

In [None]:
# Define function to identify regulatory T cells (Treg) based on FOXP3 expression
def identify_treg_cells(row: pd.Series) -> str:
    """Mark CD4+ cells with FOXP3 > 1 as Treg."""
    if row["final_type"] == "is_CD4" and (row["FOXP3"] > 1 or row["FOXP3_Imputed"] > 1):
        return "is_Treg"
    return row["final_type"]

In [None]:
# Define complete vectorized pipeline for cell type classification with cutoffs
def classify_tissue_cells(
    df: pd.DataFrame, cd8a_cutoff: float, cd8b_cutoff: float, cd4_cutoff: float
) -> pd.DataFrame:
    """
    Complete classification pipeline using vectorized operations:
    1. Classify based on imputed expression (MAGIC)
    2. Classify based on original expression
    3. Reconcile both classifications
    4. Identify Treg cells
    """
    # Vectorized classification based on imputed expression:
    is_cd8a_imp = df["CD8A_Imputed"] > cd8a_cutoff
    is_cd8b_imp = df["CD8B_Imputed"] > cd8b_cutoff
    is_cd4_imp = df["CD4_Imputed"] > cd4_cutoff
    has_cd8_imp = is_cd8a_imp | is_cd8b_imp

    # Initialize and assign MAGIC cell types:
    df["MAGIC_Cell_Type"] = "Double_Negative"
    df.loc[has_cd8_imp & ~is_cd4_imp, "MAGIC_Cell_Type"] = "is_CD8"
    df.loc[~has_cd8_imp & is_cd4_imp, "MAGIC_Cell_Type"] = "is_CD4"
    df.loc[has_cd8_imp & is_cd4_imp, "MAGIC_Cell_Type"] = "Double_Positive"

    # Vectorized classification based on original expression:
    is_cd8a_orig = df["CD8A"] > 1
    is_cd8b_orig = df["CD8B"] > 1
    is_cd4_orig = df["CD4"] > 1
    has_cd8_orig = is_cd8a_orig | is_cd8b_orig

    # Initialize and assign original cell types:
    df["type_original"] = "Double_Negative"
    df.loc[has_cd8_orig & ~is_cd4_orig, "type_original"] = "is_CD8"
    df.loc[~has_cd8_orig & is_cd4_orig, "type_original"] = "is_CD4"
    df.loc[has_cd8_orig & is_cd4_orig, "type_original"] = "Double_Positive"

    # Vectorized reconciliation of classifications:
    df["final_type"] = df["type_original"].copy()

    # Apply reconciliation rules using boolean masks:
    cd8_mask = (df["type_original"] == "is_CD8") & df["MAGIC_Cell_Type"].isin(
        ["is_CD4", "Double_Negative", "is_CD8"]
    )
    df.loc[cd8_mask, "final_type"] = "is_CD8"

    cd4_mask = (df["type_original"] == "is_CD4") & df["MAGIC_Cell_Type"].isin(
        ["is_CD4", "Double_Negative", "is_CD8"]
    )
    df.loc[cd4_mask, "final_type"] = "is_CD4"

    double_pos_mask = df["type_original"].isin(["is_CD4", "is_CD8"]) & (
        df["MAGIC_Cell_Type"] == "Double_Positive"
    )
    df.loc[double_pos_mask, "final_type"] = "Double_Positive"

    double_neg_mask = df["type_original"] == "Double_Negative"
    df.loc[double_neg_mask, "final_type"] = df.loc[double_neg_mask, "MAGIC_Cell_Type"]

    # Vectorized Treg identification:
    treg_mask = (df["final_type"] == "is_CD4") & (
        (df["FOXP3"] > 1) | (df["FOXP3_Imputed"] > 1)
    )
    df.loc[treg_mask, "final_type"] = "is_Treg"

    return df

In [None]:
# Save tumor MAGIC labels again (redundant save - cell 41 already saves this)
magic_df_tumor.to_csv(
    os.path.join(source_path, "Brain_zenodo_MAGIC_labels_tumor.csv.gz"),
    compression="gzip",
)

## Determining Expression Cutoffs

### Automatic Cutoff Determination Pipeline

Our pipeline automatically determines optimal expression thresholds using a three-step approach:

#### 1. **Automatic Clip Range Determination**
- Uses 1st and 99th percentiles of expression values with a 5% buffer
- Ensures the KDE plot captures the relevant expression range without manual tuning

#### 2. **Local Minimum Detection**
- Finds all local minima (troughs) in the KDE density curve using `scipy.signal.argrelextrema`
- Filters out edge artifacts (outer 5% of the distribution)
- Selects the **first (leftmost) interior local minimum** as the cutoff
- This naturally identifies the trough between negative and positive cell populations

#### 3. **Visual Validation**
- Plots show:
  - **Green circles (P1, P2, ...)**: Detected peaks (for reference)
  - **Blue circles (M1, M2, ...)**: Detected local minima
  - **Blue star with ✓**: Selected cutoff (first local minimum)
  - **Red dashed line**: Final cutoff value

In [None]:
# Define helper functions for automatic cutoff determination


def determine_clip_range(
    data: pd.Series, lower_percentile: float = 1, upper_percentile: float = 99
) -> list[float]:
    """
    Automatically determine clip range for KDE plotting based on data percentiles.

    Args:
        data: Series of gene expression values
        lower_percentile: Lower percentile for clipping (default 1st percentile)
        upper_percentile: Upper percentile for clipping (default 99th percentile)

    Returns:
        [lower_bound, upper_bound] for clipping
    """
    lower = np.percentile(data, lower_percentile)
    upper = np.percentile(data, upper_percentile)

    # Add small buffer (5% of range) to ensure we don't cut off important data
    data_range = upper - lower
    buffer = data_range * 0.05

    return [max(0, lower - buffer), upper + buffer]


def plot_expression_density_with_cutoff(
    data: pd.DataFrame, gene_name: str, clip_range: list[float] | None = None, ax=None
) -> float:
    """Plot expression density and return cutoff value at first local minimum (trough) between populations."""

    if ax is None:
        fig, ax = plt.subplots(1, 1, figsize=(10, 6))

    # Set random seed for reproducible KDE calculation
    np.random.seed(42)

    # Auto-determine clip range if not provided
    if clip_range is None:
        clip_range = determine_clip_range(data[gene_name])
        print(
            f"  Auto-determined clip range for {gene_name}: [{clip_range[0]:.3f}, {clip_range[1]:.3f}]"
        )

    # Generate KDE plot
    sns.kdeplot(data[gene_name], clip=clip_range, ax=ax)

    line = ax.lines[0]
    x, y = line.get_data()

    # Find all local minima in the density curve
    local_minima = argrelextrema(y, np.less)[0]

    # Filter out edge minima (first and last 5% of points)
    edge_threshold = int(len(y) * 0.05)
    interior_minima = local_minima[
        (local_minima > edge_threshold) & (local_minima < len(y) - edge_threshold)
    ]

    if len(interior_minima) == 0:
        raise ValueError(
            f"No local minimum found for {gene_name} (excluding edges). "
            f"The distribution may not be bimodal. Please check the expression distribution "
            f"or set cutoffs manually."
        )

    # Use the first local minimum (leftmost) as the cutoff
    # This should be the trough between negative and positive populations
    min_idx = interior_minima[0]
    cutoff_x = x[min_idx]
    cutoff_y = y[min_idx]

    # Find peaks for visualization (optional, but helpful for validation)
    peaks, _ = find_peaks(y, prominence=0.005)

    # Mark all local minima for visualization
    ax.plot(
        x[interior_minima],
        y[interior_minima],
        "bo",
        markersize=8,
        label="Local minima",
        zorder=5,
        alpha=0.6,
    )

    # Highlight the selected cutoff (first local minimum)
    ax.plot(
        x[min_idx],
        y[min_idx],
        "b*",
        markersize=15,
        markeredgecolor="darkblue",
        markeredgewidth=1.5,
        label="Selected cutoff",
        zorder=7,
    )

    # Mark peaks if detected (for reference)
    if len(peaks) > 0:
        ax.plot(
            x[peaks],
            y[peaks],
            "go",
            markersize=8,
            label="Detected peaks",
            zorder=5,
            alpha=0.6,
        )

        # Add numbered labels to peaks (sorted by x-position)
        sorted_peaks = peaks[np.argsort(x[peaks])]
        for peak_num, peak_idx in enumerate(
            sorted_peaks[:3], start=1
        ):  # Show first 3 peaks
            ax.annotate(
                f"P{peak_num}",
                xy=(x[peak_idx], y[peak_idx]),
                xytext=(0, 8),
                textcoords="offset points",
                ha="center",
                fontsize=10,
                fontweight="bold",
                color="darkgreen",
                bbox={
                    "boxstyle": "circle,pad=0.2",
                    "facecolor": "white",
                    "edgecolor": "darkgreen",
                    "linewidth": 1,
                },
            )

    # Add numbered labels to local minima
    for min_num, min_idx_i in enumerate(
        interior_minima[:3], start=1
    ):  # Show first 3 minima
        label_text = f"M{min_num}" if min_idx_i != min_idx else "M1✓"
        ax.annotate(
            label_text,
            xy=(x[min_idx_i], y[min_idx_i]),
            xytext=(0, -12),
            textcoords="offset points",
            ha="center",
            fontsize=10,
            fontweight="bold",
            color="darkblue",
            bbox={
                "boxstyle": "circle,pad=0.2",
                "facecolor": "white",
                "edgecolor": "darkblue",
                "linewidth": 1,
            },
        )

    ax.axvline(
        x=cutoff_x, color="r", linestyle="--", lw=2, label=f"Cutoff: {cutoff_x:.4f}"
    )
    ax.axhline(y=cutoff_y, color="r", linestyle="--", lw=2, alpha=0.5)
    ax.set_title(f"{gene_name}")
    ax.set_xlabel("Gene Expression")
    ax.set_ylabel("Density")
    ax.legend(loc="best", fontsize=9)

    return cutoff_x


def plot_cutoffs_for_tissue(
    data: pd.DataFrame, tissue_name: str, genes_dict: dict[str, list[float] | None]
):
    """
    Plot expression density and cutoffs for multiple genes in subplots.

    Args:
        data: DataFrame with gene expression data
        tissue_name: Name of tissue type (e.g., "Tumor", "Blood")
        genes_dict: Dictionary mapping gene names to clip ranges
                If clip range is None, it will be auto-determined
    """
    # Set random seed for reproducibility across all cutoff calculations
    np.random.seed(42)

    fig, axes = plt.subplots(1, 3, figsize=(15, 4))
    fig.suptitle(f"{tissue_name} - Gene Expression Cutoffs", fontsize=14, y=1.02)

    cutoffs = {}
    for idx, (gene, clip_range) in enumerate(genes_dict.items()):
        cutoff = plot_expression_density_with_cutoff(
            data, gene, clip_range, ax=axes[idx]
        )
        cutoffs[gene] = cutoff

    plt.tight_layout()
    plt.show()

    return cutoffs

In [None]:
# Determine optimal expression cutoffs for CD8A, CD8B, and CD4 in tumor samples
# Set clip ranges to None for automatic determination based on data percentiles
tumor_genes = {
    "CD8A_Imputed": None,  # Auto-determine from 1st-99th percentile
    "CD8B_Imputed": None,  # Auto-determine from 1st-99th percentile
    "CD4_Imputed": None,  # Auto-determine from 1st-99th percentile
}

cutoffs_tumor = plot_cutoffs_for_tissue(magic_df_tumor, "Tumor", tumor_genes)
cd8a_cutoff_tumor = cutoffs_tumor["CD8A_Imputed"]
cd8b_cutoff_tumor = cutoffs_tumor["CD8B_Imputed"]
cd4_cutoff_tumor = cutoffs_tumor["CD4_Imputed"]

In [None]:
# Print calculated tumor cutoffs for verification
print("Calculated Tumor Cutoffs:")
print(f"  CD8A: {cd8a_cutoff_tumor:.8f}")
print(f"  CD8B: {cd8b_cutoff_tumor:.8f}")
print(f"  CD4:  {cd4_cutoff_tumor:.8f}")

In [None]:
# Apply cell type classification to tumor samples using determined cutoffs
magic_df_tumor = classify_tissue_cells(
    magic_df_tumor,
    cd8a_cutoff=cd8a_cutoff_tumor,
    cd8b_cutoff=cd8b_cutoff_tumor,
    cd4_cutoff=cd4_cutoff_tumor,
)

In [None]:
# Save final classified tumor labels to compressed CSV
magic_df_tumor.to_csv(
    source_path / "Brain_zenodo_final_tumor_MAGIC_labels.csv.gz", compression="gzip"
)

---

### Processing Blood (PBMC) Samples

Now we'll apply the same process to peripheral blood mononuclear cells:

In [None]:
# Subset data to blood (PBMC) samples only
adata_blood = adata[adata.obs["tissue_type"] == "Blood"]

In [None]:
# Normalize and log-transform blood data (target sum = 10,000, log base 2)
sc.pp.normalize_total(adata_blood, target_sum=1e4)
sc.pp.log1p(adata_blood, base=2)

In [None]:
# Apply MAGIC imputation to denoise and impute T cell marker genes in blood
adata_magic_blood = sc.external.pp.magic(
    adata_blood,
    name_list=["CD8A", "CD8B", "CD4", "FOXP3", "CD3E", "PTPRC"],
    random_state=42,
)

In [None]:
# Combine MAGIC-imputed and original expression values for blood samples
non_imputed_df = sc.get.obs_df(adata_blood, list(adata_magic_blood.var_names))
magic_df_blood = sc.get.obs_df(adata_magic_blood, list(adata_magic_blood.var_names))
magic_df_blood.columns = [f"{col}_Imputed" for col in magic_df_blood.columns]
magic_df_blood = magic_df_blood.join(non_imputed_df)

In [None]:
# Save blood MAGIC labels to compressed CSV
magic_df_blood.to_csv(
    source_path / "Brain_zenodo_MAGIC_labels_blood.csv.gz", compression="gzip"
)

In [None]:
# Determine optimal expression cutoffs for CD8A, CD8B, and CD4 in blood samples
# Set clip ranges to None for automatic determination based on data percentiles
blood_genes = {
    "CD8A_Imputed": None,  # Auto-determine from 1st-99th percentile
    "CD8B_Imputed": None,  # Auto-determine from 1st-99th percentile
    "CD4_Imputed": None,  # Auto-determine from 1st-99th percentile
}

cutoffs_blood = plot_cutoffs_for_tissue(magic_df_blood, "Blood", blood_genes)
cd8a_cutoff_blood = cutoffs_blood["CD8A_Imputed"]
cd8b_cutoff_blood = cutoffs_blood["CD8B_Imputed"]
cd4_cutoff_blood = cutoffs_blood["CD4_Imputed"]

In [None]:
# Print calculated blood cutoffs for verification
print("Calculated Blood Cutoffs:")
print(f"  CD8A: {cd8a_cutoff_blood:.8f}")
print(f"  CD8B: {cd8b_cutoff_blood:.8f}")
print(f"  CD4:  {cd4_cutoff_blood:.8f}")

In [None]:
# Apply cell type classification to blood samples using determined cutoffs
magic_df_blood = classify_tissue_cells(
    magic_df_blood,
    cd8a_cutoff=cd8a_cutoff_blood,
    cd8b_cutoff=cd8b_cutoff_blood,
    cd4_cutoff=cd4_cutoff_blood,
)

In [None]:
# Save final classified blood labels to compressed CSV
magic_df_blood.to_csv(
    source_path / "Brain_zenodo_final_blood_MAGIC_labels.csv.gz", compression="gzip"
)

In [None]:
# Add TCR clonotype and receptor type information to the main AnnData object
adata.obs["clone_id"] = adata_tcr.obs["clone_id"]
adata.obs["receptor_type"] = adata_tcr.obs["receptor_type"]

In [None]:
# Add study metadata: cancer type and publication reference
adata.obs["cancer_type"] = "Brain Tumors"
adata.obs["study"] = "Wang et al. 2024 (Brain)"

In [None]:
# Combine tumor and blood cell type classifications
magic_labels = pd.concat([magic_df_tumor, magic_df_blood])

In [None]:
# Add final cell type labels to the main AnnData object
adata.obs["imputed_labels"] = magic_labels["final_type"]

In [None]:
# Extract patient ID from sample names and ensure string types
adata.obs["patient"] = adata.obs["sample"].str.split("_").str[0]
adata.obs["sample"] = adata.obs["sample"].astype(str)
adata.obs["tissue_type"] = adata.obs["tissue_type"].astype(str)

In [None]:
# Create unique clone IDs incorporating patient, clonotype, and cell type; calculate clone sizes
adata.obs["clone_id"] = (
    adata.obs["patient"].astype(str)
    + "_"
    + adata.obs["clone_id"].astype(str)
    + "_"
    + adata.obs["imputed_labels"].astype(str)
)
adata.obs["clone_id_size"] = adata.obs.groupby(["sample", "clone_id"]).transform("size")

In [None]:
# Classify clones as expanded or non-expanded using Shiao et al. criteria (>1.5x median)
# Calculate median clone size per sample
clone_sizes = (
    adata[adata.obs["clone_id_size"] > 1]
    .obs.groupby(["sample", "clone_id"])
    .size()
    .reset_index(name="clone_size")[["sample", "clone_size"]]
)
median_clone_sizes = clone_sizes.groupby("sample").median()
median_clone_sizes["1.5x_median"] = median_clone_sizes["clone_size"] * 1.5
median_clone_sizes = median_clone_sizes.reset_index()

# Merge back to adata
tmp = (
    adata.obs.reset_index()
    .merge(median_clone_sizes, on="sample", how="left")
    .set_index("index")
)
adata.obs["median_clone_size"] = tmp["clone_size"]
adata.obs["1.5x_median_clone_size"] = tmp["1.5x_median"]

# Vectorized expansion classification:
adata.obs["expansion"] = np.where(
    adata.obs["1.5x_median_clone_size"] < adata.obs["clone_id_size"],
    "expanded",
    "non_expanded",
)

In [None]:
# Display counts of expanded vs non-expanded cells by tissue type and sample
adata.obs.groupby(["tissue_type", "sample", "expansion"]).size()

In [None]:
# Reorganize gene metadata to use gene IDs as index with gene names as column
adata.var = adata.var.rename_axis("gene_name").reset_index().set_index("gene_ids")

In [None]:
# Save the fully processed AnnData object (commented out - uncomment to save)
# adata.write(os.path.join(source_path, "scXpand_preprocessed_data_Brain_zenodo.h5ad"), compression = 'gzip')