# Kvikio demo

Requires
- [ ] https://github.com/pydata/xarray/pull/8100
- [ ] Some updates to `dask.array.core.getter`

In [1]:
%load_ext watermark
%xmode minimal

# These imports are currently unnecessary. I import them to show versions
# cupy_xarray registers the kvikio entrypoint on install.
# import cupy as cp
# import cudf
import cupy_xarray  # registers cupy accessor
import kvikio.zarr

import flox
import numpy_groupies
import numpy as np
import xarray as xr
import zarr

import dask
dask.config.set(scheduler="sync")

store = "./air-temperature.zarr"

%watermark -iv

Exception reporting mode: Minimal
kvikio        : 23.2.0
xarray        : 2022.6.1.dev458+g83c2919b2
numpy_groupies: 0.9.22+2.gd148074
json          : 2.0.9
numpy         : 1.24.4
flox          : 0.7.3.dev12+g796dcd2
zarr          : 2.16.1
dask          : 2023.8.1
cupy_xarray   : 0.1.1+21.gd2da1e4.dirty
sys           : 3.9.17 | packaged by conda-forge | (main, Aug 10 2023, 07:02:31) 
[GCC 12.3.0]



In [2]:
xr.backends.list_engines()

{'kvikio': <KvikioBackendEntrypoint>
   Open zarr files (.zarr) using Kvikio
   Learn more at https://docs.rapids.ai/api/kvikio/nightly/api.html#zarr,
 'store': <StoreBackendEntrypoint>
   Open AbstractDataStore instances in Xarray
   Learn more at https://docs.xarray.dev/en/stable/generated/xarray.backends.StoreBackendEntrypoint.html,
 'zarr': <ZarrBackendEntrypoint>
   Open zarr files (.zarr) using zarr in Xarray
   Learn more at https://docs.xarray.dev/en/stable/generated/xarray.backends.ZarrBackendEntrypoint.html}

In [3]:
%autoreload

# Consolidated must be False
ds = xr.open_dataset(store, engine="kvikio", consolidated=False)
print(ds.air._variable._data)
ds

> [0;32m/glade/u/home/dcherian/python/xarray/xarray/core/indexing.py[0m(485)[0;36m__array__[0;34m()[0m
[0;32m    484 [0;31m        [0;32mimport[0m [0mipdb[0m[0;34m;[0m [0mipdb[0m[0;34m.[0m[0mset_trace[0m[0;34m([0m[0;34m)[0m[0;34m[0m[0;34m[0m[0m
[0m[0;32m--> 485 [0;31m        [0;32mreturn[0m [0mnp[0m[0;34m.[0m[0masarray[0m[0;34m([0m[0mself[0m[0;34m.[0m[0mget_duck_array[0m[0;34m([0m[0;34m)[0m[0;34m,[0m [0mdtype[0m[0;34m=[0m[0mdtype[0m[0;34m)[0m[0;34m[0m[0;34m[0m[0m
[0m[0;32m    486 [0;31m[0;34m[0m[0m
[0m


ipdb>  c


> [0;32m/glade/u/home/dcherian/python/xarray/xarray/core/indexing.py[0m(485)[0;36m__array__[0;34m()[0m
[0;32m    484 [0;31m        [0;32mimport[0m [0mipdb[0m[0;34m;[0m [0mipdb[0m[0;34m.[0m[0mset_trace[0m[0;34m([0m[0;34m)[0m[0;34m[0m[0;34m[0m[0m
[0m[0;32m--> 485 [0;31m        [0;32mreturn[0m [0mnp[0m[0;34m.[0m[0masarray[0m[0;34m([0m[0mself[0m[0;34m.[0m[0mget_duck_array[0m[0;34m([0m[0;34m)[0m[0;34m,[0m [0mdtype[0m[0;34m=[0m[0mdtype[0m[0;34m)[0m[0;34m[0m[0;34m[0m[0m
[0m[0;32m    486 [0;31m[0;34m[0m[0m
[0m


ipdb>  c


MemoryCachedArray(array=CopyOnWriteArray(array=LazilyIndexedArray(array=_ElementwiseFunctionArray(LazilyIndexedArray(array=<cupy_xarray.kvikio.CupyZarrArrayWrapper object at 0x2b8c63c4fdb0>, key=BasicIndexer((slice(None, None, None), slice(None, None, None), slice(None, None, None)))), func=functools.partial(<function _scale_offset_decoding at 0x2b8c4efd9d30>, scale_factor=0.01, add_offset=None, dtype=<class 'numpy.float32'>), dtype=dtype('float32')), key=BasicIndexer((slice(None, None, None), slice(None, None, None), slice(None, None, None))))))


## Create example dataset

- cannot be compressed

In [None]:
airt = xr.tutorial.open_dataset("air_temperature", engine="netcdf4")
for var in airt.variables:
    airt[var].encoding["compressor"] = None
airt["scalar"] = 12.0
airt.to_zarr(store, mode="w", consolidated=True)

## Test opening

### Standard usage

In [4]:
xr.open_dataset(store, engine="zarr").air

### Now with kvikio!

 - must read with `consolidated=False` (https://github.com/rapidsai/kvikio/issues/119)
 - dask.from_zarr to GDSStore / open_mfdataset

In [5]:
# Consolidated must be False
ds = xr.open_dataset(store, engine="kvikio", consolidated=False)
print(ds.air._variable._data)
ds

MemoryCachedArray(array=CopyOnWriteArray(array=LazilyIndexedArray(array=_ElementwiseFunctionArray(LazilyIndexedArray(array=<cupy_xarray.kvikio.CupyZarrArrayWrapper object at 0x2aff2ffd0040>, key=BasicIndexer((slice(None, None, None), slice(None, None, None), slice(None, None, None)))), func=functools.partial(<function _scale_offset_decoding at 0x2affd11f4670>, scale_factor=0.01, add_offset=None, dtype=<class 'numpy.float32'>), dtype=dtype('float32')), key=BasicIndexer((slice(None, None, None), slice(None, None, None), slice(None, None, None))))))


In [6]:
ds.scalar

## Lazy reading

In [7]:
ds.air

## Data load for repr

In [8]:
ds["air"].isel(time=0, lat=10).load()

In [9]:
ds.scalar

In [10]:
ds.air

## CuPy array on load

In [11]:
ds["air"].isel(time=0, lat=10).variable._data

MemoryCachedArray(array=CopyOnWriteArray(array=LazilyIndexedArray(array=_ElementwiseFunctionArray(LazilyIndexedArray(array=<cupy_xarray.kvikio.CupyZarrArrayWrapper object at 0x2aff2ffd0040>, key=BasicIndexer((slice(None, None, None), slice(None, None, None), slice(None, None, None)))), func=functools.partial(<function _scale_offset_decoding at 0x2affd11f4670>, scale_factor=0.01, add_offset=None, dtype=<class 'numpy.float32'>), dtype=dtype('float32')), key=BasicIndexer((0, 10, slice(None, None, None))))))

In [12]:
type(ds["air"].isel(time=0, lat=10).load().data)

cupy.ndarray

## Load to host

In [13]:
ds.air

In [14]:
type(ds.air.as_numpy().data)

numpy.ndarray

In [15]:
type(ds.air.data)

cupy.ndarray

In [16]:
type(ds.air.mean("time").load().data)

cupy.ndarray

## Doesn't work: Chunk with dask

`meta` is wrong

In [4]:
ds.chunk(time=10).air

Unnamed: 0,Array,Chunk
Bytes,14.76 MiB,51.76 kiB
Shape,"(2920, 25, 53)","(10, 25, 53)"
Dask graph,292 chunks in 2 graph layers,292 chunks in 2 graph layers
Data type,float32 numpy.ndarray,float32 numpy.ndarray
"Array Chunk Bytes 14.76 MiB 51.76 kiB Shape (2920, 25, 53) (10, 25, 53) Dask graph 292 chunks in 2 graph layers Data type float32 numpy.ndarray",53  25  2920,

Unnamed: 0,Array,Chunk
Bytes,14.76 MiB,51.76 kiB
Shape,"(2920, 25, 53)","(10, 25, 53)"
Dask graph,292 chunks in 2 graph layers,292 chunks in 2 graph layers
Data type,float32 numpy.ndarray,float32 numpy.ndarray


`dask.array.core.getter` calls `np.asarray` on each chunk.

This calls `ImplicitToExplicitIndexingAdapter.__array__` which calls `np.asarray(cupy.array)` which raises.

Xarray uses `.get_duck_array` internally to remove these adapters. We might need to add
```python
# handle xarray internal classes that might wrap cupy
if hasattr(c, "get_duck_array"):
    c = c.get_duck_array()
else:
    c = np.asarray(c)
```

In [16]:
from dask.utils import is_arraylike

data = ds.air.variable._data
is_arraylike(data)

False

In [22]:
from xarray.core.indexing import ImplicitToExplicitIndexingAdapter

In [None]:
ImplicitToExplicitIndexingAdapter(data).get_duck_array()

In [None]:
ds.chunk(time=10).air.compute()

### explicit meta

In [None]:
import cupy as cp

chunked = ds.chunk(time=10, from_array_kwargs={"meta": cp.array([])})
chunked.air

In [None]:
%autoreload

chunked.compute()

## GroupBy with flox

Requires

1. flox main branch?
2. https://github.com/ml31415/numpy-groupies/pull/63

In [None]:
ds.air.groupby("time.month").mean()