In [68]:
import numpy as np

train_x = np.load('/home/intelligence/Robin/Dataset/save_raw_data/insole_train_win_x.npy')
train_y = np.load('/home/intelligence/Robin/Dataset/save_raw_data/insole_train_win_y.npy')
test_x = np.load('/home/intelligence/Robin/Dataset/save_raw_data/insole_test_win_x.npy')
test_y = np.load('/home/intelligence/Robin/Dataset/save_raw_data/insole_test_win_y.npy')
print(f'train_x shape:{train_x.shape}')
print(f'train_y shape:{train_y.shape}')
print(f'test_x shape:{test_x.shape}')
print(f'test_y shape:{test_y.shape}')

train_x shape:(27819, 300, 16)
train_y shape:(27819,)
test_x shape:(11923, 300, 16)
test_y shape:(11923,)


In [69]:
# 分别提取出左右脚数据
left_train_x = train_x[:,:,0:8]
right_train_x = train_x[:,:,8:16]
left_test_x = test_x[:,:,0:8]
right_test_x = test_x[:,:,8:16]

train_x_1 = left_train_x[:,:,0:5]
train_x_2 = left_train_x[:,:,5:8]
train_x_3 = right_train_x[:,:,0:5]
train_x_4 = right_train_x[:,:,5:8]

test_x_1 = left_test_x[:,:,0:5]
test_x_2 = left_test_x[:,:,5:8]
test_x_3 = right_test_x[:,:,0:5]
test_x_4 = right_test_x[:,:,5:8]

train_x_new = np.concatenate((train_x_1, train_x_2, train_x_3, train_x_4),axis=2)
test_x_new = np.concatenate((test_x_1, test_x_2, test_x_3, test_x_4),axis=2)
print(train_x_new.shape)

(27819, 300, 16)


In [70]:
import torch
import torch.nn as nn
class FCN(nn.Module):
    def __init__(self, n_channels=16, n_timesteps=300, n_classes=21, out_channels=128):
        super(FCN, self).__init__()

        self.conv_block1 = nn.Sequential(nn.Conv1d(n_channels, 32, kernel_size=8, stride=1, bias=False, padding=4),
                                         nn.BatchNorm1d(32),
                                         nn.ReLU(),
                                         nn.MaxPool1d(kernel_size=2, stride=2, padding=1),
                                         nn.Dropout(0.35))
        self.conv_block2 = nn.Sequential(nn.Conv1d(32, 64, kernel_size=8, stride=1, bias=False, padding=4),
                                         nn.BatchNorm1d(64),
                                         nn.ReLU(),
                                         nn.MaxPool1d(kernel_size=2, stride=2, padding=1))
        self.conv_block3 = nn.Sequential(nn.Conv1d(64, out_channels, kernel_size=8, stride=1, bias=False, padding=4),
                                         nn.BatchNorm1d(out_channels),
                                         nn.ReLU(),
                                         nn.MaxPool1d(kernel_size=2, stride=2, padding=1))

        self.out_len = n_timesteps
        for _ in range(3):
            self.out_len = (self.out_len + 1) // 2 + 1

        self.out_channels = out_channels  # 128
        self.out_dim = self.out_len * self.out_channels

        self.logits = nn.Linear(self.out_len * out_channels, n_classes)

    def forward(self, x_in):
        x_in = x_in.permute(0, 2, 1)
        x = self.conv_block1(x_in)
        x = self.conv_block2(x)
        x = self.conv_block3(x)

        x_flat = x.reshape(x.shape[0], -1)
        logits = self.logits(x_flat)
        return logits

In [71]:
import numpy as np
import pandas as pd
import os
import time
import random
from torch.utils import data
from copy import deepcopy
import sklearn.metrics


def set_seed(seed):
    random.seed(seed)
    os.environ['PYTHONHASHSEED'] = str(seed)
    np.random.seed(seed)
    torch.manual_seed(seed)
    torch.cuda.manual_seed(seed)
    torch.cuda.manual_seed_all(seed) # if you are using multi-GPU.
    torch.backends.cudnn.benchmark = False
    torch.backends.cudnn.deterministic = True

def load_array(data_arrays, batch_size, is_train=True):
    """构造一个pytorch数据构造器"""
    dataset = data.TensorDataset(*data_arrays)
    return data.DataLoader(dataset, batch_size, shuffle=is_train)

def train(net, device, train_loader, test_loader, loss, num_epochs, optimizer, scheduler, metrics):
    print('training on', device)
    net.to(device)

    # 声明需要保存的量
    train_loss, test_loss, train_metrics, test_metrics, time_list = [], [], [], [], []
    max_test_metrics = 0.0
    # animator = d2l.Animator(xlabel='epoch', xlim=[1,num_epochs], ylim=[0,1],legend=['train loss', 'test loss', 'test accuracy'])
    for epoch in range(num_epochs):
        train_loss_batch, test_loss_batch, train_metrics_batch, test_metrics_batch = [], [], [], []
        start = time.time()
        # 训练
        if isinstance(net, nn.Module):
            net.train()  # 设置成训练模式，要计算梯度
        for X, y in train_loader:
            # 获取训练数据和标签
            X, y = X.to(device).float(), y.to(device).long()
            # 数据输入网络，前向传播
            y_hat = net(X)
            # 网络输出和标签输入损失函数，求得损失
            l = loss(y_hat, y)
            if isinstance(optimizer, torch.optim.Optimizer):
                optimizer.zero_grad()  # 清零优化器，梯度清零
                l.backward()  # 损失反向传播，计算梯度
                optimizer.step()  # 更新优化器，更新参数
            # 累加损失
            train_loss_batch.append(l.item())
            train_metrics_batch.append(metrics(y.cpu(), torch.argmax(y_hat, dim=1).cpu(), average='micro'))
        scheduler.step()
        # 累加训练损失
        train_loss.append(np.mean(train_loss_batch))
        train_metrics.append(np.mean(train_metrics_batch))
        time_list.append(time.time() - start)
        # 验证
        if isinstance(net, nn.Module):  # 如果是用torch.nn实现的模型，就将它转换成评估模式
            net.eval()  # 将模型设置为评估模式：不再计算梯度，只做forward pass
        for X, y in test_loader:
            X, y = X.to(device), y.to(device)
            y_hat = net(X)
            l = loss(y_hat, y.long())
            test_loss_batch.append(l.item())
            test_metrics_batch.append(metrics(y.cpu(), torch.argmax(y_hat, dim=1).cpu(), average='micro'))
        # 累加验证损失
        test_loss.append(np.mean(test_loss_batch))
        test_metrics.append(np.mean(test_metrics_batch))
        # 打印信息
        print("Time:{:.3f}s...".format(time_list[-1]),
              "Epoch:{}/{}...".format(epoch + 1, num_epochs),
              "Train Loss:{:.4f}...".format(train_loss[-1]),
              "Train Metrics:{:.4f}...".format(train_metrics[-1]),
              "Val Loss:{:.4f}...".format(test_loss[-1]),
              "Val Metrics:{:.4f}...".format(test_metrics[-1]),
              "Lr:{:.5f}...".format(optimizer.state_dict()['param_groups'][0]['lr']))
        if max_test_metrics < test_metrics[-1]:
            max_test_metrics = test_metrics[-1]
            best_model = deepcopy(net.state_dict())  # 保存训练epoch中指标最好的参数模型

    print("max_test_metrics:{:.4f}...".format(max_test_metrics))
    return train_loss, test_loss, train_metrics, test_metrics, best_model

def evaluate(test_loader, best_model, DEVICE, criterion):
    model = FCN()
    model = model.to(DEVICE)
    model.load_state_dict(best_model)
    with torch.no_grad():
        model.eval()
        test_loss = []
        test_metrics_batch = []
        for idx, (sample, target) in enumerate(test_loader):
            sample, target = sample.to(DEVICE), target.to(DEVICE).long()
            out = model(sample)
            loss = criterion(out, target)
            test_loss.append(loss.item())
            test_metrics_batch.append(sklearn.metrics.f1_score(target.cpu(), torch.argmax(out, dim=1).cpu(),
                                                               average='micro'))
            outputs = torch.argmax(out, dim=1)

        test_loss_avg = np.mean(test_loss)
        test_metrics_avg = np.mean(test_metrics_batch)
        test_f1_score = float(test_metrics_avg) * 100.0

    print(f'Evaluate:\n')
    print(f'Final Test Loss : {test_loss_avg:.4f}\t | \tFinal Test F1_Score : {test_f1_score:2.4f}\n')

In [72]:
set_seed(2023)
train_x = torch.Tensor(train_x)
train_y = torch.Tensor(train_y)
test_x = torch.Tensor(test_x)
test_y = torch.Tensor(test_y)
batch_size = 128
train_loader = load_array((train_x, train_y), batch_size, False)
test_loader = load_array((test_x, test_y), batch_size, False)
loss = nn.CrossEntropyLoss()
net = FCN()
optimizer = torch.optim.Adam(net.parameters(), lr=0.01, weight_decay=1e-3)
scheduler = torch.optim.lr_scheduler.ExponentialLR(optimizer, gamma=0.85)
num_epochs = 100

# GPU训练
train_on_gpu = torch.cuda.is_available()
if train_on_gpu:
    print('Training in GPU')
else:
    print('No GPU available, training on CPU; consider making n_epochs vertargets small.')
device = 'cuda' if train_on_gpu else 'cpu'

train_loss, test_loss, train_metrics, test_metrics, best_model = \
    train(net, device, train_loader, test_loader, loss, num_epochs, optimizer, scheduler, sklearn.metrics.f1_score)


Training in GPU
training on cuda
Time:0.931s... Epoch:1/100... Train Loss:3.2320... Train Metrics:0.2768... Val Loss:1.5305... Val Metrics:0.4513... Lr:0.00850...
Time:0.923s... Epoch:2/100... Train Loss:1.3780... Train Metrics:0.5114... Val Loss:1.1746... Val Metrics:0.5790... Lr:0.00723...
Time:0.907s... Epoch:3/100... Train Loss:1.1313... Train Metrics:0.5973... Val Loss:1.0665... Val Metrics:0.6049... Lr:0.00614...
Time:1.126s... Epoch:4/100... Train Loss:0.9968... Train Metrics:0.6503... Val Loss:1.0280... Val Metrics:0.6298... Lr:0.00522...
Time:0.938s... Epoch:5/100... Train Loss:0.8894... Train Metrics:0.6865... Val Loss:0.9296... Val Metrics:0.6702... Lr:0.00444...
Time:0.941s... Epoch:6/100... Train Loss:0.8175... Train Metrics:0.7108... Val Loss:0.8743... Val Metrics:0.6834... Lr:0.00377...
Time:1.024s... Epoch:7/100... Train Loss:0.7590... Train Metrics:0.7290... Val Loss:0.8131... Val Metrics:0.7203... Lr:0.00321...
Time:0.939s... Epoch:8/100... Train Loss:0.7116... Train 

In [73]:
def perturbation_rank(test_loader, best_model, DEVICE):
    model = FCN()
    model = model.to(DEVICE)
    model.load_state_dict(best_model)

    metrics = []
    errors = []
    names = ['P1', 'P2', 'P3', 'P4', 'P5', 'P6', 'P7', 'P8']
    with torch.no_grad():
        model.eval()
        for i in range(8):
            test_loss = []
            test_metrics_batch = []
            for idx, (sample, target) in enumerate(test_loader):
                sample = sample.cpu().numpy()
                np.random.shuffle(sample[:,:,i])
                np.random.shuffle(sample[:,:,i+8])
                sample, y = torch.from_numpy(sample).to(DEVICE).float(), target.to(DEVICE).long()
                out = model(sample)
                loss = nn.CrossEntropyLoss()(out, y)
                test_loss.append(loss.item())
                test_metrics_batch.append(sklearn.metrics.f1_score(y.cpu(), torch.argmax(out, dim=1).cpu(),
                                                                   average='micro'))

            test_loss_avg = np.mean(test_loss)
            errors.append(test_loss_avg)

            test_metrics_avg = np.mean(test_metrics_batch)
            metrics.append(test_metrics_avg)
            print(f'shuffle column {i+1}:')
            print(f'Final Test Loss : {test_loss_avg:.4f}\t | \tFinal Test F1_Score : {test_metrics_avg:.4f}\n')

    max_error = np.max(errors)
    importance = [e/max_error for e in errors]
    data = {'name':names,'error':errors,'importance':importance}
    result = pd.DataFrame(data,columns = ['name','error','importance'])

    # max_metrics = np.max(metrics)
    # importance = [e/max_metrics for e in metrics]
    # importance = [1 - importan for importan in importance]
    # data = {'name':names,'metrics':metrics,'importance':importance}
    # result = pd.DataFrame(data,columns = ['name','metrics','importance'])

    result.sort_values(by=['importance'],ascending=[0],inplace=True)
    result.reset_index(inplace=True,drop=True)

    return result

In [74]:
evaluate(test_loader, best_model, device, loss)
# result = perturbation_rank(test_loader, best_model, device)
# print(result)

Evaluate:

Final Test Loss : 0.4350	 | 	Final Test F1_Score : 85.5303

