In [1]:
import os
import time
import numpy as np
from pathlib import Path
from matplotlib import pyplot as plt
from time import perf_counter
from IPython.display import clear_output
from pathlib import Path

from brainflow.board_shim import BoardShim, BrainFlowInputParams, BoardIds

from tqdm.notebook import tqdm
import datetime

from config.default import *

%matplotlib qt


%load_ext autoreload
%autoreload 2

from utils.svm import DCFilter
from utils.visualize import VisulaizeCell, showMe

from utils.record import getRange, stat, preProcess, GenerateOrder

C:\Users\Nemes\anaconda3\envs\tf\lib\site-packages\numpy\.libs\libopenblas.PYQHXLVVQ7VESDPUVUADXEVJOBGHJPAY.gfortran-win_amd64.dll
C:\Users\Nemes\anaconda3\envs\tf\lib\site-packages\numpy\.libs\libopenblas.WCDJNK7YVMPZQ2ME2ZZHJJRJ3JIKNDB7.gfortran-win_amd64.dll
C:\Users\Nemes\anaconda3\envs\tf\lib\site-packages\numpy\.libs\libopenblas.XWYDX2IKJW2NMTWSFYNGFUWKQU3LYTCZ.gfortran-win_amd64.dll
  stacklevel=1)


In [2]:
def visualize(data):
    plt.ion()
    fig = plt.gcf()
    fig.canvas.manager.window.raise_()
    mngr = plt.get_current_fig_manager()
    mngr.window.setGeometry(settings.window_position) #Check if array is acceptable
    
  

    plt.ylim(-1, 1)
    for i, c in enumerate(data):
        plt.plot(c)
    plt.draw()    
    plt.pause(0.000000001)
    fig.canvas.flush_events()

   
    plt.show()
    plt.cla()

   

PRACTICE

In [None]:
def Practice(n_samples_per_class = 1):
    classes = settings['classes']
    tasks = GenerateOrder(len(classes),n_samples_per_class)
  
    print("Setting up...")
    time.sleep(2)    
    for i, task in enumerate(tasks):
        clear_output()
        print("Stand By! ({}/{})".format(i+1,len(tasks)))
        time.sleep(0.5)
        print("Perform |{:^10s}|".format(classes[tasks[i]]))
        time.sleep(2.5)
        
Practice(n_samples_per_class=5)

In [3]:
session = 4
subject = 'S102'
#es_dir = 'C:/resources/EMG/'+datetime.datetime.now().strftime("%m_%d")+'/session_'+str(session)+'/'
res_dir = os.path.join('C:/', 'resources','EMG',subject,'session_'+str(session))+'/'
print(f'Creating directory {res_dir}...')
Path(res_dir).mkdir(parents=True, exist_ok=False )


Creating directory C:/resources\EMG\S102\session_4/...


In [4]:
####      INIT BOARD        #######
BoardShim.enable_dev_board_logger()
params = BrainFlowInputParams()
board = BoardShim(BoardIds.MINDROVE_WIFI_BOARD, params)

try:
    board.stop_stream()
    board.release_session()
except:
    ...
    
board.prepare_session()
sample_rate = board.get_sampling_rate(16)
print("Device ready (sampling rate: {}hz)".format(sample_rate))

Device ready (sampling rate: 500hz)


In [5]:
length_of_signal = 1 #seconds
num_points = int(sample_rate * length_of_signal)
record_length = int(sample_rate * length_of_signal*2)
print(f'Record length: {record_length} time points--> {record_length/sample_rate} seconds')



Record length: 1000 time points--> 2.0 seconds


In [6]:
def remove2channel(data):
    mins = []
    for i, channel in enumerate(data):
        mins.append(np.min(channel))
    data = np.delete(data, np.argmin(mins), 0)

    maxs = []
    for i, channel in enumerate(data):
        maxs.append(np.max(channel))
    data = np.delete(data, np.argmax(maxs), 0)
    
    return data

def clip(data, clip_value=2000):
    data = np.clip(data, -clip_value, clip_value)
    data /= clip_value
    return data

RECORD

In [7]:
def CollectData(n_samples_per_class = 1):
    classes = settings['classes']
    results = [[] for i in range(len(classes))]
    tasks = GenerateOrder(len(classes),n_samples_per_class)
  
    board.start_stream(450000) 
    print("Setting up...")
    time.sleep(1)
    data = DCFilter(board.get_current_board_data(500))[:6,-100:]
    _, _, std = stat(data)
    if std == 0:
        print("[ERROR] No data collected!")
        return
    print("Device ok!")
    time.sleep(7)          # wait for the board to settle
    for i, task in enumerate(tasks):
        clear_output()

        #####################################################
        time.sleep(1)
        print("Stand By! ({}/{})".format(i+1,len(tasks)))
        time.sleep(1)
        print("Perform |{:^10s}|".format(classes[tasks[i]]))
        time.sleep(2)
        #####################################################

        data = board.get_current_board_data(sample_rate*10) # 10 seconds
        data = DCFilter(data)
        data = data[:6,-record_length:] #keep the data of the eeg channels only, and remove data over the trial length
        results[task].append(data)
        
        _, _, std = stat(data)
        if std == 0:
            print("[ERROR] No data collected!")
            break
        
        data = remove2channel(data)
        data = clip(data)
        visualize(data)


      
        
        
    return results, classes
        
results, classes = CollectData(n_samples_per_class=25)

Stand By! (100/100)
Perform |   Chew   |


In [8]:
#Save results to file
i = 0
for result in tqdm(results):
    print(res_dir)
    #Path(res_dir).mkdir( parents=True, exist_ok=True )

    result= np.asarray(result)
    print(result.shape)
    np.save(res_dir+settings['classes'][i]+"_2s",result)
    i+=1

  0%|          | 0/4 [00:00<?, ?it/s]

C:/resources\EMG\S102\session_4/
(25, 6, 1000)
C:/resources\EMG\S102\session_4/
(25, 6, 1000)
C:/resources\EMG\S102\session_4/
(25, 6, 1000)
C:/resources\EMG\S102\session_4/
(25, 6, 1000)
