In [None]:
import os
import sys
sys.path.append(os.path.join(os.path.dirname(os.path.abspath('__file__')), '..'))
import numpy as np
import keras
import matplotlib.pyplot as plt
from scipy import stats
from scipy.signal import spectrogram
from sklearn.metrics import confusion_matrix
from sklearn.linear_model import LinearRegression
from train_code.generator.raw_generator import transpose_raw1d,transpose_raw2d
from npy2trials import load_data
from sklearn.preprocessing import StandardScaler
from scipy.signal import butter, filtfilt, periodogram
ch_list = ['FC5','FC1','FC2','FC6','C3','C1','Cz','C2','C4','CP5','CP1','CP2','CP6']
ch_size = len(ch_list)
block_size = 750
step = 250
fs = 500

In [None]:
data_path = "C:/MLA_Saves_Bk/10002/s4m2_concatenate.npy"
d1_model_path = "/models/dec1/model-85.h5"
d2_model_path = "/models/dec2/model-23.h5"

In [None]:
full_data = np.load(data_path)
stim_data,predictclass_list,trueclass_list = load_data(full_data,fs)
minp = len(predictclass_list[0]) #TODO:15d1s2
for item in predictclass_list:
    if minp > len(item):
        minp = len(item)
predictclass_list = [item[:minp] for item in predictclass_list]
stim_data.shape

# データプレビュー

In [None]:
import logging
import numpy as np
from sklearn.preprocessing import StandardScaler
from scipy.signal import butter, filtfilt, periodogram

#TODO: これは冗長
def preprocess(data,fs):
    #print(f"{scaler.mean_} / {scaler.scale_}")
    # バンドパスフィルタを適用
    if True:
        # バンドパスフィルタの設定
        lowcut = 8  # バンドパスフィルタの下限周波数
        highcut = 30  # バンドパスフィルタの上限周波数
        nyquist = 0.5 * fs
        low = lowcut / nyquist
        high = highcut / nyquist
        b, a = butter(2, [low, high], btype='band')
        data = filtfilt(b, a, data)

    # データを標準化
    scaler = StandardScaler()
    data = scaler.fit_transform(data.T).T
    return np.array(data)

In [None]:
left_data = []
right_data = []

for i in range(stim_data.shape[0]):
    data = stim_data[i,:ch_size,:]
    y = trueclass_list[i]
    if y==1:
        left_data.append(preprocess(data,fs))
    else:
        right_data.append(preprocess(data,fs))
left_data = np.array(left_data)
right_data = np.array(right_data)

def plot_spec(key:str,data,ft=None):
    row = 5
    col = 3
    fig = plt.figure(figsize=(20, 12))
    plt.subplots_adjust(wspace=0.4, hspace=0.8)
    if ft is None:
        specs = [[] for _ in range(data.shape[0])]
        for i in range(data.shape[0]):
            for j in range(data.shape[1]):
                frequencies, times, spectrogram_data = spectrogram(data[i,j,:], fs)
                specs[i].append(spectrogram_data)
        specs = np.array(specs)
        specs = np.sum(specs,axis=0)
    else:
        frequencies, times = ft
        specs = data
    # スペクトログラムの可視化
    for i in range(ch_size):
        plt.subplot(row, col, i+1)
        p = 10 * np.log10(specs[i,:,:]) 
        #p = np.log(np.abs(p))
        plt.pcolormesh(times, frequencies, p,vmax=16)
        plt.colorbar()#label='Power/Frequency (dB/Hz)')
        plt.title(ch_list[i])
        #plt.clim(-50, 5) 
        plt.ylim(0, 50)
        plt.grid()
    fig.suptitle(key + ' Spectrogram')
    plt.show()
    return frequencies, times,specs
frequencies, times,sl = plot_spec("Left",left_data)
_,_,sr = plot_spec("Right",right_data)

row = 5
col = 3
fig = plt.figure(figsize=(10, 6))
plt.subplots_adjust(wspace=0.4, hspace=0.6)
specs = sr - sl
# スペクトログラムの可視化
for i in range(ch_size):
    plt.subplot(row, col, i+1)
    plt.pcolormesh(times, frequencies, specs[i,:,:], shading='gouraud')
    plt.colorbar()#label='Power/Frequency (dB/Hz)')
    #plt.clim(-50, 5) 
    plt.ylim(0, 25)
    plt.grid()
fig.suptitle("Diff Spectrogram")
plt.show()

# 評価

In [None]:
def calculate_acc(tclasses,pclass_list):
    predictclass_mode_list = []
    mode_ans = []
    total_len = np.sum([len(p) for p in pclass_list])
    test_count = 0
    for t,p in zip(tclasses,pclass_list):#total_acc算出 calculate
        assert len(p) > 0,p
        mode_predict = int(stats.mode(p,keepdims=True)[0])
        predictclass_mode_list.append(mode_predict)
        mode_ans.append(mode_predict == t)
        p_true = np.count_nonzero(np.array(p) == t)
        if p_true > len(p) // 2:
            test_count += 1
            #TODO: ここで詳細なacc算出
        #print(t,p_true)
    mode_ans2 = np.array(tclasses)[mode_ans]
    acc = len(mode_ans2)/len(mode_ans)
    return acc,predictclass_mode_list
def analyse1():
    acc,predictclass_mode_list = calculate_acc(trueclass_list, predictclass_list)
    std_list = []
    left_mean_list = []
    left_std_list = []
    right_mean_list = []
    right_std_list = []
    mn_predictclass_list = []
    # 混同行列を作成
    cm = confusion_matrix(trueclass_list, predictclass_mode_list)
    trsums = np.sum(cm,axis=1)
    recall = cm[0,0]/trsums[0]
    spec = cm[1,1]/trsums[1]
    if trsums[0] == trsums[1]:
        fixed_acc = None
        fixed_recall = None
        fixed_spec = None
    else:
        #acc修正
        minsum = np.min(trsums)
        lcount = rcount = 0
        mode_ans = []
        fixed_trueclasses = []
        fixed_predictclass_list = []
        for i,y in enumerate(trueclass_list):
            if y == 1:
                if lcount == minsum:
                    continue
                lcount+=1
            else:
                if rcount == minsum:
                    continue
                rcount+=1
            fixed_trueclasses.append(trueclass_list[i])
            fixed_predictclass_list.append(predictclass_list[i])
        print(len(mode_ans))
        fixed_acc,fixed_predictclass_mode_list = calculate_acc(fixed_trueclasses, fixed_predictclass_list)
        fixed_cm = confusion_matrix(np.array(fixed_trueclasses), np.array(fixed_predictclass_mode_list))
        fixed_recall = fixed_cm[0,0]/minsum
        fixed_spec = fixed_cm[1,1]/minsum
            
        
    print(f"精度:{acc},再現率(Recall):{recall},特異度(Specificity):{spec}")
    if fixed_acc is not None: print(f"精度{fixed_acc},再現率(Recall):{fixed_recall},特異度(Specificity):{fixed_spec} :補正:アンダーサンプリング)")
    # 混同行列を表示
    print("混同行列:")
    print(cm)
    if fixed_acc is not None:
        print("修正混同行列:")
        print(fixed_cm)

    print("-------通常評価-------")
    print("　|真|判別シークエンス|最頻|〇/×|std|mean")
    for i,_pack in enumerate(zip(trueclass_list,predictclass_list,predictclass_mode_list)):
        tl, pl,pml = _pack
        std = np.std(pl)
        mean = np.mean(pl)
        #if tl == 2:
        print(i+1,tl,pl,pml,"〇" if np.count_nonzero(np.array(pl) == tl) > len(pl)//2 else "×",std,mean)
        mn_predictclass_list.append([1 if tl == p else 0 for p in pl])
        std_list.append(std)
        if tl == 1:#1==left
            left_std_list.append(std)
            left_mean_list.append(mean)
        else:
            right_std_list.append(std)
            right_mean_list.append(mean)
    print(f"真が左の平均{np.mean(left_mean_list)} 標準偏差{np.mean(left_std_list)}")
    print(f"真が右の平均{np.mean(right_mean_list)} 標準偏差{np.mean(right_std_list)}")
    return acc,fixed_acc,recall,fixed_recall,spec,fixed_spec,mn_predictclass_list
data_acc,data_fixed_acc,data_recall,data_fixed_recall,data_spec,data_fixed_spec,mn_predictclass_list = analyse1()

In [None]:
def plot_lg(x,y,color):
    # 線形回帰モデル、予測値
    model = LinearRegression()
    model_lin = model.fit(x, y)
    y_lin_fit = model_lin.predict(x)
    plt.plot(x, y_lin_fit, color = color, linewidth=0.5)
def plot_epochs(x,y,title):
    # 回帰分析　線形
    itlist = np.array([(i,t) for i,t in enumerate(trueclass_list) if t == 1]).T
    lx = itlist[0,:].reshape(-1, 1)
    ly = y[lx].reshape(-1, 1)
    lp = plt.scatter(lx,ly,marker="o",label="left hand")
    itlist = np.array([(i,t) for i,t in enumerate(trueclass_list) if t == 2]).T
    rx = itlist[0,:].reshape(-1, 1)
    ry = y[rx].reshape(-1, 1)
    rp = plt.scatter(rx,ry,marker="^",label="right hand")
    plt.legend(loc='upper right',bbox_to_anchor=(1.3, 1))
    plt.draw()
    l_color = lp.get_facecolor()
    r_color = rp.get_facecolor()
    plot_lg(x,y,'#000000')
    plot_lg(lx,ly,l_color)
    plot_lg(rx,ry,r_color)
    plt.title(title)
    plt.show()

def analyse2():
    #判別ポイントごとの平均
    _mn_list = [mpl[:len(mn_predictclass_list[0])] for mpl in mn_predictclass_list]
    plt.errorbar(range(len(mn_predictclass_list[0])),np.mean(_mn_list,axis=0),yerr=np.std(_mn_list,axis=0),
                 capsize=5,ecolor='orange')
    plt.title("match 1or0")
    plt.show()
    print(np.std(_mn_list,axis=0))
    title = "Length of time matched (mean)"
    x = np.array(range(len(_mn_list))).reshape(-1, 1)
    y = np.mean(_mn_list,axis=-1)
    plot_epochs(x,y,title)

    title = "Length of time matched (std)"
    x = np.array(range(len(_mn_list))).reshape(-1, 1)
    y = np.std(_mn_list,axis=-1)
    plot_epochs(x,y,title)
analyse2()

# 以下モデル評価
predictclass_listは初期化される

## デコーダー1

In [None]:
def eval_sequence(data:np.ndarray,block_size:int,step:int,transpose_func,model):
    dataset = np.array([preprocess(data[:,ei-block_size:ei],fs) for ei in range(block_size,data.shape[1],step)])
    dataset = transpose_func(0,dataset)
    return [2 if p > 0.5 else 1 for p in model.predict(dataset,verbose = 0)]
def specificity(y_true, y_pred):
    # y_true: 正解ラベル
    # y_pred: 予測ラベル（確率ではなくクラスの予測値）
    # 予測ラベルをクラスに変換
    y_pred = K.round(y_pred)
    # Confusion matrixの計算
    true_negatives = K.sum(K.round(K.clip((1 - y_true) * (1 - y_pred), 0, 1)))
    false_positives = K.sum(K.round(K.clip((1 - y_true) * y_pred, 0, 1)))
    # 特異度の計算
    specificity = true_negatives / (true_negatives + false_positives + K.epsilon())
d1_model = keras.models.load_model(d1_model_path,custom_objects={"specificity":specificity})
all_data = stim_data[:,:ch_size,:]
predictclass_list = []
for i in range(all_data.shape[0]):
    predictclass_list.append(eval_sequence(all_data[i,:,:],block_size,step,transpose_raw2d,d1_model))
d1_acc,d1_fixed_acc,d1_recall,d1_fixed_recall,d1_spec,d1_fixed_spec,_ = analyse1()
analyse2()

## デコーダー2
表示だけ

In [None]:
d2_model = keras.models.load_model(d2_model_path,custom_objects={"specificity":specificity})
all_data = stim_data[:,:ch_size,:]
predictclass_list = []
for i in range(all_data.shape[0]):
    predictclass_list.append(eval_sequence(all_data[i,:,:],block_size,step,transpose_raw1d,d2_model))

analyse1()
analyse2()

# ログ書き込み

In [None]:
import csv
log_path = "C:/MLA_Saves_Bk/evals/output_acc.csv"
with open(log_path, 'a') as f:
    writer = csv.writer(f, lineterminator='\n') # 行末は改行
    nlst = data_path.replace("C:/MLA_Saves_Bk/","").replace("\\","/").split("/")
    writer.writerow([nlst[0],nlst[1],nlst[2],
                     data_acc,data_fixed_acc,
                     d1_acc,d1_fixed_acc])
log_path = "C:/MLA_Saves_Bk/evals/output_ex.csv"
with open(log_path, 'a') as f:
    writer = csv.writer(f, lineterminator='\n') # 行末は改行
    nlst = data_path.replace("C:/MLA_Saves_Bk/","").replace("\\","/").split("/")
    writer.writerow([nlst[0],nlst[1],nlst[2],
                     data_recall,data_fixed_recall,
                     d1_recall,d1_fixed_recall,
                     "/",
                     data_spec,data_fixed_spec,
                     d1_spec,d1_fixed_spec])