# Visualize MKR features

The idea is to check if the implemenation makes sense as well as what to expect from the different features specifically if there is a zero offset in the feature

In [1]:
import numpy as np
import pandas as pd

import matplotlib.pyplot as plt
import seaborn as sns
sns.set(style="whitegrid")

In [2]:
import inspect

import features as mkr
import tsfel as tsfel

In [3]:
useMKR = False

In [4]:
mapping = pd.read_csv('mapping.csv')

In [5]:
figsize=(20, 5)
figsizeHalf=(20, 3)

In [6]:
def matchLastDimByRepeat(values, wts):
    return np.repeat(np.expand_dims(values, axis=-1), np.ma.size(wts, axis=-1), axis=-1)

In [7]:
windows = 20
channels = 1 # this should always stay 1 (as the plots are not meant to be multidimensional)
samples = 64
totalSize = windows * channels * samples

# wts_flat = np.random.rand(totalSize) + np.sin(np.linspace(-np.pi, np.pi, totalSize)) / 2 + np.cos(np.tile(np.linspace(-np.pi, np.pi, samples // 3), windows * 4))[:totalSize] / 8. - np.linspace(0, 1.5, totalSize)
wts_flat = np.sin(2*np.pi*(2**np.linspace(2,10,totalSize))*np.arange(totalSize)/48000) + np.random.normal(0, 1, totalSize) * 0.15
# wts_flat = np.random.rand(totalSize) + np.sin(np.linspace(-np.pi, np.pi, totalSize)) / 2 - np.linspace(0, 1.5, totalSize)
wts_flat_pos = wts_flat + 2


wts_rand = wts_flat.reshape((windows, channels, samples)) 
wts_rand_pos = wts_flat_pos.reshape((windows, channels, samples))
print('Nr Windows: %s, Nr Channels: %s, Nr Values: %s' % (windows, channels, samples))

Nr Windows: 20, Nr Channels: 1, Nr Values: 64


In [8]:
ref_x = np.array(list(range(wts_flat.size)))
feature_x = np.linspace(0, wts_flat.size - samples, windows) + samples/2.

In [9]:
# sns.lineplot(x=ref_x, y=np.cos(np.tile(np.linspace(-np.pi, np.pi, samples), windows))[:len(ref_x)])

In [10]:
signal = pd.DataFrame(np.array([wts_flat, wts_flat_pos]).T, columns=['Ref-Rand', 'Ref-Pos'])

ValueError: Shape of passed values is (1280, 2), indices imply (1280, 3)

In [None]:
signal.plot(figsize=figsizeHalf)

In [None]:
fs = 50
signal_length = 50
a = np.linspace(0, fs // 2, signal_length // 2)
b = np.fft.rfftfreq(signal_length, d=1/50)
a, b, a.shape, b.shape

In [None]:
# np.testing.assert_almost_equal(wts_rand[0][0], wts_rand_pos[0][0] - 2) 
# np.testing.assert_almost_equal(np.fft.fft(wts_rand[0][0])[1:len(wts_rand[0][0]) //2], np.fft.fft(wts_rand_pos[0][0])[1:len(wts_rand[0][0]) //2])
# np.testing.assert_almost_equal(np.fft.rfft(wts_rand[0][0])[1:], np.fft.rfft(wts_rand_pos[0][0])[1:])

### Notes on fft
Explanation difference in fft from  wts_rand to wts_rand_pos:
- the fft calculates a zero-frequency component which is influenced by the offset

Performance: 
- use np.fft.fftfreq over np.linspace (3.72 µs vs 19.8 µs)
- (use np.fft.rfft over np.fft.fft (10.3 µs vs 14.6 µs)
- window size might be faster if to a power of two, but doesn't seem to measurable 

In [None]:
default_tsfel = {
    'fs': 50,
}
default_mkr = {
    'samplingfrequency': 50
}

In [None]:
if useMKR:
    column = 'MKR'
    lib = mkr
    defaults = default_mkr
else:
    column = 'TSFEL'
    lib = tsfel
    defaults = default_tsfel

In [None]:
df = pd.DataFrame()
couldNotCalc = []
couldCalc = []

def _helper(key):
    fn = getattr(lib, key)
    fnArgs = {key: defaults[key] for key in list(inspect.signature(fn).parameters)[1:] if key in defaults}

    if useMKR:
        rand = fn(wts_rand, **fnArgs)
        pos = fn(wts_rand_pos, **fnArgs)
    else:
        rand = np.array([fn(x[0], **fnArgs) for x in wts_rand])
        pos = np.array([fn(x[0], **fnArgs) for x in wts_rand_pos])
    return rand, pos

# TODO: change this to fillna with the tsfel part and wrap that up
for key in sorted(mapping[column].dropna().to_list()):
    try:
        rand, pos = _helper(key)
        if len(rand.shape) > 1:
            for i in range(rand.shape[1]):
                df[f'{key}_{i}_rand'] = rand[:,i]
                df[f'{key}_{i}_pos'] = pos[:,i]
                couldCalc.append(f'{key}_{i}')
        else:
            df[f'{key}_rand'] = rand
            df[f'{key}_pos'] = pos
            couldCalc.append(key)
    except Exception as err:
        print(err, key)
        couldNotCalc.append(key)
        raise err

print('Could not calc these features:', couldNotCalc)

In [None]:
df

In [None]:
diff = df.melt().groupby(['variable'])[['value']].nunique()
diff[diff['value'] <= 1].unstack()

In [None]:
corsDf = pd.DataFrame([[key, np.corrcoef(df['%s_rand' % key], df['%s_pos' % key])[0,1]] for key in couldCalc], columns=['Feature', 'Correlation']).set_index('Feature').abs()
# corsDf['Correlation'] = corsDf['Correlation'].abs()
corsDf.sort_values(by=['Correlation', 'Feature'], ascending=False).plot(kind='barh', figsize=(7,20), title="independent on offset")

# corsDf[corsDf['Correlation'].abs() < 0.98].set_index('Feature').plot(kind='bar', figsize=(10,7), title="dependent on offset")
# corsDf[corsDf['Correlation'].abs() >= 0.98].set_index('Feature') #.plot(kind='bar', figsize=(10,7), title="indifferent to offset")

In [None]:
corsDf.loc['zero_cross']

In [None]:
def plot_single(name, df):
    fig, ax1 = plt.subplots(1, 1, figsize=figsizeHalf)
    fig.suptitle(name)

    sns.lineplot(x=ref_x, y=signal['Ref-Rand'], color="g", ax=ax1, alpha=0.5)
    ax1b = ax1.twinx()
    sns.lineplot(x=feature_x, y=df['%s_rand' % name], color="b", ax=ax1b)

    plt.show()

def compare_visual(name, df):
    fig, (ax1, ax2) = plt.subplots(2, 1, figsize=figsize)
    fig.suptitle(name)

    sns.lineplot(x=ref_x, y=signal['Ref-Rand'], color="g", ax=ax1, alpha=0.5)
    ax1b = ax1.twinx()
    sns.lineplot(x=feature_x, y=df['%s_rand' % name], color="b", ax=ax1b)

    sns.lineplot(x=ref_x, y=signal['Ref-Pos'], color="g", ax=ax2, alpha=0.5)
    ax2b = ax2.twinx()
    sns.lineplot(x=feature_x, y=df['%s_pos' % name], color="b", ax=ax2b)
    
    plt.show()

In [None]:
feature_x.size, df['%s_rand' % key].size

In [None]:
[key for key in couldCalc if corsDf.loc[key][0] < 0.99]

In [None]:
%%time

for key in couldCalc:
# key = 'calc_mean'
    if corsDf.loc[key][0] < 0.99:
        compare_visual(key, df)

In [None]:
%%time

for key in couldCalc:
# key = 'calc_mean'
    if corsDf.loc[key][0] >= 0.99:
        plot_single(key, df)