In [None]:
import numpy as np
import matplotlib.pyplot as plt
from sklearn.model_selection import train_test_split

import torch
import torch.nn as nn
import torch.nn.functional as F
from torch.nn import CrossEntropyLoss
from torch.optim import SGD
from torch.utils.data import TensorDataset, DataLoader
from torchvision.transforms import ToTensor
from torch.autograd import Variable

### Data Preprocessing

In [None]:
# Store data as list

data_features_list = []
data_labels_list = []

for i in range(32):
    filename = 'C:/Users/User/OneDrive - sjtu.edu.cn/SJTU/Y3-2/PRP/PRP Transformer/data/datapre{}-0.npz'.format(i+1)
    data = np.load(filename, allow_pickle=True)
    data = data['datapre']

    # Features
    data_feature = data[:, [0, 1, 7, 8]]
    data_feature = torch.tensor(data_feature, dtype=torch.float32)
#     data_feature = data_feature[:350]
    data_features_list.append(data_feature)

    # Labels
    #0清醒，1浅睡N1阶段，2浅睡N2阶段，3深睡阶段，4REM期，我一般做的时候都把1和2合并成一类，都变成2，这样我们就一共有，0,2,3,4共四类
    data_label = data[:, -1]
    data_label = data_label.astype(int)
    data_label[data_label == 1] = 2
    data_label[data_label == 5] = 4
    data_label[data_label != 0] -= 1
    data_label = torch.tensor(data_label, dtype=torch.int64)
    data_label = F.one_hot(data_label)
    data_label = data_label.type(torch.float32)
#     data_label = data_label[:350]
    data_labels_list.append(data_label)
    

# train test split. Size of train data is 80% and size of test data is 20%. 
data_feature_train, data_feature_test, data_label_train, data_label_test = train_test_split(data_features_list,
                                                                             data_labels_list,
                                                                             test_size = 0.2,
                                                                             random_state = 42)
data_feature_train = torch.stack(data_feature_train)
data_feature_test = torch.stack(data_feature_test)
data_label_train = torch.stack(data_label_train)
data_label_test = torch.stack(data_label_test)

data_feature_train.shape, data_feature_test.shape, data_label_train.shape, data_label_test.shape

### Data Loader

In [None]:
# batch_size, epoch and iteration
batch_size = 1
n_iters = 8000
num_epochs = n_iters / (len(data_feature_train) / batch_size)
num_epochs = int(num_epochs)

train_dataset = TensorDataset(data_feature_train, data_label_train)
test_dataset = TensorDataset(data_feature_test, data_label_test)

train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=False)
test_loader = DataLoader(test_dataset, batch_size=batch_size, shuffle=False)

for batch_features, batch_targets in train_loader:
    print(batch_features.shape, batch_targets.shape)

# batch_features: (batch_size, feature_size); batch_targets: (batch_size)

### Many-to-many RNN

In [None]:
class RNNModel(nn.Module):
    def __init__(self, input_dim=4, hidden_dim=256, layer_dim=2, output_dim=4, batch_first=True):
        super(RNNModel, self).__init__()
        self.hidden_dim = hidden_dim
        self.layer_dim = layer_dim
        self.rnn = nn.RNN(input_size=input_dim, hidden_size=hidden_dim, num_layers=layer_dim, batch_first=True)
        self.fc = nn.Linear(in_features=hidden_dim, out_features=output_dim)
        
    def forward(self, x):
        # Initialize hidden state with zeros
        h0 = Variable(torch.zeros(self.layer_dim, x.size(0), self.hidden_dim))
            
        # One time step
        out, hn = self.rnn(x, h0)
        fc_output = self.fc(out)
        fc_output = fc_output.softmax(dim=2)
        
        return fc_output

In [None]:
# Create RNN
input_dim = 4    # input dimension
hidden_dim = 256  # hidden layer dimension
layer_dim = 2     # number of hidden layers
output_dim = 4   # output dimension
# seq_dim = 350

model = RNNModel(input_dim, hidden_dim, layer_dim, output_dim)

# Cross Entropy Loss 
error = nn.CrossEntropyLoss()

# SGD Optimizer
learning_rate = 0.05
optimizer = torch.optim.SGD(model.parameters(), lr=learning_rate)

In [None]:
loss_list = []
iteration_list = []
accuracy_list = []
count = 0

for epoch in range(num_epochs):
    for i, (x, labels) in enumerate(train_loader):
        seq_dim=x.shape[1]; print(i)
        train  = Variable(x.view(-1, seq_dim, input_dim)) # (batch_size, seq_len, features)
        labels = Variable(labels)
            
        # Clear gradients
        optimizer.zero_grad()
        
        # Forward propagation
        outputs = model(train)
    
        # Calculate softmax and ross entropy loss
        loss = error(outputs, labels)
        
        # Calculating gradients
        loss.backward()
        
        # Update parameters
        optimizer.step()
        
        count += 1
        
        if count % 250 == 0:
            # Calculate Accuracy         
            correct = 0
            total = 0
            # Iterate through test dataset
            for x, labels in test_loader:
                
                # Forward propagation
                test  = Variable(x.view(-1, seq_dim, input_dim))
                outputs = model(test)
                
                # Get predictions from the maximum value
                predicted = torch.argmax(outputs.data, dim=2)
                
                # Total number of labels
                total += labels.size(1)
                correct += (predicted == torch.argmax(labels, dim=2)).sum()
            
            accuracy = 100 * correct / float(total)
            
            # store loss and iteration
            loss_list.append(loss.data)
            iteration_list.append(count)
            accuracy_list.append(accuracy)
            if count % 500 == 0:
                # Print Loss
                print('Iteration: {}  Loss: {}  Accuracy: {} %'.format(count, loss.data.item(), accuracy))

In [None]:
# visualization loss 
plt.plot(iteration_list,loss_list)
plt.xlabel("Number of iteration")
plt.ylabel("Loss")
plt.title("RNN: Loss vs Number of iteration")
plt.show()

# visualization accuracy 
plt.plot(iteration_list,accuracy_list,color = "red")
plt.xlabel("Number of iteration")
plt.ylabel("Accuracy")
plt.title("RNN: Accuracy vs Number of iteration")
plt.savefig('graph.png')
plt.show()

### Time-Series Transformer

In [None]:
class TimeSeriesTransformer(nn.Module):
    def __init__(self, 
    input_size: int,                      # number of input variables. 1 if univariate
    dec_seq_len: int,                     # length of the input sequence fed to the decoder
    batch_first: bool=False,
    out_seq_len: int=58,                  
    dim_val: int=512,                     # d_model
    n_encoder_layers: int=4,              # number of stacked encoder layers
    n_decoder_layers: int=4,              # number of stacked decoder layers
    n_heads: int=8,                       # number of attention heads
    dropout_encoder: float=0.2,           # dropout rate of encoder
    dropout_decoder: float=0.2,           # dropout rate of decoder
    dropout_pos_enc: float=0.1,           # dropout rate of positional encoder
    dim_feedforward_encoder: int=2048,    # number of neurons in the linear layer of encoder
    dim_feedforward_decoder: int=2048,    # number of neurons in the linear layer of encoder
    num_predicted_features: int=1         # number of features to be predicted
    ): 
        
        super().__init__()
        self.dec_seq_len = dec_seq_len
        
        # Three linear layers needed for the model
        self.encoder_input_layer = nn.Linear(in_features=input_size, out_features=dim_val)
        self.linear_mapping = nn.Linear(in_features=dim_val, out_features=num_predicted_features)
        
        # Positional encoder
        self.positional_encoding_layer = pe.PositionalEncoder(d_model=dim_val, dropout=dropout_pos_enc)
        
        # Stack the encoder layers
        encoder_layer = nn.TransformerEncoderLayer(
            d_model=dim_val, nhead=n_heads, dim_feedforward=dim_feedforward_encoder,
            dropout=dropout_encoder, batch_first=batch_first
            )
        self.encoder = nn.TransformerEncoder(
            encoder_layer=encoder_layer, num_layers=n_encoder_layers, norm=None
            )
        
    def forward(self, src):
        src = self.encoder_input_layer(src)
        src = self.positional_encoding_layer(src)
        src = self.encoder(src=src)
        
        return src

### Train



In [None]:
# input_size = len(torch.unique(torch.argmax(data_label, dim=1)))
# dec_seq_len = len(data_feature)

# model = TimeSeriesTransformer(input_size=input_size, dec_seq_len=dec_seq_len)

In [None]:
loss_func_none = nn.CrossEntropyLoss(reduction="none")
loss_func_mean = nn.CrossEntropyLoss(reduction="mean")
loss_func_sum = nn.CrossEntropyLoss(reduction="sum")
pre = torch.tensor([[0.8, 0.5, 0.2, 0.5],
                    [0.2, 0.9, 0.3, 0.2],
                    [0.4, 0.3, 0.7, 0.1],
                    [0.1, 0.2, 0.4, 0.8]], dtype=torch.float)
tgt = torch.tensor([[1, 0, 0, 0],
                    [0, 1, 0, 0],
                    [0, 0, 1, 0],
                    [0, 0, 0, 1]], dtype=torch.float)
print(loss_func_none(pre, tgt))
print(loss_func_mean(pre, tgt))
print(loss_func_sum(pre, tgt))