First, basic EEG signal preprocessing is applied.

In [None]:
# Load files: 

import pandas as pd 
import numpy as np 
import mne

path = "path of the file to be processed"

raw = pd.read_csv(path) 

rawMNE = mne.io.RawArray(raw.values, mne.create_info(ch_names=raw.columns, sfreq=256, ch_types="eeg"))

# Apply bandpass 4 y 60Hz
rawMNE.filter(4, 60, fir_design="firwin")

# Apply the notch filter at 50Hz
rawMNE.notch_filter(50)

# Apply the ICA algorithm
ica = mne.preprocessing.ICA(n_components=20)
ica.fit(rawMNE)

# Obtain the independent components
ica_components = ica.get_components()

# Apply the independent components to the original signal
raw_ica = rawMNE.copy()
ica.apply(raw_ica)

After preprocessing, a feature extraction is performed.

In [None]:
# Function to obtain the statistical values of each of the brain rhythms.

def get_stadistical_values(channel, data):
    # Sampling rate
    fs = 256

    # Get real amplitudes of FFT (only in postive frequencies)
    fft_vals = np.absolute(np.fft.rfft(data[channel]))

    # Get frequencies for amplitudes in Hz
    fft_freq = np.fft.rfftfreq(len(data[channel]), 1.0/fs)

    # Define EEG bands
    eeg_bands = {
                'Theta': (5, 8),
                'Alpha': (8, 12),
                'Beta': (12, 30),
                'Gamma': (30, 60)}

    # Define statistical operations

    eeg_band_fft_mean = dict()

    eeg_band_fft_variance = dict()

    eeg_band_fft_deviation = dict()

    eeg_band_fft_max = dict()

    eeg_band_fft_sum = dict()

    eeg_band_fft_median = dict()


    for band in eeg_bands: 

        freq_ix = np.where((fft_freq >= eeg_bands[band][0]) & 
                        (fft_freq <= eeg_bands[band][1]))[0]
        
        eeg_band_fft_mean[channel+band+"_Mean"] = np.mean(fft_vals[freq_ix])
        eeg_band_fft_variance[channel+band+"_variance"] = np.var(fft_vals[freq_ix])
        eeg_band_fft_deviation[channel+band+"_deviation"] = np.std(fft_vals[freq_ix])
        eeg_band_fft_max[channel+band+"_max"] = np.max(fft_vals[freq_ix])
        eeg_band_fft_sum[channel+band+"_summatory"] = np.sum(fft_vals[freq_ix])
        eeg_band_fft_median[channel+band+"_median"] = np.median(fft_vals[freq_ix])
    

    dfReturned = pd.DataFrame()

    dfReturned = dfReturned.append(pd.DataFrame.from_dict(eeg_band_fft_mean, orient='index'))
    dfReturned = dfReturned.append(pd.DataFrame.from_dict(eeg_band_fft_variance, orient='index'))
    dfReturned = dfReturned.append(pd.DataFrame.from_dict(eeg_band_fft_deviation, orient='index'))
    dfReturned = dfReturned.append(pd.DataFrame.from_dict(eeg_band_fft_max, orient='index'))
    dfReturned = dfReturned.append(pd.DataFrame.from_dict(eeg_band_fft_sum, orient='index'))
    dfReturned = dfReturned.append(pd.DataFrame.from_dict(eeg_band_fft_median, orient='index'))


    dfReturned = dfReturned.transpose()

    return dfReturned


In [None]:
# Function to obtain the entropy values of the signal

import antropy as ant 

def get_entropy_values(channel, data):
    
    dictValues = dict()

    dictValues["Perm_Entropy"+channel] = ant.perm_entropy(data[channel], normalize=True)
    dictValues["Spectral_Entropy"+channel] = ant.spectral_entropy(data[channel], sf=256, method='welch', normalize=True)
    dictValues["SVD_Entropy"+channel] = ant.svd_entropy(data[channel], normalize=True)
    dictValues["APP_Entropy"+channel] = ant.app_entropy(data[channel])
    dictValues["Sample_Entropy"+channel] =  ant.sample_entropy(data[channel])
    dictValues["Hjorth_Mobility_Entropy"+channel] =  ant.hjorth_params(data[channel])[0]
    dictValues["Hjorth_Complexity_Entropy"+channel] =  ant.hjorth_params(data[channel])[1]
    dictValues["ZeroCrossings"+channel] =  ant.num_zerocross(data[channel])

    #Fractal dimension

    dictValues["Petrosian" + channel] =  ant.petrosian_fd(data[channel])
    dictValues["Katz"+channel] =  ant.katz_fd(data[channel])
    dictValues["Higuchi"+channel] =  ant.higuchi_fd(data[channel])
    dictValues["Detrended"+channel] =  ant.detrended_fluctuation(data[channel])

    dfReturned = pd.DataFrame()
    dfReturned = dfReturned.append(pd.DataFrame.from_dict(dictValues, orient='index'))

    dfReturned = dfReturned.transpose()

    return dfReturned


In [None]:
# Function to concatenate statistical values with entropy values.

def aply_all_channels(workDF):

    channels = ["Fp1","Fp2","F1","F2","F5","F6","O1","O2"]

    allData = pd.DataFrame()

    #Bucle para separar en epochs de 4 segundos

    for i in range(0, workDF.shape[0]):
        
        if ((i+1024) > workDF.shape[0]):
            break

        epoch = workDF.copy().iloc[i:i+1024]
        allChanels = pd.DataFrame()

        for channel in channels:
            aux = get_stadistical_values(channel, epoch)
            aux2 = get_entropy_values(channel, epoch)

            allChanels = pd.concat([allChanels, aux, aux2], axis=1)

        allData = pd.concat([allChanels, allData], axis=0)


    
    return allData

In [None]:
# In case you want to perform this feature extraction from several datasets simultaneously, you can use the following code.

%%time 
from multiprocessing import Pool 


dataset1,dataset2,dataset3,dataset4 = Pool().map(aply_all_channels, [dataset1.copy(), dataset2.copy(),dataset3.copy(),dataset4.copy() ])

In [None]:
# Export the datasets to CSV for the following steps

dataset1.to_csv("dataset1.csv",index=False)
dataset2.to_csv("dataset2.csv",index=False)
dataset3.to_csv("dataset3.csv",index=False)
dataset4.to_csv("dataset4.csv",index=False)