In [1]:
import os 

path = r"C:\Users\Alonso\Desktop\Magister\final_project\repository\LibEER\LibEER"

os.chdir(path)

# Test data load

## a) Synthetic data

In [2]:
import numpy as np

# Números aleatorios entre 0 y 1
d1 = np.random.rand(105, 32, 3500)
d2 = np.random.rand(93, 32, 3500)

# Unificamos los dos conjuntos en una lista
unified_data = [[d1, d2]]

#labels
l1 = np.random.randint(0, 2, (105, 8))
l2 = np.random.randint(0, 2, (93, 8))

label = [[l1, l2]]

subject = 0
print(len(unified_data), len(unified_data[0]), np.array(unified_data[0][subject]).shape)
print(len(label), len(label[0]), np.array(label[0][subject]).shape)

1 2 (105, 32, 3500)
1 2 (105, 8)


## b) Functions

In [33]:
from scipy import signal
from  scipy.signal.windows import hann
import numpy as np
import math
from scipy.fftpack import fft,ifft

def feature_extraction(data, sample_rate, extract_bands, time_window, overlap, feature_type):
    """
    input: information processed after bandpass filter
    output:
    input shape -> data:  (session, subject, trail, channel, band, filter_data)
    output shape -> data:  (session, subject, trail, sample, channel, band, band_feature)
    """
    isLds = False
    if feature_type.endswith("_lds"):
        isLds = True
        feature_type = feature_type[:-4]
    fe = {
        #'psd': psd_extraction,
        'de': de_extraction,
        'de_fourier': de_extraction_fourier,
        #'de_reduced': de_reduced_extraction
    }[feature_type]
    feature_data = []
    for ses_i, ses_data in enumerate(data):
        ses_fe = []
        for sub_i, sub_data in enumerate(ses_data):
            sub_fe = []
            for trail_i, trail_data in enumerate(sub_data):
                sub_fe_data = fe(trail_data, sample_rate, extract_bands, time_window, overlap)
                if isLds:
                    sub_fe_data = lds(sub_fe_data)
                sub_fe.append(sub_fe_data)
            ses_fe.append(sub_fe)
        feature_data.append(ses_fe)
    return feature_data

def de_extraction(data, sample_rate, extract_bands, time_window, overlap):
    """
    DE feature extraction
    :param data: original eeg data, input shape: (channel, filter_data)
    :param sample_rate: sample rate of eeg signal
    :param extract_bands: the frequency bands that needs to be extracted
    :param time_window: time window of one extract part
    :param overlap: overlap
    :return: de feature need to be computed
    """
    if extract_bands is None:
        extract_bands = [[0.5, 4], [4, 8], [8, 14], [14, 30], [30, 50]]
    nyq = 0.5 * sample_rate
    noverlap = int(overlap * sample_rate)
    window_size = int(time_window * sample_rate)
    if noverlap != 0:
        sample_num = (data.shape[1] - window_size) // (window_size - noverlap)
    else:
        sample_num = (data.shape[1]) // window_size
    de_data = np.zeros((sample_num, data.shape[0], len(extract_bands)))
    for b_idx, band in enumerate(extract_bands):
        b, a = signal.butter(3, [band[0]/nyq, band[1]/nyq], 'bandpass')
        band_data = signal.filtfilt(b, a, data)
        t = 0
        for i in range(sample_num):
            # Old
            de_data[i,:,b_idx] = 1 / 2 * np.log2(2 * np.pi * np.e * np.var(band_data[:,t:t+window_size], axis=1, ddof=1))
            t += window_size-noverlap
    return de_data

def lds(data):
    """
    Process data using a linear dynamic system approach.

    :param data: Input data array with shape (time, channel, feature)
    :return: Transformed data with shape (time, channel, feature)
    """
    [num_t, num_channel, num_feature] = data.shape
    # Flatten the channel and feature dimensions
    data = data.reshape((data.shape[0], -1))

    # Initial parameters
    prior_correlation = 0.01
    transition_matrix = 1
    noise_correlation = 0.0001
    observation_matrix = 1
    observation_correlation = 1

    # Calculate the mean for initialization
    mean = np.mean(data, axis=0)
    data = data.T  # Transpose for easier manipulation of time dimension

    num_features, num_samples = data.shape
    P = np.zeros(data.shape)
    U = np.zeros(data.shape)
    K = np.zeros(data.shape)
    V = np.zeros(data.shape)

    # Initial Kalman filter setup
    K[:, 0] = prior_correlation * observation_matrix / (
                observation_matrix * prior_correlation * observation_matrix + observation_correlation) * np.ones(
        (num_features,))
    U[:, 0] = mean + K[:, 0] * (data[:, 0] - observation_matrix * prior_correlation)
    V[:, 0] = (np.ones((num_features,)) - K[:, 0] * observation_matrix) * prior_correlation

    # Apply the Kalman filter over time
    for i in range(1, num_samples):
        P[:, i - 1] = transition_matrix * V[:, i - 1] * transition_matrix + noise_correlation
        K[:, i] = P[:, i - 1] * observation_matrix / (
                    observation_matrix * P[:, i - 1] * observation_matrix + observation_correlation)
        U[:, i] = transition_matrix * U[:, i - 1] + K[:, i] * (
                    data[:, i] - observation_matrix * transition_matrix * U[:, i - 1])
        V[:, i] = (1 - K[:, i] * observation_matrix) * P[:, i - 1]

    # Return the processed data, reshaping it to match the original input shape
    return U.T.reshape((num_t, num_channel, num_feature))

def segment_data_fourier(data, time_window, sample_rate, overlap):

    window_len = int(time_window * sample_rate)

    # Step 1: get the total number of samples
    total_samples = data.shape[1]

    # Step 2: calculate how many extra points don't fit into 200-sample blocks
    extra = total_samples % window_len

    # Step 3: trim the first `extra` samples
    data_trimmed = data[:, extra:]  # discard from the beginning

    # Step 4: reshape into blocks of shape (62, 200, num_blocks)
    num_blocks = data_trimmed.shape[1] // window_len
    segmented_data = data_trimmed.reshape(data.shape[0], num_blocks, window_len)

    # (Optional) Reorder to (num_blocks, 62, 200) for convenience
    segmented_data = np.transpose(segmented_data, (1, 0, 2))

    return segmented_data

def de_extraction_fourier(data_trial, sample_rate, extract_bands,  time_window, overlap):
    '''
    compute DE
    --------
    input:  data [n*m]          n electrodes, m time points
            stft_para.stftn     frequency domain sampling rate
            stft_para.fStart    start frequency of each frequency band
            stft_para.fEnd      end frequency of each frequency band
            stft_para.window    window length of each sample point(seconds)
            stft_para.fs        original frequency
    output: DE [n*l*k]        n electrodes, l windows, k frequency bands
    '''
    #initialize the parameters
    fStart, fEnd = map(list, zip(*extract_bands))
    # STFTN is the same 
    STFTN = sample_rate


    #Convert frecuency from Hz to positions of a vector
    fStartNum =(np.array(fStart) / sample_rate * STFTN).astype(int)
    fEndNum = (np.array(fEnd)/sample_rate*STFTN).astype(int)

    #Hanning window
    Hlength=time_window*sample_rate
    Hwindow=hann(Hlength)
    #Hwindow= np.array([0.5 - 0.5 * np.cos(2 * np.pi * n / (Hlength+1)) for n in range(1,Hlength+1)])
    de_trial = []
    #Segment data and compute DE for each segment
    data_trial = segment_data_fourier(np.array(data_trial), time_window, sample_rate, overlap)
    for data in data_trial:
        #Compute differential entropy
        n=data.shape[0]
        #Compute fft
        Hdata = data*Hwindow
        FFTdata=fft(Hdata,STFTN)
        magFFTdata=abs(FFTdata[:, 0:int(STFTN/2)])
        #Compute energy and de per frecuancy band
        de = np.zeros([n,len(fStart)])
        for i, (start, end) in enumerate(zip(fStartNum, fEndNum)):
            band = magFFTdata[:, start:end]  # (n_channels, ancho de banda)
            E = np.sum(band**2, axis=1) / (end - start + 1)  # (n_channels,)
            de[:, i] = np.log2(100 * E)
        de_trial.append(de)

    return np.array(de_trial)


## Old

In [23]:
from data_utils.preprocess import baseline_removal, bandpass_filter, segment_data

data = unified_data

# a 1-second non-overlapping preprocess window to extract de_lds features on specified extract bands
data = feature_extraction(data, 250, extract_bands=[[0.5,4],[4,8],[8,14],[14,30],[30,50]] , time_window=1, overlap=0, feature_type="de_lds")
# sliding window with a size of 1 and  a step size of 1 to segment the samples.
data, feature_dim = segment_data(data, sample_length=1, stride=1)
# data format: (session, subject, trail, sample(min*chann*bands))

105
14
32


In [24]:
np.array(data[0][0]).shape

(105, 14, 32, 5)

## New

In [None]:
from data_utils.preprocess import baseline_removal, bandpass_filter, segment_data

data = unified_data

# a 1-second non-overlapping preprocess window to extract de_lds features on specified extract bands
data = feature_extraction(data, 250, extract_bands=[[1, 4],[4,8],[8,14],[14,30],[30,50]] , time_window=1, overlap=0, feature_type="de_fourier_lds")
# sliding window with a size of 1 and  a step size of 1 to segment the samples.
data, feature_dim = segment_data(data, sample_length=1, stride=1)
# data format: (session, subject, trail, sample(min*chann*bands))

  from pandas.core import (


105
14
32


In [35]:
np.array(data[0][0]).shape

(105, 14, 32, 5)