In [1]:
import pandas as pd
import scipy
from scipy import io
import numpy as np
import tensorflow as tf
from keras.models import load_model
import sys;
from esinet.evaluate import eval_mse
# sys.path.insert(0, '../')
from esinet import Simulation, Net, util, evaluate
from forward import create_forward_model, get_info
from matplotlib import pyplot as plt
from esinet.util import calculate_source
import pickle
import tensorflow.keras.backend as K
from tensorflow.keras import layers, models

In [2]:
def custom_prep_data(data):

    data = np.swapaxes(data, 1,2)

    # 获取数据维度
    num_samples, num_timepoints, num_channels = data.shape
    
    # 将数据类型转换为 np.float32
    data = data.astype(np.float32)
    
    # 对每个样本（脑电信号的一个时间点）进行处理
    for i in range(num_samples):
        for j in range(num_timepoints):
            # 获取当前样本的数据
            sample_data = data[i, j, :]
            
            # 假设你想要进行去除平均值和标准化的预处理
            # 去除平均值
            sample_data_mean = np.mean(sample_data)
            sample_data_std = np.std(sample_data)
            sample_data -= sample_data_mean
            
            # 标准化
            if sample_data_std != 0:
                sample_data /= sample_data_std
        
            # 更新数据
            data[i, j, :] = sample_data
    print("The shape of EEG is", data.shape)
    
    return data

# 对源信号进行归一化
def custom_prep_source(data):
    
    # 将数据类型转换为 np.float32
    data = data.astype(np.float32)
    
    for i, y_sample in enumerate(data):
        max_abs_vals=np.array(np.max(abs(data[i])))
        max_abs_vals[max_abs_vals == 0] = 1
        data[i] /= max_abs_vals   

    print("The shape of source is", data.shape)
    
    return data

def make_3d(matrix):
    # 如果矩阵的维度不是3，则增加一个维度
    if matrix.ndim != 3:
        # 在前面增加一个维度
        nrow, ncol = matrix.shape
        matrix_3d = matrix.reshape((1, nrow, ncol))
        return matrix_3d
    else:
        return matrix

def wight_combined_loss(y_true, y_pred):
    
    # # 设定权重
    # binary_array=tf.where(y_true != 0, 1, 0)
    # binary_array = tf.cast(binary_array, dtype=tf.float32)
    # y_true_filtered = tf.multiply(binary_array, y_true)
    # y_pred_filtered = tf.multiply(binary_array, y_pred)

    # 创建一个掩码，标记非零值
    mask = tf.not_equal(y_true, 0)
    # 使用掩码过滤出非零元素
    y_true_filtered = tf.boolean_mask(y_true, mask)
    y_pred_filtered = tf.boolean_mask(y_pred, mask)

    # MSE Loss
    huber = tf.keras.losses.Huber()(y_true, y_pred)
    huber2 = tf.keras.losses.MeanSquaredError()(y_true_filtered, y_pred_filtered)
    
    # Cosine Similarity Loss
    # 使用 tf.keras.losses.cosine_similarity，并确保结果为正值
    cosine_loss = 1+tf.keras.losses.CosineSimilarity()(y_true, y_pred)
    
    # 组合损失，确保使用 tf.cast 保持类型一致
    combined = 1* huber + 1*cosine_loss
    #combined = 1000 * huber + cosine_loss + huber2
    #combined = 1000 * huber + huber2

    return combined

def non_zero_mse_loss(y_true, y_pred):

    # 创建一个掩码，标记非零值
    mask = tf.not_equal(y_true, 0)
    
    # 使用掩码过滤出非零元素
    y_true_filtered = tf.boolean_mask(y_true, mask)
    y_pred_filtered = tf.boolean_mask(y_pred, mask)
    
    # 计算非零元素的均方误差
    # mse = tf.square(y_true_filtered - y_pred_filtered)
    mse = K.mean(K.square(y_true_filtered - y_pred_filtered))
    
    # 返回均方误差的平均值
    return tf.reduce_mean(mse)

def sparsity(y_true, y_pred):
    return K.mean(K.square(y_pred)) / K.max(K.square(y_pred))
def combined_loss2(y_true, y_pred):

    print(y_pred.shape)
    print(y_true.shape)
    # MSE Loss
    huber = tf.keras.losses.Huber()(y_true, y_pred)

    cosine_loss = sparsity(y_true, y_pred)
    
    # 组合损失，确保使用 tf.cast 保持类型一致
    combined = huber + 0.01 * cosine_loss
    return combined

In [3]:
# 提取模型中某一层的输出
def get_activations(model, inputs, layer_name=None):
    inp = model.input
    for layer in model.layers:
        if layer.name == layer_name:
            Y = layer.output
    model = tf.keras.Model(inp,Y)
    out = model.predict(inputs)
    return out

In [4]:
# 加载数据
loaded_data = np.load('D:/jupyter_note/SWX_source/Simulated_data/x_test1.npy')
y_true = np.load('D:/jupyter_note/SWX_source/Simulated_data/y_test1.npy')
# 从文件中加载 sim_test 对象
with open( 'D:/jupyter_note/SWX_source/Simulated_data/sim1.pkl', 'rb') as file:
    sim_test = pickle.load(file)

# 加载保存的模型
model1 = load_model('D:/jupyter_note/SWX_source/Simulated_data/model_attention', compile=False)
# 定义优化器和损失函数
optimizer = tf.keras.optimizers.Adam(learning_rate=0.0003)
loss = wight_combined_loss
# 重新编译模型
model1.compile(loss=loss, optimizer=optimizer)
# 打印模型摘要
model1.summary()

data=loaded_data
print(data.shape)
    
# 对输入数据进行预处理
data_processed = make_3d(data)
print(data_processed.shape)
X_test = custom_prep_data(data_processed)
X_test2 = X_test[40:41, :, :]
print(X_test2.shape)

Model: "Simplified_Hybrid_Model"
__________________________________________________________________________________________________
 Layer (type)                   Output Shape         Param #     Connected to                     
 Input (InputLayer)             [(None, None, 60)]   0           []                               
                                                                                                  
 dense (Dense)                  (None, None, 200)    12200       ['Input[0][0]']                  
                                                                                                  
 dense_1 (Dense)                (None, None, 200)    40200       ['dense[0][0]']                  
                                                                                                  
 dense_2 (Dense)                (None, None, 200)    40200       ['dense_1[0][0]']                
                                                                            

In [5]:
attention_vectors = []
attention_vectors = get_activations(model1, X_test2, layer_name='reshape')
print('attention = ', attention_vectors)
print(attention_vectors.shape)

attention =  [[[3.5797853e-15 2.6555130e-01 4.7206824e-13 ... 8.0850667e-05
   6.1562573e-06 6.1295395e-08]]]
(1, 1, 1284)


In [6]:
%matplotlib inline
%load_ext autoreload
%autoreload 2

import sys; sys.path.insert(0, '../')
from esinet import util
from esinet import Net
from esinet.forward import create_forward_model, get_info
from scipy.stats import pearsonr

# 去掉第一个维度 (1, 1, 1284) -> (1284,)
wighted_squeezed = np.squeeze(attention_vectors)

# 扩展为 (26, 1284)，重复元素
wighted_expanded = np.tile(wighted_squeezed, (26, 1))

# 检查结果
print(wighted_expanded.shape)  # 应该输出 (26, 1284)

plot_params = dict(surface='white', initial_time=0.05, views=('dorsal'), hemi='both', colormap='hot', background='white', add_data_kwargs=dict(fmin=0,fmid=0.5, fmax=1,scale_factor=0.1), verbose=0)
y_hat = np.swapaxes(wighted_expanded, 0, 1) # 得到的源信号维度(n_samples,n_channels, n_times)
stc = sim_test.source_data[4]
stc_hat = stc.copy()
# stc_hat.data = comp.decode(y_hat.T)
stc_hat.data = y_hat
stc_hat.plot(**plot_params)#将神经网络模型的预测结果可视化，通过 stc_hat.plot

(26, 1284)


<mne.viz._brain._brain.Brain at 0x218b1ba9d30>