In [None]:
import h5py
import os
import gc
import numpy as np
import torch
import torch.nn as nn
from torch.utils.data import Dataset, DataLoader
import matplotlib.pyplot as plt
from torch.utils.tensorboard import SummaryWriter
%matplotlib inline

def convert_to_one_hot(Y, C):
    Y = np.eye(C)[Y.reshape(-1)].T
    return Y

####### 修改输入文件
filename = 'fused_200_96.h5'
file = h5py.File(os.path.join('data/',filename),'r')
imageData   = file['imageData'][:]
imageLabel  = file['imageLabel'][:]  
file.close()

# 随机打乱数据和标签
N = imageData.shape[0]
index = np.random.permutation(N)
data  = imageData[index,:,:]
label = imageLabel[index]

# 对数据升维,标签one-hot
data  = np.expand_dims(data, axis=1)
label = convert_to_one_hot(label,49).T
label = np.argmax(label, axis=1)  

# 划分数据集
N = data.shape[0]
num_train = round(N*0.8)
X_train = data[0:num_train,:,:,:]
Y_train = label[0:num_train]
X_test  = data[num_train:N,:,:,:]
Y_test  = label[num_train:N]

print ("X_train shape: " + str(X_train.shape))
print ("Y_train shape: " + str(Y_train.shape))
print ("X_test shape: " + str(X_test.shape))
print ("Y_test shape: " + str(Y_test.shape))

In [None]:
class dataset_prediction(Dataset):
    def __init__(self, data_features, data_target):
        self.len = len(data_features)
        self.features = torch.from_numpy(data_features)
        self.target = torch.from_numpy(data_target)
        
    def __getitem__(self, index):
        return self.features[index], self.target[index]

    def __len__(self):
        return self.len

train_set = dataset_prediction(data_features=X_train, data_target=Y_train)
test_set = dataset_prediction(data_features=X_test, data_target=Y_test)

train_loader = DataLoader(dataset=train_set,
                            batch_size=64,
                            shuffle=True,
                            drop_last=True)

In [None]:
# raw model
class CNN(nn.Module):
    def __init__(self):
        super(CNN, self).__init__()   # 继承__init__功能
        ## 第一层卷积
        self.conv1 = nn.Sequential(
            # 输入[1,200,60]
            nn.Conv2d(
                in_channels=1,    # 输入图片的高度
                out_channels=32,  # 输出图片的高度
                kernel_size=(20,3),    # 20x3的卷积核，相当于过滤器
                stride=1,         # 卷积核在图上滑动，每隔一个扫一次
                padding=2,        # 给图外边补上0
            ),
            # 经过卷积层 输出[32,200,60] 传入池化层
            nn.ReLU(),
            nn.MaxPool2d(kernel_size=(10,1))   # 经过池化 输出[32,20,60] 传入下一个卷积
        )
        ## 第二层卷积
        self.conv2 = nn.Sequential(
            nn.Conv2d(
                in_channels=32,    # 同上
                out_channels=64,
                kernel_size=(3,3),
                stride=1,
                padding=1
            ),
            # 经过卷积 输出[64, 40, 30] 传入池化层
            nn.ReLU(),
            nn.MaxPool2d(kernel_size=(3,2))  # 经过池化 输出[64,7,30] 传入下一个卷积
        )
        ## 第三层卷积
        self.conv3 = nn.Sequential(
            nn.Conv2d(
                in_channels=64,    # 同上
                out_channels=128,
                kernel_size=(3,3),
                stride=1,
                padding=2
            ),
            # 经过卷积 输出[128, 6, 30] 传入池化层
            ####### 这几个的 size 计算有点迷，得看加了 padding 之后对池化的影响
            nn.ReLU(),
            nn.MaxPool2d(kernel_size=2)  # 经过池化 输出[128,4,17] 传入下一个卷积
        )

        ## 输出层 
        ######### in_features 只要对齐就行了，看编译器要多少就改成多少
        self.output = nn.Linear(in_features=128*2*25, out_features=49)

    def forward(self, x):
        x = torch.tensor(x)
        x = x.to(torch.float32)
        x = x.cuda()
        x = self.conv1(x)
        # print("After conv1:", x.shape)
        x = self.conv2(x)           #
        # print("After conv2:", x.shape)
        x = self.conv3(x) 
        # print("After conv3:", x.shape)
        x = x.view(x.size(0), -1)   # 
        # print("After view:", x.shape)
        output = self.output(x)     # 输出[50,10]
        return output


In [None]:
# # adjust model
# class CNN(nn.Module):
#     def __init__(self):
#         super(CNN, self).__init__()   # 继承__init__功能
#         ## 第一层卷积
#         self.conv1 = nn.Sequential(
#             # 输入[1,200,60]
#             nn.Conv2d(
#                 in_channels=1,    # 输入图片的高度
#                 out_channels=32,  # 输出图片的高度
#                 kernel_size=(20,3),    # 20x3的卷积核，相当于过滤器
#                 stride=1,         # 卷积核在图上滑动，每隔一个扫一次
#                 padding=2,        # 给图外边补上0
#             ),
#             # 经过卷积层 输出[32,200,60] 传入池化层
#             nn.ReLU(),
#             nn.MaxPool2d(kernel_size=(10,1))   # 经过池化 输出[32,20,60] 传入下一个卷积
#         )
#         ## 第二层卷积
#         self.conv2 = nn.Sequential(
#             nn.Conv2d(
#                 in_channels=32,    # 同上
#                 out_channels=64,
#                 kernel_size=(3,3),
#                 stride=1,
#                 padding=1
#             ),
#             # 经过卷积 输出[64, 40, 30] 传入池化层
#             nn.ReLU(),
#             nn.MaxPool2d(kernel_size=(3,2))  # 经过池化 输出[64,7,30] 传入下一个卷积
#         )
#         ## 第三层卷积
#         self.conv3 = nn.Sequential(
#             nn.Conv2d(
#                 in_channels=64,    # 同上
#                 out_channels=128,
#                 kernel_size=(3,3),
#                 stride=1,
#                 padding=2
#             ),
#             # 经过卷积 输出[128, 6, 30] 传入池化层
#             ####### 这几个的 size 计算有点迷，得看加了 padding 之后对池化的影响
#             nn.ReLU(),
#             nn.MaxPool2d(kernel_size=2)  # 经过池化 输出[128,4,17] 传入下一个卷积
#         )
#         # # 
#         # self.conv4 = nn.Sequential(
#         #     nn.Conv2d(256,512,kernel_size=(3,3),stride=1,padding=1),
#         #     nn.ReLU(),
#         #     nn.MaxPool2d(kernel_size=(2,1))
#         # )
#         # Dropout层，减少过拟合
#         self.fc1 = nn.Sequential(
#             nn.Linear(128*4*4,1024), # 根据输入尺寸计算
#             nn.ReLU(),
#             nn.Dropout(p=0.5), # 50%的Dropout概率
#             nn.Linear(1024,49)
#         )
#         ## 输出层 
#         ######### in_features 只要对齐就行了，看编译器要多少就改成多少
#         # self.output = nn.Linear(in_features=128*10*4, out_features=49)

#     def forward(self, x):
#         x = torch.tensor(x)
#         x = x.to(torch.float32)
#         x = x.cuda()
#         x = self.conv1(x)
#         # print("After conv1:", x.shape)
#         x = self.conv2(x)           #
#         # print("After conv2:", x.shape)
#         x = self.conv3(x) 
#         # print("After conv3:", x.shape)
#         # x = self.conv4(x)
#         # print("After conv4:", x.shape)
#         x = x.view(x.size(0), -1)   
#         # print("After view:", x.shape)
#         output = self.fc1(x)
#         # print("After fc1:", x.shape)
#         # output = self.output(x)     # 输出[50,10]
#         return output

In [None]:
# cnn 实例化
# CUDA_LAUNCH_BLOCKING=1

cnn = CNN()
print(cnn)
#初始化TensorBoard
# writer = SummaryWriter(log_dir='runs/experiment1')

train_losses = []
test_accuracies = []

# 如果GPU可用，将模型迁移至GPU
os.environ['CUDA_LAUNCH_BLOCKING'] = '1'
if torch.cuda.is_available():
    cnn = cnn.cuda()


epoches = 100
learning_rate = 0.001

# 定义优化器和损失函数
optimizer = torch.optim.Adam(cnn.parameters(), lr=learning_rate,weight_decay=0.005)
loss_function = nn.CrossEntropyLoss()
if torch.cuda.is_available():
    loss_function = loss_function.cuda()

# 开始训练
for epoch in range(epoches):
    print("进行第{}个epoch".format(epoch))
    for step, (batch_x, batch_y) in enumerate(train_loader):
        batch_x = batch_x.cuda()
        batch_y = batch_y.cuda()
        
        output = cnn(batch_x)  # batch_x=[64,1,200,60]

        loss = loss_function(output, batch_y)
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()
        
        # 每一步记录训练损失到TensorBoard
        # writer.add_scalar('Loss/train',loss.item(),epoch* len(train_loader)+step)
        
        # 为了实时显示准确率
        if step % 64 == 0:
            with torch.no_grad():
                test_output = cnn(X_test).cpu()
                pred_y = torch.max(test_output, 1)[1].data.numpy()
                accuracy = float(np.sum(pred_y == Y_test)) / float(Y_test.shape[0])
                print('Epoch: ', epoch, '| train loss: %.4f' % loss.cpu().data.numpy(), '| test accuracy: %.2f' % accuracy)
            
            # 记录测试准确率到TensorBoard
            # writer.add_scalar('Accuracy/test',accuracy,epoch*len(train_loader)+step)
        
        # 保存训练损失和测试准确率
        train_losses.append(loss.cpu().data.numpy())
        test_accuracies.append(accuracy)
    torch.cuda.empty_cache()
    del batch_x,batch_y,output,loss
    gc.collect()
# writer.close()

test_output = cnn(X_test[:10]).cpu()
pred_y = torch.max(test_output, 1)[1].data.numpy().squeeze()
print(pred_y)
print(X_test[:10])


In [None]:
# 绘制结果
#保存训练损失和测试准确率图表
import os
current_dir = os.getcwd()
plt.figure(figsize=(12,4))
plt.subplot(1,2,1)
plt.plot(train_losses,label='Train Loss')
plt.xlabel('Iteration')
plt.ylabel('Loss')
plt.legend()
plt.title('Train Loss over Iterations')
plt.subplot(1,2,2)
plt.plot(test_accuracies,label='Test Accuracy')
plt.xlabel('Iteration')
plt.ylabel('Accuracy')
plt.legend()
plt.title('Test Accuracy over Iterations')
plt.savefig(os.path.join(current_dir,'runs/experiment1/',filename+'.png'))
# plt.savefig(os.path.join(current_dir,'runs/experiment1/','emg_100_12'+'.png'))
plt.show()

In [None]:
print("max_test_accuracy:{:.2f}%".format(max(test_accuracies)*100))

## 保存模型

In [None]:
# 假设 model 是你的 CNN 模型
torch.save(cnn.state_dict(), 'model/weight/'+filename+'.pth')
