# **Out-of-Core Single-Cell Analysis with RAPIDS-SingleCell & Dask**  
**Author:** [Severin Dicks](https://github.com/Intron7)
**Copyright** [scverse](https://scverse.org)

In this notebook, we demonstrate the **out-of-core computation** capabilities of **rapids-singlecell** using **Dask**.  
This approach allows us to analyze larger scale datasets, such as 1.3 million to **11 million cells** efficiently, even on relatively small hardware.  

By leveraging **Dask**, we can:  
- **Process large-scale single-cell datasets** without exceeding memory limits.  
- **Enable chunk-based execution**, loading only the necessary data into memory at any time.  

This method makes **large-scale single-cell analysis feasible** on standard hardware setups,  
removing barriers to working with massive datasets.


In [1]:
import dask
import time

from dask_cuda import LocalCUDACluster
from dask.distributed import Client

In [2]:
import rmm
import cupy as cp

from rmm.allocators.cupy import rmm_cupy_allocator

## **Initializing a Dask Cluster for Out-of-Core Computation**  

To enable **out-of-core computation** and parallel processing,  
we initialize a **Dask CUDA cluster**, which distributes computations across available GPU resources.  

We create a **local GPU cluster** with the following configuration:  


In [3]:
%%time
cluster = LocalCUDACluster(CUDA_VISIBLE_DEVICES="0", threads_per_worker=30)

client = Client(cluster)

client

CPU times: user 411 ms, sys: 77.6 ms, total: 489 ms
Wall time: 3.13 s


0,1
Connection method: Cluster object,Cluster type: dask_cuda.LocalCUDACluster
Dashboard: http://127.0.0.1:8787/status,

0,1
Dashboard: http://127.0.0.1:8787/status,Workers: 1
Total threads: 30,Total memory: 144.40 GiB
Status: running,Using processes: True

0,1
Comm: tcp://127.0.0.1:34119,Workers: 1
Dashboard: http://127.0.0.1:8787/status,Total threads: 30
Started: Just now,Total memory: 144.40 GiB

0,1
Comm: tcp://127.0.0.1:43497,Total threads: 30
Dashboard: http://127.0.0.1:36695/status,Memory: 144.40 GiB
Nanny: tcp://127.0.0.1:42029,
Local directory: /tmp/dask-scratch-space/worker-hptjfx_9,Local directory: /tmp/dask-scratch-space/worker-hptjfx_9


In [4]:
import rapids_singlecell as rsc
import anndata as ad

## **Loading Large Datasets into AnnData with Dask**  

To efficiently handle large-scale single-cell datasets, we load data directly from an **HDF5 (`h5`) or Zarr file**  
into an **AnnData object** using **Dask arrays**. This enables **lazy loading**, allowing data to be processed in chunks  
without exceeding memory limits.  

We achieve this using **`read_elem_as_dask`**, which loads the expression matrix (`X`) as a **Dask array**

Let's first download the data.  
- For the 1.3M cells dataset, `nvidia_1.3M`, this may take about 5 minutes, as it is 5.3 gigabytes to download and convert.  This is the default dataset for running the notebook.
- For the 11M cells dataset, `cell_atlas`, downloading can get considerable time, as it is over 43GB of data to download and convert.  This dataset is commented out by default.  You will have to uncomment the links below, as instructed, to download this dataset.

If you already have the data ready, it will skip the download

In [5]:
import wget
import os
from anndata.experimental import read_elem_as_dask
import h5py

data_dir = "./h5"

# 1.3M cells
url = 'https://rapids-single-cell-examples.s3.us-east-2.amazonaws.com/1M_brain_cells_10X.sparse.h5ad'
output = data_dir+'/nvidia_1.3M.h5ad'
final = './zarr/nvidia_1.3M.zarr'

# 11M Cells.  Please uncomment the 3 lines below to download this dataset
# url = 'https://datasets.cellxgene.cziscience.com/3817734b-0f82-433b-8c38-55b214200fff.h5ad'
# output = data_dir+'/cell_atlas.h5ad'
# final = './zarr/cell_atlas.zarr'

if not os.path.exists(final): # Check if zarr dataset directory exists
    if not os.path.exists(data_dir): # Check if h5 directory exists
        print('creating data directory')
        os.system('mkdir ./h5')
    else:
        print(f'{data_dir} directory found')

    if not os.path.isfile(output): # Check to see if we have our final output.  If it is there, get to the analysis!
        print(f'Downloading cell data into {output}...')
        wget.download(url, output)
    else:
        print(f'{output} dataset found')

    # Start to convert the h5ad file to zarr
    print(f'Converting {output} into {final}...')
    SPARSE_CHUNK_SIZE = 20_000
    
    f = h5py.File(output)
    X = f["X"]
    shape = X.attrs["shape"]
    adata = ad.AnnData(
        X = read_elem_as_dask(X, (SPARSE_CHUNK_SIZE, shape[1])),
        obs = ad.io.read_elem(f["obs"]),
        var = ad.io.read_elem(f["var"]))
    f.close()
    
    adata.write_zarr(final)
    print(f'{final} is ready for use!')
else:
    print(f'{final} zarr dataset directory found')

./h5 directory found
./h5/nvidia_1.3M.h5ad dataset found
Converting ./h5/nvidia_1.3M.h5ad into ./zarr/nvidia_1.3M.zarr...


  utils.warn_names_duplicates("var")


./zarr/nvidia_1.3M.zarr is ready for use!


Now, let's read in the zarr data.

In [6]:
import zarr

SPARSE_CHUNK_SIZE = 20_000

f = zarr.open(final)
X = f["X"]
shape = X.attrs["shape"]
adata = ad.AnnData(
    X = read_elem_as_dask(X, (SPARSE_CHUNK_SIZE, shape[1])),
    obs = ad.io.read_elem(f["obs"]),
    var = ad.io.read_elem(f["var"]))



  utils.warn_names_duplicates("var")


## **Transferring AnnData to GPU for Accelerated Computation**  

Once the dataset is loaded as a **Dask-backed AnnData object**,  
we transfer it to the **GPU** to leverage **RAPIDS-SingleCell’s** accelerated processing.  

We use **`rsc.get.anndata_to_GPU()`**, which efficiently moves the AnnData object to GPU memory:  


In [7]:
rsc.get.anndata_to_GPU(adata)

In [8]:
adata.X

Unnamed: 0,Array,Chunk
Shape,"(1306127, 27998)","(20000, 27998)"
Dask graph,66 chunks in 4 graph layers,66 chunks in 4 graph layers
Data type,float32 cupyx.scipy.sparse._csr.csr_matrix,float32 cupyx.scipy.sparse._csr.csr_matrix
"Array Chunk Shape (1306127, 27998) (20000, 27998) Dask graph 66 chunks in 4 graph layers Data type float32 cupyx.scipy.sparse._csr.csr_matrix",27998  1306127,

Unnamed: 0,Array,Chunk
Shape,"(1306127, 27998)","(20000, 27998)"
Dask graph,66 chunks in 4 graph layers,66 chunks in 4 graph layers
Data type,float32 cupyx.scipy.sparse._csr.csr_matrix,float32 cupyx.scipy.sparse._csr.csr_matrix


## **Quality Control (QC) Metrics Calculation (Requires Synchronization)**  

Before proceeding with further analysis, we compute **quality control (QC) metrics**  
to assess dataset quality and filter out low-quality cells or genes.  

We use **`rsc.pp.calculate_qc_metrics()`** to calculate key QC metrics

Although we are working with Dask-backed AnnData, this operation requires a synchronization step.
This means that Dask computations must be evaluated immediately,
so the process is not completely lazy like other out-of-core operations.

In [9]:
%%time
rsc.pp.calculate_qc_metrics(adata)

CPU times: user 1.29 s, sys: 323 ms, total: 1.61 s
Wall time: 27 s


## **Filtering Cells and Genes Without Additional Computation**  

Instead of using **`sc.pp.filter_cells`** and **`sc.pp.filter_genes`**,  
we apply filtering directly using boolean indexing to **avoid extra computation**.

**Why Use Direct Indexing Instead of Built-in Functions?**
* More Efficient with Dask → Avoids triggering additional computations.
* Preserves Lazy Execution → Filtering is applied without forcing full dataset evaluation.
* Copy is Essential → Using `.copy()` prevents views, which may not work reliably with Dask-backed AnnData.

In [10]:
adata = adata[(adata.obs["n_genes_by_counts"]<=10000) 
            & (adata.obs["n_genes_by_counts"]>=200)].copy()

  utils.warn_names_duplicates("var")


In [11]:
adata.X

Unnamed: 0,Array,Chunk
Shape,"(1292537, 27998)","(19789, 27998)"
Dask graph,66 chunks in 5 graph layers,66 chunks in 5 graph layers
Data type,float32 cupyx.scipy.sparse._csr.csr_matrix,float32 cupyx.scipy.sparse._csr.csr_matrix
"Array Chunk Shape (1292537, 27998) (19789, 27998) Dask graph 66 chunks in 5 graph layers Data type float32 cupyx.scipy.sparse._csr.csr_matrix",27998  1292537,

Unnamed: 0,Array,Chunk
Shape,"(1292537, 27998)","(19789, 27998)"
Dask graph,66 chunks in 5 graph layers,66 chunks in 5 graph layers
Data type,float32 cupyx.scipy.sparse._csr.csr_matrix,float32 cupyx.scipy.sparse._csr.csr_matrix


## **Log Normalization (Fully Lazy Execution)**  

Next, we apply **log normalization** to scale gene expression values.  
This step ensures that differences in sequencing depth across cells do not dominate downstream analysis.  

In [12]:
%%time
rsc.pp.normalize_total(adata,target_sum = 10000)
rsc.pp.log1p(adata)

CPU times: user 37.3 ms, sys: 4.38 ms, total: 41.7 ms
Wall time: 41.8 ms


## **Selecting Highly Variable Genes (Requires Synchronization)**  

To focus on the most informative features, we identify **highly variable genes (HVGs)**  
using the **Cell Ranger** method and subset the dataset accordingly.  

**Important Considerations:**
* Requires Synchronization → Computing highly variable genes triggers evaluation,
meaning this step is not fully lazy when using Dask.
* Copy is Essential → Using `.copy()` prevents views, ensuring the operation works properly with Dask-backed AnnData.


In [13]:
%%time
rsc.pp.highly_variable_genes(adata,n_top_genes=5000, flavor="cell_ranger")
adata = adata[:,adata.var.highly_variable].copy()

CPU times: user 1.26 s, sys: 387 ms, total: 1.65 s
Wall time: 22.1 s


  utils.warn_names_duplicates("var")


## **Scaling Gene Expression (Requires Synchronization)**  

To standardize gene expression values, we apply **feature scaling**,  
ensuring that all genes contribute equally to downstream analysis.

**Important Considerations:**
* Requires Synchronization → Since the input matrix is in CSR format (Compressed Sparse Row),
this step forces an immediate computation, meaning it is not fully lazy like earlier transformations.
* Scaling → Divides each gene’s expression values by its standard deviation.
* zero_center=False → Keeps the scaled values non-centered,
which is beneficial for sparse matrices and GPU acceleration.


In [14]:
%%time
rsc.pp.scale(adata, zero_center= False)

CPU times: user 871 ms, sys: 281 ms, total: 1.15 s
Wall time: 22.1 s


## **Principal Component Analysis (PCA) on GPU (Two-Step Synchronization Process)**  

To reduce dimensionality while preserving meaningful variation,  
we perform **Principal Component Analysis (PCA)** using **GPU acceleration**.

Understanding the Two-Step Synchronization in PCA:
1. Mandatory Synchronization for Covariance & Mean Calculation
    * PCA requires computing the covariance matrix and mean vector,
    which must be explicitly synchronized before proceeding.
    * This step is handled automatically within `rsc.pp.pca()`.

2. Finalizing the Transformation with `.compute()`
    * After computing the principal components, the data remains lazy (Dask CuPy array).
    * Calling `.compute()` on `adata.obsm["X_pca"]` performs the final transformation,
      projecting the data onto the computed PCs and materializing the result as a fully computed CuPy array.

**Why This Matters?**
* The first synchronization (**covariance & mean**) ensures the PCA model is ready.
* The second synchronization (`compute()`) ensures that the transformed data is fully realized
for downstream analyses like clustering and UMAP.

In [15]:
%%time
rsc.pp.pca(adata, n_comps = 100,mask_var=None)
adata.obsm["X_pca"]=adata.obsm["X_pca"].compute()

CPU times: user 2.34 s, sys: 1.3 s, total: 3.64 s
Wall time: 52.5 s


In [None]:
# These notebooks are very GPU memory intensive!
# In order to free up GPU memory, we'll kill this kernel prior to proceeding.  You will get a message.  This is expected.
# If you have a CUDA or an Out Of Memory (OOM) error, please kill all kernels to free up your GPU memory and try again!
# You can comment this out if you want to continue exploring the notebook.
# Please consult the README for more tips and tricks.

import IPython

IPython.Application.instance().kernel.do_shutdown(True)