# Xarray apply_ufunc
Author: Loic Bachelot loic.bachelot@ifremer.fr\
\
In this notebook, you will find a simple example on how to use the xarray apply_ufunc method (http://xarray.pydata.org/en/stable/generated/xarray.apply_ufunc.html)\
The example is: From a dataset with one dimention x and a time dimention, we want o have the most frequent label over time for each coordinate x.\
This example can easily be modified to do any operation over a specific dimention (mean, sum, min, max, etc...)

In [18]:
import numpy as np
import xarray as xr
import time
from numba import float64, guvectorize
# import dask

## Definition of the core funtion
in this function, we define the transformation to apply over the specific dimension. 
The input x is the array we do the operation over, in our case, the time dimension of one x coordinate\
We use time.sleep(1) to add some precessing time and showcase the parallel execution

In [2]:
def core_funct(x):
    time.sleep(1)
    res = np.nan
    if np.count_nonzero(~np.isnan(x)) == len(x):
        res = np.argmax(np.bincount(x.astype('int')))
    return res

In [3]:
def core_funct(Travel, arrivals):
    dt = arrivals - Travel
    dt = dt[~np.isnan(dt)] #remove nan value
    dt = dt - np.mean(dt)
    return np.mean(np.abs(dt)) # error

## Creating an example Xarray datarray
We purposely add np.nan values in it as it can happen in real life problems and need to be handled. Further explanation next cell.\
Note that we also chunk our array on the x dimension, the one we will iterate over. Feel free to change this and experience for yourself the impact!

In [4]:
array = xr.DataArray([[np.nan, np.nan, 10], [5, 5, 10], [3, 3, 10]], coords=[("x", [0.1, 0.2, 0.3]), ('time', [10, 11, 12])]).chunk({'x':1})
array

Unnamed: 0,Array,Chunk
Bytes,72 B,24 B
Shape,"(3, 3)","(1, 3)"
Dask graph,3 chunks in 1 graph layer,3 chunks in 1 graph layer
Data type,float64 numpy.ndarray,float64 numpy.ndarray
"Array Chunk Bytes 72 B 24 B Shape (3, 3) (1, 3) Dask graph 3 chunks in 1 graph layer Data type float64 numpy.ndarray",3  3,

Unnamed: 0,Array,Chunk
Bytes,72 B,24 B
Shape,"(3, 3)","(1, 3)"
Dask graph,3 chunks in 1 graph layer,3 chunks in 1 graph layer
Data type,float64 numpy.ndarray,float64 numpy.ndarray


# Apply_ufunc
Let's look at the attributes one by one:
- core_funct: the funtion we defined earlier, that will do the operation over each x coordinate array
- array: our input xarray data array
- dask="parallelized": explicitely say dask will handle the parallelization
- input_core_dims=[['time']]: core dimention to pass for processing, thet should no be broadcast
- vectorize=True: precise that core_funct only takes arrays defined over core dimensions as input and vectorize it automatically with numpy.vectorize(). In our case, arrays defined over 'time'
- output_dtypes=np.float: type of output, needed to handle np.nan in case of chunked array
\
\
Full documentation here: http://xarray.pydata.org/en/stable/generated/xarray.apply_ufunc.html

In [5]:
Travel = np.load("../Travel.npy",allow_pickle=True)
Travel = Travel.item()
grid_loc = list(Travel['T'].keys()) # 1,024,800 grid nodes to be searched i.e. #Shape of lon,lat,dep=(140, 120, 61)


In [6]:
# coords = np.array([np.array(coord) for coord in Travel['T'].keys()])
coords = np.array([coord for coord in Travel['T'].keys()])
TT = np.array([Travel['T'][coord] for coord in Travel['T'].keys()])


In [7]:
TT.shape

(1024800, 28)

In [8]:
array = xr.DataArray(TT, coords=[("x", np.arange(len(coords))), ("sta", np.arange(TT.shape[-1]))  ]).chunk({'x':200})
array

Unnamed: 0,Array,Chunk
Bytes,218.92 MiB,43.75 kiB
Shape,"(1024800, 28)","(200, 28)"
Dask graph,5124 chunks in 1 graph layer,5124 chunks in 1 graph layer
Data type,float64 numpy.ndarray,float64 numpy.ndarray
"Array Chunk Bytes 218.92 MiB 43.75 kiB Shape (1024800, 28) (200, 28) Dask graph 5124 chunks in 1 graph layer Data type float64 numpy.ndarray",28  1024800,

Unnamed: 0,Array,Chunk
Bytes,218.92 MiB,43.75 kiB
Shape,"(1024800, 28)","(200, 28)"
Dask graph,5124 chunks in 1 graph layer,5124 chunks in 1 graph layer
Data type,float64 numpy.ndarray,float64 numpy.ndarray


In [9]:
TT.shape

(1024800, 28)

In [10]:
coords.shape

(1024800, 3)

In [11]:
ds_var = xr.Dataset(
            data_vars=dict(
                TT=(["idx", "sta"], TT,
                   dict(description="arrival time", units="s")),
                lon=(["idx"], coords[:,0],
                   dict(description="Longitude", units="degree")),
                lat=(["idx"], coords[:,1],
                   dict(description="Latitude", units="degree")),
                dep=(["idx"], coords[:,2],
                   dict(description="depth", units="km")),
            ),
            coords=dict(
                idx=(["idx"], np.arange(TT.shape[0])),
                sta=(["sta"], np.arange(TT.shape[1])),
            )
    
).chunk({'idx':100})
ds_var

Unnamed: 0,Array,Chunk
Bytes,218.92 MiB,21.88 kiB
Shape,"(1024800, 28)","(100, 28)"
Dask graph,10248 chunks in 1 graph layer,10248 chunks in 1 graph layer
Data type,float64 numpy.ndarray,float64 numpy.ndarray
"Array Chunk Bytes 218.92 MiB 21.88 kiB Shape (1024800, 28) (100, 28) Dask graph 10248 chunks in 1 graph layer Data type float64 numpy.ndarray",28  1024800,

Unnamed: 0,Array,Chunk
Bytes,218.92 MiB,21.88 kiB
Shape,"(1024800, 28)","(100, 28)"
Dask graph,10248 chunks in 1 graph layer,10248 chunks in 1 graph layer
Data type,float64 numpy.ndarray,float64 numpy.ndarray

Unnamed: 0,Array,Chunk
Bytes,7.82 MiB,800 B
Shape,"(1024800,)","(100,)"
Dask graph,10248 chunks in 1 graph layer,10248 chunks in 1 graph layer
Data type,float64 numpy.ndarray,float64 numpy.ndarray
"Array Chunk Bytes 7.82 MiB 800 B Shape (1024800,) (100,) Dask graph 10248 chunks in 1 graph layer Data type float64 numpy.ndarray",1024800  1,

Unnamed: 0,Array,Chunk
Bytes,7.82 MiB,800 B
Shape,"(1024800,)","(100,)"
Dask graph,10248 chunks in 1 graph layer,10248 chunks in 1 graph layer
Data type,float64 numpy.ndarray,float64 numpy.ndarray

Unnamed: 0,Array,Chunk
Bytes,7.82 MiB,800 B
Shape,"(1024800,)","(100,)"
Dask graph,10248 chunks in 1 graph layer,10248 chunks in 1 graph layer
Data type,float64 numpy.ndarray,float64 numpy.ndarray
"Array Chunk Bytes 7.82 MiB 800 B Shape (1024800,) (100,) Dask graph 10248 chunks in 1 graph layer Data type float64 numpy.ndarray",1024800  1,

Unnamed: 0,Array,Chunk
Bytes,7.82 MiB,800 B
Shape,"(1024800,)","(100,)"
Dask graph,10248 chunks in 1 graph layer,10248 chunks in 1 graph layer
Data type,float64 numpy.ndarray,float64 numpy.ndarray

Unnamed: 0,Array,Chunk
Bytes,7.82 MiB,800 B
Shape,"(1024800,)","(100,)"
Dask graph,10248 chunks in 1 graph layer,10248 chunks in 1 graph layer
Data type,float64 numpy.ndarray,float64 numpy.ndarray
"Array Chunk Bytes 7.82 MiB 800 B Shape (1024800,) (100,) Dask graph 10248 chunks in 1 graph layer Data type float64 numpy.ndarray",1024800  1,

Unnamed: 0,Array,Chunk
Bytes,7.82 MiB,800 B
Shape,"(1024800,)","(100,)"
Dask graph,10248 chunks in 1 graph layer,10248 chunks in 1 graph layer
Data type,float64 numpy.ndarray,float64 numpy.ndarray


In [12]:
ds_var.isel(idx=0)

Unnamed: 0,Array,Chunk
Bytes,224 B,224 B
Shape,"(28,)","(28,)"
Dask graph,1 chunks in 2 graph layers,1 chunks in 2 graph layers
Data type,float64 numpy.ndarray,float64 numpy.ndarray
"Array Chunk Bytes 224 B 224 B Shape (28,) (28,) Dask graph 1 chunks in 2 graph layers Data type float64 numpy.ndarray",28  1,

Unnamed: 0,Array,Chunk
Bytes,224 B,224 B
Shape,"(28,)","(28,)"
Dask graph,1 chunks in 2 graph layers,1 chunks in 2 graph layers
Data type,float64 numpy.ndarray,float64 numpy.ndarray

Unnamed: 0,Array,Chunk
Bytes,8 B,8 B
Shape,(),()
Dask graph,1 chunks in 2 graph layers,1 chunks in 2 graph layers
Data type,float64 numpy.ndarray,float64 numpy.ndarray
Array Chunk Bytes 8 B 8 B Shape () () Dask graph 1 chunks in 2 graph layers Data type float64 numpy.ndarray,,

Unnamed: 0,Array,Chunk
Bytes,8 B,8 B
Shape,(),()
Dask graph,1 chunks in 2 graph layers,1 chunks in 2 graph layers
Data type,float64 numpy.ndarray,float64 numpy.ndarray

Unnamed: 0,Array,Chunk
Bytes,8 B,8 B
Shape,(),()
Dask graph,1 chunks in 2 graph layers,1 chunks in 2 graph layers
Data type,float64 numpy.ndarray,float64 numpy.ndarray
Array Chunk Bytes 8 B 8 B Shape () () Dask graph 1 chunks in 2 graph layers Data type float64 numpy.ndarray,,

Unnamed: 0,Array,Chunk
Bytes,8 B,8 B
Shape,(),()
Dask graph,1 chunks in 2 graph layers,1 chunks in 2 graph layers
Data type,float64 numpy.ndarray,float64 numpy.ndarray

Unnamed: 0,Array,Chunk
Bytes,8 B,8 B
Shape,(),()
Dask graph,1 chunks in 2 graph layers,1 chunks in 2 graph layers
Data type,float64 numpy.ndarray,float64 numpy.ndarray
Array Chunk Bytes 8 B 8 B Shape () () Dask graph 1 chunks in 2 graph layers Data type float64 numpy.ndarray,,

Unnamed: 0,Array,Chunk
Bytes,8 B,8 B
Shape,(),()
Dask graph,1 chunks in 2 graph layers,1 chunks in 2 graph layers
Data type,float64 numpy.ndarray,float64 numpy.ndarray


In [13]:
arrivals = np.random.randn(28)
arrivals.shape

(28,)

In [14]:
ds_var.TT

Unnamed: 0,Array,Chunk
Bytes,218.92 MiB,21.88 kiB
Shape,"(1024800, 28)","(100, 28)"
Dask graph,10248 chunks in 1 graph layer,10248 chunks in 1 graph layer
Data type,float64 numpy.ndarray,float64 numpy.ndarray
"Array Chunk Bytes 218.92 MiB 21.88 kiB Shape (1024800, 28) (100, 28) Dask graph 10248 chunks in 1 graph layer Data type float64 numpy.ndarray",28  1024800,

Unnamed: 0,Array,Chunk
Bytes,218.92 MiB,21.88 kiB
Shape,"(1024800, 28)","(100, 28)"
Dask graph,10248 chunks in 1 graph layer,10248 chunks in 1 graph layer
Data type,float64 numpy.ndarray,float64 numpy.ndarray


In [19]:
@guvectorize(
    "(float64[:], float64[:], float64[:])",
    "(n), (n) -> ()"
)
def core_funct_v2(travel, arrivals, res):
    dt = arrivals - travel
    dt = dt[~np.isnan(dt)] #remove nan value
    dt = dt - np.mean(dt)
    res[0] = np.mean(np.abs(dt)) # error

In [21]:
res = xr.apply_ufunc(core_funct_v2, ds_var.TT.load(), arrivals, input_core_dims=[["sta"], ["sta"]], output_dtypes=float)


In [29]:
ds_var.assign(variables={"result": (('idx'), res.data)})

Unnamed: 0,Array,Chunk
Bytes,7.82 MiB,800 B
Shape,"(1024800,)","(100,)"
Dask graph,10248 chunks in 1 graph layer,10248 chunks in 1 graph layer
Data type,float64 numpy.ndarray,float64 numpy.ndarray
"Array Chunk Bytes 7.82 MiB 800 B Shape (1024800,) (100,) Dask graph 10248 chunks in 1 graph layer Data type float64 numpy.ndarray",1024800  1,

Unnamed: 0,Array,Chunk
Bytes,7.82 MiB,800 B
Shape,"(1024800,)","(100,)"
Dask graph,10248 chunks in 1 graph layer,10248 chunks in 1 graph layer
Data type,float64 numpy.ndarray,float64 numpy.ndarray

Unnamed: 0,Array,Chunk
Bytes,7.82 MiB,800 B
Shape,"(1024800,)","(100,)"
Dask graph,10248 chunks in 1 graph layer,10248 chunks in 1 graph layer
Data type,float64 numpy.ndarray,float64 numpy.ndarray
"Array Chunk Bytes 7.82 MiB 800 B Shape (1024800,) (100,) Dask graph 10248 chunks in 1 graph layer Data type float64 numpy.ndarray",1024800  1,

Unnamed: 0,Array,Chunk
Bytes,7.82 MiB,800 B
Shape,"(1024800,)","(100,)"
Dask graph,10248 chunks in 1 graph layer,10248 chunks in 1 graph layer
Data type,float64 numpy.ndarray,float64 numpy.ndarray

Unnamed: 0,Array,Chunk
Bytes,7.82 MiB,800 B
Shape,"(1024800,)","(100,)"
Dask graph,10248 chunks in 1 graph layer,10248 chunks in 1 graph layer
Data type,float64 numpy.ndarray,float64 numpy.ndarray
"Array Chunk Bytes 7.82 MiB 800 B Shape (1024800,) (100,) Dask graph 10248 chunks in 1 graph layer Data type float64 numpy.ndarray",1024800  1,

Unnamed: 0,Array,Chunk
Bytes,7.82 MiB,800 B
Shape,"(1024800,)","(100,)"
Dask graph,10248 chunks in 1 graph layer,10248 chunks in 1 graph layer
Data type,float64 numpy.ndarray,float64 numpy.ndarray


In [16]:
%%time
a = xr.apply_ufunc(core_funct, ds_var.TT, arrivals, dask="parallelized", input_core_dims=[['sta']], vectorize=True, output_dtypes=np.float)
a.compute()

Deprecated in NumPy 1.20; for more details and guidance: https://numpy.org/devdocs/release/1.20.0-notes.html#deprecations


ValueError: input_core_dims must be None or a tuple with the length same to the number of arguments. Given 1 input_core_dims: [['sta']],  but number of args is 2.

# Without apply_ufunc
Here is an exaple without using apply_ufunc:\
- We define a function to loop over the x dimension and call our core_funct on each x array defined over 'time'
- append results in array and cast to Xarray data array

In [None]:
def get_most_freq_loop(ds):
    mpblab = []
    for i in ds['x']:
        res = core_funct(ds.sel(x=i))
        mpblab.append(res)
    return xr.DataArray(np.array(mpblab))

In [None]:
%%time
b = get_most_freq_loop(array)
b

# Results
The method using apply_ufunc is taking about 1 second total which is the sleep(1) of the core_funct. We are not going over the sleep(1) only one time but 3 times in parallel so only takes 1 second overall.\
The execution using the loop is taking 3 seconds, and this is because we have to wait 1 second before starting the next computation.\
\
Using this approach helped us archive a speedup of about 15x for a plot on the same 8 core machine. Of course it depends on the number of core, and the chunks defined.
I invite you to read dask best practice guide for more information on the chunks: https://docs.dask.org/en/latest/array-best-practices.html 