2024/07/18

# OpenBCI Python版GUI
[OpenBCI](https://openbci.com/) 用のPython製GUIです．
脳波生データをリアルタイムにNumPy配列として取得でき，シームレスに解析できるほか，改造して刺激呈示機能を付ければ厳密に時間管理もできます(工夫すれば1000FPSは出ます)．

💻使い方
- `pip install -U setuptools pyserial numpy scipy brainflow pygame`
- `Board()` の `port` 引数だけ変え，上から実行するだけ．
- 脳波時系列データは `board.data_tot` にNumPy配列として保存・更新され，GUI実行中も実行後も好きなタイミングでアクセスし，解析できます．

In [1]:
import brainflow    # importに数分かかるかも
import time, os
import pygame as pg
import numpy as np
from scipy import signal

colors = np.array([(0,0,255), (0,255,255), (0,255,0), (255,255,0), (255,0,0), (255,0,255), (255,255,255), (127,127,127)])

pygame 2.5.2 (SDL 2.28.3, Python 3.12.3)
Hello from the pygame community. https://www.pygame.org/contribute.html


In [9]:
def normalize(X, sfreq=250, fmin=1, fmax=45):
    X = signal.lfilter(*signal.iirnotch(50, 4, sfreq), X, axis=1)
    X = signal.lfilter(*signal.butter(5, np.array([45, 70])/(sfreq/2), btype="bandstop"), X, axis=1)
    X = signal.lfilter(*signal.butter(5, np.array([fmin, fmax])/(sfreq/2), btype="bandpass"), X, axis=1)
    return X

class Board(brainflow.BoardShim):
    def __init__(self, port="COM3", sfreq=250):
        params = brainflow.BrainFlowInputParams()
        params.serial_port = port
        super().__init__(brainflow.BoardIds.CYTON_BOARD, params)

        self.prepare_session()
        for ch in range(1, 9):
            self.config_board(f"x{ch}030110X")
            print(f"configuring ({ch}/8)", end="\r")
        print("\nready")
        self.start_stream()
        
        self.data_tot = np.zeros((8, 1000))
        self.sfreq = sfreq

    def get_data(self, trange=3):
        N = trange * self.sfreq
        data_new = self.get_board_data()[1:9]
        self.data_tot = np.concatenate([self.data_tot, data_new], 1)
        X = normalize(self.data_tot[:, -2*N:], fmin=5)[:, -N:]
        return X

    def switch_stream(self):
        if self.is_streaming:
            self.stop_stream()
        else:
            self.start_stream()

    def start_stream(self, num_samples: int = 1800 * 250, streamer_params: str = None) -> None:
        self.is_streaming = True
        return super().start_stream(num_samples, streamer_params)

    def stop_stream(self) -> None:
        self.is_streaming = False
        return super().stop_stream()

class DummyBoard:
    def __init__(self, port="COM3"):
        self.data_tot = np.zeros((8, 1000))
        self.sfreq = 250
        self.is_streaming = True
        self.t0 = time.perf_counter()

    def get_data(self, trange=3, fmin=3):
        N_new = int(250 * (time.perf_counter() - self.t0)) - (self.data_tot.shape[1] - 1000)
        data_new = np.random.randn(8, max(0, N_new)) * 50

        if self.is_streaming:
            self.data_tot = np.concatenate([self.data_tot, data_new], 1)
        N = trange * self.sfreq
        X = normalize(self.data_tot[:, -2*N:], fmin=fmin)[:, -N:]
        # X = self.data_tot[:, -N:]
        return X

    def switch_stream(self):
        if self.is_streaming:
            self.stop_stream()
        else:
            self.start_stream()

    def start_stream(self, num_samples: int = 1800 * 250, streamer_params: str = None) -> None:
        self.is_streaming = True

    def stop_stream(self) -> None:
        self.is_streaming = False

    def release_session(self) -> None:
        return

class BCIRenderer:
    def __init__(self, board=None, trange=3, fmax=60):
        self.win: pg.Surface = None
        self.font: pg.font.Font = None
        self.board: DummyBoard|brainflow.BoardShim = board
        self.electrode_pos = [(-1.0,3.6), (1.0,3.6), (-1.7,0.0), (1.7,0.0), (-2.9,-2.3), (2.9,-2.3), (-1.0,-3.5), (1.0,-3.5)]
        self.trange = trange
        self.fmax = fmax

    def draw_waves(self, data: np.ndarray, left=0, top=0, right=640, bottom=480, trange=3, maxuV=100):
        W = right - left
        H = bottom - top
        h = H // 16 # 振幅の半分のピクセル数
        x = np.linspace(left, right, data.shape[1])
        Y = h + data.clip(-maxuV, maxuV) * h / maxuV
        for i, y in enumerate(Y):
            topi = top + 2 * h * i
            pg.draw.lines(self.win, colors[i], False, np.c_[x, y+topi])
            pg.draw.line(self.win, "white", (left, topi), (right, topi))
        pg.draw.line(self.win, "white", (left, bottom), (right, bottom))

        for t, x in enumerate(np.linspace(right, left, trange+1).astype(int)):
            pg.draw.line(self.win, "gray", (x, top), (x, bottom))
            self.win.blit(self.font.render(f" -{t}", True, "white"), (x, bottom-20))

    def draw_spec(self, data: np.ndarray, left=0, top=0, right=640, bottom=480, fmax=60, lim=100):
        freq, Vs = signal.welch(data, self.board.sfreq, scaling="spectrum")
        As_log = np.log(Vs[:, :fmax+3] * data.shape[1] * 1e-3)
        Y = (top - bottom) * As_log.clip(0, lim) / lim + bottom
        x = np.linspace(left, right, Y.shape[1])
        for i, y in enumerate(Y):
            pg.draw.lines(self.win, colors[i], False, np.c_[x, y])
        for freq in range(0, fmax+1, 10):
            x = left + (right-left) * freq / fmax
            pg.draw.line(self.win, "gray", (x, top), (x, bottom))
            self.win.blit(self.font.render(f"{freq}", True, "white"), (x, bottom-20))
        pg.draw.line(self.win, "gray", (left, top), (right, top))
        pg.draw.line(self.win, "gray", (left, bottom), (right, bottom))

    def draw_network(self, data: np.ndarray, left=0, top=0, right=100, bottom=100):
        A = np.nan_to_num(np.corrcoef(data), 0)
        power = abs(A).mean(0)
        # A = ((1 + np.corrcoef(data)) * (255 / 2)).astype(int)
        wzoom, hzoom, cx, cy = (right-left)/10, (bottom-top)/10, (right+left)/2, (bottom+top)/2
        pos = [(cx + wzoom*self.electrode_pos[i][0], cy - hzoom*self.electrode_pos[i][1]) for i in range(8)]
        for i in range(8):
            for j in range(i+1, 8):
                c = (colors[i] + colors[j]) / 2
                pg.draw.line(self.win, c, pos[i], pos[j], int(wzoom/2*A[i, j]+1))
            pg.draw.circle(self.win, colors[i], pos[i], int(wzoom*power[i]/2+1))
            self.win.blit(self.font.render(f"{power[i]:.2f}", True, "white"), pos[i])

    def draw(self, win: pg.Surface, font: pg.font.Font, clock: pg.time.Clock, downkeys: list):
        self.win = win
        self.font = font
        winw, winh = pg.display.get_window_size()
        self.section_data = self.board.get_data(self.trange)
        self.draw_waves(self.section_data, 0, winh//2, winw, winh, self.trange)
        self.draw_spec(self.section_data, 0, 0, winw*4//6, winh//2, self.fmax, lim=5)
        self.draw_network(self.section_data, winw*4//6, 0, winw, winh//2)
        if pg.K_SPACE in downkeys:
            self.board.switch_stream()
        self.win.blit(self.font.render(f"{clock.get_fps():4.0f}", True, "white"), (0, 0))

class GUI:
    def __init__(self, bci: BCIRenderer):
        self.renderers = [bci]

    def mainloop(self, win_wh=(1280, 720)):
        pg.init()
        self.win = pg.display.set_mode(win_wh)
        self.clock = pg.time.Clock()
        self.font = pg.font.SysFont("Times New Roman" if os.name=="nt" else "Noto Sans Mono", 16)

        self.done = False
        for frame_count in range(999999):
            self.win.fill((0, 0, 0))
            self.downkeys = []
            for event in pg.event.get():
                if event.type == pg.QUIT:
                    self.done = True
                elif event.type == pg.KEYDOWN:
                    self.downkeys.append(event.key)
                    if event.key == pg.K_ESCAPE:
                        self.done = True
            for renderer in self.renderers:
                renderer.draw(self.win, self.font, self.clock, self.downkeys)
            self.clock.tick()
            pg.display.update()
            if self.done:
                break
        pg.quit()

In [11]:
# # USBドングル挿してるポートの名前を調べる
from serial.tools.list_ports import comports
for p in comports():
    if "USB" in p.name or "COM" in p.name:
        print(p.device)

In [3]:
board = Board("COM3")   # USBドングル挿してるポートを指定．Ubuntuなら port="/dev/ttyUSB0" など

configuring (8/8)
ready


In [10]:
bci = BCIRenderer(board, 6, 80)
gui = GUI(bci)
gui.mainloop()

In [11]:
board.release_session()