In [8]:
#######################################################################
# Copyright (c) 2019-present, Blosc Development Team <blosc@blosc.org>
# All rights reserved.
#
# This source code is licensed under a BSD-style license (found in the
# LICENSE file in the root directory of this source tree)
#######################################################################

# Benchmark to evaluate expressions with numba and NDArray instances as operands.
# As numba takes a while to compile the first time, we use cached functions, so
# make sure to run the script at least a couple of times.

from time import time

import numba as nb
import numexpr as ne
import numpy as np

import blosc2
%load_ext cython

In [9]:
shape = (5000, 10_000)
chunks = [500, 10_000]
blocks = [4, 10_000]
dtype = np.float32

# Expression to evaluate
exprs = ("x < .5",
         "(x**2 + y**2) <= (2 * x * y + 1)",
         "(sin(x)**3 + cos(y)**2) >= (cos(x) * sin(y) + z)",
         )

In [10]:
# Prepare the operands
npx = np.linspace(0, 1, np.prod(shape), dtype=dtype).reshape(shape)
npy = np.linspace(-1, 1, np.prod(shape), dtype=dtype).reshape(shape)
npz = np.linspace(0, 10, np.prod(shape), dtype=dtype).reshape(shape)
vardict = {"x": npx, "y": npy, "z": npz, "np": np}
x = blosc2.asarray(npx, chunks=chunks, blocks=blocks)
y = blosc2.asarray(npy, chunks=chunks, blocks=blocks)
z = blosc2.asarray(npz, chunks=chunks, blocks=blocks)
b2vardict = {"x": x, "y": y, "z": z, "blosc2": blosc2}

In [11]:
# Define the functions to evaluate the expressions

# The numba+blosc2 version using an udf
@nb.jit(parallel=True, cache=True)
def udf_numba(inputs, output, offset):
    icount = len(inputs)
    x = inputs[0]
    if icount == 1:
        for i in nb.prange(x.shape[0]):
            for j in nb.prange(x.shape[1]):
                output[i, j] = x[i, j] < .5
    elif icount == 2:
        y = inputs[1]
        for i in nb.prange(x.shape[0]):
            for j in nb.prange(x.shape[1]):
                output[i, j] = x[i, j]**2 + y[i, j]**2 <= 2 * x[i, j] * y[i, j] + 1
    elif icount == 3:
        y = inputs[1]
        z = inputs[2]
        for i in nb.prange(x.shape[0]):
            for j in nb.prange(x.shape[1]):
                output[i, j] = (np.sin(x[i, j])**3 + np.cos(y[i, j])**2) >= (np.cos(x[i, j]) * np.sin(y[i, j]) + z[i, j])
    return

In [12]:
# Evaluate expressions
for n, expr in enumerate(exprs):
    print(f"*** Evaluating expression: {expr} ...")

    # Evaluate the expression with NumPy/numexpr
    npexpr = expr.replace("sin", "np.sin").replace("cos", "np.cos")
    t0 = time()
    npres = eval(npexpr, vardict)
    print("NumPy took %.3f s" % (time() - t0))
    # ne.set_num_threads(1)
    # nb.set_num_threads(1)  # this does not work that well; better use the NUMBA_NUM_THREADS env var
    output = npres.copy()
    t0 = time()
    ne.evaluate(expr, vardict, out=output)
    print("NumExpr took %.3f s" % (time() - t0))
    np.testing.assert_equal(output, npres)

    # Evaluate the expression with Blosc2+numexpr
    blosc2.cparams_dflts["codec"] = blosc2.Codec.LZ4
    blosc2.cparams_dflts["clevel"] = 5
    b2expr = expr.replace("sin", "blosc2.sin").replace("cos", "blosc2.cos")
    c = eval(b2expr, b2vardict)
    t0 = time()
    d = c.eval()
    print("LazyExpr+eval took %.3f s" % (time() - t0))
    # Check
    np.testing.assert_equal(d[:], npres)
    t0 = time()
    d = c[:]
    print("LazyExpr+getitem took %.3f s" % (time() - t0))
    # Check
    np.testing.assert_equal(d[:], npres)

    inputs, npinputs = (x,), (npx,)
    if n == 1:
        inputs, npinputs = (x, y), (npx, npy)
    elif n == 2:
        inputs, npinputs = (x, y, z), (npx, npy, npz)

    t0 = time()
    udf_numba(npinputs, output, offset=None)
    print("Numba took %.3f s" % (time() - t0))
    np.testing.assert_equal(output, npres)

    expr_ = blosc2.lazyudf(udf_numba, inputs, np.bool_, chunked_eval=False,
                           chunks=chunks, blocks=blocks)
    # actual benchmark
    # eval() uses the udf function as a prefilter
    t0 = time()
    res = expr_.eval()
    print("LazyUDF+eval took %.3f s" % (time() - t0))
    np.testing.assert_equal(res[...], npres)
    # getitem uses the same compiled function but as a postfilter
    t0 = time()
    res = expr_[:]
    print("LazyUDF+getitem took %.3f s" % (time() - t0))
    np.testing.assert_equal(res[...], npres)

    expr_ = blosc2.lazyudf(udf_numba, inputs, np.bool_, chunked_eval=True,
                           chunks=chunks, blocks=blocks)
    # getitem but using chunked evaluation
    t0 = time()
    res = expr_.eval()
    print("LazyUDF+chunked_eval took %.3f s" % (time() - t0))
    np.testing.assert_equal(res[...], npres)
    t0 = time()
    res = expr_[:]
    print("LazyUDF+getitem+chunked_eval took %.3f s" % (time() - t0))
    np.testing.assert_equal(res[...], npres)


In [13]:
%%cython
# The cython+blosc2 version using an udf
import numpy as np
cimport numpy as np
cimport cython
from cython.parallel cimport parallel, prange
from libc.math cimport sinf, cosf
#from cpython cimport bool
@cython.boundscheck(False)  # Deactivate bounds checking
@cython.wraparound(False)   # Deactivate negative indexing.
#def udf_cython(inputs, np.ndarray[np.npy_bool, ndim=2] output, object offset):
def udf_cython(inputs, np.npy_bool[:, ::1] output, object offset) -> None:
    cdef int icount = len(inputs)
    #print(f"*** icount: {icount}")
    cdef np.npy_float32[:, ::1] x, y, z
    x = inputs[0]
    cdef long shape0, shape1
    shape0 = x.shape[0]
    shape1 = x.shape[1]
    cdef int i, j
    if icount == 1:
        with nogil, parallel():
            for i in prange(shape0):
                for j in prange(shape1):
                    output[i, j] = x[i, j] < .5
    elif icount == 2:
        y = inputs[1]
        with nogil, parallel():
            for i in prange(shape0):
                for j in prange(shape1):
                    output[i, j] = x[i, j]**2 + y[i, j]**2 <= 2 * x[i, j] * y[i, j] + 1
    elif icount == 3:
        y = inputs[1]
        z = inputs[2]
        with nogil, parallel():
            for i in prange(shape0):
                for j in prange(shape1):
                    output[i, j] = (sinf(x[i, j])**3 + cosf(y[i, j])**2) >= (cosf(x[i, j]) * sinf(y[i, j]) + z[i, j])
    return

In [14]:
# Evaluate expressions for cython
for n, expr in enumerate(exprs):
    print(f"*** Evaluating expression: {expr} ...")
    npres = np.empty_like(npx, dtype=np.bool_)
    ne.evaluate(expr, vardict, out=npres)

    inputs, npinputs = (x,), (npx,)
    if n == 1:
        inputs, npinputs = (x, y), (npx, npy)
    elif n == 2:
        inputs, npinputs = (x, y, z), (npx, npy, npz)

    expr_ = blosc2.lazyudf(udf_cython, inputs, np.bool_, chunked_eval=False,
                           chunks=chunks, blocks=blocks)
    # actual benchmark
    # eval() uses the udf function as a prefilter
    t0 = time()
    res = expr_.eval()
    print("LazyUDF+eval+cython took %.3f s" % (time() - t0))
    np.testing.assert_equal(res[...], npres)
    # getitem uses the same compiled function but as a postfilter
    t0 = time()
    res = expr_[:]
    print("LazyUDF+getitem+cython took %.3f s" % (time() - t0))
    np.testing.assert_equal(res[...], npres)

    expr_ = blosc2.lazyudf(udf_cython, inputs, np.bool_, chunked_eval=True,
                           chunks=chunks, blocks=blocks)
    # getitem but using chunked evaluation
    t0 = time()
    res = expr_.eval()
    print("LazyUDF+chunked_eval+cython took %.3f s" % (time() - t0))
    np.testing.assert_equal(res[...], npres)
    t0 = time()
    res = expr_[:]
    print("LazyUDF+getitem+chunked_eval+cython took %.3f s" % (time() - t0))
    np.testing.assert_equal(res[...], npres)
