In [1]:
import warnings
warnings.filterwarnings("ignore")

import os
import numpy as np
import pandas as pd
from sklearn.metrics import accuracy_score
from sklearn.metrics import f1_score

from torch.utils.data import Dataset
from torch.utils.data import DataLoader
import torch
import torch.nn as nn
import torch.nn.functional as F

In [2]:
# Some Functions 

def load_y_data(y_path):
    y = np.loadtxt(y_path, dtype=np.int32).reshape(-1,1)
    # change labels range from 1-6 t 0-5, this enables a sparse_categorical_crossentropy loss function
    return y - 1

def load_X_data(X_path):
    X_signal_paths = [X_path + file for file in os.listdir(X_path)]
    X_signals = [np.loadtxt(path, dtype=np.float32) for path in X_signal_paths]
    return np.transpose(np.array(X_signals), (1, 2, 0))


class data_set(Dataset):
    def __init__(self, data_x, data_y):
        self.data_x = data_x
        self.data_y = data_y
        

    def __len__(self):
        return len(self.data_x)
    
    def __getitem__(self, index):
        sample_x = self.data_x[index]
        sample_y = self.data_y[index]
        return sample_x, sample_y
    

def validation(data_loader, criterion):
    
    model.eval()
    #model.eval()是保证BN层能够用全部训练数据的均值和方差，
    #即测试过程中要保证BN层的均值和方差不变。对于Dropout，model.eval()是利用到了所有网络连接，即不进行随机舍弃神经元。
    total_loss = []
    preds = []
    trues = []
    with torch.no_grad():
        for i, (batch_x,batch_y) in enumerate(data_loader):

            batch_x = batch_x.double().to(device)
            batch_y = batch_y.long().to(device)

            outputs = model(batch_x)

            pred = outputs.detach()#.cpu()
            true = batch_y.detach()#.cpu()

            loss = criterion(pred, true) 
            total_loss.append(loss.cpu())

            preds.extend(list(np.argmax(outputs.detach().cpu().numpy(),axis=1)))
            trues.extend(list(batch_y.detach().cpu().numpy()))   

    total_loss = np.average(total_loss)
    acc = accuracy_score(preds,trues)

    f_w = f1_score(trues, preds, average='weighted')
    f_macro = f1_score(trues, preds, average='macro')
    f_micro = f1_score(trues, preds, average='micro')
    model.train()

    return total_loss,  acc, f_w,  f_macro, f_micro#, f_1



# Load the data

In [3]:
PATH =  "C:\\Users\\weizh\\桌面\\asd\\Model\\UCI HAR Dataset_2"
LABEL_NAMES = ["Walking", "Walking upstairs", "Walking downstairs", "Sitting", "Standing", "Laying"]

# load X data
X_train = load_X_data(os.path.join(PATH + r'\train\Inertial Signals/'))
X_test = load_X_data(os.path.join(PATH + r'\test\Inertial Signals/'))
# load y label
y_train = load_y_data(os.path.join(PATH + r'\train\y_train.txt'))
y_test = load_y_data(os.path.join(PATH + r'\test\y_test.txt'))

print("useful information:")
print(f"shapes (n_samples, n_steps, n_signals) of X_train: {X_train.shape} and X_test: {X_test.shape}")


useful information:
shapes (n_samples, n_steps, n_signals) of X_train: (7352, 128, 9) and X_test: (2947, 128, 9)


# dataloader

In [4]:
batch_size = 128
train_data = data_set(X_train,y_train[:,0])
test_data = data_set(X_test,y_test[:,0])

train_data_loader = DataLoader(train_data, 
                               batch_size   =  batch_size,
                               shuffle      =  True,
                               num_workers  =  0,
                               drop_last    =  False)

test_data_loader = DataLoader(test_data, 
                               batch_size   =  batch_size,
                               shuffle      =  False,
                               num_workers  =  0,
                               drop_last    =  False)

# Model

In [5]:
def conv1d(ni: int, no: int, ks: int = 1, stride: int = 1, padding: int = 0, bias: bool = False):
    """
    ni: in channel  no: out channel ks:kern size
    Create and initialize a `nn.Conv1d` layer with spectral normalization.
    """
    conv = nn.Conv1d(ni, no, ks, stride=stride, padding=padding, bias=bias)
    nn.init.kaiming_normal_(conv.weight)
    if bias:
        conv.bias.data.zero_()
    # return spectral_norm(conv)
    return conv

In [6]:
class SelfAttention(nn.Module):
    """
    # self-attention implementation from https://github.com/fastai/fastai/blob/5c51f9eabf76853a89a9bc5741804d2ed4407e49/fastai/layers.py
    Self attention layer for nd
    """
    def __init__(self, n_channels: int, div):
        super(SelfAttention, self).__init__()

        if n_channels > 1:
            self.query = conv1d(n_channels, n_channels//div,bias=True)
            self.key = conv1d(n_channels, n_channels//div,bias=True)
        else:
            self.query = conv1d(n_channels, n_channels,bias=True)
            self.key = conv1d(n_channels, n_channels,bias=True)
        self.value = conv1d(n_channels, n_channels)
        self.gamma = nn.Parameter(torch.tensor([0.]))

    def forward(self, x):
        # Notation from https://arxiv.org/pdf/1805.08318.pdf
        size = x.size()
        #print("size+",size)
        x = x.view(*size[:2], -1)
        #print("size-",x.size())
        f, g, h = self.query(x), self.key(x), self.value(x)

        beta = F.softmax(torch.bmm(f.permute(0, 2, 1).contiguous(), g), dim=1)
        o = self.gamma * torch.bmm(h, beta) + x
        return o.view(*size).contiguous()

In [13]:
a = SelfAttention(32,1)

In [14]:
a

SelfAttention(
  (query): Conv1d(32, 32, kernel_size=(1,), stride=(1,))
  (key): Conv1d(32, 32, kernel_size=(1,), stride=(1,))
  (value): Conv1d(32, 32, kernel_size=(1,), stride=(1,), bias=False)
)

In [15]:
inp = torch.randn(1,32,9)
a(inp).shape

torch.Size([1, 32, 9])

In [10]:
class HARmodel(nn.Module):
    def __init__(
        self,
        input_shape ,
        number_class , 
        filter_num = 32,
        filter_size = 5,
        nb_conv_layers = 4,
        dropout = 0.2,
        activation = "ReLU",
        sa_div= 1,
    ):
        super(HARmodel, self).__init__()
        
        # PART 1 , Channel wise Feature Extraction
        
        layers_conv = []
        for i in range(nb_conv_layers):
        
            if i == 0:
                in_channel = 1
            else:
                in_channel = filter_num
    
            layers_conv.append(nn.Sequential(
                nn.Conv2d(in_channel, filter_num, (filter_size, 1),(2,1)),#(2,1)
                nn.ReLU(inplace=True),
                nn.BatchNorm2d(filter_num),

            ))
        
        self.layers_conv = nn.ModuleList(layers_conv)

        # PART2 , Cross Channel Fusion through Attention
        self.dropout = nn.Dropout(dropout)

        self.sa = SelfAttention(filter_num, sa_div)
        
        shape = self.get_the_shape(input_shape)

        # PART 3 , Prediction 
        
        self.activation = nn.ReLU() 
        self.fc1 = nn.Linear(input_shape[2]*filter_num ,filter_num)
        self.flatten = nn.Flatten()
        self.fc2 = nn.Linear(shape[1]*filter_num ,filter_num)
        self.fc3 = nn.Linear(filter_num ,number_class)


        
    def get_the_shape(self, input_shape):
        x = torch.rand(input_shape)
        print('aaaaaaaaaaaa',x.shape)
        x = x.unsqueeze(1)
        print('bbbbbbbbbbbb',x.shape)
        for layer in self.layers_conv:
            x = layer(x)  
            print(type(x))
        atten_x = torch.cat(
            [self.sa(torch.unsqueeze(x[:, :, t, :], dim=3)) for t in range(x.shape[2])],
            dim=-1,
        )
        atten_x = atten_x.permute(0, 3, 1, 2)
        return atten_x.shape
    def refined(self,x):
        refined = torch.cat(
            [self.sa(torch.unsqueeze(x[:, :, t, :], dim=3)) for t in range(x.shape[2])],
            dim=-1,
        )
        return refined
    def forward(self, x):
        # B L C
        #print(x.shape)
        print('输入数据尺寸',x.shape)
        x = x.unsqueeze(1)
        
        print(type(x))
        for layer in self.layers_conv:
            x = layer(x)      
            print('每一个卷积层输出尺寸',x.shape)
        #batch, filter, length, channel = x.shape


        # apply self-attention on each temporal dimension (along sensor and feature dimensions)
        


        #print('app',refined.shape)
        x = self.refined(x)
        x = x.permute(0, 3, 1, 2)
        print('refined/attention之后的数据尺寸',x.shape)
        #print('refined.permute(0, 3, 1, 2)')
        x = x.reshape(x.shape[0], x.shape[1], -1)
        print('reshape之后数据尺寸',x.shape)
        x = self.dropout(x)
        print('dropout之后数据尺寸',x.shape)
        
        x = self.activation(self.fc1(x)) # B L C
        print('activation之后数据尺寸',x.shape)
        x = self.flatten(x)
        print('flatten之后的数据尺寸',x.shape)
        #print(type(x))
        x = self.activation(self.fc2(x)) # B L C
        print('再次activation之后的数据尺寸',x.shape)
        y = self.fc3(x)    
        print('最终输出的数据尺寸',y.shape,'\n'*2)
        return y

In [11]:
a_1 = nn.Conv1d(9, 36, 1)

In [12]:
inp = torch.randn(32,9,128)
inp = a_1(inp)
inp.shape

torch.Size([32, 36, 128])

In [20]:
class GLU(nn.Module):
    def __init__(self, dim):
        super().__init__()
        self.dim = dim

    def forward(self, x):
        out, gate = x.chunk(2, dim=self.dim)
        return out * gate.sigmoid()

In [21]:
glu = GLU(dim=0)

In [22]:
glu(inp).shape

torch.Size([16, 36, 128])

# Train and Evaluation

In [14]:
import sys
import torch
import tensorwatch as tw
import torchvision.models

In [15]:
from torch import optim
import time
learning_rate = 0.0001
train_epochs = 300
device = torch.device('cuda:{}'.format(0))
criterion =  nn.CrossEntropyLoss(reduction="mean").to(device)


#input_shape = (1, length, channel)
model = HARmodel((1,128,9),6,filter_num = 32).double().to(device)

print("Parameter :", np.sum([para.numel() for para in model.parameters()]))
model_optim = optim.Adam(model.parameters(), lr=learning_rate)
train_steps = len(train_data_loader)

aaaaaaaaaaaa torch.Size([1, 128, 9])
bbbbbbbbbbbb torch.Size([1, 1, 128, 9])
<class 'torch.Tensor'>
<class 'torch.Tensor'>
<class 'torch.Tensor'>
<class 'torch.Tensor'>
Parameter : 33639


import torch.onnx
 
import netron
input_ = torch.randn(128,128,9).to(device=device, dtype=torch.double)
out_put = model(input_)
print(type(out_put))
print(out_put_.shape)
onnx_path = "netForwatch.onnx"
torch.onnx.export(model, input, onnx_path)

netron.start(onnx_path)

In [16]:
for epoch in range(train_epochs):
    train_loss = []
    model.train()
    epoch_time = time.time()

    for i, (batch_x,batch_y) in enumerate(train_data_loader):
        #start = time.time()


        model_optim.zero_grad()

        batch_x = batch_x.double().to(device)
        batch_y = batch_y.long().to(device)
        
        outputs = model(batch_x)
        #print(time.time()-start)
        loss = criterion(outputs, batch_y)
        
        train_loss.append(loss.item())

        loss.backward()
        model_optim.step()
        #print(time.time()-start)
        #print("-------------")

    print("Epoch: {} cost time: {}".format(epoch+1, time.time()-epoch_time))

    train_loss = np.average(train_loss)


    test_loss , test_acc, test_f_w,  test_f_macro,  test_f_micro = validation(test_data_loader, criterion)

    print("TEST: Epoch: {0}, Steps: {1} | Train Loss: {2:.7f}  Test Loss: {3:.7f} Test Accuracy: {4:.7f}  Test weighted F1: {5:.7f}  Test macro F1 {6:.7f} ".format(
        epoch + 1, train_steps, train_loss, test_loss, test_acc, test_f_w, test_f_macro))

输入数据尺寸 torch.Size([128, 128, 9])
<class 'torch.Tensor'>
每一个卷积层输出尺寸 torch.Size([128, 32, 62, 9])
每一个卷积层输出尺寸 torch.Size([128, 32, 29, 9])
每一个卷积层输出尺寸 torch.Size([128, 32, 13, 9])
每一个卷积层输出尺寸 torch.Size([128, 32, 5, 9])
refined/attention之后的数据尺寸 torch.Size([128, 5, 32, 9])
reshape之后数据尺寸 torch.Size([128, 5, 288])
dropout之后数据尺寸 torch.Size([128, 5, 288])
activation之后数据尺寸 torch.Size([128, 5, 32])
flatten之后的数据尺寸 torch.Size([128, 160])
再次activation之后的数据尺寸 torch.Size([128, 32])
最终输出的数据尺寸 torch.Size([128, 6]) 


输入数据尺寸 torch.Size([128, 128, 9])
<class 'torch.Tensor'>
每一个卷积层输出尺寸 torch.Size([128, 32, 62, 9])
每一个卷积层输出尺寸 torch.Size([128, 32, 29, 9])
每一个卷积层输出尺寸 torch.Size([128, 32, 13, 9])
每一个卷积层输出尺寸 torch.Size([128, 32, 5, 9])
refined/attention之后的数据尺寸 torch.Size([128, 5, 32, 9])
reshape之后数据尺寸 torch.Size([128, 5, 288])
dropout之后数据尺寸 torch.Size([128, 5, 288])
activation之后数据尺寸 torch.Size([128, 5, 32])
flatten之后的数据尺寸 torch.Size([128, 160])
再次activation之后的数据尺寸 torch.Size([128, 32])
最终输出的数据尺寸 torch.Size([128,

<class 'torch.Tensor'>
每一个卷积层输出尺寸 torch.Size([128, 32, 62, 9])
每一个卷积层输出尺寸 torch.Size([128, 32, 29, 9])
每一个卷积层输出尺寸 torch.Size([128, 32, 13, 9])
每一个卷积层输出尺寸 torch.Size([128, 32, 5, 9])
refined/attention之后的数据尺寸 torch.Size([128, 5, 32, 9])
reshape之后数据尺寸 torch.Size([128, 5, 288])
dropout之后数据尺寸 torch.Size([128, 5, 288])
activation之后数据尺寸 torch.Size([128, 5, 32])
flatten之后的数据尺寸 torch.Size([128, 160])
再次activation之后的数据尺寸 torch.Size([128, 32])
最终输出的数据尺寸 torch.Size([128, 6]) 


输入数据尺寸 torch.Size([128, 128, 9])
<class 'torch.Tensor'>
每一个卷积层输出尺寸 torch.Size([128, 32, 62, 9])
每一个卷积层输出尺寸 torch.Size([128, 32, 29, 9])
每一个卷积层输出尺寸 torch.Size([128, 32, 13, 9])
每一个卷积层输出尺寸 torch.Size([128, 32, 5, 9])
refined/attention之后的数据尺寸 torch.Size([128, 5, 32, 9])
reshape之后数据尺寸 torch.Size([128, 5, 288])
dropout之后数据尺寸 torch.Size([128, 5, 288])
activation之后数据尺寸 torch.Size([128, 5, 32])
flatten之后的数据尺寸 torch.Size([128, 160])
再次activation之后的数据尺寸 torch.Size([128, 32])
最终输出的数据尺寸 torch.Size([128, 6]) 


输入数据尺寸 torch.Size([128, 1

<class 'torch.Tensor'>
每一个卷积层输出尺寸 torch.Size([128, 32, 62, 9])
每一个卷积层输出尺寸 torch.Size([128, 32, 29, 9])
每一个卷积层输出尺寸 torch.Size([128, 32, 13, 9])
每一个卷积层输出尺寸 torch.Size([128, 32, 5, 9])
refined/attention之后的数据尺寸 torch.Size([128, 5, 32, 9])
reshape之后数据尺寸 torch.Size([128, 5, 288])
dropout之后数据尺寸 torch.Size([128, 5, 288])
activation之后数据尺寸 torch.Size([128, 5, 32])
flatten之后的数据尺寸 torch.Size([128, 160])
再次activation之后的数据尺寸 torch.Size([128, 32])
最终输出的数据尺寸 torch.Size([128, 6]) 


输入数据尺寸 torch.Size([128, 128, 9])
<class 'torch.Tensor'>
每一个卷积层输出尺寸 torch.Size([128, 32, 62, 9])
每一个卷积层输出尺寸 torch.Size([128, 32, 29, 9])
每一个卷积层输出尺寸 torch.Size([128, 32, 13, 9])
每一个卷积层输出尺寸 torch.Size([128, 32, 5, 9])
refined/attention之后的数据尺寸 torch.Size([128, 5, 32, 9])
reshape之后数据尺寸 torch.Size([128, 5, 288])
dropout之后数据尺寸 torch.Size([128, 5, 288])
activation之后数据尺寸 torch.Size([128, 5, 32])
flatten之后的数据尺寸 torch.Size([128, 160])
再次activation之后的数据尺寸 torch.Size([128, 32])
最终输出的数据尺寸 torch.Size([128, 6]) 


输入数据尺寸 torch.Size([128, 1

<class 'torch.Tensor'>
每一个卷积层输出尺寸 torch.Size([128, 32, 62, 9])
每一个卷积层输出尺寸 torch.Size([128, 32, 29, 9])
每一个卷积层输出尺寸 torch.Size([128, 32, 13, 9])
每一个卷积层输出尺寸 torch.Size([128, 32, 5, 9])
refined/attention之后的数据尺寸 torch.Size([128, 5, 32, 9])
reshape之后数据尺寸 torch.Size([128, 5, 288])
dropout之后数据尺寸 torch.Size([128, 5, 288])
activation之后数据尺寸 torch.Size([128, 5, 32])
flatten之后的数据尺寸 torch.Size([128, 160])
再次activation之后的数据尺寸 torch.Size([128, 32])
最终输出的数据尺寸 torch.Size([128, 6]) 


输入数据尺寸 torch.Size([128, 128, 9])
<class 'torch.Tensor'>
每一个卷积层输出尺寸 torch.Size([128, 32, 62, 9])
每一个卷积层输出尺寸 torch.Size([128, 32, 29, 9])
每一个卷积层输出尺寸 torch.Size([128, 32, 13, 9])
每一个卷积层输出尺寸 torch.Size([128, 32, 5, 9])
refined/attention之后的数据尺寸 torch.Size([128, 5, 32, 9])
reshape之后数据尺寸 torch.Size([128, 5, 288])
dropout之后数据尺寸 torch.Size([128, 5, 288])
activation之后数据尺寸 torch.Size([128, 5, 32])
flatten之后的数据尺寸 torch.Size([128, 160])
再次activation之后的数据尺寸 torch.Size([128, 32])
最终输出的数据尺寸 torch.Size([128, 6]) 


输入数据尺寸 torch.Size([128, 1

KeyboardInterrupt: 

The Evaluation is not standard here. I directly printed the performance of the model on the test set after each epoch training. As you can see, the test accuracy finally converges to about 94. But there are only 8599 parameters