In [3]:
import glob
import random
import h5py
import numpy as np

Find data files recursively from root folder.

In [4]:
data_path = "D:/Coding/Thesis/Data/STFT Output/**/*.h5"
data_files = glob.glob(data_path, recursive=True)

In [5]:
config = {}

config['EEG_window_length_in_samples'] = 30000

Next, we configure the output layer of the LSTM:


`delta_time_k` predicts the delta time to the next $k^{th}$ tap.

`tap_count_times` predicts the *number of taps* within the next $p$ seconds.

In [6]:
config['delta_time_k'] = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
config['tap_count_times_p'] = [0.5, 1, 5, 10, 50, 100, 500]
config['EEG_sampling_rate'] = 1000

config['tap_count_times_in_samples'] = np.multiply(config['tap_count_times_p'], config['EEG_sampling_rate'])

The ParticipantData class contains and acts on participant data. It knows about the structure of the h5 files and can return random windows of EEG activity and taps.

In [None]:
class ParticipantData:
    def __init__(self, data_files):
        self.data_file_paths = data_files
        
        self.open_h5_files()
        
        self._generate_group_idx()
    
    
    def _generate_group_idx(self):
        self.sessions = [list(participant.keys()) for participant in self.data_files_open]

        #windows = [list(participant[session].keys()) for participant in data_files_open for session in list(participant.keys())]
        self.windows = []
        for participant in self.data_files_open:
            for session in list(participant.keys()):
                self.windows.append(list(participant[session].keys()))
    
    
    def open_h5_files(self):
        self.data_files_open = [h5py.File(f, 'r') for f in self.data_file_paths]
    
    
    def close_h5_files(self):
        for f in self.data_files_open:
            f.close()
        
        self.data_files_open = []
    
    
    def get_random_EEG_window(self, window_size):
        ppt = random.choice(self.data_files_open)        
        session = random.choice(list(ppt.keys()))
        activity_window = random.choice(list(ppt[session].keys()))
        
        window_start_idx = random.randrange(ppt[session][activity_window]['stft'].shape[0] - window_size)        
        window_idx = range(window_start_idx, window_start_idx + window_size)
        
        input_data = np.array(ppt[session][activity_window]['stft'][window_idx, :, :][0])        
        output_data = self.get_taps_in_window(ppt[session][activity_window]['taps'], window_idx)
                
        return(input_data, output_data)
    
    
    def get_taps_in_window(self, taps, window_idx):      
        tap_deltas = self.get_delta_taps(taps, window_idx)
        future_tap_n = self.get_n_future_taps(taps, window_idx)
        
        result = np.concatenate(tap_deltas, future_tap_n)

        return(result)
    
    def get_delta_taps(self, taps, window_idx):
        tap_deltas = np.zeros([len(config['delta_time_k']), len(window_idx)])
        #print(taps[0])
        
        for k_idx, k in enumerate(config['delta_time_k']):
            for sample_idx, sample_ in enumerate(window_idx):
                _next_kth_tap = next((tap for tap in taps[0] if tap > sample_), None)
                _delta = _next_kth_tap - sample_
                tap_deltas[k_idx, sample_idx] = _delta
        
        return(tap_deltas)
            
    
    def get_n_future_taps(self, taps, window_idx):
        tap_counts = np.zeros([len(config['delta_time_k']), window_idx.shape[0]])
        
        for p_idx, p in enumerate(config['tap_count_times_in_samples']):
            for sample_idx, sample_ in enumerate(window_idx):
                _future_taps = np.logical_and(tap > sample_, tap <= sample_ + p)
                _n_future_taps = np.sum(_future_taps)
                
                tap_counts[p_idx, sample_idx] = _n_future_taps
        
        return(tap_counts)

In [None]:
ppt_data = ParticipantData(data_files)

In [49]:
input_layer, output_layer = ppt_data.get_random_EEG_window(config['EEG_window_length_in_samples'])

print(input_layer.shape)
print(output_layer.shape)

AttributeError: 'range' object has no attribute 'shape'

Assign real and imaginary part of stft output separately to new numpy array.

In [45]:
f = h5py.File(data_files[0], 'r')

list(f['12_02_11_04_19']['window_1'].keys())

print(f['12_02_11_04_19']['window_1']['taps'][:, -1][0])
print(f['12_02_11_04_19']['window_1']['stft'].shape[0])

3112552
3142553


Close h5 files again.

In [50]:
ppt_data.close_h5_files()