In [17]:
import os
import glob
#
import numpy as np
import pandas as pd

import torch
from torch.utils.data import Dataset, DataLoader
from torchvision import transforms, utils
import random

In [18]:
class CustomEEGDataset(Dataset):
    def __init__(self, loc_df , root_dir , transform = None, multi = False):
        # self.loc_df = pd.read_csv(csv_file)
        self.loc_df = loc_df
        self.transform = transform
        self.root_dir = root_dir
        self.multi = multi
    def __len__(self,):
        return len(self.loc_df)
        
    def __getitem__(self, idx):
        if torch.is_tensor(idx):
            idx = idx.tolist()

        batch = []   
        for i in idx:
            eeg_file = os.path.join(self.root_dir,
                                self.loc_df.iloc[i, 3])
            eeg = torch.from_numpy(torch.load(eeg_file)) # [0][0]

            if self.multi:
                eeg= eeg.unsqueeze(0)
            batch.append(eeg) 
        bat = torch.vstack(batch)
            
        if self.transform is not None:
            bat = self.transform(bat)

        return bat



In [5]:
class DFSpliter():
    def __init__(self, train_size= 0.8, val_size = 0.1, test_size = 0.1,) -> None:
        self.train_size = train_size
        self.val_size = val_size
        self.test_size = test_size

    def forward(self, csv_file):
        pass



In [None]:
#Augmentation set
'''
Listas:
-Negation
-Time-Shift
-Gaussian-Noise
-Masking
-DC voltage
-Amplitude
Por implementar:
-Temporal Invertion
-Permutation
-Time-wraping

'''


In [89]:
def Masking(channel: np.array, window: int= 150):
    '''
    Set to zero 
    Input:  -channel = Numpy array
            -window = Number of samples to set to zero
    Output: Numpy array masked
    '''
    channel_size = len(channel)
    first = np.random.randint(0,channel_size- window)
    channel[first:first+window] = 0

    return channel

def DCVoltage(channel : np.array, max_magnitude: float = 0.5):
    ''' 
    Add a DC component between [-max_mangitude, max_magnitude]
    Input:  -channel = Numpy array
            -max_magnitude = max value to be added
    Output: Numpy array 
    '''
    dc_comp = (np.random.random(1)*2 - 1)*max_magnitude
    dispaced_channel = channel + dc_comp
    return dispaced_channel    

def GaussianNoise(channel: np.array, std: float = 0.2):
    '''
    Add Gaussian Noise with zero mean and std deviation
    Input:  -channel = Numpy array
            -std = Gaussian std
    Output: Channel with additive gaussian noise added
    '''
    channel_size = len(channel)
    noise = np.random.normal(loc = 0, scale= std, size= channel_size)
    noisy_channel = channel + noise
    return noisy_channel

def Time_Shift(channel: np.array, min_shift: int = 0, max_shift: int = 50):
    ''' 
    Shifts the channel n samples between min_shift and max_shift using reflection pad
    Input:  -channel = Numpy array
            -min_shift = Min number of samples to shift
            -max_shhift = Max number of samples to shift  
    Output: Shifted channel
    '''
    n_shift = np.random.randint(min_shift,max_shift)
    channel_size = len(channel)
    padded_array = np.pad(channel,pad_width= n_shift, mode = "reflect")
    right_left = np.random.choice((0,2))
    shifted_array = padded_array[n_shift*right_left:channel_size + n_shift*right_left]
    return shifted_array
def Amplitude(channel :np.array, max_amplitude: float = 1.5):
    '''
    Modifies the ampliude of the channel values between [1+max_amplitude,1-max_amplitude]
    Input:  -channel = Numpy array
            -max_amplitude = Max aplitude to add
    Output: Boosted channel
    '''
    amplitude = 1 + ((np.random.random(1)*2 -1) * max_amplitude)
    boosted_channel = channel*amplitude
    return boosted_channel

def Permutation(channel: np.array, win_samples: int = 4):
    '''
    Permutates the arrays by secuences of win_samples len
    Ensure its divisible by the total len of the array or the len of the output secuence will be wrong
    Input:  -channel = Numpu array
            -win_samples = Number of samples per secuences (N_sec = len(channel)// win_samples)
    Output: Permutated secuence
    '''

    n_seqs = len(channel)// win_samples
    random_idx = np.random.choice(np.arange(0,n_seqs, 1), n_seqs, replace=False ) 
    permutated = np.concatenate([channel[win_samples*i: win_samples*(i+1)] for i in random_idx])
    return permutated
def Temporal_Invertion(channel: np.array):
    ''' 
    Return the array reversed
    Input:  -channel = Numpy array
    Output: Reversed array
    '''
    reversed = channel[::-1]
    return reversed

def Negation(channel: np.array):
    '''
    Inverts the full array
    Input: -channel = Numpy array
    Output: Inverted array
    '''
    negated = channel * (-1)
    return negated


In [1]:
class Augmentations():
    def __init__(self, n_aug, multi = False, transform = None) -> None:
        self.n_aug = n_aug
        self.multi = multi
        self.transform = transform
    def forward(self,batch):
        xbar_batch = []
        xhat_batch = []

        rbar_idxs = np.random.choice(np.arange(0,len(self.transform),1),size=self.n_aug, replace= False)
        rhat_idxs = np.random.choice(np.arange(0,len(self.transform),1),size=self.n_aug, replace= False)

        for channel in batch:
            pass

        

In [55]:
aug = [Time_Shift,Amplitude,Permutation,Negation,Masking]

In [1]:
from torch import nn

In [16]:
loc_df = pd.DataFrame(columns= ["Patient", "Session","N_Win", "Dir"], )

NameError: name 'pd' is not defined

In [8]:
root_path = "C:\\Users\\TheSy\\Desktop\\FinalEL7006"

In [12]:
loc_df.loc[len(loc_df)] = ["aaax","session_id",1,"LSTMData-0.001.pt"]


In [13]:
dataset = CustomEEGDataset(loc_df,root_path, multi=False)

In [105]:
test = dataset.__getitem__([1])[0][0].numpy()
