# Utility functions

In [None]:
import pandas as pd
import matplotlib.pyplot as plt
import numpy as np
from scipy.fft import fft, rfft, irfft

from yaml import safe_load

**Load files**

In [None]:
def load_instance_file(filepath, filename, columns_names):
    
    full_filepath = filepath.format(filename)
    df = pd.read_csv('../{}'.format(full_filepath), names=columns_names)
    
    return df

def load_instance_set_files(instance_catalog, labels):
    
    dfs = []
    
    filepath = instance_catalog['filepath']
    columns_names = instance_catalog['columns']
    
    initial_value = int(instance_catalog['name_value_range'][0])
    final_value = int(instance_catalog['name_value_range'][1])     
        
    for i in range(initial_value, final_value + 1):
        
        df_ = load_instance_file(filepath, i, columns_names)
        df_['label'] = labels.iloc[i-1]['label']
        df_['sample'] = i
        df_['sample_index'] = df_.index.tolist()
        df_ = df_.set_index(['sample', 'sample_index'])
        dfs.append(df_)
        
    df = pd.concat(dfs, axis=0, ignore_index=False)
    
    return df


**Calculate fft**

In [None]:
def calc_fft(df, col, samples, N=70000):
    
    df_ = df.copy()
    
    mat_X = np.zeros(shape=(int(N/2), len(samples)))
    
    for i, sample in enumerate(samples):
        
        x = df_.loc[sample][col].values
        mat_X[:, i] = np.abs(fft(x) * 2 / N)[:int(N/2)]
        
    return mat_X

In [None]:
def create_df_esp_freq(mat_X, samples, N_max, col, axis_concat=0, data_freq = None):
    
    indexes = ['X_{}'.format(f) for f in range(N_max)]
    multidx_cols = [*zip([col]*len(indexes), indexes)]
    indexes_ = pd.MultiIndex.from_tuples(multidx_cols)
    
    data_freq_ = pd.DataFrame(data=mat_X[:N_max], 
                            columns=samples, 
                            index=indexes_)
    
    data_freq_ = data_freq_.T
    
    if data_freq is None:
        data_freq = data_freq_ 
        
    else:
        data_freq = pd.concat([data_freq, data_freq_], axis=axis_concat, ignore_index=False)
    
    return data_freq

**Get peaks positions and values**

In [None]:
def get_peaks_positions_values(data_f, channel, labels_considered=['Before', 'After'], add_label=True):
    
    list_peaks_idx = []
    set_unique_peaks_idx = []
    list_peaks_values = []

    # Getting peaks indices
    for i, x in data_f.loc[data_f.label.isin(labels_considered)].drop(columns='label')[channel].iterrows():

        peaks_idx, _ = signal.find_peaks(x, threshold=np.quantile(x, 0.15))
        list_peaks_idx.append(peaks_idx)
        list_peaks_values.append(x[peaks_idx].values)

    # Getting unique values of indices
    set_peaks_idx = set()
    for peaks_idx in list_peaks_idx:
        for idx in peaks_idx:
            set_peaks_idx.add(idx)

    # Selecting only the columns related to the peaks
    data_freq_peaks = data_f.iloc[:, list(set_peaks_idx)]
    
    # Renaming the columns to adding the prefix "ch{}_"
    data_freq_peaks.columns = data_freq_peaks.columns.droplevel(level=0)
    data_freq_peaks = data_freq_peaks.rename(columns = {col: '{}_{}'.format(channel, col) for col in data_freq_peaks.columns})
    
    if add_label:
        data_freq_peaks = data_freq_peaks.join(data_f.label, on=data_freq_peaks.index)
    
    return data_freq_peaks

In [None]:
def get_peaks_positions_values(data_f, channel, labels_considered=['Before', 'After'], add_label=True):
    
    list_peaks_idx = []
    set_unique_peaks_idx = []
    list_peaks_values = []

    # Getting peaks indices
    for i, x in data_f.loc[data_f.label.isin(labels_considered)].drop(columns='label')[channel].iterrows():

        peaks_idx, _ = signal.find_peaks(x, threshold=np.quantile(x, 0.15))
        list_peaks_idx.append(peaks_idx)
        list_peaks_values.append(x[peaks_idx].values)

    # Getting unique values of indices
    set_peaks_idx = set()
    for peaks_idx in list_peaks_idx:
        for idx in peaks_idx:
            set_peaks_idx.add(idx)

    # Selecting only the columns related to the peaks
    data_freq_peaks = data_f.iloc[:, list(set_peaks_idx)]
    
    # Renaming the columns to adding the prefix "ch{}_"
    data_freq_peaks.columns = data_freq_peaks.columns.droplevel(level=0)
    data_freq_peaks = data_freq_peaks.rename(columns = {col: '{}_{}'.format(channel, col) for col in data_freq_peaks.columns})
    
    if add_label:
        data_freq_peaks = data_freq_peaks.join(data_f.label, on=data_freq_peaks.index)
    
    return data_freq_peaks

In [None]:
def select_cols_names_channel(df, channel='ch1'):
    return [col for col in df.columns if col.split('_', 1)[0] == channel]

def select_cols_channel(df, channel='ch1'):
    cols_names = select_cols_names_channel(df, channel=channel)
    return df[cols_names]

def get_peaks_pos(df, channel, n_max_positions=3):
    
    df_ = select_cols_channel(df, channel=channel)
    N_samples = len(df_.columns)
    
    array_peaks_pos = np.zeros(shape=(len(df_), n_max_positions))

    for i, row in df_.iterrows():
        row_ = row.copy()
        for pos in range(1, n_max_positions+1):
            
            idx_max = row_.argmax() 
            position = int(row_.index[idx_max].split('_')[-1])
            row_ = row_.drop(index = row_.index[idx_max])

            array_peaks_pos[i-1, pos-1] = position / N_samples

    data_peak_pos = pd.DataFrame(data=array_peaks_pos, 
                                 columns = ['{}_freqmax_{}'.format(channel, i) for i in range(1, n_max_positions+1)], 
                                 index = df_.index)

    return data_peak_pos

**Calculate RMS of fundamental frequency and harmonics components, and RMS of "noise"**

In [None]:
def parabolic(f, x):

    if int(x) != x:
        raise ValueError('x must be an integer sample index')
    else:
        x = int(x)
    xv = 1/2. * (f[x-1] - f[x+1]) / (f[x-1] - 2 * f[x] + f[x+1]) + x
    yv = f[x] - 1/4. * (f[x-1] - f[x+1]) * (xv - x)
    return (xv, yv)

In [None]:
def calculate_tdhn(df, col, samples, N=70000, beta_filter=38, n_harm=5, wind_fund_ratio=0.1, f_max=250):
    
    df_ = df.loc[samples][col].unstack().copy()
    
    mat_X = np.zeros(shape=(int(N/2 + 1), len(samples))) # Row: frequency, column: sample
    mat_X_noise = mat_X.copy()
    
    f_fund = []
    rms_total = []
    rms_noise = []
    
    # Defining and window
    window = signal.windows.kaiser(M=N, beta=beta_filter) 
    windowed = (df_.values * window).T  # Row: Time, column: Sample 

    for i, sample in enumerate(samples):
        
        x = windowed[:, i]
        X = rfft(x)
        mat_X[:, i] =  np.abs(X)
        
        fund_f_idx = np.argmax(np.abs(X[:f_max]))
        true_f_idx = parabolic(np.log(np.abs(X)), fund_f_idx)[0]
        f_fund.append(true_f_idx)
        
        rms_total.append(20*np.log10(np.sqrt(np.abs(X.T @ X))))
        
        # Setting fundamental and harmonic regions to zero
        for nn in range(1, n_harm + 1):

            lowermin = int(true_f_idx * nn * (1 - wind_fund_ratio))
            uppermin = int(true_f_idx * nn * (1 + wind_fund_ratio))

            X[lowermin : uppermin] = 0.0
        
        mat_X_noise[:, i] = np.abs(X)
        
        rms_noise.append(20*np.log10(np.sqrt(np.abs(X.T @ X))))
        
    return f_fund, rms_total, rms_noise

In [None]:
def get_tdhn_by_channel(df, channel='ch1', add_channel_prefix=True):
    df_ = df.copy()
    samples_before = df_.loc[data['label'] == 'Before'].index.get_level_values(level=0).unique().tolist()
    f_fund_before, rms_total_before, rms_noise_before = calculate_tdhn(df_, channel, samples_before)

    samples_after = df_.loc[data['label'] == 'After'].index.get_level_values(level=0).unique().tolist()
    f_fund_after, rms_total_after, rms_noise_after = calculate_tdhn(df_, channel, samples_after)

    # Creating a DF with the results
    df_tdhn = pd.DataFrame(index=samples_before, 
                           data={'ffund': f_fund_before, 
                                 'rms_total' : rms_total_before,
                                'rms_noise': rms_noise_before})

    df_tdhn_ = pd.DataFrame(index=samples_after, 
                           data={'ffund': f_fund_after, 
                                'rms_total' : rms_total_after,
                                'rms_noise': rms_noise_after})

    df_tdhn = pd.concat([df_tdhn, df_tdhn_], axis=0, ignore_index=False)
    df_tdhn['rms_ratio'] = df_tdhn['rms_noise'] / df_tdhn['rms_total']
    
    if add_channel_prefix:
        rename_cols = {col : f"{channel}_{col}" for col in df_tdhn.columns}
        df_tdhn = df_tdhn.rename(columns = rename_cols)

    return df_tdhn