## Upgraded version of MPDist

In [1]:
import numpy as np
from numba import njit

In [2]:
@njit(cache=True, fastmath=True)
def mpdist(
    x: np.ndarray,
    y: np.ndarray,
    m: int = 0
) -> float:
    """
    Matrix Profile Distance
    
    Parameters
    ----------
    x : np.ndarray
        First time series, univariate, shape ``(n_timepoints,)``
    y : np.ndarray
        Second time series, univariate, shape ``(n_timepoints,)``
    m : int (default = 0)
        Length of the subsequence

    Returns
    -------
    float
        Matrix Profile distance between x and y

    Raises
    ------
    ValueError
        If x and y are not 1D arrays

    Examples
    --------
    >>> import numpy as np
    >>> from aeon.distances import euclidean_distance
    >>> x = np.array([[1, 2, 3, 4, 5, 6, 7, 8, 9, 10]])
    >>> y = np.array([[11, 12, 13, 14, 15, 16, 17, 18, 19, 20]])
    >>> mpdist(x, y)
    31.622776601683793
    """
    if x.ndim == 1 and y.ndim == 1:
        return _mpdist(x,y,m)
    raise ValueError("x and y must be a 1D array of shape (n_timepoints,)")

In [3]:
@njit(cache=True, fastmath=True)
def _mpdist(
    x: np.ndarray,
    y: np.ndarray,
    m: int
)-> float:
    threshold = 0.05

    mp_ab, ip_ab = _stomp_ab(x,y,m)  # AB Matrix profile
    mp_ba, ip_ba = _stomp_ab(y,x,m)  # BA Matrix profile
    
    join_mp = np.concatenate([mp_ab, mp_ba])

    k = int(np.ceil(threshold * (len(x) + len(y))))

    sorted_mp = np.sort(join_mp)

    if len(sorted_mp) > k:
        dist = sorted_mp[k]
    else:
        dist = sorted_mp[len(sorted_mp) - 1]

    return dist

In [4]:
@njit(cache=True, fastmath=True)
def _stomp_ab(x: np.ndarray, y: np.ndarray, m: int):
    """
    STOMP implementation for AB similarity join.

    Parameters
    ----------
        x: numpy.array
            First time series.
        y: numpy.array
            Second time series.
        m: int
            Length of the subsequences.

    Output
    ------
        mp: numpy.array
            Array with the distance between every subsequence from x
            to the nearest subsequence with same length from y.
        ip: numpy.array
            Array with the index of the nearest neighbor of x in y.
    """
    len_x = len(x)
    len_y = len(y)
    if m == 0:
        if len_x > len_y:
            m = int(len_x / 4)
        else:
            m = int(len_y / 4)
    subs_x = len_x - m + 1
    subs_y = len_y - m + 1

    x_mean = []
    x_std = []
    y_mean = []
    y_std = []

    for i in range(subs_x):
        x_mean.append(np.mean(x[i : i + m]))
        x_std.append(np.std(x[i : i + m]))

    for i in range(subs_y):
        y_mean.append(np.mean(y[i : i + m]))
        y_std.append(np.std(y[i : i + m]))

    # Compute the distance profile for the first y subsequence
    first_dot_prod = _sliding_dot_products(y[0:m], x, m, len_x)

    # Initialization
    mp = np.full(subs_x, float("inf"))  # matrix profile
    ip = np.zeros(subs_x)  # index profile

    # Compute the distance profile for the first x subsequence
    dot_prod = _sliding_dot_products(x[0:m], y, m, len_y)
    dp = _calculate_distance_profile(
        dot_prod, x_mean[0], x_std[0], y_mean, y_std, m, subs_y
    )

    # Update the matrix profile
    mp[0] = np.amin(dp)
    ip[0] = np.argmin(dp)

    for i in range(1, subs_x):
        for j in range(subs_y - 1, 0, -1):
            dot_prod[j] = (
                dot_prod[j - 1] - y[j - 1] * x[i - 1] + y[j - 1 + m] * x[i - 1 + m]
            )

            # Compute the next dot products using previous ones
        dot_prod[0] = first_dot_prod[i]
        dp = _calculate_distance_profile(
            dot_prod, x_mean[i], x_std[i], y_mean, y_std, m, subs_y
        )
        mp[i] = np.amin(dp)
        ip[i] = np.argmin(dp)

    return mp, ip

In [5]:

def _sliding_dot_products(q, t, len_q, len_t):
    """
    Compute the sliding dot products between a query and a time series.

    Parameters
    ----------
        q: numpy.array
            Query.
        t: numpy.array
            Time series.
        len_q: int
            Length of the query.
        len_t: int
            Length of the time series.

    Output
    ------
        dot_prod: numpy.array
             Sliding dot products between q and t.
    """
    # Reversing query and padding both query and time series
    padded_t = np.pad(t, (0, len_t))
    reversed_q = np.flipud(q)
    padded_reversed_q = np.pad(reversed_q, (0, 2 * len_t - len_q))

    # Applying FFT to both query and time series
    fft_t = np.fft.fft(padded_t)
    fft_q = np.fft.fft(padded_reversed_q)

    # Applying inverse FFT to obtain the convolution of the time series by
    # the query
    element_wise_mult = np.multiply(fft_t, fft_q)
    inverse_fft = np.fft.ifft(element_wise_mult)

    # Returns only the valid dot products from inverse_fft
    dot_prod = inverse_fft[len_q - 1 : len_t].real

    return dot_prod

In [5]:
from numba import njit, complex128, float64
import numpy as np

@njit(cache=True, fastmath=True)
def custom_pad(arr, pad_width, constant_values=0):
    """
    Custom padding function to pad the array.

    Parameters:
        arr (numpy.array): Array to be padded.
        pad_width (tuple of tuples): Padding width for each dimension.
        constant_values: Constant values used for padding.

    Returns:
        numpy.array: Padded array.
    """
    padded_shape = tuple((i + sum(pad) for i, pad in zip(arr.shape, pad_width)))
    padded_arr = np.empty(padded_shape, dtype=arr.dtype)
    padded_arr.fill(constant_values)
    padded_arr[
        tuple(slice(pad[0], pad[0] + size) for pad, size in zip(pad_width, arr.shape))
    ] = arr
    return padded_arr

@njit(cache=True, fastmath=True)
def fft(arr):
    """
    Custom FFT implementation using numpy.

    Parameters:
        arr (numpy.array): Input array.

    Returns:
        numpy.array: FFT of the input array.
    """
    N = len(arr)
    if N <= 1:
        return arr
    else:
        even = fft(arr[::2])
        odd = fft(arr[1::2])
        T = [np.exp(-2j * np.pi * k / N) * odd[k] for k in range(N // 2)]
        return [even[k] + T[k] for k in range(N // 2)] + [even[k] - T[k] for k in range(N // 2)]

@njit(cache=True, fastmath=True)
def ifft(arr):
    """
    Custom Inverse FFT implementation using numpy.

    Parameters:
        arr (numpy.array): Input array.

    Returns:
        numpy.array: Inverse FFT of the input array.
    """
    conj_arr = np.conjugate(arr)
    fft_arr = fft(conj_arr)
    return np.conjugate(fft_arr) / len(arr)

@njit(cache=True, fastmath=True)
def _sliding_dot_products(q, t, len_q, len_t):
    """
    Compute the sliding dot products between a query and a time series.

    Parameters:
        q (numpy.array): Query.
        t (numpy.array): Time series.
        len_q (int): Length of the query.
        len_t (int): Length of the time series.

    Returns:
        numpy.array: Sliding dot products between q and t.
    """
    # Reversing query and padding both query and time series
    padded_t = custom_pad(t, (0, len_t))
    reversed_q = q[::-1]  # Reverse the query
    padded_reversed_q = custom_pad(reversed_q, (0, 2 * len_t - len_q))

    # Applying FFT to both query and time series
    fft_t = fft(padded_t)
    fft_q = fft(padded_reversed_q)

    # Applying inverse FFT to obtain the convolution of the time series by the query
    element_wise_mult = np.multiply(fft_t, fft_q)
    inverse_fft = ifft(element_wise_mult)

    # Returns only the valid dot products from inverse_fft
    dot_prod = inverse_fft[len_q - 1 : len_t].real

    return dot_prod


In [6]:
@njit(cache=True, fastmath=True)
def _calculate_distance_profile(
    dot_prod, q_mean, q_std, t_mean, t_std, q_len, n_t_subs
):
    """
    Calculate the distance profile for the given query.

    Parameters
    ----------
        dot_prod: numpy.array
            Sliding dot products between the time series and the query.
        q_mean: float
            Mean of the elements of the query.
        q_std: float
            Standard deviation of elements of the query.
        t_mean: numpy.array
            Array with the mean of the elements from each subsequence of
            length(query) from the time series.
        t_std: numpy.array
            Array with the standard deviation of the elements from each
            subsequence of length(query) from the time series.
        q_len: int
            Length of the query.
        n_t_subs: int
            Number of subsequences in the time series.

    Output
    ------
        d: numpy.array
            Distance profile of query q.
    """
    for i in range(0, n_t_subs):
        d = 2 * q_len * (1 - ((dot_prod[i] - q_len * q_mean * t_mean[i]) 
                          / (q_len * q_std * t_std[i])))
    
    d = np.absolute(d)
    d = np.sqrt(d)

    return d

## Analysing the code

In [7]:
import time

In [8]:
x = np.array([1, 2, 3, 5, 9, 16, 23, 19, 13, 7])
y = np.array([3, 7, 13, 19, 23, 31, 36, 40, 48, 55, 63])
m = 3

In [9]:
start = time.perf_counter()
mp_dist = mpdist(x,y,m)
print(mp_dist)
end = time.perf_counter()
print("Elapsed (with compilation) = {}s".format((end - start)))

TypingError: Failed in nopython mode pipeline (step: nopython frontend)
[1m[1m[1m[1mFailed in nopython mode pipeline (step: nopython frontend)
[1m[1m[1m[1m[1m[1mFailed in nopython mode pipeline (step: nopython frontend)
[1mUntyped global name '_sliding_dot_products':[0m [1m[1mCannot determine Numba type of <class 'function'>[0m
[1m
File "C:\Users\Divya Tiwari\AppData\Local\Temp\ipykernel_6544\1355812272.py", line 47:[0m
[1mdef _stomp_ab(x: np.ndarray, y: np.ndarray, m: int):
    <source elided>
    # Compute the distance profile for the first y subsequence
[1m    first_dot_prod = _sliding_dot_products(y[0:m], x, m, len_x)
[0m    [1m^[0m[0m
[0m
[0m[1mDuring: resolving callee type: type(CPUDispatcher(<function _stomp_ab at 0x0000020230A73C40>))[0m
[0m[1mDuring: typing of call at C:\Users\Divya Tiwari\AppData\Local\Temp\ipykernel_6544\2366255758.py (9)
[0m
[0m[1mDuring: resolving callee type: type(CPUDispatcher(<function _stomp_ab at 0x0000020230A73C40>))[0m
[0m[1mDuring: typing of call at C:\Users\Divya Tiwari\AppData\Local\Temp\ipykernel_6544\2366255758.py (10)
[0m
[0m[1mDuring: resolving callee type: type(CPUDispatcher(<function _stomp_ab at 0x0000020230A73C40>))[0m
[0m[1mDuring: typing of call at C:\Users\Divya Tiwari\AppData\Local\Temp\ipykernel_6544\2366255758.py (9)
[0m
[1m
File "C:\Users\Divya Tiwari\AppData\Local\Temp\ipykernel_6544\2366255758.py", line 9:[0m
[1mdef _mpdist(
    <source elided>

[1m    mp_ab, ip_ab = _stomp_ab(x,y,m)  # AB Matrix profile
[0m    [1m^[0m[0m

[0m[1mDuring: resolving callee type: type(CPUDispatcher(<function _mpdist at 0x0000020230A73880>))[0m
[0m[1mDuring: typing of call at C:\Users\Divya Tiwari\AppData\Local\Temp\ipykernel_6544\1639383497.py (39)
[0m
[0m[1mDuring: resolving callee type: type(CPUDispatcher(<function _mpdist at 0x0000020230A73880>))[0m
[0m[1mDuring: typing of call at C:\Users\Divya Tiwari\AppData\Local\Temp\ipykernel_6544\1639383497.py (39)
[0m
[1m
File "C:\Users\Divya Tiwari\AppData\Local\Temp\ipykernel_6544\1639383497.py", line 39:[0m
[1mdef mpdist(
    <source elided>
    if x.ndim == 1 and y.ndim == 1:
[1m        return _mpdist(x,y,m)
[0m        [1m^[0m[0m
