In [4]:
# from scipy.signal import hilbert
# import numpy as np
# import matplotlib.pyplot as plt


# def compare_elements(array1, array2):  # array1和array2大小相同

#     array = np.zeros(len(array1))
#     for i in range(len(array1)):
#         if array1[i] == array2[i]:
#             array[i] = 0
#         elif array1[i] > array2[i]:
#             array[i] = 1
#         else:
#             array[i] = -1

#     return array


# def phase_locked_matrix(all_channel_eeg):

#     """all_channel_eeg的shape例如是32 * 8064，其中32是脑电极，而8064是每个通道下采集的数据"""

#     channels = all_channel_eeg.shape[0]
#     sampling_number = all_channel_eeg.shape[1]  # 得到输入的通道数和每个通道的采样点数
#     eeg_instantaneous_phase = np.zeros_like(all_channel_eeg)  # 初始化每个通道下每个采样点的瞬时相位


#     for index, single_channel_eeg in enumerate(all_channel_eeg):
#         analytic_signal = hilbert(single_channel_eeg)
#         instantaneous_phase = np.unwrap(np.angle(analytic_signal))
#         eeg_instantaneous_phase[index] = instantaneous_phase

#     matrix = np.zeros(shape=[channels, channels])  # 初始化相位锁定矩阵，shape是32 * 32


#     for i in range(channels):
#         for j in range(channels):
#             if i == j:
#                 matrix[i][j] = 1
#             else:
#                 matrix[i][j] = np.abs((compare_elements(eeg_instantaneous_phase[i], eeg_instantaneous_phase[j])).sum()) / sampling_number

#     return matrix

In [5]:
import functionConnective
import torch
import torch.utils.data
import torch.nn as nn
import torchvision.datasets
import numpy as np
import pandas as pd
import os


BATCH_SIZE = 16


# 制作数据样本库
class mydatasets(torch.utils.data.Dataset):
    def __init__(self, file_dir, file_type):
        super().__init__()
        self.file_name = []
        self.targets = []
        for root, dirs, files in os.walk(file_dir, topdown=False):
            for file in files:
                if root[-4:] == 'fast':
                    self.targets.append(1)
                elif root[-4:] == 'medi':
                    self.targets.append(2)
                else:
                    self.targets.append(3)
                if file_type in file:
                    self.file_name.append(os.path.join(root, file))
        self.targets = torch.from_numpy(np.array(self.targets)).long()

    def __getitem__(self, item):
        data = pd.read_csv(self.file_name[item])
        data = data.values[:30, 1:]
        data = torch.from_numpy(functionConnective.phase_locked_matrix(data)).float().unsqueeze(dim=0)
        target = self.targets[item]

        return data, target

    def __len__(self):
        return len(self.file_name)

In [6]:
a = torch.from_numpy(np.random.randn(5,3))
b = torch.from_numpy(np.random.randint(0, 3, size=[1,5]))
print('a', a, 'b', b)
print(torch.argmax(a, dim=1))
print((torch.argmax(a, dim=1) == b).float().numpy().sum())

a tensor([[-0.2640, -0.6655,  0.7047],
        [-0.1274,  0.3606, -0.2659],
        [-2.0861,  0.9882, -1.9634],
        [-0.8690, -1.2837,  2.1231],
        [ 1.8118,  0.5346,  1.4706]], dtype=torch.float64) b tensor([[2, 1, 0, 0, 1]], dtype=torch.int32)
tensor([2, 1, 1, 2, 0])
2.0


In [7]:
from scipy.signal import hilbert
import numpy as np
import matplotlib.pyplot as plt


def compare_elements(array1, array2):  # array1和array2大小相同

    array = np.zeros(len(array1))
    for i in range(len(array1)):
        if array1[i] == array2[i]:
            array[i] = 0
        elif array1[i] > array2[i]:
            array[i] = 1
        else:
            array[i] = -1

    return array


def phase_locked_matrix(all_channel_eeg):

    """all_channel_eeg的shape例如是4 * 32 * 8064，其中4是四种频段，32是32个脑电极数脑电极，而8064是每个通道下采集的数据"""

    # 得到输入的频段数，电极通道数和每个通道的采样点数
    bands, channels, points = all_channel_eeg.shape()  
    eeg_instantaneous_phase = np.zeros_like(all_channel_eeg)  # 初始化每个通道下每个采样点的瞬时相位

    for band, signal_band_eeg in enumerate(all_channel_eeg):
        for channel, single_channel_eeg in enumerate(signal_band_eeg):
            analytic_signal = hilbert(single_channel_eeg)
            instantaneous_phase = np.unwrap(np.angle(analytic_signal))
            eeg_instantaneous_phase[band, channel] = instantaneous_phase
    
    matrix = np.zeros(shape=[bands, channels, channels])  # 初始化相位锁定矩阵，shape是4 * 32 * 32


    for index in range(bands):
        for i in range(channels):
            for j in range(channels):
                if i == j:
                    matrix[index][i][j] = 1
                else:
                    matrix[index][i][j] = np.abs((compare_elements(eeg_instantaneous_phase[index][i], eeg_instantaneous_phase[index][j])).sum()) / points
    
        return matrix

In [8]:
import numpy as np
import scipy.signal as signal


def get_eeg_signal_four_frequency_band(eeg_signal_data, Fs=2000):

    channels, points = eeg_signal_data.shape

    theta = [4, 8]
    alpha = [8, 12]
    beta = [12, 30]
    gamma = 30
    
    eeg_data_theta = np.zeros_like(eeg_signal_data)
    eeg_data_alpha = np.zeros_like(eeg_signal_data)
    eeg_data_beta = np.zeros_like(eeg_signal_data)
    eeg_data_gamma = np.zeros_like(eeg_signal_data)

    filter_theta = signal.firwin(points, theta, pass_zero='bandpass', fs=Fs)
    filter_alpha = signal.firwin(points, alpha, pass_zero='bandpass', fs=Fs)
    filter_beta = signal.firwin(points, beta, pass_zero='bandpass', fs=Fs)

    '''
        firwin函数设计的都是偶对称的fir滤波器，当N为偶数时，其截止频率处即fs/2都是zero response的，所以用N+1
    '''
    if points % 2 == 0:
        filter_gamma = signal.firwin(points+1, gamma, pass_zero='highpass', fs=128)
    else:
        filter_gamma = signal.firwin(points, gamma, pass_zero='highpass', fs=128)

    for channel in range(channels):
        eeg_data_theta[channel] = signal.convolve(eeg_signal_data[channel], filter_theta,mode='same')
        eeg_data_alpha[channel] = signal.convolve(eeg_signal_data[channel], filter_alpha,mode='same')
        eeg_data_beta[channel] = signal.convolve(eeg_signal_data[channel], filter_beta,mode='same')
        eeg_data_gamma[channel] = signal.convolve(eeg_signal_data[channel], filter_gamma, mode='same')  # 得到fir数字滤波器后直接与信号做卷积

    return np.array([eeg_data_theta, eeg_data_alpha, eeg_data_beta, eeg_data_gamma])  # 将四个频段组合在一起

In [9]:
import pandas as pd

data = pd.read_csv(r'D:\Files\SJTU\Study\MME_Lab\Teacher_Lu\click_number\eeg\process1.0\fast\9.csv')
data = data.values[:30, 1:]
print(data.shape)
data = get_eeg_signal_four_frequency_band(data)
data.shape

(30, 141)


(4, 30, 141)

In [21]:
class cnn(nn.Module):
    def __init__(self):
        super(cnn, self).__init__()
        self.conv1 = nn.Sequential(
            # n = ((in_channels - kernel + 2 * padding) / stride) + 1
            nn.Conv2d(in_channels=2, out_channels=32, kernel_size=3, stride=3, padding=3),
            nn.BatchNorm2d(32),
            nn.ReLU(),
            nn.MaxPool2d(2),
            nn.Dropout2d(p=0.25)
            # batch_size * 32 * 6 * 6
        )

        self.conv2 = nn.Sequential(
            nn.Conv2d(32, 64, 3, 3, 3, groups=32),
            nn.BatchNorm2d(64),
            nn.ReLU(),
            nn.MaxPool2d(2),
            # batch_size * 64 * 2 * 2
        )

        self.linear01 = nn.Sequential(
            nn.Dropout(p=0.25),
            nn.Linear(64 * 2 * 2, 64),
            nn.BatchNorm1d(64),
            nn.ReLU(),
            nn.Dropout(p=0.25),
            nn.Linear(64, 3),
        )

    def forward(self, x):
        conv1_out = self.conv1(x)
        conv2_out = self.conv2(conv1_out)
        res = conv2_out.view(conv2_out.size(0), -1)  # 将平面的(即有形状的矩阵)平展
        res = self.linear01(res)

        return res
    
a = cnn()
torch.save(a, r'D:\Files\SJTU\Study\PycharmProjects\click_number')

SyntaxError: EOL while scanning string literal (<ipython-input-21-0640df86cd0d>, line 40)