In [1]:
import torch
import numpy as np
import pandas as pd
from torch.nn import Linear, ReLU, Flatten, Sequential, Conv1d, Module, CrossEntropyLoss, Dropout, Softmax
from torch.optim import Adam
from torch.utils.data import Dataset

In [2]:
activity_encode = {
        "LAYING":0,
       "SITTING":1,
       "STANDING":2,
        "WALKING":3,
       "WALKING_DOWNSTAIRS":4,
       "WALKING_UPSTAIRS":5
       }

In [3]:
class UCI_Static_Dataset(Dataset):
    """UCI dataset."""

    def __init__(self, csv_file):

        data = pd.read_csv(csv_file)
        data['Activity'] = data['Activity'].map(activity_encode)
        data, _ = [x for _, x in data.groupby(data['Activity'] > 2)]
        self.data_y = data['Activity'].values
        data = pd.DataFrame(data.drop(['Activity','subject'],axis=1))
        self.data_x = np.array(data)
        # [batch, channels, features]
        self.data_x = self.data_x.reshape(len(self.data_x), 1, 561)
        self.data_x  = torch.from_numpy(self.data_x)
        self.data_y = self.data_y.astype(int)
        self.data_y = torch.from_numpy(self.data_y)

    def __len__(self):
        return len(self.data_x)

    def __getitem__(self, idx):
        return self.data_x[idx], self.data_y[idx]

In [4]:
train_data = UCI_Static_Dataset(csv_file='train.csv')
valid_data = UCI_Static_Dataset(csv_file='valid.csv')
test_data = UCI_Static_Dataset(csv_file='test.csv')

In [5]:
valid_data.__len__()

828

In [6]:
trainloader = torch.utils.data.DataLoader(train_data, batch_size=32, shuffle=True)
validloader = torch.utils.data.DataLoader(valid_data, batch_size=32, shuffle=True)
testloader = torch.utils.data.DataLoader(test_data, batch_size=32, shuffle=True)

In [7]:
class StaticCNN(Module):
    def __init__(self):
        super(StaticCNN, self).__init__()

        self.cnn_layers = Sequential(
            # a
            Conv1d(1, 30, kernel_size=tuple([3])), #stride=tuple([1])),
            ReLU(),
            Conv1d(30, 50, kernel_size=tuple([3])),
            ReLU(),
            Conv1d(50, 100, kernel_size=tuple([3])),
            ReLU(),
            # Flatten
            Flatten()
        )

        self.linear_layers = Sequential(
            Dropout(0.5),
            Linear(55500, 3),
            #ReLU(inplace=True),
            Softmax(dim=1)
        )

    # Defining the forward pass
    def forward(self, x):
        # print(x.shape)
        x = self.cnn_layers(x)
        # print(x.shape)
        # x = x.view(-1, 32*100*561)
        #x = Flatten(x)
        #print(x.shape)
        x = self.linear_layers(x)
        #print(x.shape)
        return x

In [8]:
#CrossEntropyLoss not MSE because
criterion = CrossEntropyLoss()
device = "cuda" if torch.cuda.is_available() else "cpu"

model = StaticCNN()
epochs = 10

step = epochs//5
optimizer = Adam(model.parameters(), lr=0.0005)
model.to(device)

StaticCNN(
  (cnn_layers): Sequential(
    (0): Conv1d(1, 30, kernel_size=(3,), stride=(1,))
    (1): ReLU()
    (2): Conv1d(30, 50, kernel_size=(3,), stride=(1,))
    (3): ReLU()
    (4): Conv1d(50, 100, kernel_size=(3,), stride=(1,))
    (5): ReLU()
    (6): Flatten(start_dim=1, end_dim=-1)
  )
  (linear_layers): Sequential(
    (0): Dropout(p=0.5, inplace=False)
    (1): Linear(in_features=55500, out_features=3, bias=True)
    (2): Softmax(dim=1)
  )
)

In [9]:
data_loaders = {}
data_loaders['train'] = trainloader
data_loaders['val'] = validloader
data_lengths = {"train": len(train_data), "val": len(valid_data)}

for epoch in range(epochs):
    if (epoch+1) % step == 0:
        print('Epoch {}/{}'.format(epoch, epochs - 1))
        print('-' * 10)

    # Each epoch has a training and validation phase
    for phase in ['train', 'val']:
        if phase == 'train':
            model.train(True)  # Set model to training mode
        else:
            model.train(False)  # Set model to evaluate mode

        running_loss = 0.0

        # Iterate over data.
        for data in data_loaders[phase]:
            inputs, labels = data
            inputs, labels = inputs.to(device).float(), labels.to(device).long()
            # forward pass to get outputs
            output = model(inputs)

            # calculate the loss between predicted and target keypoints
            loss = criterion(output, labels)

            # zero the parameter (weight) gradients
            optimizer.zero_grad()

            # backward + optimize only if in training phase
            if phase == 'train':
                loss.backward()
                # update the weights
                optimizer.step()

            # print loss statistics
            running_loss += loss.item()

        epoch_loss = running_loss / data_lengths[phase]
        if (epoch+1) % step == 0:
            print('{} Loss: {:.4f}'.format(phase, epoch_loss))

Epoch 1/9
----------
train Loss: 0.0198
val Loss: 0.0209
Epoch 3/9
----------
train Loss: 0.0188
val Loss: 0.0193
Epoch 5/9
----------
train Loss: 0.0185
val Loss: 0.0194
Epoch 7/9
----------
train Loss: 0.0182
val Loss: 0.0191
Epoch 9/9
----------
train Loss: 0.0180
val Loss: 0.0193


In [10]:
correct = 0
total = 0
model.cpu()
with torch.no_grad():
    for inputs, labels in testloader:
        inputs, labels = inputs.float(), labels.long()
        outputs = model(inputs)
        _, predicted = torch.max(outputs.data, 1)
        total += labels.size(0)
        correct += (predicted == labels).sum().item()

print('Accuracy of the network on the test set: %d %%' % (
    100 * correct / total))

Accuracy of the network on the test set: 94 %
