## NinaPro动作分类——卷积神经网络动作分类

针对传统方法对大样本数据动作分类精度不高的问题，我们引入深度学习模型来对手势动作进行识别。卷积神经网络（Convolutional Neural Networks, CNN）是一类包含卷积计算且具有深度结构的前馈神经网络（Feedforward Neural Networks），是深度学习（deep learning）的代表算法之一，在近年来艺成功运用于诸多领域，比如图像分类、语音识别、视频分类等。自CNN出现，就不断有人尝试使用CNN识别NinaPro数据集中的首部运动。相比经典分类方法，卷积神经网络的分类精度远高于经典方法。

In [None]:
CNN_LSIT = ["FCN", "NinaProNet", "Resnet18"]
CNN_METHOD = 2

#### 1.数据读取与预处理

In [None]:
from scipy import io
import numpy as np

def get_feature_dict(filename):
    """将ninapro_feature的MAT文件加载为字典

    Args:
        path: mat文件路径
        filename: mat文件名

    Returns:
        数据集字典
        [feat_set, featStim, featRep]
    """
    # 读取MAT文件
    print('load file: ' + filename + '...', end= '', flush=True)
    dict_feature=io.loadmat(filename)
    if (dict_feature != ()):
        #print(ninapro_data.keys())
        print('[ok]:%d'%(len(dict_feature['featStim'])), flush=True)
    # 返回字典
    return dict_feature


In [None]:
import numpy as np
from matplotlib import pyplot as plt

def merge_multisubject(b,e):
    """将多组数据从mat文件中提取出来，预处理后合并

    Args:
        b: 开始的受试者序号
        e: 结束的受试者序号

    Returns:
        [emg,acc,gyro,mag,labels]肌电c12，加速度c36，角速度c36，磁强c36数据和标签。
    """
    emg_feature = None
    labels = None
    # 去除0label
    for i in range(b,e+1):
        feature_dict = get_feature_dict("../feature/feature_S{0}.mat".format(i))
        index = []
        for i in range(len(feature_dict['featStim'])):
            if feature_dict['featStim'][i]!=0:
                index.append(i)
        # 重排元素
        emg_temp = feature_dict['feat_set']
        emg_temp = np.reshape(emg_temp,(-1,5,12))
        emg_temp = np.swapaxes(emg_temp,1,2)
        if(emg_feature is None):
            emg_feature = emg_temp[index,:,:]
            labels = feature_dict['featStim'][index,:]
        else:
            emg_feature = np.vstack((emg_feature,emg_temp[index,:,:])) 
            labels = np.vstack((labels,feature_dict['featStim'][index,:]))
        #print('delete 0 label,',emg_temp[index,:,:].shape)
    # 归一化
    s = np.zeros(5)
    for i in range(5):
        s[i] = np.sum(np.abs(emg_feature[:,:,i]))/emg_feature[:,:,i].size
        #print("avg=",s)
        if(s[i]!=0):
            emg_feature[:,:,i] /= s[i]
            emg_feature[:,:,i] -= 0.5*s[i]
    return emg_feature,labels
emg_feature,labels = merge_multisubject(1,4)
print(emg_feature.shape)

In [None]:
# 检查数据
import matplotlib.pyplot as plt
plt.plot(labels)
print(emg_feature[0])

In [None]:
#数据预处理
def precess_data(feat,label):
    """将数据reshape到可以送入神经网络的size

    Args:
        feat: 特征序列
        label: 标签

    Returns:
        [feat, label] 处理后的特征序列和标签
    """
    if (CNN_LSIT[CNN_METHOD] == "FCN"):
        feat = np.swapaxes(feat,1,2)
        feat = np.expand_dims(feat,1)
    if (CNN_LSIT[CNN_METHOD] == "NinaProNet"):
        feat = np.swapaxes(feat,1,2)
    if (CNN_LSIT[CNN_METHOD] == "Resnet18"):
        feat = np.swapaxes(feat,1,2)
        feat = np.expand_dims(feat,1)
        
    feat = feat.astype(np.float32)
    label = label.flatten() - 1
    return feat,label


#### 2. 模型结构

In [None]:
import torch
import torch.nn as nn
from torch.autograd import Variable
from torchsummary import summary
import torch.nn.functional as F
from torchvision import models

class NinaProNet(nn.Module):
    def __init__(self, class_num=None, base_features=16, window_length=256, input_channels=10):
        super(NinaProNet, self).__init__()
        self.class_num = class_num
        self.conv1 = nn.Sequential(
            nn.Conv1d(in_channels=input_channels,
                      out_channels=base_features * 2,
                      kernel_size=3, stride=1, padding=1),
            nn.BatchNorm1d(base_features * 2),
            nn.ReLU(),
            nn.Dropout(p=0.3),
            nn.MaxPool1d(kernel_size=2, stride=2)
        )
        self.conv2 = nn.Sequential(
            nn.Conv1d(in_channels=base_features * 2,
                      out_channels=base_features * 4,
                      kernel_size=3, stride=1, padding=1),
            nn.BatchNorm1d(base_features * 4),
            nn.ReLU(),
            nn.Dropout(p=0.3),
            nn.MaxPool1d(kernel_size=2, stride=2)
        )
        self.conv3 = nn.Sequential(
            nn.Conv1d(in_channels=base_features * 4,
                      out_channels=base_features * 4,
                      kernel_size=3, stride=1, padding=1),
            nn.BatchNorm1d(base_features * 4),
            nn.ReLU(),
            nn.Dropout(p=0.3),
            nn.MaxPool1d(kernel_size=2, stride=2)
        )
        self.conv4 = nn.Sequential(
            nn.Conv1d(in_channels=base_features * 4,
                      out_channels=base_features * 4,
                      kernel_size=3, stride=1, padding=1),
            nn.BatchNorm1d(base_features * 4),
            nn.ReLU(),
            nn.Dropout(p=0.3),
        )

        self.mlp1 = nn.Sequential(
            nn.Linear(base_features * 4 * int(window_length / 8), 256),
            nn.ReLU(),
            nn.Dropout(p=0.5),
            nn.Linear(256, 100),
            nn.ReLU(),
            nn.Dropout(p=0.5)
        )
        self.mlp2 = nn.Linear(100, self.class_num)

    def forward(self, x):
        x = x.type(torch.cuda.FloatTensor)
        x = self.conv1(x)
        x = self.conv2(x)
        x = self.conv3(x)
        x = self.conv4(x)

        x = self.mlp1(x.view(x.size(0), -1))
        x = self.mlp2(x)
        x = F.softmax(x, dim=1)
        return x

class FCN(nn.Module):
    def __init__(self, input_size, class_num):
        super().__init__()
        self.class_num = class_num
        self.input_size = input_size
        self.fcn1 = nn.Sequential(
            nn.Linear(in_features=input_size[0] * input_size[1], out_features=10000),
            nn.Dropout(p=0.3),
            nn.BatchNorm1d(10000),
            nn.ReLU())
        self.fcn2 = nn.Sequential(
            nn.Linear(in_features=10000, out_features=1000),
            nn.Dropout(p=0.3),
            nn.BatchNorm1d(1000),
            nn.ReLU())
        self.fcn3 = nn.Sequential(
            nn.Linear(in_features=1000, out_features=100),
            nn.Dropout(p=0.3),
            nn.BatchNorm1d(100),
            nn.ReLU())
        self.fcn4 = nn.Linear(in_features=100, out_features=self.class_num)

    def forward(self, x):
        x = x.type(torch.cuda.FloatTensor)
        x = x.view(x.size(0), -1)
        x = self.fcn1(x)
        x = self.fcn2(x)
        x = self.fcn3(x)
        x = self.fcn4(x)
        x = F.softmax(x, dim=1)
        return x
device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
print("The model will be running on", device, "device")

model_test = None

if (CNN_LSIT[CNN_METHOD] == "FCN"):
    model_test = FCN(class_num=40,input_size=(5,12)).to(device)
    summary(model_test,(1,5,12))

if (CNN_LSIT[CNN_METHOD] == "NinaProNet"):
    model_test = NinaProNet(class_num=40,input_channels=5,window_length=12).to(device)
    summary(model_test,(5,12))

if (CNN_LSIT[CNN_METHOD] == "Resnet18"):
    model_test = models.resnet18(num_classes=40)
    model_test.conv1 = nn.Conv2d(1, 64, kernel_size=7, stride=2, padding=3, bias=False)
    model_test.to(device)
    summary(model_test,(1,5,12))

#### 3. 模型训练

In [None]:
import time

def testAccuracy(device, model, test_loader):
    
    model.eval()
    accuracy = 0.0
    total = 0.0
    
    with torch.no_grad():
        for data in test_loader:
            emg_data, labels = data
            emg_data = Variable(emg_data.to(device))    # torch.Size([64, 1, 200, 12])
            labels = Variable(labels.to(device))        # torch.Size([64])
            # run the model on the test set to predict labels
            outputs = model(emg_data)
            # the label with the highest energy will be our prediction
            _, predicted = torch.max(outputs.data, 1)
            total += labels.size(0)
            accuracy += (predicted == labels).sum().item()
    
    # compute the accuracy over all test images
    accuracy = (100 * accuracy / total)
    return(accuracy)

train_accs = []
train_loss = []
test_accs = []
def train(device, num_epochs, train_loader, test_loader):
    best_accuracy = 0
    model = model_test
    loss_func = torch.nn.CrossEntropyLoss()
    optimizer = torch.optim.Adam(model.parameters(), lr=1e-4)

    
    

    for epoch in range(num_epochs):
        running_loss = 0.0
        for i,(inputs, labels) in enumerate(train_loader,0):#0是下标起始位置默认为0
            # data 的格式[[inputs, labels]]       
    #         inputs,labels = data
            inputs = Variable(inputs.to(device))    # torch.Size([64, 1, 200, 12])
            labels = Variable(labels.to(device))        # torch.Size([64]) 
            #初始为0，清除上个batch的梯度信息
            optimizer.zero_grad()         

            #前向+后向+优化     
            outputs = model(inputs)
            loss = loss_func(outputs,labels.long())
            loss.backward()
            optimizer.step()

            # loss 的输出，每个一百个batch输出，平均的loss
            running_loss += loss.item()
            if i%100 == 99:
                print('[%d,%5d] loss :%.3f' %
                    (epoch+1,i+1,running_loss/100),end='',flush=True)
                running_loss = 0.0
            train_loss.append(loss.item())

            # 训练曲线的绘制 一个batch中的准确率
            correct = 0
            total = 0
            _, predicted = torch.max(outputs.data, 1)
            total = labels.size(0)# labels 的长度
            correct = (predicted == labels).sum().item() # 预测正确的数目
            train_accs.append(100*correct/total)
            if i%100 == 99:
                print(' acc=%d'%(100*correct/total))
            
        accuracy = testAccuracy(device, model, test_loader)
        print('For epoch', epoch+1,'the test accuracy over the whole test set is %d %%' % (accuracy))
        # we want to save the model if the accuracy is the best
        if accuracy > best_accuracy:
            best_accuracy = accuracy
            torch.save(model.state_dict(), "../model/best_epoch{1}_{0}.pth".format(epoch, (int(time.time())%1000000)))


In [None]:
# 矩阵数据预处理
# 训练集1-16
emg_feature,labels = merge_multisubject(1,4)
emg_feature,labels = precess_data(emg_feature,labels)
print('train reshaped data:',emg_feature.shape)
emg_feature_t = torch.tensor(emg_feature)
labels_t = torch.tensor(labels)
train_dataset = torch.utils.data.TensorDataset(emg_feature_t, labels_t)

# 验证集17-20
emg_feature,labels = merge_multisubject(5,5)
emg_feature,labels = precess_data(emg_feature,labels)
print('test reshaped data:',emg_feature.shape)
emg_feature_t = torch.tensor(emg_feature)
labels_t = torch.tensor(labels)
test_dataset = torch.utils.data.TensorDataset(emg_feature_t, labels_t)

print('get dataloader...', end='',flush=True)
# 划分数据集与训练集
train_loader = torch.utils.data.DataLoader(
    dataset=train_dataset,      # torch TensorDataset format
    batch_size=128,      # mini batch size
    shuffle=True,               # 要不要打乱数据 (打乱比较好)
    num_workers=2,              # 多线程来读数据
    drop_last = True,
   
)
test_loader = torch.utils.data.DataLoader(
    dataset=test_dataset,      # torch TensorDataset format
    batch_size=256,      # mini batch size
    shuffle=True,               # 要不要打乱数据 (打乱比较好)
    num_workers=2,              # 多线程来读数据
    drop_last = True,
)
print('[ok]')
print('begin to train.....')
# 模型训练
train(device, 10, train_loader, test_loader)

In [None]:
train_loss = np.array(train_loss)
train_accs = np.array(train_accs)
plt.plot(train_accs)
plt.plot(train_loss)
np.savetxt('train_loss.csv', train_loss, delimiter=",")
np.savetxt('train_accs.csv', train_accs, delimiter=",")


#### 4. 模型验证

In [None]:
#生成验证集
d,l = merge_multisubject(9,9)
d,l = precess_data(d,l)
print('reshaped data:',d.shape)

print('get dataloader...', end='',flush=True)
d_t = torch.tensor(d)
l_t = torch.tensor(l)
vaild_dataset = torch.utils.data.TensorDataset(d_t, l_t)
vaild_loader = torch.utils.data.DataLoader(
    dataset=vaild_dataset,      # torch TensorDataset format
    batch_size=256,      # mini batch size
    shuffle=False,               # 要不要打乱数据 (打乱比较好)
    num_workers=2,              # 多线程来读数据
    drop_last = True,
)
print('[ok]')

model_test = models.resnet18(num_classes=40)
model_test.conv1 = nn.Conv2d(1, 64, kernel_size=7, stride=2, padding=3, bias=False)
model_test.load_state_dict(torch.load("../model/best_epoch849085_4.pth"))
model_test = model_test.to(device)
accuracy = testAccuracy(device, model_test, vaild_loader)
print('the test accuracy over the whole test set is %d %%' % (accuracy))