# 明暗タスクデータセット 要約


## 前処理
### Pythonによるファイル統合
心理学実験ソフトウェアのログをedfファイルに書き込む

In [None]:
import os
from labedf import csv2,edf2,set2
import numpy as np
from scipy import signal
from pyedflib import EdfReader
from utils import edf as myedf,edflab as myedflab,signals_standardization,lowpass_filter
DATASET_DIR_PATH = "./dataset/lord2/train"
file_settings = myedflab.MergeAllCsv2EdfFileSettings(DATASET_DIR_PATH + "/ペア.csv",list_encoding="ansi")
edfcsv_filenames = file_settings.get_edfcsv_filenames()
with EdfReader(f"{DATASET_DIR_PATH}/edf/{edfcsv_filenames[0,0]}") as er:
    fs = int(myedf.get_fs(er))

# %% merge csv,edf
file_settings.build_dir_path = f"{file_settings.root_path}/build/1"
filenames = myedflab.merge_all_csv2edf(file_settings,label_header_name="LorD",marker_names=["Marker"],marker_offset=None)
file_settings.build_dir_path = f"{file_settings.root_path}/build/2"
_ = myedflab.merge_all_csv2edf(file_settings,label_header_name="LorD",marker_names=["Wait"],marker_offset=None)

### MATLABで前処理
+ [コード - コミット固定](https://github.com/s-n-1-0/sist_eeg_tests/blob/64c722b010dc63870cb3341b225c3c5873d027e8/notes/dataset/lord2/preprocessing.m)  
(変更点 : エポックリジェクション前の途中経過を保存する処理を加えた、待機時脳波も前処理対象に加えた)

## データセットサイズ

In [None]:
import mne
labels = ["dark","light"]
before_wait_dataset = []
before_lord_dataset = []
after_wait_dataset = []
after_lord_dataset = []
# 刺激脳波
for filename in filenames:
    epochs = mne.io.read_epochs_eeglab(DATASET_DIR_PATH +"/snapshot/1/"+filename+".set")
    _ld = []
    for label in labels:
        _ld.append(epochs.get_data(item=f"Marker__{label}"))
    before_lord_dataset.append(_ld)
    epochs = mne.io.read_epochs_eeglab(DATASET_DIR_PATH +"/pre2/1/"+filename+".set")
    _ld = []
    for label in labels:
        _ld.append(epochs.get_data(item=f"Marker__{label}"))
    after_lord_dataset.append(_ld)

# 待機脳波
for filename in filenames:
    epochs = mne.io.read_epochs_eeglab(DATASET_DIR_PATH +"/snapshot/2/"+filename+".set")
    _wd = []
    for label in labels:
        _wd.append(epochs.get_data(item=f"Wait__{label}"))
    before_wait_dataset.append(np.concatenate(_wd)) # dark,lightにこだわりがないため結合
    epochs = mne.io.read_epochs_eeglab(DATASET_DIR_PATH +"/pre2/2/"+filename+".set")
    _wd = []
    for label in labels:
        _wd.append(epochs.get_data(item=f"Wait__{label}"))
    after_wait_dataset.append(np.concatenate(_wd))



In [None]:
import matplotlib.pyplot as plt

before_wait_count = sum([wd.shape[0] for wd in before_wait_dataset])
before_dark_count = sum([ld[0].shape[0]   for ld in before_lord_dataset])
before_light_count = sum([ld[1].shape[0]   for ld in before_lord_dataset])

after_wait_count = sum([wd.shape[0] for wd in after_wait_dataset])
after_dark_count = sum([ld[0].shape[0]   for ld in after_lord_dataset])
after_light_count = sum([ld[1].shape[0]   for ld in after_lord_dataset])


print(f"前処理前 待機画像数 : {before_wait_count}")
print(f"前処理前 刺激画像数(暗) : {before_dark_count}")
print(f"前処理前 刺激画像数(明) : {before_light_count}")
print("------")
print(f"前処理後 待機画像数 : {after_wait_count}")
print(f"前処理後 刺激画像数(暗) : {after_dark_count}")
print(f"前処理後 刺激画像数(明) : {after_light_count}")

plt.bar(["wait"] + labels,[before_wait_count,before_dark_count,before_light_count])
plt.title("Before")
plt.show()
plt.title("After")
plt.bar(["wait"] + labels,[after_wait_count,after_dark_count,after_light_count])


## 加算平均・平均・標準偏差・標準誤差・パワースペクトル(平均)

In [None]:
t_range = [i/500 - 1 for i in range(1500)]
N = 500            # サンプル数
t = 3          # サンプリング間隔
fft_range = np.linspace(0, N, N*t)  # 時間軸

def plot_all_ch(title,data:np.ndarray,is_fft:bool = False):
    fig = plt.figure(figsize=(20,8),facecolor="white")
    fig.suptitle(title)
    plt.subplots_adjust(wspace=0.4, hspace=0.6)
    for i in range(10):
        ax = fig.add_subplot(3, 4, i+1)
        ax.plot(t_range if not is_fft else fft_range,data[i,:])
        ax.set_title(f"{i + 1}ch")

    return fig
def save_all_ch(path:str,title:str,data:np.ndarray,is_fft:bool = False):
    fig = plot_all_ch(title,data,is_fft=is_fft)
    fig.savefig(path)
    plt.close(fig)
class PlotMan():
    def __init__(self,is_before:bool,dataset:list[np.ndarray],dataset_type:str) -> None:
        erp = np.array([np.sum(wd,axis=0) / wd.shape[0] for wd in dataset])
        all_erp = np.sum(erp,axis=0) /  erp.shape[0]
        sum_eeg = np.array([np.sum(wd,axis=0) for wd in dataset])
        all_sum_eeg = np.sum(sum_eeg,axis=0) /  sum_eeg.shape[0]
        std_eeg = np.array([np.std(wd,axis=0,ddof=1) / wd.shape[0] for wd in dataset])
        all_std_eeg = np.sum(std_eeg,axis=0) /  std_eeg.shape[0]
        se_eeg = np.array([se / np.sqrt(se.shape[0]) for se in std_eeg])
        all_se_eeg = np.sum(se_eeg,axis=0) /  se_eeg.shape[0]
        def get_power(x:np.ndarray):
            zs = []
            for i in range(x.shape[0]):
                z = np.zeros(tuple(x.shape[1:]))
                for j in range(x.shape[1]):
                    f = np.fft.fft(x[i,j,:])
                    amp = np.abs(f)
                    z[j,:] = amp ** 2
                zs.append(z)
            return np.array(zs)

        psd_eeg = np.array([np.sum(get_power(wd),axis=0) / wd.shape[0] for wd in dataset])
        all_psd_eeg = np.sum(psd_eeg,axis=0) /  psd_eeg.shape[0]    
        self.erp = erp
        self.all_erp = all_erp
        self.sum_eeg = sum_eeg
        self.all_sum_eeg = all_sum_eeg
        self.std_eeg = std_eeg
        self.all_std_eeg = all_std_eeg
        self.se_eeg = se_eeg
        self.all_se_eeg = all_se_eeg
        self.is_before = is_before
        self.dataset_type = dataset_type
        self.psd_eeg = psd_eeg
        self.all_psd_eeg = all_psd_eeg
    
    def plot_all(self):
        plot_all_ch(f"[{'Before' if self.is_before else 'After'}] All Mean {self.dataset_type.capitalize()} ERP",self.all_erp)
        plot_all_ch(f"[{'Before' if self.is_before else 'After'}] All Mean {self.dataset_type.capitalize()} Sum",self.all_sum_eeg)
        plot_all_ch(f"[{'Before' if self.is_before else 'After'}] All Mean {self.dataset_type.capitalize()} Std",self.all_std_eeg)
        plot_all_ch(f"[{'Before' if self.is_before else 'After'}] All Mean {self.dataset_type.capitalize()} Se",self.all_se_eeg)
        plot_all_ch(f"[{'Before' if self.is_before else 'After'}] All Mean {self.dataset_type.capitalize()} Psd",self.all_psd_eeg,is_fft=True)
    def save_ch(self):
        save_dir = f"./results/{'b' if self.is_before else 'a'}_{self.dataset_type}"
        os.makedirs(save_dir + "/erp", exist_ok=True)
        os.makedirs(save_dir + "/sum", exist_ok=True)
        os.makedirs(save_dir + "/std", exist_ok=True)
        os.makedirs(save_dir + "/se", exist_ok=True)
        os.makedirs(save_dir + "/psd", exist_ok=True)
        for i,erp in enumerate(self.erp):
            save_all_ch(f"{save_dir}/erp/{i}.jpg",f"[{'Before' if self.is_before else 'After'}] {self.dataset_type.capitalize()} ERP(File{i + 1})",erp)
        for i,erp in enumerate(self.sum_eeg):
            save_all_ch(f"{save_dir}/sum/{i}.jpg",f"[{'Before' if self.is_before else 'After'}] {self.dataset_type.capitalize()} Sum(File{i + 1})",erp)
        for i,erp in enumerate(self.std_eeg):
            save_all_ch(f"{save_dir}/std/{i}.jpg",f"[{'Before' if self.is_before else 'After'}] {self.dataset_type.capitalize()} Std(File{i + 1})",erp)
        for i,erp in enumerate(self.se_eeg):
            save_all_ch(f"{save_dir}/std/{i}.jpg",f"[{'Before' if self.is_before else 'After'}] {self.dataset_type.capitalize()} Se(File{i + 1})",erp)
        for i,erp in enumerate(self.psd_eeg):
            save_all_ch(f"{save_dir}/psd/{i}.jpg",f"[{'Before' if self.is_before else 'After'}] {self.dataset_type.capitalize()} Psd(File{i + 1})",erp,is_fft=True)
before_wait = PlotMan(True,before_wait_dataset,"wait")
before_dark = PlotMan(True,[ld[0] for ld in before_lord_dataset],"dark")
before_light = PlotMan(True,[ld[1] for ld in before_lord_dataset],"light")

after_wait =  PlotMan(False,after_wait_dataset,"wait")
after_dark = PlotMan(False,[ld[0] for ld in after_lord_dataset],"dark")
after_light = PlotMan(False,[ld[1]   for ld in after_lord_dataset],"light")

plots = [before_wait,before_dark,before_light,after_wait,after_dark,after_light]


### 全ファイルの合算について

In [None]:
for p in plots:
    p.plot_all()

### 各ファイルの結果について
画像ファイルの数が多すぎるのでOnedriveで共有します。

In [None]:
for p in plots:
    p.save_ch()