In [1]:
%load_ext cython

In [2]:
import numpy as np

In [3]:
def sparc(movement, fs, padlevel=4, fc=10.0, amp_th=0.05):
    """
    Calcualtes the smoothness of the given speed profile using the modified
    spectral arc length metric.
    Parameters
    ----------
    movement : np.array
               The array containing the movement speed profile.
    fs       : float
               The sampling frequency of the data.
    padlevel : integer, optional
               Indicates the amount of zero padding to be done to the movement
               data for estimating the spectral arc length. [default = 4]
    fc       : float, optional
               The max. cut off frequency for calculating the spectral arc
               length metric. [default = 10.]
    amp_th   : float, optional
               The amplitude threshold to used for determing the cut off
               frequency upto which the spectral arc length is to be estimated.
               [default = 0.05]
    Returns
    -------
    sal      : float
               The spectral arc length estimate of the given movement's
               smoothness.
    (f, Mf)  : tuple of two np.arrays
               This is the frequency(f) and the magntiude spectrum(Mf) of the
               given movement data. This spectral is from 0. to fs/2.
    (f_sel, Mf_sel) : tuple of two np.arrays
                      This is the portion of the spectrum that is selected for
                      calculating the spectral arc length.
    Notes
    -----
    This is the modfieid spectral arc length metric, which has been tested only
    for discrete movements.
    Examples
    --------
    >>> t = np.arange(-1, 1, 0.01)
    >>> move = np.exp(-5*pow(t, 2))
    >>> sal, _, _ = sparc(move, fs=100.)
    >>> '%.5f' % sal
    '-1.41403'
    """
    # Number of zeros to be padded.
    nfft = int(pow(2, np.ceil(np.log2(len(movement))) + padlevel))

    # Frequency
    f = np.arange(0, fs, fs / nfft)
    # Normalized magnitude spectrum
    Mf = abs(np.fft.fft(movement, nfft))
    Mf = Mf / max(Mf)

    # Indices to choose only the spectrum within the given cut off frequency
    # Fc.
    # NOTE: This is a low pass filtering operation to get rid of high frequency
    # noise from affecting the next step (amplitude threshold based cut off for
    # arc length calculation).
    fc_inx = ((f <= fc) * 1).nonzero()
    f_sel = f[fc_inx]
    Mf_sel = Mf[fc_inx]

    # Choose the amplitude threshold based cut off frequency.
    # Index of the last point on the magnitude spectrum that is greater than
    # or equal to the amplitude threshold.
    inx = ((Mf_sel >= amp_th) * 1).nonzero()[0]
    fc_inx = range(inx[0], inx[-1] + 1)
    f_sel = f_sel[fc_inx]
    Mf_sel = Mf_sel[fc_inx]

    # Calculate arc length
    new_sal = -sum(
        np.sqrt(
            pow(
                np.diff(f_sel) / (f_sel[-1] - f_sel[0]), 
                2
            ) + pow(
                np.diff(Mf_sel), 
                2
            )
        )
    )
    return new_sal

In [4]:
%%cython
# cython: infer_types = True
# cython: boundscheck = False
# cython: wraparound = False
cimport cython
from numpy import zeros, double as npy_double, arange, abs as npy_abs, array
from numpy.fft import rfft
from libc.math cimport log2, sqrt, ceil, abs, floor


cpdef sparc_1d(const double[:] x, double fsample, int padlevel, double fcut,
            double amp_thresh):
    cdef Py_ssize_t n = x.size, j, ixf, ixa0 = 0, ixa
    cdef double sal = 0., frange

    cdef int nfft = 2**(<int>(ceil(log2(n)) + padlevel))
    ixf = <Py_ssize_t>(floor(fcut / fsample * (nfft-1)))
    cdef double[:] freq = arange(0, fsample, fsample / nfft)

    # normalized magnitude spectrum
    cdef double[:] Mf = npy_abs(rfft(x, n=nfft))
    cdef double max_Mf = 0.
    for j in range(ixf + 1):
        if Mf[j] > max_Mf:
            max_Mf = Mf[j]
    amp_thresh *= max_Mf

    # indices to choose only the spectrum within the given cutoff frequency fcut
    # NOTE: this is a low pass filtering operation to get rid of high frequency
    # noise from affecting the next step (amplitude threshold based cutoff for
    # arc length calculation)
    ixa = ixf
    while Mf[ixa0] < amp_thresh and ixa0 < nfft:
        ixa0 += 1
    while Mf[ixa] < amp_thresh and ixa > 0:
        ixa -= 1

    frange = freq[ixa] - freq[ixa0]

    for j in range(ixa0+1, ixa+1):
        sal -= sqrt(((freq[j] - freq[j-1]) / frange)**2
                    + ((Mf[j] - Mf[j-1]) / max_Mf)**2)

    return sal


cpdef SPARC(const double[:, :, :] x, double fsample, int padlevel, double fcut,
               double amp_thresh):
    cdef Py_ssize_t m = x.shape[0], p = x.shape[2], i, k

    sparclen = zeros((m, p), dtype=npy_double, order='C')
    cdef double[:, ::1] sal = sparclen

    for i in range(m):
        for k in range(p):
            sal[i, k] = sparc_1d(x[i, :, k], fsample, padlevel, fcut, amp_thresh)

    return sparclen

In [5]:
from sparc import sparc as fsparc, sparc2 as fsparc2, sparc3 as fsparc3
# from sparc2 import sparc as fsparc2, sparc_1d as fsparc2_1d

In [13]:
np.random.seed(5)
x = np.random.rand(50000, 150, 3)
xf = np.asfortranarray(x.transpose([1, 2, 0]))

t = np.arange(0, 150/50, 1/50)

fs = 50.0
cut = 10.0
pad = 2
thresh = 0.05
nfft = int(pow(2, np.ceil(np.log2(150)) + pad))

In [14]:
np.allclose(
    fsparc(xf[:, :, :500], fs, pad, cut, thresh).T, 
    SPARC(x[:500, :, :], fs, pad, cut, thresh)
)

True

In [15]:
np.allclose(
    fsparc3(xf[:, :, :500], fs, pad, cut, thresh).T, 
    SPARC(x[:500, :, :], fs, pad, cut, thresh)
)

True

In [16]:
%timeit SPARC(x, fs, pad, cut, thresh)

3.86 s ± 5.25 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)


In [17]:
%timeit fsparc(xf, fs, pad, cut, thresh)

1.16 s ± 214 µs per loop (mean ± std. dev. of 7 runs, 1 loop each)


In [18]:
%timeit fsparc2(xf, fs, pad, cut, thresh)

1.2 s ± 1.59 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)


In [19]:
%timeit fsparc3(xf, fs, pad, cut, thresh)

1.14 s ± 364 µs per loop (mean ± std. dev. of 7 runs, 1 loop each)


In [None]:
sparc(x[0, :, 0], fs, pad, cut, thresh)

In [None]:
freq = np.linspace(0, 0.5, 32, endpoint=False) * 50.0

In [None]:
(freq[6] - freq[5]) / (freq[8] - freq[1])

In [None]:
1 / 7

In [19]:
%matplotlib widget

In [20]:
import matplotlib.pyplot as plt

In [24]:
y = np.random.rand(150)

freq, res = {}, {}
for n in 256, 512, 1024, 2048, 4096, 8192:
    freq[n] = np.fft.rfftfreq(n)
    res[n] = np.abs(np.fft.rfft(y, n))

In [26]:
f, ax = plt.subplots(figsize=(10, 6))

for n in freq:
    ax.plot(freq[n], res[n], label=f'nfft={n}')

ax.legend()

Canvas(toolbar=Toolbar(toolitems=[('Home', 'Reset original view', 'home', 'home'), ('Back', 'Back to previous …

<matplotlib.legend.Legend at 0x7f45970f3f70>