# Dask Array Support to AnnData

## Initalizing

First let's do our imports and initalize adata objects with the help of the `adata_with_dask` function defined below.

In [1]:
import dask
import dask.array as da
import numpy as np
import pandas as pd
import anndata as ad

dask.config.set({"visualization.engine": "graphviz"});

In [2]:
def adata_with_dask(M, N):
    adata_dict = {}
    adata_dict["X"] = da.random.random((M, N))
    adata_dict["dtype"] = np.float64
    adata_dict["obsm"] = dict(
        a=da.random.random((M, 100)),
    )
    adata_dict["layers"] = dict(
        a=da.random.random((M, N)),
    )
    adata_dict["obs"] = pd.DataFrame(
        {"batch": np.random.choice(["a", "b"], M)},
        index=[f"cell{i:03d}" for i in range(M)],
    )
    adata_dict["var"] = pd.DataFrame(index=[f"gene{i:03d}" for i in range(N)])
    
    
    return ad.AnnData(**adata_dict)

Here is how our adata looks like

In [3]:
adata = adata_with_dask(10000,10000)
adata

AnnData object with n_obs × n_vars = 10000 × 10000
    obs: 'batch'
    obsm: 'a'
    layers: 'a'

## Representation of Dask Arrays

Dask arrays consists of chunks that can be distributed in clusters. In the figure below, each small square represents a chunk that form a dask array. In principle these some of these chunks could be in different machines (clusters).

In [4]:
adata.X

Unnamed: 0,Array,Chunk
Bytes,762.94 MiB,47.68 MiB
Shape,"(10000, 10000)","(2500, 2500)"
Count,1 Graph Layer,16 Chunks
Type,float64,numpy.ndarray
"Array Chunk Bytes 762.94 MiB 47.68 MiB Shape (10000, 10000) (2500, 2500) Count 1 Graph Layer 16 Chunks Type float64 numpy.ndarray",10000  10000,

Unnamed: 0,Array,Chunk
Bytes,762.94 MiB,47.68 MiB
Shape,"(10000, 10000)","(2500, 2500)"
Count,1 Graph Layer,16 Chunks
Type,float64,numpy.ndarray


In [5]:
adata.obsm['a']

Unnamed: 0,Array,Chunk
Bytes,7.63 MiB,7.63 MiB
Shape,"(10000, 100)","(10000, 100)"
Count,1 Graph Layer,1 Chunks
Type,float64,numpy.ndarray
"Array Chunk Bytes 7.63 MiB 7.63 MiB Shape (10000, 100) (10000, 100) Count 1 Graph Layer 1 Chunks Type float64 numpy.ndarray",100  10000,

Unnamed: 0,Array,Chunk
Bytes,7.63 MiB,7.63 MiB
Shape,"(10000, 100)","(10000, 100)"
Count,1 Graph Layer,1 Chunks
Type,float64,numpy.ndarray


## The Computation Graph
The graph layer in the `Count` row refers to the layer of the computation graph of the chunks, i.e. which operations are applied to them. We have this because the operations done on Dask arrays aren't computed instantly. This way, Dask array can optimize the queries we issued to it. It also won't keep our resources occupied for the results we expect later. Below is a representation of the chunks we initially created.

In [6]:
adata.X.visualize()

CytoscapeWidget(cytoscape_layout={'name': 'dagre', 'rankDir': 'BT', 'nodeSep': 10, 'edgeSep': 10, 'spacingFact…

We now show an example for this computation graph on dask arrays to understand it better. This part is not technically relevant to AnnData.

In [7]:
xsum = adata.X.sum(axis=1) # do a sum on axis=1
xsum

Unnamed: 0,Array,Chunk
Bytes,78.12 kiB,19.53 kiB
Shape,"(10000,)","(2500,)"
Count,3 Graph Layers,4 Chunks
Type,float64,numpy.ndarray
"Array Chunk Bytes 78.12 kiB 19.53 kiB Shape (10000,) (2500,) Count 3 Graph Layers 4 Chunks Type float64 numpy.ndarray",10000  1,

Unnamed: 0,Array,Chunk
Bytes,78.12 kiB,19.53 kiB
Shape,"(10000,)","(2500,)"
Count,3 Graph Layers,4 Chunks
Type,float64,numpy.ndarray


Note that this computation isn't necessarily done yet, but rather saved to actually run it later.   If we investigate the computation graph of this result, we can see that for this operation some chunks aren't dependent on each other. This might give a hint to the Dask framework to store the chunks that depend on each other to the same cluster. For this simple exercise, all the chunks can be stored in one machine, but when it is impossible to store all the four chunks into one machine this will come in handy.

In [8]:
xsum.visualize()

CytoscapeWidget(cytoscape_layout={'name': 'dagre', 'rankDir': 'BT', 'nodeSep': 10, 'edgeSep': 10, 'spacingFact…

But coming back to our anndata tutorial we will see that nothing changed in our adata.

In [9]:
adata.X.visualize()

CytoscapeWidget(cytoscape_layout={'name': 'dagre', 'rankDir': 'BT', 'nodeSep': 10, 'edgeSep': 10, 'spacingFact…

## Concatenation

In this section we will cover how concatenation on anndata objects that use Dask arrays looks like. We first create another anndata object to concatenate with.

In [10]:
adata2 = adata_with_dask(10000,10000)
adata2

AnnData object with n_obs × n_vars = 10000 × 10000
    obs: 'batch'
    obsm: 'a'
    layers: 'a'

We can see that the X attribute of adata also consist of four chunks.

In [11]:
adata2.X

Unnamed: 0,Array,Chunk
Bytes,762.94 MiB,47.68 MiB
Shape,"(10000, 10000)","(2500, 2500)"
Count,1 Graph Layer,16 Chunks
Type,float64,numpy.ndarray
"Array Chunk Bytes 762.94 MiB 47.68 MiB Shape (10000, 10000) (2500, 2500) Count 1 Graph Layer 16 Chunks Type float64 numpy.ndarray",10000  10000,

Unnamed: 0,Array,Chunk
Bytes,762.94 MiB,47.68 MiB
Shape,"(10000, 10000)","(2500, 2500)"
Count,1 Graph Layer,16 Chunks
Type,float64,numpy.ndarray


In [12]:
adata_concat = ad.concat([adata,adata2],index_unique='id')

When we concatenate the whole object you can see that in the X of the result consists of eight chunks and they quite probably are just the source chunks aligned.

In [13]:
adata_concat.X

Unnamed: 0,Array,Chunk
Bytes,1.49 GiB,47.68 MiB
Shape,"(20000, 10000)","(2500, 2500)"
Count,3 Graph Layers,32 Chunks
Type,float64,numpy.ndarray
"Array Chunk Bytes 1.49 GiB 47.68 MiB Shape (20000, 10000) (2500, 2500) Count 3 Graph Layers 32 Chunks Type float64 numpy.ndarray",10000  20000,

Unnamed: 0,Array,Chunk
Bytes,1.49 GiB,47.68 MiB
Shape,"(20000, 10000)","(2500, 2500)"
Count,3 Graph Layers,32 Chunks
Type,float64,numpy.ndarray


To confirm this we look at the computation graph. We can confirm that this new object's X is just the chunks of source X's put together.

In [14]:
adata_concat.X.visualize()

CytoscapeWidget(cytoscape_layout={'name': 'dagre', 'rankDir': 'BT', 'nodeSep': 10, 'edgeSep': 10, 'spacingFact…

Here for the obsm we also this. The chunk from both are just stacked on top.

In [15]:
adata_concat.obsm['a']

Unnamed: 0,Array,Chunk
Bytes,15.26 MiB,7.63 MiB
Shape,"(20000, 100)","(10000, 100)"
Count,3 Graph Layers,2 Chunks
Type,float64,numpy.ndarray
"Array Chunk Bytes 15.26 MiB 7.63 MiB Shape (20000, 100) (10000, 100) Count 3 Graph Layers 2 Chunks Type float64 numpy.ndarray",100  20000,

Unnamed: 0,Array,Chunk
Bytes,15.26 MiB,7.63 MiB
Shape,"(20000, 100)","(10000, 100)"
Count,3 Graph Layers,2 Chunks
Type,float64,numpy.ndarray


In [16]:
adata_concat.obsm['a'].visualize()

CytoscapeWidget(cytoscape_layout={'name': 'dagre', 'rankDir': 'BT', 'nodeSep': 10, 'edgeSep': 10, 'spacingFact…

## Views

Let's see how our views of anndata objects play with Dask arrays.

### Slice View



We take a slice of the concatenated adatas. This operation returns a view of the adata which means that the resulting adata holds a view of the source adata's Dask array, namely the `DaskArrayView` class,  which is a completely different object than Dask array.

In [17]:
adata_slice_view = adata_concat[:500, :][:, :500]

Below, you can see the values of the X attribute of the result. Which implies that the resulting object isn't "lazy" like Dask arrays. 

In [18]:
adata_slice_view.X, adata_slice_view.shape

(DaskArrayView([[0.81182637, 0.60832413, 0.63094608, ..., 0.25991546,
                 0.97130487, 0.65728839],
                [0.13260749, 0.74650301, 0.11485053, ..., 0.71921565,
                 0.92553613, 0.73383694],
                [0.32783528, 0.06200809, 0.94391329, ..., 0.81707524,
                 0.44381134, 0.07904699],
                ...,
                [0.31714581, 0.55801191, 0.93052054, ..., 0.6936699 ,
                 0.4174704 , 0.73246195],
                [0.09022853, 0.87645173, 0.96647298, ..., 0.20770134,
                 0.27631136, 0.0251467 ],
                [0.80847933, 0.75661258, 0.69358137, ..., 0.65941116,
                 0.50209536, 0.41314259]]),
 (500, 500))

But our original adata remains unchanged.

In [19]:
adata_concat.X

Unnamed: 0,Array,Chunk
Bytes,1.49 GiB,47.68 MiB
Shape,"(20000, 10000)","(2500, 2500)"
Count,3 Graph Layers,32 Chunks
Type,float64,numpy.ndarray
"Array Chunk Bytes 1.49 GiB 47.68 MiB Shape (20000, 10000) (2500, 2500) Count 3 Graph Layers 32 Chunks Type float64 numpy.ndarray",10000  20000,

Unnamed: 0,Array,Chunk
Bytes,1.49 GiB,47.68 MiB
Shape,"(20000, 10000)","(2500, 2500)"
Count,3 Graph Layers,32 Chunks
Type,float64,numpy.ndarray


### Index List View

In [20]:
small_view = adata_concat[[12,12,3,5,53],[1,2,5]]
small_view

View of AnnData object with n_obs × n_vars = 5 × 3
    obs: 'batch'
    obsm: 'a'
    layers: 'a'

In [21]:
small_view.X

DaskArrayView([[0.56433873, 0.21104419, 0.61432291],
               [0.56433873, 0.21104419, 0.61432291],
               [0.51771641, 0.95237847, 0.31383044],
               [0.50142335, 0.97339081, 0.9336738 ],
               [0.35927466, 0.94088   , 0.8781376 ]])

### View by Category

In [22]:
categ_view = adata_concat[adata_concat.obs['batch'] == 'b']

In [23]:
categ_view.X

DaskArrayView([[0.13260749, 0.74650301, 0.11485053, ..., 0.93270847,
                0.98403846, 0.9927202 ],
               [0.32783528, 0.06200809, 0.94391329, ..., 0.50178389,
                0.54705791, 0.75716067],
               [0.57973377, 0.30956408, 0.90240769, ..., 0.76048711,
                0.73042165, 0.3745892 ],
               ...,
               [0.47399902, 0.73394741, 0.43985274, ..., 0.54661086,
                0.3116454 , 0.43921658],
               [0.0305885 , 0.94680013, 0.11682959, ..., 0.44790366,
                0.75264702, 0.46293768],
               [0.47863471, 0.42154932, 0.51308936, ..., 0.3518507 ,
                0.20528009, 0.01369657]])

## To Memory

In [24]:
adata_concat.X

Unnamed: 0,Array,Chunk
Bytes,1.49 GiB,47.68 MiB
Shape,"(20000, 10000)","(2500, 2500)"
Count,3 Graph Layers,32 Chunks
Type,float64,numpy.ndarray
"Array Chunk Bytes 1.49 GiB 47.68 MiB Shape (20000, 10000) (2500, 2500) Count 3 Graph Layers 32 Chunks Type float64 numpy.ndarray",10000  20000,

Unnamed: 0,Array,Chunk
Bytes,1.49 GiB,47.68 MiB
Shape,"(20000, 10000)","(2500, 2500)"
Count,3 Graph Layers,32 Chunks
Type,float64,numpy.ndarray


Here no copies are made, only the result of the lazy object is asked to be materialized.

In [25]:
adata_mem = adata_concat.to_memory(copy=False)
adata_mem.X

array([[0.81182637, 0.60832413, 0.63094608, ..., 0.5919205 , 0.61510886,
        0.3511783 ],
       [0.13260749, 0.74650301, 0.11485053, ..., 0.93270847, 0.98403846,
        0.9927202 ],
       [0.32783528, 0.06200809, 0.94391329, ..., 0.50178389, 0.54705791,
        0.75716067],
       ...,
       [0.02126415, 0.20244217, 0.45168958, ..., 0.59297117, 0.53405757,
        0.63389122],
       [0.81320476, 0.03885505, 0.88772439, ..., 0.2266176 , 0.07520848,
        0.71671106],
       [0.58298023, 0.61520019, 0.68716917, ..., 0.87494478, 0.34212017,
        0.16569513]])

If you want to both materialize the result and copy.

In [26]:
del adata_mem

In [27]:
adata_mem = adata_concat.to_memory(copy=True)
adata_mem.X

array([[0.81182637, 0.60832413, 0.63094608, ..., 0.5919205 , 0.61510886,
        0.3511783 ],
       [0.13260749, 0.74650301, 0.11485053, ..., 0.93270847, 0.98403846,
        0.9927202 ],
       [0.32783528, 0.06200809, 0.94391329, ..., 0.50178389, 0.54705791,
        0.75716067],
       ...,
       [0.02126415, 0.20244217, 0.45168958, ..., 0.59297117, 0.53405757,
        0.63389122],
       [0.81320476, 0.03885505, 0.88772439, ..., 0.2266176 , 0.07520848,
        0.71671106],
       [0.58298023, 0.61520019, 0.68716917, ..., 0.87494478, 0.34212017,
        0.16569513]])

# IO operations

Read/Write operations on `h5ad` and `Zarr` are supported. One should note that the lazy objects are materialized when this is called. For now, the anndata loaded from file won't be loaded with dask arrays in it.

## Write h5ad

In [28]:
adata = adata_with_dask(100,100)

In [29]:
adata.write_h5ad('a1.h5ad')

In [30]:
adata.X

Unnamed: 0,Array,Chunk
Bytes,78.12 kiB,78.12 kiB
Shape,"(100, 100)","(100, 100)"
Count,1 Graph Layer,1 Chunks
Type,float64,numpy.ndarray
"Array Chunk Bytes 78.12 kiB 78.12 kiB Shape (100, 100) (100, 100) Count 1 Graph Layer 1 Chunks Type float64 numpy.ndarray",100  100,

Unnamed: 0,Array,Chunk
Bytes,78.12 kiB,78.12 kiB
Shape,"(100, 100)","(100, 100)"
Count,1 Graph Layer,1 Chunks
Type,float64,numpy.ndarray


## Read h5ad

In [31]:
h5ad_adata = ad.read_h5ad('a1.h5ad')

In [32]:
h5ad_adata.X

array([[0.87020048, 0.71785178, 0.60956666, ..., 0.77540814, 0.97385522,
        0.58240197],
       [0.18965275, 0.96108207, 0.00738146, ..., 0.02289613, 0.70454499,
        0.50157217],
       [0.21079103, 0.40237768, 0.89090548, ..., 0.92926245, 0.3746632 ,
        0.71671809],
       ...,
       [0.52292645, 0.46114164, 0.7615855 , ..., 0.19401272, 0.73427208,
        0.47392012],
       [0.33771419, 0.89763822, 0.093955  , ..., 0.77895972, 0.21971277,
        0.83693097],
       [0.35121118, 0.42795938, 0.28069129, ..., 0.1023317 , 0.76424995,
        0.94417626]])

## Write zarr

In [33]:
adata.write_zarr('a2.zarr')

In [34]:
adata.X

Unnamed: 0,Array,Chunk
Bytes,78.12 kiB,78.12 kiB
Shape,"(100, 100)","(100, 100)"
Count,1 Graph Layer,1 Chunks
Type,float64,numpy.ndarray
"Array Chunk Bytes 78.12 kiB 78.12 kiB Shape (100, 100) (100, 100) Count 1 Graph Layer 1 Chunks Type float64 numpy.ndarray",100  100,

Unnamed: 0,Array,Chunk
Bytes,78.12 kiB,78.12 kiB
Shape,"(100, 100)","(100, 100)"
Count,1 Graph Layer,1 Chunks
Type,float64,numpy.ndarray


## Read zarr

In [35]:
zarr_adata = ad.read_zarr('a2.zarr')

In [36]:
zarr_adata.X

array([[0.87020048, 0.71785178, 0.60956666, ..., 0.77540814, 0.97385522,
        0.58240197],
       [0.18965275, 0.96108207, 0.00738146, ..., 0.02289613, 0.70454499,
        0.50157217],
       [0.21079103, 0.40237768, 0.89090548, ..., 0.92926245, 0.3746632 ,
        0.71671809],
       ...,
       [0.52292645, 0.46114164, 0.7615855 , ..., 0.19401272, 0.73427208,
        0.47392012],
       [0.33771419, 0.89763822, 0.093955  , ..., 0.77895972, 0.21971277,
        0.83693097],
       [0.35121118, 0.42795938, 0.28069129, ..., 0.1023317 , 0.76424995,
        0.94417626]])

Notice how they are loaded as arrays rather than dask arrays.

## Dask Array Support for Other Fields

This is the list of operations and in which fields they are supported, although some might have not been covered in this tutorial.

The following work with operations anndata supported before are also supported now with Dask arrays:
- anndata.concat()
- Views
- copy()
- to_memory() (changed behaviour)
- read/write on h5ad/zarr

**X, obsm, varm, obsp, varp, layers, uns, and raw** attributes are all supported and tested.

Note: scipy.sparse array wrapped with dask array doesn't play well. This is mainly because of the inconsistent numpy api support of scipy.sparse. Even though not explicitly tested, a sparse array that supports the numpy api should theoretically work well.