In [None]:
import numpy as np
import pandas as pd
import pywt
import scipy
from scipy import stats
from tqdm.notebook import tqdm_notebook

# отключим предупреждения Anaconda
import warnings
warnings.simplefilter('ignore')

# будем отображать графики прямо в jupyter'e
%matplotlib inline
import seaborn as sns
import matplotlib.pyplot as plt
sns.set()
#графики в svg выглядят более четкими
%config InlineBackend.figure_format = 'svg' 

#увеличим дефолтный размер графиков
from pylab import rcParams
rcParams['figure.figsize'] = 8, 3

In [None]:
# Загрузим исходные данные
measurement = pd.read_csv('./data/MeasuredSignals.csv')
reference = pd.read_csv('./data/ReferenceSignal.csv')
# Очистим данные от пропущенных значений
reference = reference.drop(['Unnamed: 1', 'Unnamed: 2'], axis=1).dropna(axis=0)
# Преобразуем в массив
reference = reference.SourceSignal.to_numpy()
signal1 = measurement['MeasuredSignal1'].values
signal2 = measurement['MeasuredSignal2'].values

In [None]:
# Опишем функции, удаляющие шум в измеренном сигнале
def madev(d, axis=None):
    """ Mean absolute deviation of a signal """
    return np.mean(np.absolute(d - np.mean(d, axis)), axis)

def wavelet_denoising(x, wavelet='db4', level=1):
    coeff = pywt.wavedec(x, wavelet, mode="per")
    sigma = (1/0.6745) * madev(coeff[-level])
    uthresh = sigma * np.sqrt(2 * np.log(len(x)))
    coeff[1:] = (pywt.threshold(i, value=uthresh, mode='hard') for i in coeff[1:])
    return pywt.waverec(coeff, wavelet, mode='per')

# Теперь оконной функцией вычислим коэффициент корреляции Пирсона между отфильтрованными 
# с помощью wavelets сигналами и эталонным сигналом
def corr_func(signal, reference, method='pearson', families=pywt.wavelist(), 
              probability=False, alternative='two-sided'):
    
    length = len(signal)
    window = len(reference)
    df = pd.DataFrame(index=range(0,length-window))
    
    for wav in tqdm_notebook(families, desc='Progress bar'):
        try:
            filtered = wavelet_denoising(signal, wavelet=wav, level=1)
        except:
            pass
        
        rolling_r = []
        for i in range(0,(length-window)):
            if method == 'pearson':
                if probability:
                    correl = 1 - stats.pearsonr(filtered[i:i+window], reference)[1]
                else:
                    correl = stats.pearsonr(filtered[i:i+window], reference)[0]
            elif method == 'spearman':
                if probability:
                    try:
                        correl = stats.spearmanr(filtered[i:i+window], reference, alternative=alternative)[1]
                    except:
                        correl = 0
                else:
                    correl = stats.spearmanr(filtered[i:i+window], reference)[0]
            else:
                raise ValueError('Unexpected method')

            if np.isnan(correl):
                rolling_r.append(0)
            else: 
                rolling_r.append(correl)         

        df[f'{wav}'] = rolling_r 
    df['mean'] = df.mean(axis=1)
    return df

In [None]:
# Оценим wavelet denoising для первого сигнала
# Внимание! Вычисления занимают продолжительное время
for wav in pywt.wavelist():
    try:
        filtered1 = wavelet_denoising(signal1, wavelet=wav, level=1)
    except:
        pass
    
    plt.figure(figsize=(125, 5))
    plt.plot(signal1, label='Raw')
    plt.plot(filtered1, label='Filtered')
    plt.plot(reference, label='Reference', color='black')
    plt.xlim(0, 5000)
    plt.legend()
    plt.title(f"DWT Denoising the first signal with {wav} Wavelet", size=15)
    # plt.savefig(f"./plots/DWT Denoising the first signal with {wav} Wavelet.png", bbox_inches='tight')
    plt.show()

In [None]:
# Оценим wavelet denoising для второго сигнала
# Внимание! Вычисления занимают продолжительное время
for wav in pywt.wavelist():
    try:
        filtered2 = wavelet_denoising(signal2, wavelet=wav, level=1)
    except:
        pass
    
    plt.figure(figsize=(125, 5))
    plt.plot(signal2, label='Raw')
    plt.plot(filtered2, label='Filtered')
    plt.plot(reference, label='Reference', color='black')
    plt.xlim(0, 5000)
    plt.legend()
    plt.title(f"DWT Denoising the second signal with {wav} Wavelet", size=15)
    # plt.savefig(f"./plots/DWT Denoising the second signal with {wav} Wavelet.png", bbox_inches='tight')
    plt.show()

In [None]:
# Вычислим коэффициент корреляции на всех доступных wavelet families
pearson_df_1 = corr_func(signal1, reference)
pearson_df_2 = corr_func(signal2, reference)

left_corner_1 = pearson_df_1['mean'][pearson_df_1['mean'] > 0.5].index[0]
right_corner_1 = pearson_df_1['mean'][pearson_df_1['mean'] > 0.5].index[-1]
left_corner_2 = pearson_df_2['mean'][pearson_df_2['mean'] > 0.5].index[0]
right_corner_2 = pearson_df_2['mean'][pearson_df_2['mean'] > 0.5].index[-1]

plt.figure(figsize=(60, 5))
plt.plot(pearson_df_1['mean'], label='Mean of first signal Pearson coefficient')
plt.plot(pearson_df_2['mean'], label='Mean of second signal Pearson coefficient')
plt.legend()
plt.title("Mean values of Pearson correlation coefficient", size=15)
plt.axhline(0.5, color="k", linestyle="--")
plt.ylim(0, 0.9)
plt.xlim(0, 5000)
plt.axvspan(left_corner_1-10, right_corner_1+10, color='#EF9A9A', alpha=0.5)
plt.axvspan(left_corner_2-10, right_corner_2+10, color='#EF9A9A', alpha=0.5)
# plt.savefig(f"./plots/Mean values of Pearson correlation coefficient.png", bbox_inches='tight')
plt.show()

In [None]:
# Также оценим вероятности на всех доступных wavelet families
pearson_df_1_prob = corr_func(signal1, reference, probability=True)
pearson_df_2_prob = corr_func(signal2, reference, probability=True)

left_corner_1_prob = pearson_df_1_prob['mean'][pearson_df_1_prob['mean'] > 0.9].index[0]
right_corner_1_prob = pearson_df_1_prob['mean'][pearson_df_1_prob['mean'] > 0.9].index[-1]
left_corner_2_prob = pearson_df_2_prob['mean'][pearson_df_2_prob['mean'] > 0.9].index[0]
right_corner_2_prob = pearson_df_2_prob['mean'][pearson_df_2_prob['mean'] > 0.9].index[-1]

plt.figure(figsize=(60, 5))
plt.plot(pearson_df_1_prob['mean'], label='Mean of first signal correlation probability')
plt.plot(pearson_df_2_prob['mean'], label='Mean of second signal correlation probability')
plt.legend()
plt.title("Mean values of Pearson correlation probability", size=15)
plt.axhline(0.9, color="k", linestyle="--")
plt.ylim(0, 1)
plt.xlim(0, 5000)
plt.axvspan(left_corner_1_prob-10, right_corner_1_prob+10, color='#EF9A9A', alpha=0.5)
plt.axvspan(left_corner_2_prob-10, right_corner_2_prob+10, color='#EF9A9A', alpha=0.5)
# plt.savefig(f"./plots/Mean values of Pearson correlation probability.png", bbox_inches='tight')
plt.show()

In [None]:
# После просмотра очищенного сигнала можно выделить следующие wavelet families
families = ['bior4.4',
            'cgau8',
            'cmor',
            'coif4',
            'coif7',
            'coif9',
            'coif14',
            'db8',
            'fbsp',
]

In [None]:
# Оценим вероятности на выделенных wavelet families
pearson_df_1_prob_fm = corr_func(signal1, reference, families=families, probability=True)
pearson_df_2_prob_fm = corr_func(signal2, reference, families=families, probability=True)

for i, df in enumerate([pearson_df_1_prob_fm, pearson_df_2_prob_fm]):
    globals()[f'left_corner_{i}_prob_fm'] = df['mean'][df['mean'] > 0.95].index[0]
    globals()[f'right_corner_{i}_prob_fm'] = df['mean'][df['mean'] > 0.95].index[-1]

plt.figure(figsize=(60, 5))
plt.plot(pearson_df_1_prob_fm['mean'], label='Mean of first signal correlation probability (chosen wavelet families)')
plt.plot(pearson_df_2_prob_fm['mean'], label='Mean of second signal correlation probability (chosen wavelet families)')
plt.legend()
plt.title("Mean values of Pearson correlation probability (chosen wavelet families)", size=15)
plt.axhline(0.95, color="k", linestyle="--")
plt.ylim(0, 1)
plt.xlim(0, 5000)
plt.axvspan(left_corner_0_prob_fm-5, right_corner_0_prob_fm+5, color='#EF9A9A', alpha=0.7)
plt.axvspan(left_corner_1_prob_fm-5, right_corner_1_prob_fm+5, color='#EF9A9A', alpha=0.7)
# plt.savefig(f"./plots/Mean values of Pearson correlation probability (chosen wavelet families).png", bbox_inches='tight')
plt.show()

Таким образом, выделенные семейства материнских вейвлетов позволяют уменьшить зашумленность данных и выявить статистически значимые корреляции измеренных сигналов с эталонным откликом.

In [None]:
# Оценим очищенный сигнал на выделенном интервале корреляции для первого сигнала 
window = len(reference)

for wav in families:
    try:
        filtered1 = wavelet_denoising(signal1, wavelet=wav, level=1)
    except:
        pass
    
    fig, ax = plt.subplots(1, 2, gridspec_kw={'width_ratios': [1, 5.5]}, sharey=True)
    fig.tight_layout()
    ax[0].set_title('Reference signal')
    ax[1].set_title(f"DWT Denoising the first signal with {wav} Wavelet") 
    ax[1].plot(signal1, label='Raw')
    ax[1].plot(filtered1, label='Filtered Signal 1')
    ax[0].plot(reference, label='Reference', color='black')
    plt.axvspan(left_corner_0_prob_fm, right_corner_0_prob_fm+window, color='#EF9A9A', alpha=0.7)
    plt.xlim(left_corner_0_prob_fm-100, right_corner_0_prob_fm+100)
    plt.legend()
    # plt.savefig(f"./plots/DWT Denoising the first signal with {wav} Wavelet.png", bbox_inches='tight')
    plt.show()

In [None]:
# Оценим очищенный сигнал на выделенном интервале корреляции для второго сигнала 
for wav in families:
    try:
        filtered2 = wavelet_denoising(signal2, wavelet=wav, level=1)
    except:
        pass
    
    fig, ax = plt.subplots(1, 2, gridspec_kw={'width_ratios': [1, 5.5]}, sharey=True)
    fig.tight_layout()
    ax[0].set_title('Reference signal')
    ax[1].set_title(f"DWT Denoising the second signal with {wav} Wavelet") 
    ax[1].plot(signal2, label='Raw')
    ax[1].plot(filtered2, label='Filtered Signal 2')
    ax[0].plot(reference, label='Reference', color='black')
    plt.axvspan(left_corner_1_prob_fm, right_corner_1_prob_fm+window, color='#EF9A9A', alpha=0.7)
    plt.xlim(left_corner_1_prob_fm-100, right_corner_1_prob_fm+100)
    plt.legend()
    # plt.savefig(f"./plots/DWT Denoising the second signal with {wav} Wavelet.png", bbox_inches='tight')
    plt.show()

Также оценим коэф. корреляции Спирмена

In [None]:
spearman_df_1_conf = corr_func(signal1, reference, 
                               method='spearman', families=families, 
                               alternative='greater', probability=True)
spearman_df_2_conf = corr_func(signal2, reference, 
                               method='spearman', families=families, 
                               alternative='greater', probability=True)

for i, df in enumerate([pearson_df_1_prob_fm, pearson_df_2_prob_fm]):
    globals()[f'left_corner_{i}_spr'] = df['mean'][df['mean'] > 0.95].index[0]
    globals()[f'right_corner_{i}_spr'] = df['mean'][df['mean'] > 0.95].index[-1]

plt.figure(figsize=(60, 5))
plt.plot(spearman_df_1_conf['mean'], label="Mean of first signal Spearman probability for positive correlation (chosen families)")
plt.plot(spearman_df_2_conf['mean'], label="Mean of second signal Spearman probability for positive correlation (chosen families)")
plt.legend()
plt.title("Mean values of Spearman correlation probability", size=15)
plt.axhline(0.95, color="k", linestyle="--")
plt.ylim(0, 1)
plt.xlim(0, 5000)
plt.axvspan(left_corner_0_spr-5, right_corner_0_spr+5, color='#EF9A9A', alpha=0.7)
plt.axvspan(left_corner_1_spr-5, right_corner_1_spr+5, color='#EF9A9A', alpha=0.7)
# plt.savefig(f"./plots/Mean values of Spearman correlation probability (chosen wavelet families).png", bbox_inches='tight')
plt.show()