In [1]:
import numpy as np
import os
import wfdb
from tqdm import tqdm
from scipy import signal as sig
from util import *
import scipy.io as sio

## ECG

In [2]:
ECG_corpus_path = '/home/tony/Bio-ASP/EMG/mit-bih-normal-sinus-rhythm-database-1.0.0'
ECG_save_path ='/home/tony/Bio-ASP/Data/ECG_Ch1_fs1000_bp_training'

check_path(ECG_save_path)

ECG_list = []
for f in os.listdir(ECG_corpus_path):
    f = os.path.splitext(os.path.basename(f))[0]
    if f.isdigit():
        ECG_list.append(f)

ECG_list = sorted(list(set(ECG_list)))

print(f"ECG Corpus Path: {ECG_corpus_path}")
print(f"There are {len(ECG_list)} ECG signals")


ECG Corpus Path: /home/tony/Bio-ASP/EMG/mit-bih-normal-sinus-rhythm-database-1.0.0
There are 18 ECG signals


In [3]:
def read_ecg(ecg_path):
    ecg = wfdb.rdrecord(ecg_path).__dict__.get('p_signal')[:,0] # channel 1 ECG
    ecg_rate = 1000
    ecg = resample(ecg,128,ecg_rate)
    ecg = ecg.astype('float64')
    return ecg 

In [4]:
#Filter
b_h, a_h = sig.butter(3, 10, 'hp',fs=1000)
b_l, a_l = sig.butter(3, 200, 'lp',fs=1000)

# Segment period
start,end = 0, 70000

In [5]:
#read ecg file and save as npy
for i in tqdm(range(len(ECG_list))):
    ecg_file = read_ecg(os.path.join(ECG_corpus_path, ECG_list[i]))
    if ecg_file.ndim>1:
        for j in range(ecg_file.shape[1]):
            ecg_save = sig.filtfilt(b_l,a_l,sig.filtfilt(b_h,a_h,ecg_file[start:end,j]))
            np.save(os.path.join(ECG_save_path,os.path.join(ECG_corpus_path, ECG_list[i]).split(os.sep)[-1].split(".")[0]+str(j)),ecg_save)
    else:
        ecg_file = sig.filtfilt(b_l,a_l,sig.filtfilt(b_h,a_h,ecg_file[start:end]))
        np.save(os.path.join(ECG_save_path,os.path.join(ECG_corpus_path, ECG_list[i]).split(os.sep)[-1].split(".")[0]),ecg_file)
    

  0%|          | 0/18 [00:00<?, ?it/s]

100%|██████████| 18/18 [03:27<00:00, 11.54s/it]


## EMG

In [6]:
def read_emg(emg_path,channel,restimulus=False):
    # extract nth channel EMG, bandpass,  down-sampling, normalize
    b, a = signal.butter(4, [20,500], 'bp',fs=2000) #bandpass = signal.butter(4, [20,500], 'bandpass',output='sos',fs=2000)
    emg_data = sio.loadmat(emg_path)
    y_clean = emg_data.get('emg')[:,channel-1] #channel 
    y_clean = signal.filtfilt(b,a,y_clean)[::2]
    y_clean = y_clean/np.max(abs(y_clean))
    y_clean = y_clean.astype('float64')
    if restimulus:
        y_restimulus = emg_data.get('restimulus')[::2]
    else:
        y_restimulus = 0
    return y_clean, y_restimulus

def get_emg_filepaths(directory,number,exercise):
    # import n(umber) of EMG signals
    emg_paths =[]
    for i in range(1,number+1):
        filename = "DB2_s"+str(i)+"/S"+str(i)+"_E"+str(exercise)+"_A1.mat"
        emg_paths.append(os.path.join(directory, filename))
    return emg_paths

In [7]:
EMG_corpus_path = '/home/tony/Bio-ASP/EMG/EMG_DB2'
exercise = 1
channel = [2]
EMG_data_num = 40
cross_channel = False#True
segment = 60 # unit: second
points_per_seg = segment * 1000 # fs = 1000 Hz

In [8]:
EMG_save_path = f"/home/tony/Bio-ASP/Data/data_E{str(exercise)}_S{str(EMG_data_num)}_withSTI_seg60s_nsrd"
check_path(EMG_save_path)

for ch in channel:
    # if cross_channel == True:
    #     out_path ="./data_E"+str(exercise)+"_S"+str(EMG_data_num)+"_Ch"+str(channel[0])+"_"+str(channel[1])+"_withSTI_seg60s_nsrd"
    # else:
    #     out_path ="./data_E"+str(exercise)+"_S"+str(EMG_data_num)+"_Ch"+str(ch)+"_withSTI_seg60s_nsrd"
    
    check_path(EMG_save_path+'/train/clean')
    check_path(EMG_save_path+'/test/clean')
    check_path(EMG_save_path+'/val/clean')

    file_paths = get_emg_filepaths(EMG_corpus_path,EMG_data_num,exercise)

    for i in tqdm(range(len(file_paths))):
        test = False    
        if i<24:
            save_path = EMG_save_path+'/train/clean'
        elif i<30:
            save_path = EMG_save_path+'/val/clean'
        else:    
            save_path = EMG_save_path+'/test/clean'
            test = True
        emg_file,restimulus = read_emg(file_paths[i],ch,test)
        
        for j in range(emg_file.shape[0]//points_per_seg):
            np.save(os.path.join(save_path,file_paths[i].split(os.sep)[-1].split(".")[0]+"_ch"+str(ch)+"_"+str(j)),emg_file[j*points_per_seg:(j+1)*points_per_seg])
            if test:
                np.save(os.path.join(save_path,file_paths[i].split(os.sep)[-1].split(".")[0]+"_ch"+str(ch)+"_"+str(j)+"_sti"),restimulus[j*points_per_seg:(j+1)*points_per_seg])

  0%|          | 0/40 [00:00<?, ?it/s]

100%|██████████| 40/40 [00:44<00:00,  1.12s/it]


## Mixture

In [9]:
import math
import csv

In [None]:
clean_paths = [os.path.join(EMG_save_path, "train/clean"), os.path.join(EMG_save_path, "val/clean")]
noise_path = ECG_save_path

exercise = 1
channel = [2]
# channel = [1,2]
EMG_data_num = 40
# noise_paths =  ["../ECG_Ch1_fs1000_bp_training","../ECG_Ch1_fs1000_bp_training"]

SNR_list = [-5,-7,-9,-11,-13,-15] # Training SNR
num_of_copy = [10,5]
cross_channel = False
test = False
normalize = False

In [None]:
def get_filepaths_withSTI(directory,ftype='.npy'):
    file_paths = []
    for root, directories, files in os.walk(directory):
        for filename in files:
            if filename[-5] !='i' and filename.endswith(ftype):
                filepath = os.path.join(root, filename)
                file_paths.append(filepath)  # Add it to the list.
    return sorted(file_paths)

def add_noise(clean_path, noise_path, SNR, return_info=False, normalize=False):
    clean_rate = 1000
    y_clean = np.load(clean_path)
    noise_ori = np.load(noise_path)
    
    #if noise shorter than clean wav, extend
    if len(noise_ori) < len(y_clean):
        tmp = (len(y_clean) // len(noise_ori)) + 1
        y_noise = []
        for _ in range(tmp):
            y_noise.extend(noise_ori)
    else:
        y_noise = noise_ori

    # cut noise 
    start = random.randint(0,len(y_noise)-len(y_clean))
    end = start+len(y_clean)
    y_noise = y_noise[start:end]     
    y_noise = np.asarray(y_noise)

    y_clean_pw = np.dot(y_clean,y_clean) 
    y_noise_pw = np.dot(y_noise,y_noise) 

    scalar = np.sqrt(y_clean_pw/((10.0**(SNR/10.0))*y_noise_pw))
    noise = scalar * y_noise
    y_noisy = y_clean + noise
    if normalize: 
        norm_scalar = np.max(abs(y_noisy))
        y_noisy = y_noisy/norm_scalar

    if return_info is False:
        return y_noisy, clean_rate
    else:
        info = {}
        info['start'] = start
        info['end'] = end
        info['scalar'] = scalar
        return y_noisy, clean_rate, info


In [None]:
sti = True
noisy_folder = 'noisy'
loop_break = False
for ch in channel:
    if loop_break == True:
        break
    if cross_channel == True:
        out_path = "./data_E"+str(exercise)+"_S"+str(EMG_data_num)+"_Ch"+str(channel[0])+"_"+str(channel[1])+"_withSTI_seg60s_nsrd"
        loop_break = True
    else:
        out_path = "./data_E"+str(exercise)+"_S"+str(EMG_data_num)+"_Ch"+str(ch)+"_withSTI_seg60s_nsrd"
    if test == True:
        clean_paths =[out_path+'/test/clean']
    else:
        clean_paths = [out_path+'/train/clean',out_path+'/val/clean']

    for i in range(len(clean_paths)):
        clean_path = clean_paths[i]
        noise_path = noise_paths[i]
        Noisy_path = clean_path.replace('clean',noisy_folder)
        root_path = clean_path.replace('clean',noisy_folder)
        
        check_path(Noisy_path)

        clean_list = get_filepaths(clean_path) if sti is False else get_filepaths_withSTI(clean_path)
        noise_list = get_filepaths(noise_path)
        
        sorted(clean_list)

        for snr in SNR_list:
                with open(root_path+str(snr)+'.csv', 'w', newline='') as csvFile:
                    fieldNames = ['EMG','ECG','start','end','scalar']
                    writer = csv.DictWriter(csvFile, fieldNames)
                    writer.writeheader()
                    for clean_emg_path in tqdm(clean_list):
                            noise_wav_path_list = random.sample(noise_list, num_of_copy[i])
                            for noise_ecg_path in noise_wav_path_list:
                                y_noisy, clean_rate, info = add_noise(clean_emg_path, noise_ecg_path, snr, True, normalize)
                                noise_name = noise_ecg_path.split(os.sep)[-1].split(".")[0]
                                output_dir = Noisy_path+os.sep+str(snr)+os.sep+noise_name
                                creat_dir(output_dir)
                                emg_name = clean_emg_path.split(os.sep)[-1].split(".")[0]
                                np.save(os.path.join(output_dir,emg_name),y_noisy)
                                writer.writerow({'EMG':emg_name,'ECG':noise_name, 'start':info['start'], 'end':info['end'],'scalar':info['scalar']})