In [1]:
import wave
import matplotlib.pyplot as plt
import numpy as np

import scipy as sp
from scipy import signal

import os

In [2]:
def read_actions(path):
    files = os.listdir(path)
    num_files = len(files)
    segment_size = 2000
    data = np.zeros((num_files, segment_size), dtype=np.int16)
    
    for i in range(num_files):
        file = files[i]
        if ".DS_Store" not in file:
            with open(path + "/" +file, "rb") as f:
                signal = [float(line.strip()) for line in f]
                data[i, :] = signal
        else:
            continue        
        
    return data


def wave_data_loader():
    flexion = read_actions('./flexion' + "/action/")
    openhand = read_actions('./openhand' + "/action/")
    punch = read_actions('./punch' + "/action/")
    rest1 = read_actions('./flexion' + "/rest/")
    rest2 = read_actions('./openhand' + "/rest/")
    rest3 = read_actions('./punch' + "/rest/")
    
    all_data = np.vstack((flexion,
                          openhand,
                          punch,
#                           rest1,
#                           rest2,
                          rest3))
    label = np.hstack(([1]*flexion.shape[0],
                      [2]*openhand.shape[0],
                      [3]*punch.shape[0],
#                       [0]*rest1.shape[0],
#                       [0]*rest2.shape[0],
                      [0]*rest3.shape[0]))
    label = np.transpose(label)
#     label_multiClass = np.zeros((label.size,4))
#     for i in range(label.size):
#         label_multiClass[i,label[i]] = 1
    return all_data, label


In [13]:
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader, random_split

class WaveformDataset(Dataset):
    def __init__(self):
        self.signals, self.labels = wave_data_loader()

    def __len__(self):
        return len(self.signals)

    def __getitem__(self, idx):
        signal = self.signals[idx]
        label = self.labels[idx]
        return signal, label

    
    
class WaveformCNN(nn.Module):
    def __init__(self):
        super(WaveformCNN, self).__init__()
        self.conv1 = nn.Sequential(nn.Conv1d(in_channels=1, 
                                             out_channels=16, 
                                             kernel_size=7, 
                                             stride=1, 
                                             padding=3),
#                                    nn.BatchNorm1d(16),
                                   nn.ReLU(),
                                   nn.MaxPool1d(kernel_size=2, stride=2))
        self.conv2 = nn.Sequential(nn.Conv1d(in_channels=16, 
                                             out_channels=32, 
                                             kernel_size=5, 
                                             stride=1, 
                                             padding=2),
                                   nn.ReLU(),
                                   nn.MaxPool1d(kernel_size=2, stride=2))
        self.conv3 = nn.Sequential(nn.Conv1d(in_channels=32, 
                                             out_channels=64, 
                                             kernel_size=3, 
                                             stride=1, 
                                             padding=1),
                                   nn.ReLU(),
                                   nn.MaxPool1d(kernel_size=2, stride=2))

        self.fc1 = nn.Linear(int(64 * 2000/2/2/2), 128)
        self.fc2 = nn.Linear(128, 4)

    def forward(self, x):
        x = x.unsqueeze(1)
        x = self.conv1(x)
        x = self.conv2(x)
        x = self.conv3(x)
        x = torch.flatten(x, start_dim=1)
        x = torch.relu(self.fc1(x))
        x = self.fc2(x)
        return x
    
    
def train(model, device, train_loader, optimizer, epoch):
    """
    Trains the specified model for one epoch.
    """
    model.train()
    for batch_idx, data in enumerate(train_loader):
        signal, label = data[0].to(device), data[1].to(device)
        optimizer.zero_grad()
        output = model(signal.float())
#         print(output)
#         print(label)
        loss = nn.CrossEntropyLoss()(output, label)
        loss.backward()
        optimizer.step()
        
        if batch_idx % 20 == 0:
            print('Train Epoch: {} [{}/{} ({:.0f}%)]\tLoss: {:.6f}'.format(
                epoch, batch_idx * len(signal), len(train_loader.dataset),
                       100. * batch_idx / len(train_loader), loss.item()))

            
def test(model, test_loader):
    # Test the model
    correct = 0
    total = 0
    with torch.no_grad():
        model.eval() 
        for signals, labels in test_loader:
            test_output = model(signals.float())
            _, pred_y = torch.max(test_output.data, 1)
#             pred_y = torch.max(test_output, 1)[1].data.squeeze()  # the same function as above
            total += labels.size(0)
            correct += (pred_y == labels).sum().item()
            
    print(f'Accuracy of the CNN on the {total} test signals: {100 * correct // total} %')
#     print(correct)
#     print(total)

In [4]:
dataset = WaveformDataset()
train_set, test_set = random_split(dataset, [int(0.8*len(dataset)), int(0.2*len(dataset))+1])
train_loader = DataLoader(train_set, batch_size=10, shuffle=True)
test_loader = DataLoader(test_set, batch_size=10, shuffle=True)
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model = WaveformCNN().to(device)
optimizer = optim.Adam(model.parameters(), lr=0.001)

In [9]:
# train
for epoch in range(1, 2):
    train(model, device, train_loader, optimizer, epoch)



In [None]:
# save parameter
PATH = r'./model_parameter/cifar_net.pth'
torch.save(model.state_dict(), PATH)

In [14]:
# test
test(model,test_loader)

Accuracy of the CNN on the 216 test signals: 39 %


In [17]:
# 统计各类的数据量 尽量balance
_, label = wave_data_loader()
(label==0).sum()

460