In [1]:
import torch 
import numpy as np
import os

首先加载数据

In [2]:
root = os.getcwd()
subject_id = 1
npy_dir =root + f'\data\preprocess\S{subject_id:>02d}\\'


这里的数据集 尺寸为 980 * 64 * 256

但是为了和图片尺寸一致 我们添加 为 980 * 1 * 64 * 256

In [3]:
x = np.load(npy_dir + 'x.npy')
y = np.load(npy_dir + 'y.npy')
x= torch.from_numpy(x).float() #转化成张量 浮点型
y=  torch.from_numpy(y)
x= torch.unsqueeze(x,1) #squeeze是压缩的意思，unsqueeze是添加维度，解压缩,后面的1是指维度


设置加载数据

In [4]:
train_rate = 0.75
batch_size = 16

In [5]:
total_num = len(x)
train_num = int(total_num * train_rate)
train_dataset = torch.utils.data.TensorDataset(x[:train_num],y[:train_num])
test_dataset = torch.utils.data.TensorDataset(x[train_num:],y[train_num:])
train_data_loader  = torch.utils.data.DataLoader(dataset = train_dataset,
                                                batch_size = batch_size,
                                                shuffle = True)
test_data_loader = torch.utils.data.DataLoader(dataset = test_dataset,
                                                batch_size = batch_size,
                                                shuffle = False)



训练模型

这里先把模型放在这里，训练过程写好，等下再解析模型

1.模型输入16 * 1 * 64 * 256 

In [6]:

class InputBlockTime(torch.nn.Module):
    def __init__(self, time_channels, time_size, spatial_channels, spatial_size, pool_param=(1, 2), dropout=0.5):
        super(InputBlockTime, self).__init__()

        self.block = torch.nn.Sequential(
            # time filter
            torch.nn.Conv2d(in_channels=1, out_channels=time_channels,
                            kernel_size=time_size, padding=(0, time_size[1] // 2), bias=False),


            # spatial filter
            torch.nn.Conv2d(in_channels=time_channels, groups=time_channels,
                            out_channels=spatial_channels, kernel_size=spatial_size),

            torch.nn.BatchNorm2d(spatial_channels),
            torch.nn.ELU(),
            torch.nn.MaxPool2d(kernel_size=pool_param,
                               stride=pool_param),
            torch.nn.Dropout(dropout)
        )

    def forward(self, x):
        x = self.block(x)
        return x


class FeatureBlock(torch.nn.Module):
    def __init__(self, in_channels, out_channels, conv_param=(1, 15), pool_param=(1, 2), dropout=0.5, padding=(-1, -1, -1, -1)):
        super(FeatureBlock, self).__init__()
        if (padding[0] == -1):
            padding = (conv_param[1] // 2, conv_param[1] // 2, 0, 0)

        self.padding = torch.nn.ZeroPad2d(padding=padding)

        self.conv = torch.nn.Conv2d(
            in_channels=in_channels,
            out_channels=out_channels,
            kernel_size=conv_param,
            bias=False,
        )

        self.norm = torch.nn.BatchNorm2d(
            num_features=out_channels
        )

        self.elu = torch.nn.ELU()

        self.pool = torch.nn.MaxPool2d(
            kernel_size=pool_param,
            stride=pool_param
        )

        self.drop_out = torch.nn.Dropout(dropout)

    def forward(self, x):
        x = self.padding(x)
        x = self.conv(x)
        x = self.norm(x)
        x = self.elu(x)
        x = self.pool(x)
        return self.drop_out(x)


class DeepConvNet(torch.nn.Module):
    def __init__(self, sample, class_num, time_channels, time_size,
                 spatial_channels, spatial_size,
                 feature_pool_size, feature_channels_list, dropout):
        super(DeepConvNet, self).__init__()

        self.input_block = InputBlockTime(time_channels=time_channels, time_size=time_size,
                                          spatial_channels=spatial_channels, spatial_size=spatial_size,
                                          pool_param=feature_pool_size, dropout=dropout)

        self.feature_block_list = torch.nn.Sequential()
        pre_channels = spatial_channels
        for channel in feature_channels_list:
            self.feature_block_list.add_module(
                f"feature {channel}",
                FeatureBlock(in_channels=pre_channels, out_channels=channel,
                             pool_param=feature_pool_size, dropout=dropout)
            )
            pre_channels = channel

        # 造一个数据来得到全连接层之前数据的 shape
        # 这样就不用手动计算数据的 shape 了，是一个实用技巧
        tmp_data = torch.Tensor(np.ones((1, 1, 64, sample), dtype=float))
        tmp_data = self.input_block(tmp_data)
        tmp_data = self.feature_block_list(tmp_data)
        tmp_data = tmp_data.view(tmp_data.size(0), -1)

        self.classifer = torch.nn.Sequential(
            torch.nn.Linear(tmp_data.shape[1],
                            class_num)
        )

    def forward(self, x):
        out2 = self.input_block(x)
        x = self.feature_block_list(out2)
        x = x.view(x.size(0), -1)
        x = self.classifer(x)
        return torch.nn.functional.softmax(x, dim=1)

In [7]:
model = DeepConvNet(
        sample=256, class_num=2,
        time_channels=25, time_size=(1, 9),
        spatial_channels=50, spatial_size=(64, 1),
        feature_pool_size=(1, 3), feature_channels_list=[100, 200], dropout=0.5)

In [8]:
epochs = 10

In [9]:
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu') #得到服务器
model  = model.to(device) #将模型放在服务器上
loss_func = torch.nn.CrossEntropyLoss() #设置损失函数
optimizer = torch.optim.Adam(model.parameters() ) #设置梯度下降的优化器

for epoch in range(epochs): #不断循环利用这个dataloader来训练
    running_loss = 0.0
    batch_size = None
    size_loss = 0 #这几个参数为什么这么设还不知道

    for index,data in enumerate(train_data_loader): #这是对于一个batch_size（16个样本拼在一起）的训练
        x,y = data
        x = x.to(device)
        y = y.to(device)
        model_out = model(x)

        loss = loss_func(model_out,y.long()) #利用loss函数求解损失值 （将y转换为long形式）
        optimizer.zero_grad() #梯度清零,之所以要清理，是因为每次反传都会产生新的梯度
        loss.backward() #反传
        optimizer.step() #梯度下降
        running_loss += float(loss.item()) 

    print(f'epochs: {epoch+1} / {epochs}\tTrain_loss:{running_loss:.4f}',end = '\n') #这里的f应该是format的简写，即为格式化输出{}的内容
print('\nFinished training')


epochs: 1 / 10	Train_loss:30.6454
epochs: 2 / 10	Train_loss:27.1768
epochs: 3 / 10	Train_loss:23.5734
epochs: 4 / 10	Train_loss:21.3702
epochs: 5 / 10	Train_loss:21.9127
epochs: 6 / 10	Train_loss:20.8666
epochs: 7 / 10	Train_loss:19.8323
epochs: 8 / 10	Train_loss:19.0817
epochs: 9 / 10	Train_loss:18.6752
epochs: 10 / 10	Train_loss:17.5857

Finished training


测试模型

In [10]:
correct_num = 0
total_num = 0
for index,data in enumerate(test_data_loader):
    x,y = data
    x = x.to(device)
    y = y.to(device)
    model_out = model(x)
    _, pred = torch.max(model_out, 1)
    correct_num += np.sum(pred.cpu().numpy()==y.cpu().numpy()) 
    #这一步是转换到cpu，然后转换为numpy
    total_num +=len(y)
print('\nTest acc:'+str(correct_num/total_num)) 


Test acc:0.9020408163265307


In [11]:
import torch
from torchviz import make_dot
out = model(x)   # 将 x 输入网络
g = make_dot(out)  # 实例化 make_dot
g.view()  # 直接在当前路径下保存 pdf 并打开
# g.render(filename='netStructure/myNetModel', view=False, format='pdf')  # 保存 pdf 到指定路径不打开


'Digraph.gv.pdf'