In [1]:
import torch
import torch.nn as nn
import torch.nn.functional as F

import tsfel.feature_extraction as tsf

import numpy as np

import torchaudio

In [2]:
X = np.random.randn(100)
Y = torch.Tensor(X)

In [25]:
def compute_time(signal, fs):
    return torch.arange(0, len(signal)) / fs
np.isclose(compute_time(Y, 5), tsf.compute_time(X, 5)).all()

np.True_

In [24]:
compute_time(Y, 1, dim=1)

tensor([ 0.,  1.,  2.,  3.,  4.,  5.,  6.,  7.,  8.,  9., 10., 11., 12., 13.,
        14., 15., 16., 17., 18., 19., 20., 21., 22., 23., 24., 25.])

In [27]:
def calc_centroid(signal, fs):
    time = compute_time(signal, fs)
    energy = signal ** 2
    t_energy = torch.dot(time, energy)
    energy_sum = torch.sum(energy)
    if energy_sum == 0 or t_energy == 0:
        centroid = 0
    else:
        centroid = t_energy / energy_sum
    return centroid
print(calc_centroid(Y, 5))
np.isclose(calc_centroid(Y, 5), tsf.calc_centroid(X, 5)).all()

tensor(9.6619)


np.True_

In [28]:
def negative_turning(signal):
    diff_sig = torch.diff(signal)
    array_signal = torch.arange(len(diff_sig[:-1]))
    negative_turning_pts = torch.where((diff_sig[array_signal] < 0) & (diff_sig[array_signal + 1] > 0))[0]
    return len(negative_turning_pts)
print(negative_turning(Y))
np.isclose(negative_turning(Y), tsf.negative_turning(X)).all()

33


np.True_

In [29]:
def positive_turning(signal):
    diff_sig = torch.diff(signal)
    array_signal = torch.arange(len(diff_sig[:-1]))
    positive_turning_pts = torch.where((diff_sig[array_signal + 1] < 0) & (diff_sig[array_signal] > 0))[0]
    return len(positive_turning_pts)
print(positive_turning(Y))
np.isclose(positive_turning(Y), tsf.positive_turning(X)).all()

32


np.True_

In [38]:
def mean_abs_diff(signal):
    return torch.mean(torch.abs(torch.diff(signal)))
print(mean_abs_diff(Y))
np.isclose(mean_abs_diff(Y), tsf.mean_abs_diff(X)).all()

tensor(1.4356)


np.True_

In [39]:
def mean_diff(signal):
    return torch.mean(torch.diff(signal))
print(mean_diff(Y))
np.isclose(mean_diff(Y), tsf.mean_diff(X)).all()

tensor(0.0027)


np.True_

In [37]:
def median_abs_diff(signal):
    return torch.median(torch.abs(torch.diff(signal)))
print(median_abs_diff(Y))
np.isclose(median_abs_diff(Y), tsf.median_abs_diff(X)).all()

tensor(1.1030)


np.True_

In [36]:
def median_diff(signal):
    return torch.median(torch.diff(signal))
print(median_diff(Y))
np.isclose(median_diff(Y), tsf.median_diff(X)).all()

tensor(-0.0069)


np.True_

In [41]:
def distance(signal):
    diff_sig = torch.diff(signal)
    return torch.sum(torch.sqrt(1 + diff_sig**2))
print(distance(Y))
np.isclose(distance(Y), tsf.distance(X)).all()

tensor(183.9422)


np.True_

In [34]:
def sum_abs_diff(signal):
    return torch.sum(torch.abs(torch.diff(signal)))
print(sum_abs_diff(Y))
np.isclose(sum_abs_diff(Y), tsf.sum_abs_diff(X)).all()

tensor(142.1246)


np.True_

In [33]:
def zero_cross(signal):
    return len(torch.where(torch.diff(torch.sign(signal)))[0])
print(zero_cross(Y))
np.isclose(zero_cross(Y), tsf.zero_cross(X)).all()

54


np.True_

In [None]:
def slope(signal):
    t = np.linspace(0, len(signal) - 1, len(signal))
    return np.polyfit(t, signal, 1)[0]
print(slope(Y))
np.isclose(slope(Y), tsf.slope(X)).all()

In [32]:
def auc(signal, fs):
    t = compute_time(signal, fs)
    return torch.sum(0.5 * torch.diff(t) * torch.abs(signal[:-1] + signal[1:]))
print(auc(Y, 5))
np.isclose(auc(Y, 5), tsf.auc(X, 5)).all()

tensor(12.5686)


np.True_

In [42]:
def neighbourhood_peaks(signal, n=10):
    signal = np.array(signal)
    subsequence = signal[n:-n]
    # initial iteration
    peaks = (subsequence > torch.roll(signal, 1)[n:-n]) & (subsequence > torch.roll(signal, -1)[n:-n])
    for i in torch.arange(2, n + 1):
        peaks &= subsequence > torch.roll(signal, i)[n:-n]
        peaks &= subsequence > torch.roll(signal, -i)[n:-n]
    return torch.sum(peaks)
print(auc(Y, 5))
np.isclose(auc(Y, 5), tsf.auc(X, 5)).all()

tensor(12.5686)


np.True_

In [47]:
# hard to parallel
def calc_lempel_ziv_complexity(sequence):
    sub_strings = set()
    ind = 0
    inc = 1
    while True:
        if ind + inc > len(sequence):
            break
        sub_str = sequence[ind : ind + inc]
        if sub_str in sub_strings:
            inc += 1
        else:
            sub_strings.add(sub_str)
            ind += inc
            inc = 1
    return len(sub_strings) / len(sequence)
def lempel_ziv(signal, threshold=None):
    if threshold is None:
        threshold = np.mean(signal)
    binary_signal = (signal > threshold).int()
    string_binary_signal = "".join(map(str, binary_signal))
    lz_index = calc_lempel_ziv_complexity(string_binary_signal)
    return lz_index
print(lempel_ziv(Y, 5), tsf.lempel_ziv(X, 5))
np.isclose(lempel_ziv(Y, 5), tsf.lempel_ziv(X, 5)).all()

0.1332142857142857 0.13


np.False_

In [49]:
def abs_energy(signal):
    return torch.sum(torch.abs(signal) ** 2)
print(abs_energy(Y), tsf.abs_energy(X))
np.isclose(abs_energy(Y), tsf.abs_energy(X)).all()

tensor(140.8801) 140.88013648664017


np.True_

In [50]:
def average_power(signal, fs):
    time = compute_time(signal, fs)
    return torch.sum(signal ** 2) / (time[-1] - time[0])
print(average_power(Y, 5), tsf.average_power(X, 5))
np.isclose(average_power(Y, 5), tsf.average_power(X, 5)).all()

tensor(7.1152) 7.11515840841617


np.True_

In [86]:
# cross entropy
def create_xx(features):
    if max(features) < 0:
        max_f = -torch.max(features,dim=0)[0]
        min_f = torch.min(features,dim=0)[0]
    else:
        min_f = torch.min(features,dim=0)[0]
        max_f = torch.max(features,dim=0)[0]
    if torch.min(features,dim=0)[0] == torch.max(features,dim=0)[0]:
        xx = torch.linspace(min_f, min_f + 10, len(features))
    else:
        xx = torch.linspace(min_f, max_f, len(features))
    return xx

def gaussian(features):
    xx = create_xx(features)
    std_value = torch.std(features)
    mean_value = torch.mean(features)
    if std_value == 0:
        return 0.0
    pdf_gauss = torch.distributions.normal.Normal(mean_value, std_value).log_prob(xx).exp()
    return pdf_gauss / torch.sum(pdf_gauss)

def entropy(signal, prob="gauss"):
    if prob == "standard":
        value, counts = torch.unique(signal, return_counts=True)
        p = counts / counts.sum()
    elif prob == "gauss":
        p = gaussian(signal)
    if torch.sum(p) == 0:
        return 0.0
    # Handling zero probability values
    p = p[torch.where(p != 0)]
    # If probability all in one value, there is no entropy
    if np.log2(len(signal)) == 1:
        return 0.0
    elif torch.sum(p * np.log2(p)) / np.log2(len(signal)) == 0:
        return 0.0
    else:
        return -torch.sum(p * np.log2(p)) / np.log2(len(signal))
print(entropy(Y), tsf.entropy(X))
np.isclose(entropy(Y), tsf.entropy(X)).all()

tensor(0.9429) 1.0


  elif torch.sum(p * np.log2(p)) / np.log2(len(signal)) == 0:
  return -torch.sum(p * np.log2(p)) / np.log2(len(signal))


np.False_

In [51]:
def hist_mode(signal, nbins=10):
    hist_values, bin_edges = np.histogram(signal, bins=nbins)
    max_bin_idx = np.argmax(hist_values)
    mode_value = (bin_edges[max_bin_idx] + bin_edges[max_bin_idx + 1]) / 2.0
    return mode_value
print(hist_mode(Y, 5), tsf.hist_mode(X, 5))
np.isclose(hist_mode(Y, 5), tsf.hist_mode(X, 5)).all()

-0.25253606 -0.25253615296528875


np.True_

In [52]:
def interq_range(signal):
    return np.percentile(signal, 75) - np.percentile(signal, 25)
print(interq_range(Y), tsf.interq_range(X))
np.isclose(interq_range(Y), tsf.interq_range(X)).all()

1.4366736 1.436673625982386


np.True_

In [87]:
def pk_pk_distance(signal):
    return torch.abs(torch.max(signal, dim=0)[0] - torch.min(signal, dim=0)[0])
print(pk_pk_distance(Y), tsf.pk_pk_distance(X))
np.isclose(pk_pk_distance(Y), tsf.pk_pk_distance(X)).all()

tensor(6.0044) 6.004447725264798


np.True_

In [88]:
def calc_ecdf(signal):
    return torch.sort(signal, dim=0)[0], torch.arange(1, signal.shape[0] + 1) / signal.shape[0]
def ecdf(signal, d=10):
    _, y = calc_ecdf(signal)
    if len(signal) <= d:
        return tuple(y)
    else:
        return tuple(y[:d])
print(ecdf(Y), tsf.ecdf(X))
np.isclose(ecdf(Y), tsf.ecdf(X)).all()

(tensor(0.0100), tensor(0.0200), tensor(0.0300), tensor(0.0400), tensor(0.0500), tensor(0.0600), tensor(0.0700), tensor(0.0800), tensor(0.0900), tensor(0.1000)) (np.float64(0.01), np.float64(0.02), np.float64(0.03), np.float64(0.04), np.float64(0.05), np.float64(0.06), np.float64(0.07), np.float64(0.08), np.float64(0.09), np.float64(0.1))


np.True_

In [90]:
def ecdf_percentile(signal, percentile=None):
    x, y = calc_ecdf(signal)
    if torch.sum(torch.diff(signal)) == 0:
        return signal[0]
    else:
        return torch.max(x[y <= percentile], dim=0)[0]
print(ecdf_percentile(Y, 0.2), tsf.ecdf_percentile(X, 0.2))
np.isclose(ecdf_percentile(Y, 0.2), tsf.ecdf_percentile(X, 0.2)).all()

tensor(-0.9408) -0.9408168183789232


np.True_

In [92]:
def ecdf_slope(signal, p_init=0.5, p_end=0.75):
    # check if signal is constant
    if torch.sum(torch.diff(signal)) == 0:
        return torch.nan
    else:
        x_init = ecdf_percentile(signal, percentile=p_init)
        x_end = ecdf_percentile(signal, percentile=p_end)
        return (p_end - p_init) / (x_end - x_init)
print(ecdf_slope(Y), tsf.ecdf_slope(X))
np.isclose(ecdf_slope(Y), tsf.ecdf_slope(X)).all()

tensor(0.3309) 0.3308885585353737


np.True_

In [94]:
def ecdf_percentile_count(signal, percentile=None):
    x, y = calc_ecdf(signal)
    # check if signal is constant
    if torch.sum(torch.diff(signal)) == 0:
        return signal[0]
    else:
        return x[y <= percentile].shape[0]
print(ecdf_percentile_count(Y, 0.2), tsf.ecdf_percentile_count(X, 0.2))
np.isclose(ecdf_percentile_count(Y, 0.2), tsf.ecdf_percentile_count(X, 0.2)).all()

20 20


np.True_

In [97]:
def calc_fft(signal, fs):
    fmag = torch.abs(torch.fft.rfft(signal))
    f = torch.fft.rfftfreq(len(signal), d=1 / fs)
    return f, fmag

In [99]:
# nor related to signal itself?
def spectral_distance(signal, fs):
    f, fmag = calc_fft(signal, fs)
    cum_fmag = torch.cumsum(fmag)
    # Computing the linear regression
    points_y = torch.linspace(0, cum_fmag[-1], len(cum_fmag))
    return torch.sum(points_y - cum_fmag)
print(ecdf_percentile_count(Y, 5), tsf.ecdf_percentile_count(X, 5))
np.isclose(ecdf_percentile_count(Y, 5), tsf.ecdf_percentile_count(X, 5)).all()

100 100


np.True_

In [None]:
def fundamental_frequency(signal, fs):
    signal = signal - np.mean(signal)
    f, fmag = calc_fft(signal, fs)
    # Finding big peaks, not considering noise peaks with low amplitude
    bp = scipy.signal.find_peaks(fmag, height=max(fmag) * 0.3)[0]
    # # Condition for offset removal, since the offset generates a peak at frequency zero
    bp = bp[bp != 0]
    if not list(bp):
        f0 = 0
    else:
        # f0 is the minimum big peak frequency
        f0 = f[min(bp)]
    return f0
print(fundamental_frequency(Y, 0.2), tsf.fundamental_frequency(X, 0.2))
np.isclose(fundamental_frequency(Y, 0.2), tsf.fundamental_frequency(X, 0.2)).all()

In [None]:
def max_power_spectrum(signal, fs):
    if np.std(signal) == 0:
        return float(max(scipy.signal.welch(signal, fs, nperseg=len(signal))[1]))
    else:
        return float(max(scipy.signal.welch(signal / np.std(signal), fs, nperseg=len(signal))[1]))

In [103]:
def spectral_centroid(signal, fs):
    f, fmag = calc_fft(signal, fs)
    if not torch.sum(fmag):
        return 0
    else:
        return torch.dot(f, fmag / torch.sum(fmag))
print(spectral_centroid(Y, 5), tsf.spectral_centroid(X, 5))
np.isclose(spectral_centroid(Y, 5), tsf.spectral_centroid(X, 5)).all()

tensor(1.3237) 1.323686781664513


np.True_

In [131]:
def spectral_spread(signal, fs):
    f, fmag = calc_fft(signal, fs)
    spect_centroid = spectral_centroid(signal, fs)
    if not torch.sum(fmag):
        return 0
    else:
        return torch.dot(((f - spect_centroid) ** 2), (fmag / torch.sum(fmag))) ** 0.5
print(spectral_spread(Y, 5), tsf.spectral_spread(X, 5))
np.isclose(spectral_spread(Y, 5), tsf.spectral_spread(X, 5)).all()

tensor(0.7100) 0.7099732883827264


np.True_

In [132]:
def max_frequency(signal, fs):
    f, fmag = calc_fft(signal, fs)
    cum_fmag = torch.cumsum(fmag, dim=0)
    try:
        ind_mag = torch.where(cum_fmag > cum_fmag[-1] * 0.95)[0][0]
    except IndexError:
        ind_mag = torch.argmax(cum_fmag)

    return f[ind_mag]
print(max_frequency(Y, 5), tsf.max_frequency(X, 5))
np.isclose(max_frequency(Y, 5), tsf.max_frequency(X, 5)).all()

tensor(2.4000) 2.4000000000000004


np.True_

In [133]:
def median_frequency(signal, fs):
    f, fmag = calc_fft(signal, fs)
    cum_fmag = torch.cumsum(fmag, dim=0)
    try:
        ind_mag = torch.where(cum_fmag > cum_fmag[-1] * 0.50)[0][0]
    except IndexError:
        ind_mag = torch.argmax(cum_fmag)
    f_median = f[ind_mag]
    return f_median
print(median_frequency(Y, 5), tsf.median_frequency(X, 5))
np.isclose(median_frequency(Y, 5), tsf.median_frequency(X, 5)).all()

tensor(1.4000) 1.4000000000000001


np.True_

In [130]:
def spectral_decrease(signal, fs):
    f, fmag = calc_fft(signal, fs)
    fmag_band = fmag[1:]
    len_fmag_band = torch.arange(2, len(fmag) + 1)
    # Sum of numerator
    soma_num = torch.sum((fmag_band - fmag[0]) / (len_fmag_band - 1), axis=0)
    if not torch.sum(fmag_band):
        return 0
    else:
        # Sum of denominator
        soma_den = 1 / torch.sum(fmag_band)

        # Spectral decrease computing
        return soma_den * soma_num
print(spectral_decrease(Y, 5), tsf.spectral_decrease(X, 5))
np.isclose(spectral_decrease(Y, 5), tsf.spectral_decrease(X, 5)).all()

tensor(0.0488) 0.04883853063577662


np.True_

In [129]:
def spectral_kurtosis(signal, fs):
    f, fmag = calc_fft(signal, fs)
    if not spectral_spread(signal, fs):
        return 0
    else:
        spect_kurt = ((f - spectral_centroid(signal, fs)) ** 4) * (fmag / torch.sum(fmag))
        return torch.sum(spect_kurt) / (spectral_spread(signal, fs) ** 4)
print(spectral_kurtosis(Y, 5), tsf.spectral_kurtosis(X, 5))
np.isclose(spectral_kurtosis(Y, 5), tsf.spectral_kurtosis(X, 5)).all()

tensor(1.9127) 1.9126616456922227


np.True_

In [128]:
def spectral_skewness(signal, fs):
    f, fmag = calc_fft(signal, fs)
    spect_centr = spectral_centroid(signal, fs)

    if not spectral_spread(signal, fs):
        return 0
    else:
        skew = ((f - spect_centr) ** 3) * (fmag / torch.sum(fmag))
        return torch.sum(skew) / (spectral_spread(signal, fs) ** 3)
print(spectral_skewness(Y, 5), tsf.spectral_skewness(X, 5))
np.isclose(spectral_skewness(Y, 5), tsf.spectral_skewness(X, 5)).all()

tensor(-0.1278) -0.12781690143011465


np.True_

In [127]:
def spectral_slope(signal, fs):
    f, fmag = calc_fft(signal, fs)
    sum_fmag = fmag.sum()
    dot_ff = (f * f).sum()
    sum_f = f.sum()
    len_f = len(f)
    if not ([f]) or (sum_fmag == 0):
        return 0
    else:
        if not (len_f * dot_ff - sum_f**2):
            return 0
        else:
            num_ = (1 / sum_fmag) * (len_f * torch.sum(f * fmag) - sum_f * sum_fmag)
            denom_ = len_f * dot_ff - sum_f**2
            return num_ / denom_
print(spectral_slope(Y, 5), tsf.spectral_slope(X, 5))
np.isclose(spectral_slope(Y, 5), tsf.spectral_slope(X, 5)).all()

tensor(0.0027) 0.002667394811385087


np.True_

In [126]:
def spectral_variation(signal, fs):
    f, fmag = calc_fft(signal, fs)
    sum1 = torch.sum(fmag[:-1] * fmag[1:])
    sum2 = torch.sum(fmag[1:] ** 2)
    sum3 = torch.sum(fmag[:-1] ** 2)
    if not sum2 or not sum3:
        variation = 1
    else:
        variation = 1 - (sum1 / ((sum2**0.5) * (sum3**0.5)))
    return variation
print(spectral_variation(Y, 5), tsf.spectral_variation(X, 5))
np.isclose(spectral_variation(Y, 5), tsf.spectral_variation(X, 5)).all()

tensor(0.2019) 0.20191787509731118


np.True_

In [125]:
def spectral_positive_turning(signal, fs):
    f, fmag = calc_fft(signal, fs)
    diff_sig = torch.diff(fmag)
    array_signal = torch.arange(len(diff_sig[:-1]))
    positive_turning_pts = torch.where((diff_sig[array_signal + 1] < 0) & (diff_sig[array_signal] > 0))[0]
    return len(positive_turning_pts)
print(spectral_positive_turning(Y, 5), tsf.spectral_positive_turning(X, 5))
np.isclose(spectral_positive_turning(Y, 5), tsf.spectral_positive_turning(X, 5)).all()

16 16


np.True_

In [124]:
def spectral_roll_off(signal, fs):
    f, fmag = calc_fft(signal, fs)
    cum_ff = torch.cumsum(fmag, dim=0)
    value = 0.95 * (torch.sum(fmag))
    return f[torch.where(cum_ff >= value)[0][0]]
print(spectral_roll_off(Y, 5), tsf.spectral_roll_off(X, 5))
np.isclose(spectral_roll_off(Y, 5), tsf.spectral_roll_off(X, 5)).all()

tensor(2.4000) 2.4000000000000004


np.True_

In [123]:
def spectral_roll_on(signal, fs):
    f, fmag = calc_fft(signal, fs)
    cum_ff = torch.cumsum(fmag, dim=0)
    value = 0.05 * (torch.sum(fmag))
    return f[torch.where(cum_ff >= value)[0][0]]
print(spectral_roll_on(Y, 5), tsf.spectral_roll_on(X, 5))
np.isclose(spectral_roll_on(Y, 5), tsf.spectral_roll_on(X, 5)).all()

tensor(0.1500) 0.15000000000000002


np.True_

In [122]:
def human_range_energy(signal, fs):
    f, fmag = calc_fft(signal, fs)
    allenergy = torch.sum(fmag**2)
    if allenergy == 0:
        # For handling the occurrence of Nan values
        return 0.0
    hr_energy = torch.sum(fmag[torch.argmin(torch.abs(0.6 - f)) : torch.argmin(torch.abs(2.5 - f))] ** 2)
    ratio = hr_energy / allenergy
    return ratio
print(human_range_energy(Y, 5), tsf.human_range_energy(X, 5))
np.isclose(human_range_energy(Y, 5), tsf.human_range_energy(X, 5)).all()

tensor(0.7990) 0.7990190450645974


np.True_

In [None]:
def filterbank(signal, fs, pre_emphasis=0.97, nfft=512, nfilt=40):
    """Computes the MEL-spaced filterbank.

    It provides the information about the power in each frequency band.

    Implementation details and description on:
    https://www.kaggle.com/ilyamich/mfcc-implementation-and-tutorial
    https://haythamfayek.com/2016/04/21/speech-processing-for-machine-learning.html#fnref:1

    Parameters
    ----------
    signal : nd-array
        Input from which filterbank is computed
    fs : float
        Sampling frequency
    pre_emphasis : float
        Pre-emphasis coefficient for pre-emphasis filter application
    nfft : int
        Number of points of fft
    nfilt : int
        Number of filters

    Returns
    -------
    nd-array
        MEL-spaced filterbank
    """

    # Signal is already a window from the original signal, so no frame is needed.
    # According to the references it is needed the application of a window function such as
    # hann window. However if the signal windows don't have overlap, we will lose information,
    # as the application of a hann window will overshadow the windows signal edges.

    # pre-emphasis filter to amplify the high frequencies

    emphasized_signal = np.append(
        np.array(signal)[0],
        np.array(signal[1:]) - pre_emphasis * np.array(signal[:-1]),
    )

    # Fourier transform and Power spectrum
    mag_frames = np.absolute(
        np.fft.rfft(emphasized_signal, nfft),
    )  # Magnitude of the FFT

    pow_frames = (1.0 / nfft) * (mag_frames**2)  # Power Spectrum

    low_freq_mel = 0
    high_freq_mel = 2595 * np.log10(1 + (fs / 2) / 700)  # Convert Hz to Mel
    mel_points = np.linspace(
        low_freq_mel,
        high_freq_mel,
        nfilt + 2,
    )  # Equally spaced in Mel scale
    hz_points = 700 * (10 ** (mel_points / 2595) - 1)  # Convert Mel to Hz
    filter_bin = np.floor((nfft + 1) * hz_points / fs)

    fbank = np.zeros((nfilt, int(np.floor(nfft / 2 + 1))))
    for m in np.arange(1, nfilt + 1):

        f_m_minus = int(filter_bin[m - 1])  # left
        f_m = int(filter_bin[m])  # center
        f_m_plus = int(filter_bin[m + 1])  # right

        for k in np.arange(f_m_minus, f_m):
            fbank[m - 1, k] = (k - filter_bin[m - 1]) / (filter_bin[m] - filter_bin[m - 1])
        for k in np.arange(f_m, f_m_plus):
            fbank[m - 1, k] = (filter_bin[m + 1] - k) / (filter_bin[m + 1] - filter_bin[m])

    # Area Normalization
    # If we don't normalize the noise will increase with frequency because of the filter width.
    enorm = 2.0 / (hz_points[2 : nfilt + 2] - hz_points[:nfilt])
    fbank *= enorm[:, np.newaxis]

    filter_banks = np.dot(pow_frames, fbank.T)
    filter_banks = np.where(
        filter_banks == 0,
        np.finfo(float).eps,
        filter_banks,
    )  # Numerical Stability
    filter_banks = 20 * np.log10(filter_banks)  # dB

    return filter_banks

def mfcc(signal, fs, pre_emphasis=0.97, nfft=512, nfilt=40, num_ceps=12, cep_lifter=22):
    filter_banks = filterbank(signal, fs, pre_emphasis, nfft, nfilt)
    mel_coeff = scipy.fft.dct(filter_banks, type=2, axis=0, norm="ortho")[1 : (num_ceps + 1)]  # Keep 2-13
    mel_coeff -= np.mean(mel_coeff, axis=0) + 1e-8
    # liftering
    ncoeff = len(mel_coeff)
    n = np.arange(ncoeff)
    lift = 1 + (cep_lifter / 2) * np.sin(np.pi * n / cep_lifter)  # cep_lifter = 22 from python_speech_features library
    mel_coeff *= lift
    return tuple(mel_coeff)
print(mfcc(Y, 5), tsf.mfcc(X, 5))
np.isclose(mfcc(Y, 5), tsf.mfcc(X, 5)).all()

In [18]:
def _periodogram(X: torch.Tensor, fs, detrend, scaling):
    if X.dim() > 2:
        X = torch.squeeze(X)
    elif X.dim() == 1:
        X = X.unsqueeze(0)

    if detrend:
        X -= X.mean(-1, keepdim=True)

    N = X.size(-1)
    assert N % 2 == 0

    df = fs / N
    dt = df
    f = torch.arange(0, N / 2 + 1) * df  # [0:df:f/2]

    dual_side = torch.fft.fft(X)  # 双边谱
    half_idx = int(N / 2 + 1)
    single_side = dual_side[:, 0:half_idx]
    win = torch.abs(single_side)

    ps = win ** 2
    if scaling == 'density':  # 计算功率谱密度
        scale = N * fs
    elif scaling == 'spectrum':  # 计算功率谱
        scale = N ** 2
    elif scaling is None: # 不做缩放
        scale = 1
    else:
        raise ValueError('Unknown scaling: %r' % scaling)
    Pxy = ps / scale

    Pxy[:, 1:-1] *= 2  # 能量2倍;直流分量不用二倍, 中心频率点不用二倍

    return f, Pxy.squeeze()


def periodogram(X: torch.Tensor, fs=256, detrend=False, scaling='density', no_grad=True):
    """计算信号单边 PSD, 基本等价于 scipy.signal.periodogram
    
        Parameters:
        ----------
            - `X`:          torch.Tensor, EEG, [T]/[N, T]
            - `fs`:         int, 采样率, Hz
            - `detrend`:    bool, 是否去趋势 (去除直流分量)
            - `scaling`:    { 'density', 'spectrum' }, 可选
                - 'density':    计算功率谱密度 `(V ** 2 / Hz)`
                - 'spectrum':    计算功率谱 `(V ** 2)`
            - `no_grad`:    bool, 是否启用 no_grad() 模式
    
        Returns:
        ----------
            - `Pxy`:    Tensor, 单边功率谱
    """
    if no_grad:
        with torch.no_grad():
            return _periodogram(X, fs, detrend, scaling)
    else:
        return _periodogram(X, fs, detrend, scaling)
    
def _get_window(window, nwlen, device):
    if window == 'hann':
        window = torch.hann_window(
            nwlen, dtype=torch.float32, device=device, periodic=False
        )
    elif window == 'hamming':
        window = torch.hamming_window(
            nwlen, dtype=torch.float32, device=device, periodic=False
        )
    elif window == 'blackman':
        window = torch.blackman_window(
            nwlen, dtype=torch.float32, device=device, periodic=False
        )
    elif window == 'boxcar':
        window = torch.ones(nwlen, dtype=torch.float32, device=device)
    else:
        raise ValueError('Invalid Window {}' % window)
    return window


def _pwelch(X: torch.Tensor, fs, detrend, scaling, window, nwlen, nhop):
    if scaling == 'density':
        scale = (fs * (window * window).sum().item())
    elif scaling == 'spectrum':
        scale = window.sum().item() ** 2
    else:
        raise ValueError('Unknown scaling: %r' % scaling)
    # --------------- Fold and windowing --------------- #
    N, T = X.size(0), X.size(-1)
    X = X.view(N, 1, 1, T)
    X_fold = F.unfold(X, (1, nwlen), stride=nhop)  # [N, 1, 1, T] -> [N, nwlen, win_cnt]
    if detrend:
        X_fold -= X_fold.mean(1, keepdim=True) # 各个窗口各自detrend
    window = window.view(1, -1, 1)  # [1, nwlen, 1]
    X_windowed = X_fold * window  # [N, nwlen, win_cnt]
    win_cnt = X_windowed.size(-1)

    # --------------- Pwelch --------------- #
    X_windowed = X_windowed.transpose(1, 2).contiguous()  # [N, win_cnt, nwlen]
    X_windowed = X_windowed.view(N * win_cnt, nwlen)  # [N * win_cnt, nwlen]
    f, pxx = _periodogram(
        X_windowed, fs, detrend=False, scaling=None
    )  # [N * win_cnt, nwlen // 2 + 1]
    pxx /= scale
    pxx = pxx.view(N, win_cnt, -1)  # [N, win_cnt, nwlen // 2 + 1]
    pxx = torch.mean(pxx, dim=1)  # [N, nwlen // 2 + 1]
    return f, pxx


def pwelch(
    X: torch.Tensor,
    fs=256,
    detrend=False,
    scaling='density',
    window='hann',
    nwlen=128,
    nhop=None,
    no_grad=True,
):
    """Pwelch 方法，大致相当于 scipy.signal.welch

        Parameters:
        ----------
            - `X`:          torch.Tensor, EEG, [T]/[N, T]
            - `fs`:         int, 采样率, Hz
            - `detrend`:    bool, 是否去趋势 (去除直流分量)
            - `scaling`:    { 'density', 'spectrum' }, 可选
                - 'density':    计算功率谱密度 `(V ** 2 / Hz)`
                - 'spectrum':    计算功率谱 `(V ** 2)`
            - `window`:     str, 窗函数名称
            - `nwlen`:      int, 窗函数长度 (点的个数)
            - `nhop`:       int, 窗函数移动步长, 即 nwlen - noverlap (点的个数)
                            如果为 None，则默认为 `nwlen // 2`
            - `no_grad`:    bool, 是否启用 no_grad() 模式
    
        Returns:
        ----------
            - `Pxy`:    Tensor, 单边功率谱
    """
    nhop = nwlen // 2 if nhop is None else nhop
    window = _get_window(window, nwlen, X.device)
    if no_grad:
        with torch.no_grad():
            return _pwelch(X, fs, detrend, scaling, window, nwlen, nhop)
    else:
        return _pwelch(X, fs, detrend, scaling, window, nwlen, nhop)
pwelch(torch.stack([Y], dim=0), fs=1, window = "hann", scaling = 'density', detrend=True, nwlen=10, nhop=5)

(tensor([0.0000, 0.1000, 0.2000, 0.3000, 0.4000, 0.5000]),
 tensor([[0.5287, 1.7273, 1.9764, 1.9345, 1.8635, 0.6566]]))

In [19]:
import scipy
scipy.signal.welch(X, fs=1., window = "hann", scaling = 'density', nperseg=10, noverlap=5)

(array([0. , 0.1, 0.2, 0.3, 0.4, 0.5]),
 array([0.38725113, 1.77615456, 1.9841756 , 1.96212444, 1.90009807,
        0.66686779]))

In [142]:
def spectral_entropy(signal, fs):
    # Removing DC component
    sig = signal - torch.mean(signal)
    f, fmag = calc_fft(sig, fs)
    power = fmag**2
    if power.sum() == 0:
        return 0.0
    prob = torch.divide(power, power.sum())
    prob = prob[prob != 0]
    # If probability all in one value, there is no entropy
    if prob.size() == 1:
        return 0.0
    return -torch.multiply(prob, torch.log2(prob)).sum() / np.log2(prob.size()[0])
print(spectral_entropy(Y, 5), tsf.spectral_entropy(X, 5))
np.isclose(spectral_entropy(Y, 5), tsf.spectral_entropy(X, 5)).all()

tensor(0.9011) 0.9010709860682248


np.True_

In [156]:
import ptwt
def continuous_wavelet_transform(signal, fs, wavelet="mexh", widths=np.arange(1, 10)):
    coefficients, frequencies = ptwt.cwt(signal, widths, wavelet, sampling_period=1 / fs)
    return coefficients, frequencies

def wavelet_entropy(signal, fs, wavelet="mexh", max_width=10):
    if torch.sum(signal) == 0:
        return 0.0
    max_width = int(max_width)
    widths = torch.arange(1, max_width)
    coeffs, _ = continuous_wavelet_transform(signal=signal, fs=fs, wavelet=wavelet, widths=widths)
    energy_scale = torch.sum(torch.abs(coeffs), axis=1)
    t_energy = torch.sum(energy_scale)
    prob = energy_scale / t_energy
    w_entropy = -torch.sum(prob * torch.log(prob))
    return w_entropy
print(wavelet_entropy(Y, 5), tsf.wavelet_entropy(X, 5))
np.isclose(wavelet_entropy(Y, 5), tsf.wavelet_entropy(X, 5)).all()

tensor(2.1943, dtype=torch.float64) 2.194275773109381


np.True_

In [160]:
def wavelet_abs_mean(signal, fs, wavelet="mexh", max_width=10):
    max_width = int(max_width)
    widths = torch.arange(1, max_width)
    coeffs, frequencies = continuous_wavelet_transform(signal=signal, fs=fs, wavelet=wavelet, widths=widths)
    f_keys = np.round(frequencies, 2).astype(str)
    return {"names": [f + "Hz" for f in f_keys], "values": np.abs(torch.mean(coeffs, axis=1))}
print(wavelet_abs_mean(Y, 5))
print(tsf.wavelet_abs_mean(X, 5))

{'names': ['1.25Hz', '0.62Hz', '0.42Hz', '0.31Hz', '0.25Hz', '0.21Hz', '0.18Hz', '0.16Hz', '0.14Hz'], 'values': tensor([0.0014, 0.0070, 0.0149, 0.0281, 0.0416, 0.0536, 0.0606, 0.0614, 0.0555],
       dtype=torch.float64)}
{'names': ['1.25Hz', '0.62Hz', '0.42Hz', '0.31Hz', '0.25Hz', '0.21Hz', '0.18Hz', '0.16Hz', '0.14Hz'], 'values': array([0.00143808, 0.0070053 , 0.01492294, 0.02811977, 0.04157084,
       0.05357599, 0.06063112, 0.06140213, 0.05549297])}


  return {"names": [f + "Hz" for f in f_keys], "values": np.abs(torch.mean(coeffs, axis=1))}


In [161]:
def wavelet_std(signal, fs, wavelet="mexh", max_width=10):
    max_width = int(max_width)
    widths = torch.arange(1, max_width)
    coeffs, frequencies = continuous_wavelet_transform(signal=signal, fs=fs, wavelet=wavelet, widths=widths)
    f_keys = np.round(frequencies, 2).astype(str)
    return {"names": [f + "Hz" for f in f_keys], "values": torch.std(coeffs, axis=1)}
print(wavelet_std(Y, 5))
print(tsf.wavelet_std(X, 5))

{'names': ['1.25Hz', '0.62Hz', '0.42Hz', '0.31Hz', '0.25Hz', '0.21Hz', '0.18Hz', '0.16Hz', '0.14Hz'], 'values': tensor([1.0915, 0.9627, 1.0173, 1.0528, 1.0864, 1.1374, 1.1539, 1.1118, 1.0266],
       dtype=torch.float64)}
{'names': ['1.25Hz', '0.62Hz', '0.42Hz', '0.31Hz', '0.25Hz', '0.21Hz', '0.18Hz', '0.16Hz', '0.14Hz'], 'values': array([1.08604507, 0.95790926, 1.012239  , 1.04754014, 1.08095311,
       1.13165147, 1.14812133, 1.10618824, 1.02148187])}


In [163]:
def wavelet_var(signal, fs, wavelet="mexh", max_width=10):
    max_width = int(max_width)
    widths = torch.arange(1, max_width)
    coeffs, frequencies = continuous_wavelet_transform(signal=signal, fs=fs, wavelet=wavelet, widths=widths)
    f_keys = np.round(frequencies, 2).astype(str)
    return {"names": [f + "Hz" for f in f_keys], "values": torch.var(coeffs, axis=1)}
print(wavelet_var(Y, 5))
print(tsf.wavelet_var(X, 5))

{'names': ['1.25Hz', '0.62Hz', '0.42Hz', '0.31Hz', '0.25Hz', '0.21Hz', '0.18Hz', '0.16Hz', '0.14Hz'], 'values': tensor([1.1914, 0.9269, 1.0350, 1.1084, 1.1803, 1.2936, 1.3315, 1.2360, 1.0540],
       dtype=torch.float64)}
{'names': ['1.25Hz', '0.62Hz', '0.42Hz', '0.31Hz', '0.25Hz', '0.21Hz', '0.18Hz', '0.16Hz', '0.14Hz'], 'values': array([1.1794939 , 0.91759014, 1.02462779, 1.09734035, 1.16845962,
       1.28063505, 1.3181826 , 1.22365243, 1.0434252 ])}


In [164]:
def wavelet_energy(signal, fs, wavelet="mexh", max_width=10):
    max_width = int(max_width)
    widths = torch.arange(1, max_width)
    coeffs, frequencies = continuous_wavelet_transform(signal=signal, fs=fs, wavelet=wavelet, widths=widths)
    f_keys = np.round(frequencies, 2).astype(str)
    return {"names": [f + "Hz" for f in f_keys], "values": torch.sqrt(torch.sum(coeffs**2, axis=1) / coeffs.shape[1])}
print(wavelet_energy(Y, 5))
print(tsf.wavelet_energy(X, 5))

{'names': ['1.25Hz', '0.62Hz', '0.42Hz', '0.31Hz', '0.25Hz', '0.21Hz', '0.18Hz', '0.16Hz', '0.14Hz'], 'values': tensor([1.0860, 0.9579, 1.0123, 1.0479, 1.0818, 1.1329, 1.1497, 1.1079, 1.0230],
       dtype=torch.float64)}
{'names': ['1.25Hz', '0.62Hz', '0.42Hz', '0.31Hz', '0.25Hz', '0.21Hz', '0.18Hz', '0.16Hz', '0.14Hz'], 'values': array([1.08604602, 0.95793487, 1.01234899, 1.04791749, 1.08175217,
       1.13291899, 1.14972115, 1.10789108, 1.02298811])}


In [165]:
def petrosian_fractal_dimension(signal):
    n = len(signal)
    diff_signal = torch.diff(torch.sign(torch.diff(signal)))
    num_sign_changes = torch.sum(diff_signal != 0)
    pfd = np.log10(n) / (np.log10(n) + np.log10(n / (n + 0.4 * num_sign_changes)))
    return pfd
print(petrosian_fractal_dimension(Y), tsf.petrosian_fractal_dimension(X))
np.isclose(petrosian_fractal_dimension(Y), tsf.petrosian_fractal_dimension(X)).all()

tensor(1.0528) 1.0528369071450172


  pfd = np.log10(n) / (np.log10(n) + np.log10(n / (n + 0.4 * num_sign_changes)))


np.True_