Reference impl. from Wikipedia:

In [1]:
from itertools import combinations
from math import log

def construct_templates(timeseries_data:list, m:int=2):
    num_windows = len(timeseries_data) - m + 1
    return [timeseries_data[x:x+m] for x in range(0, num_windows)]

def get_matches(templates:list, r:float):
    return len(list(filter(lambda x: is_match(x[0], x[1], r), combinations(templates, 2))))

def is_match(template_1:list, template_2:list, r:float):
    return all([abs(x - y) < r for (x, y) in zip(template_1, template_2)])

def sample_entropy(timeseries_data:list, window_size:int, r:float):
    B = get_matches(construct_templates(timeseries_data, window_size), r)
    A = get_matches(construct_templates(timeseries_data, window_size+1), r)
    return -log(A/B)

Naive port following the reference logic:

In [2]:
import numpy as np
np.__version__


'1.26.4'

In [3]:
import numba
import numpy as np

In [4]:
@numba.njit()
def construct_templates_(x, m=2):
    res = np.empty((x.size - m + 1, m), dtype=x.dtype.type)
    for i in range(res.shape[0]):
        res[i] = x[i:i+m]
    return res

@numba.njit()
def is_match_(a, b, r):
    return np.all(np.abs(a - b) < r)

@numba.njit()
def get_matches_(t, r):
    res = 0
    for i in range(t.shape[0] - 1):
        for j in range(i+1, t.shape[0]):
            if is_match_(t[i], t[j], r):
                res += 1
    return res

@numba.njit()
def sample_entropy_(x, w, r):
    B = get_matches_(construct_templates_(x, w), r)
    A = get_matches_(construct_templates_(x, w+1), r)
    return -np.log(A/B)

In [5]:
x_ = np.arange(1000) - 500
x = x_.tolist()

In [6]:
%time sample_entropy(x, 4, 2)

CPU times: user 2.45 s, sys: 413 µs, total: 2.45 s
Wall time: 2.5 s


0.0010045204260054762

In [7]:
sample_entropy_(x_, 4, 2)  # compile before timing
%time sample_entropy_(x_, 4, 2)

CPU times: user 116 ms, sys: 46 µs, total: 116 ms
Wall time: 116 ms


0.0010045204260054762

Time with longer vector:

In [8]:
x_ = np.random.randn(2000)
x = x_.tolist()

In [9]:
%time sample_entropy(x, 3, 1)

CPU times: user 8.18 s, sys: 19.6 ms, total: 8.2 s
Wall time: 8.67 s


0.665893330822177

In [10]:
sample_entropy_(x_, 3, 1)
%time sample_entropy_(x_, 3, 1)

CPU times: user 456 ms, sys: 25 µs, total: 456 ms
Wall time: 458 ms


0.665893330822177

OK, just compiling yields ~10x faster code.  

The next obvious optimisation is to return early in `is_match` (upon the first pair of template elements that do not meet the condition):

In [11]:
@numba.njit()
def construct_templates_(x, m=2):
    res = np.empty((x.size - m + 1, m), dtype=x.dtype.type)
    for i in range(res.shape[0]):
        res[i] = x[i:i+m]
    return res

@numba.njit()
def is_match_2(a, b, r):
    for i in range(a.size):
        if np.abs(a[i] - b[i]) >= r:
            return False
    return True

@numba.njit()
def get_matches_2(t, r):
    res = 0
    for i in range(t.shape[0] - 1):
        for j in range(i+1, t.shape[0]):
            if is_match_2(t[i], t[j], r):
                res += 1
    return res

@numba.njit()
def sample_entropy_2(x, w, r):
    B = get_matches_2(construct_templates_(x, w), r)
    A = get_matches_2(construct_templates_(x, w+1), r)
    return -np.log(A/B)

In [12]:
sample_entropy_(x_, 3, 1)
%time sample_entropy_(x_, 3, 1)

CPU times: user 481 ms, sys: 55 µs, total: 481 ms
Wall time: 486 ms


0.665893330822177

In [13]:
sample_entropy_2(x_, 3, 1)
%time sample_entropy_2(x_, 3, 1)

CPU times: user 46 ms, sys: 0 ns, total: 46 ms
Wall time: 47.9 ms


0.665893330822177

Cool, another 10x. The last obvious step is to parallelise `get_matches`. This will require a bit of extra code, as 1) we want to evenly distribute computation over a triangular matrix and 2) avoid a race condition. 

In [14]:
@numba.njit()
def dumb_load_balancer(n, cores):
    o = ((n**2 - n) // 2)  # total number of obs. in the triangle
    r = np.cumsum(np.arange(n)[::-1])  # total number of obs. up to row
    res = np.zeros(cores+1, dtype=np.uint32)  # row indices
    crit = o//cores  # criterion for using given row index as the boundary
    cnt = 1
    for i, e in enumerate(r):
        if e >= crit:
            res[cnt] = i
            crit += o//cores
            cnt += 1
    res[-1] = n
    return res

In [15]:
@numba.njit(parallel=True)
def get_matches_p(t, r):
    ix = dumb_load_balancer(t.shape[0], numba.config.NUMBA_NUM_THREADS)
    res = np.zeros(ix.size - 1)
    for core in numba.prange(ix.size-1):
        for i in range(ix[core], ix[core+1]):
            for j in range(i+1, t.shape[0]):
                if is_match_2(t[i], t[j], r):
                    res[core] += 1
    return np.sum(res)

@numba.njit()
def sample_entropy_p(x, w, r):
    B = get_matches_p(construct_templates_(x, w), r)
    A = get_matches_p(construct_templates_(x, w+1), r)
    return -np.log(A/B)

In [16]:
x = np.random.randn(10_000)
sample_entropy_p(x, 3, 1.)  # compile before timing
%time sample_entropy_p(x, 3, 1.)

  B = get_matches_p(construct_templates_(x, w), r)


CPU times: user 1.77 s, sys: 46 µs, total: 1.77 s
Wall time: 891 ms


0.6533255792439661

In [17]:
sample_entropy_2(x, 3, 1.)  # compile before timing
%time sample_entropy_2(x, 3, 1.)

CPU times: user 1.09 s, sys: 4.05 ms, total: 1.1 s
Wall time: 1.1 s


0.6533255792439661

In [18]:
%time sample_entropy_(x, 3, 1.)

CPU times: user 15.2 s, sys: 28.4 ms, total: 15.2 s
Wall time: 15.5 s


0.6533255792439661

In [19]:
numba.config.NUMBA_NUM_THREADS

4

This notebook was run on a 12 core machine; with this level of parallelism we get about 500x improvement over the reference impl.