In [1]:
import numpy as np
import pywt
import numpy as np
from tqdm import tqdm  



In [4]:
datafilename1 = "C:\\Users\\Administrator\\Desktop\\结束\\MFEG\\dataprocessing\\cinc2017Seg.npz"
data1 = np.load(datafilename1, allow_pickle=True)
X_train, y_train, X_val, y_val, X_test, y_test = data1['ecgstrain'], data1['labelstrain'], data1['ecgsval'], data1['labelsval'], data1['ecgstest'], data1['labelstest']

In [6]:
def calculate_rmse(x, y):
    x = np.array(x)
    y = np.array(y)
    rmse = np.sqrt(np.mean((x - y) ** 2))
    return rmse
def calculate_snr(signal, denoised_signal):
    signal_power = np.sum(denoised_signal ** 2)
    noise_power = np.sum((signal - denoised_signal) ** 2)
    snr = 10 * np.log10(signal_power / noise_power)
    return snr
def calculate_snr_all(signal, denoised_signals):
    snrs = []
    for denoised_signal in denoised_signals:
        signal_power = np.sum(denoised_signal ** 2)
        noise_power = np.sum((signal - denoised_signal) ** 2)
        snr = 10 * np.log10(signal_power / noise_power)
        snrs.append(snr)
    return snrs
def add_noise_with_snr(signal, snr_db):
    signal_power = np.var(signal)
    noise_power = signal_power * (10 ** (-snr_db / 10))
    noise = np.random.normal(0, np.sqrt(noise_power), signal.shape)
    noisy_signal = signal + noise
    return noisy_signal

In [7]:
import numpy as np
from scipy.fft import fft, ifft

def autocorrelation(signal):
    N = len(signal)
    signal -= np.mean(signal)  
    PSD = np.abs(fft(signal))**2
    autocorr = ifft(PSD)
    autocorr = np.real(autocorr)  
    autocorr /= autocorr[0]  
    return autocorr[:N]

def nzopp(signal):
    acf = autocorrelation(signal)
    return np.sum(np.abs(np.diff(np.sign(acf))) > 0) / len(acf)
def find_optimal_threshold(data, wavelet='db7', level=9, initial_thresholds=[0.1, 0.5, 0.9]):
    optimal_threshold = None
    max_nzopp = -np.inf
    threshold_diff = 1
    thresholds = np.array(initial_thresholds)
    while threshold_diff > 1e-10:
        r_values = []
        for threshold in tqdm(thresholds):
            coeffs = pywt.wavedec(data=data, wavelet=wavelet, level=level)
            for i in range(1, len(coeffs)):
                coeffs[i] = pywt.threshold(coeffs[i], threshold, mode='soft')
            rdata = pywt.waverec(coeffs=coeffs, wavelet=wavelet)
            r_values.append(nzopp(rdata))
        max_index = np.argmax(r_values)
        optimal_threshold = thresholds[max_index]
        if max_index == 0:
            new_thresholds = np.linspace(thresholds[0], thresholds[1], 5)
        elif max_index == len(thresholds) - 1:
            new_thresholds = np.linspace(thresholds[-2], thresholds[-1], 5)
        else:
            new_thresholds = np.linspace(thresholds[max_index-1], thresholds[max_index+1], 5)
        
        threshold_diff = np.max(np.abs(np.diff(new_thresholds)))
        thresholds = new_thresholds
    return optimal_threshold
def denoise(data, wavelet='db7', level=9):
    optimal_threshold = find_optimal_threshold(data, wavelet=wavelet, level=level)
    coeffs = pywt.wavedec(data=data, wavelet=wavelet, level=level)
    for i in range(1, len(coeffs)):
        coeffs[i] = pywt.threshold(coeffs[i], optimal_threshold, mode='soft')
    rdata = pywt.waverec(coeffs=coeffs, wavelet=wavelet)
    return rdata


In [8]:
def denoise_universal_threshold(data):
    coeffs = pywt.wavedec(data=data, wavelet='db7', level=9)
    cA9, cD9, cD8, cD7, cD6, cD5, cD4, cD3, cD2, cD1 = coeffs
    threshold = (np.median(np.abs(cD1)) / 0.6745) * (np.sqrt(2 * np.log(len(cD1))))
    cD1.fill(0)
    cD2.fill(0)
    for i in range(1, len(coeffs) - 2):
        coeffs[i] = pywt.threshold(coeffs[i], threshold)
    rdata = pywt.waverec(coeffs=coeffs, wavelet='db7')
    return rdata

In [9]:
def denoise_bayesian_threshold(data, wavelet='db7', level=9):
    coeffs = pywt.wavedec(data=data, wavelet=wavelet, level=level)
    cA9, cD9, cD8, cD7, cD6, cD5, cD4, cD3, cD2, cD1 = coeffs
    sigma = np.median(np.abs(cD1)) / 0.6745
    n = len(data)
    threshold = sigma * np.sqrt(2 * np.log(n))
    for i in range(1, len(coeffs)):
        coeffs[i] = pywt.threshold(coeffs[i], threshold, mode='soft')
    rdata = pywt.waverec(coeffs=coeffs, wavelet=wavelet)
    return rdata

In [10]:
def heursure_threshold(data_length, sigma):
    universal_threshold = sigma * np.sqrt(2 * np.log(data_length))
    sure_threshold = 0.6745 * sigma * np.sqrt(data_length)
    threshold = min(universal_threshold, sure_threshold)
    return threshold
def denoise_heursure_threshold(data, wavelet='db7', level=9):
    coeffs = pywt.wavedec(data=data, wavelet=wavelet, level=level)
    sigma = np.median(np.abs(coeffs[-1])) / 0.6745  
    n = len(data)
    threshold = heursure_threshold(n, sigma)
    for i in range(1, len(coeffs)):
        coeffs[i] = pywt.threshold(coeffs[i], threshold, mode='soft')
    rdata = pywt.waverec(coeffs=coeffs, wavelet=wavelet)
    return rdata

In [11]:
def minimax_threshold(data_length):
    if data_length >= 32:
        threshold = 0.3936 + 0.1829 * np.log(data_length)
    else:
        threshold = 0.0 
    return threshold
def denoise_minimax_threshold(data, wavelet='db7', level=9):
    coeffs = pywt.wavedec(data=data, wavelet=wavelet, level=level)
    sigma = np.median(np.abs(coeffs[-1])) / 0.6745  
    n = len(data)
    threshold = minimax_threshold(n) * sigma
    for i in range(1, len(coeffs)):
        coeffs[i] = pywt.threshold(coeffs[i], threshold, mode='soft')
    rdata = pywt.waverec(coeffs=coeffs, wavelet=wavelet)
    return rdata

denise

denoise_universal_threshold

denoise_bayesian_threshold

denoise_heursure_threshold

denoise_minimax_threshold

In [12]:
results = [denoise(signal) for signal in X_train]

100%|██████████| 3/3 [00:00<00:00, 597.37it/s]
100%|██████████| 5/5 [00:00<00:00, 824.87it/s]
100%|██████████| 5/5 [00:00<00:00, 679.28it/s]
100%|██████████| 5/5 [00:00<00:00, 806.69it/s]
100%|██████████| 5/5 [00:00<00:00, 689.20it/s]
100%|██████████| 5/5 [00:00<00:00, 765.78it/s]
100%|██████████| 5/5 [00:00<00:00, 637.32it/s]
100%|██████████| 5/5 [00:00<00:00, 829.31it/s]
100%|██████████| 5/5 [00:00<00:00, 753.34it/s]
100%|██████████| 5/5 [00:00<00:00, 679.42it/s]
100%|██████████| 5/5 [00:00<00:00, 808.77it/s]
100%|██████████| 5/5 [00:00<00:00, 835.29it/s]
100%|██████████| 5/5 [00:00<00:00, 711.16it/s]
100%|██████████| 5/5 [00:00<00:00, 714.19it/s]
100%|██████████| 5/5 [00:00<00:00, 1000.69it/s]
100%|██████████| 5/5 [00:00<00:00, 811.53it/s]
100%|██████████| 3/3 [00:00<00:00, 745.48it/s]
100%|██████████| 5/5 [00:00<00:00, 833.82it/s]
100%|██████████| 5/5 [00:00<00:00, 876.52it/s]
100%|██████████| 5/5 [00:00<00:00, 734.04it/s]
100%|██████████| 5/5 [00:00<00:00, 777.79it/s]
100%|███████

In [13]:
x2=denoise_universal_threshold(X_train)
x3=denoise_bayesian_threshold(X_train)
x4=denoise_heursure_threshold(X_train)
x5=denoise_minimax_threshold(X_train)

In [14]:
results=np.array(results)

In [21]:
denoised_signals = [results, x2, x3, x4, x5]
snr_values = calculate_snr_all(X_train, denoised_signals)
for snr in snr_values:
    print(snr)

64.99955646105045
25.381635806646806
44.84499719604678
45.8116129719646
51.06471511612747
