In [None]:
import numpy as np

import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader
from torch.utils.data import sampler
import torch.nn.functional as F

import torchvision.datasets as dset
import torchvision.transforms as T

import glob
from skimage import io, transform
from sklearn.metrics import mean_squared_error, r2_score
import matplotlib.pyplot as plt
import time

# Ignore warnings
import warnings
warnings.filterwarnings("ignore")

plt.ion()   # interactive mode

# 5-image CNN

In [None]:
USE_GPU = True

dtype = torch.float32

if USE_GPU and torch.cuda.is_available():
    device = torch.device('cuda')
else:
    device = torch.device('cpu')

# Constant to control how frequently we print train loss
print_every = 3

print('using device:', device)

### Dataset class

In [None]:
class GripForceDataset(Dataset):
    """Finger Grip Force dataset."""
    def __init__(self, root_dir, n_subj, n_exp_per_subj, transform=None):
        """
        Args:
            root_dir (string): Directory with all the images.
            n_subj (int): Number of test subjects.
            n_exp_per_subj (int): Number of experiments per test subject.
            k: TODO (number of frames to use as one item)
        """
        self.N = 0
        self.images = np.zeros((0, 3), dtype=np.int)
        self.force = np.zeros((0, 2), dtype=np.float)
        for subject_number in range(1, n_subj + 1):
            for experiment_number in range(1, n_exp_per_subj + 1):
                img_path = '{}/{:02d}/{:02d}/frames_aligned/'.format(root_dir, subject_number, experiment_number)
                force_path = '{}/{:02d}/{:02d}/labels.csv'.format(root_dir, subject_number, experiment_number)
        
                n = len(glob.glob('{}*.png'.format(img_path)))
                if (n >= 5):
                    experiment = np.zeros((n - 4, 3), dtype=np.int)
                    experiment[:,0] = subject_number
                    experiment[:,1] = experiment_number
                    # central fram of the item (item = frame + 2 prev + 2 next frames)
                    experiment[:,2] = np.arange(2, n - 2)
                    self.images = np.concatenate((self.images, experiment), axis=0)
                    
                    force = np.loadtxt(force_path, delimiter=',')[2:-2,:]
                    self.force = np.concatenate((self.force, force), axis=0)
                    
                    self.N += n - 4
                    
        self.root_dir = root_dir
        self.transform = transform

    def __len__(self):
        return self.N

    def __getitem__(self, idx):
#         print('__getitem__ {}'.format(idx))
        
#         img_paths = ['{}/{:02d}/{:02d}/frames_aligned/{:04d}.png'.format(self.root_dir, 
#                                                                     self.images[idx, 0], 
#                                                                     self.images[idx, 1], 
#                                                                     self.images[idx, 2] + x) for x in range(-2, 3)]
#         frames = np.concatenate([io.imread(img_path) for img_path in img_paths], axis=2)
        img_path = '{}/{:02d}/{:02d}/frames_aligned/{:04d}.png'.format(self.root_dir, 
                                                                    self.images[idx, 0], 
                                                                    self.images[idx, 1], 
                                                                    self.images[idx, 2])
        frames = np.array(io.imread(img_path))

        force = self.force[idx]
        
        sample = {'frames': frames, 'force': force}
        if self.transform:
#             print('__transform {}'.format(idx))
            sample = self.transform(sample)
            
#         print('__returning {}'.format(idx))
            
        return sample

In [None]:
# test Dataset class
root_dir = '/media/viktor/Samsung_T5/Research/dataset'
n_subj = 1
n_exp_per_subj = 1

ds = GripForceDataset(root_dir, n_subj, n_exp_per_subj)

print('len={}'.format(ds.__len__()))

fig = plt.figure(figsize=(20, 20))
sample = ds[105]
for i in range(1):
#     print(i, sample['image'].shape, sample['landmarks'].shape)

    ax = plt.subplot(1, 6, i + 1)
    ax.set_title('frame#{}'.format(i-2))
    ax.axis('off')
    plt.imshow(sample['frames'][:,:,i*3:(i+1)*3])

plt.show()

In [None]:
class Rescale(object):
    """Rescale the image in a sample to a given size.

    Args:
        output_size (tuple or int): Desired output size. If tuple, output is
            matched to output_size. If int, smaller of image edges is matched
            to output_size keeping aspect ratio the same.
    """

    def __init__(self, output_size):
        assert isinstance(output_size, (int, tuple))
        self.output_size = output_size

    def __call__(self, sample):
        frames, force = sample['frames'], sample['force']

        h, w = frames.shape[:2]
        if isinstance(self.output_size, int):
            if h > w:
                new_h, new_w = self.output_size * h / w, self.output_size
            else:
                new_h, new_w = self.output_size, self.output_size * w / h
        else:
            new_h, new_w = self.output_size

        new_h, new_w = int(new_h), int(new_w)

        frames = transform.resize(frames, (new_h, new_w))

        return {'frames': frames, 'force': force}

class ToTensor(object):
    """Convert ndarrays in sample to Tensors."""

    def __call__(self, sample):
        frames, force = sample['frames'], sample['force']

        # swap color axis because
        # numpy image: H x W x C
        # torch image: C X H X W
        frames = frames.transpose((2, 0, 1))
        return {'frames': torch.from_numpy(frames),
                'force': torch.from_numpy(force)}

In [None]:
# test Dataset with transform
transformed_dataset = GripForceDataset(root_dir, 
                                       n_subj, 
                                       n_exp_per_subj,
                                       transform=T.Compose([
                                           Rescale(270),
                                           ToTensor()
                                       ]))

for i in range(len(transformed_dataset)):
    sample = transformed_dataset[i]

    print(i, sample['frames'].size(), sample['force'].size())

    if i == 3:
        break

### Split data into training, validation, and testing sets

In [None]:
batch_size = 64
val_split = .05
test_split = .1
shuffle_dataset = True # Only for train/val - test dataset is not shuffled
random_seed = 42

# Data indices
dataset_size = len(transformed_dataset)
indices = list(range(dataset_size))

val_size = int(np.floor(val_split * dataset_size))
test_size = int(np.floor(test_split * dataset_size))
train_size = dataset_size - val_size - test_size

test_indices = indices[train_size+val_size:]
indices = indices[:train_size+val_size]
if shuffle_dataset:
    np.random.seed(random_seed)
    np.random.shuffle(indices)
train_indices = indices[:train_size]
val_indices = indices[train_size:]

# # JUST FOR DEBUG PURPOSE
# batch_size = 3
# train_indices = train_indices[:6]
# val_indices = val_indices[:6]
# test_indices = test_indices[:6]

# Data Samplers
train_sampler = sampler.SubsetRandomSampler(train_indices)
val_sampler = sampler.SubsetRandomSampler(val_indices)
test_sampler = sampler.SubsetRandomSampler(test_indices)

# Data Loaders
train_loader = torch.utils.data.DataLoader(transformed_dataset,
                                           batch_size=batch_size,
                                           num_workers=64,
                                           sampler=train_sampler)
val_loader = torch.utils.data.DataLoader(transformed_dataset,
                                         batch_size=1,
                                         num_workers=64,
                                         sampler=val_sampler)
test_loader = torch.utils.data.DataLoader(transformed_dataset,
                                          batch_size=batch_size,
                                          num_workers=64,
                                          sampler=test_sampler)

### Module Neural Network class

In [None]:
class ConvNet(nn.Module):
    def __init__(self, in_channels, maps_1, maps_2, maps_3, neurons_1, neurons_2, num_out):
        super().__init__()
        
        self.conv1 = nn.Conv2d(in_channels, maps_1, (7, 7), padding=(3,3))
        nn.init.kaiming_normal_(self.conv1.weight)
        
        self.conv2 = nn.Conv2d(maps_1, maps_2, (9, 9), padding=(4,4))
        nn.init.kaiming_normal_(self.conv2.weight)
        
        self.conv3 = nn.Conv2d(maps_2, maps_3, (3, 3), padding=(1,1))
        nn.init.kaiming_normal_(self.conv3.weight)
        
        self.fc1 = nn.Linear(192, neurons_1)
        nn.init.kaiming_normal_(self.fc1.weight)
        
        self.fc2 = nn.Linear(neurons_1, neurons_2)
        nn.init.kaiming_normal_(self.fc2.weight)
        
        self.fc3 = nn.Linear(neurons_2, num_out)
        nn.init.kaiming_normal_(self.fc3.weight)

    def forward(self, x):
        x = F.max_pool2d(F.relu(self.conv1(x)), 10)
        x = F.max_pool2d(F.relu(self.conv2(x)), 5)
        x = F.max_pool2d(F.relu(self.conv3(x)), 4)
        x = x.view(-1, self.num_flat_features(x))
        x = F.relu(self.fc1(x))
        x = F.relu(self.fc2(x))
        x = self.fc3(x)
        return x
    
    def num_flat_features(self, x):
        size = x.size()[1:]  # all dimensions except the batch dimension
        num_features = 1
        for s in size:
            num_features *= s
#         print(num_features)
        return num_features


def test_ConvNet():
    x = torch.zeros((2, 3, 270, 480), dtype=dtype)  # (minibatch size, image size (ch, h, w))
    model = ConvNet(in_channels=3, maps_1=32, maps_2=64, maps_3=96, neurons_1=100, neurons_2 = 100, num_out=2)
    scores = model(x)
    print(scores.size())
test_ConvNet()

In [None]:
net = ConvNet(in_channels=3, maps_1=32, maps_2=64, maps_3=96, neurons_1=100, neurons_2 = 100, num_out=2)
# net = net.to(device=device)
print(net)

### Check accuracy

In [None]:
def check_accuracy(loader, model, train=False):
    if train == True:
        print('Checking accuracy on validation set')
    else:
        print('Checking accuracy on test set')
        
    n = len(loader.sampler.indices)
    y_true = np.zeros((n, 2), dtype=np.float)
    y_pred = np.zeros((n, 2), dtype=np.float)
    
    model = model.to(device=device)
    model.eval()  # set model to evaluation mode
    with torch.no_grad():
        for t, sample in enumerate(loader):
            x, y = sample['frames'], sample['force']
            # move to device, e.g. GPU
            x = x.to(device=device, dtype=dtype)  
#             y = y.to(device=device, dtype=dtype)
            
            # predict
            у_pred_gpu = model(x)
            
            # store delta
            y_pred_cpu = у_pred_gpu.data.cpu()
            y_pred[batch_size*t:batch_size*(t+1), :] = y_pred_cpu.numpy()
            y_true[batch_size*t:batch_size*(t+1), :] = y.numpy()
            
    r2 = r2_score(y_true, y_pred)
#     MSE = mean_squared_error(y_true, y_pred)
#     print('RMSE: {:.2f}  R2 score: {:.2f}'.format(MSE ** .5, r2))
    print('R2 score: {:.2f}'.format(r2))

In [None]:
t0 = time.time()
check_accuracy(test_loader, net, True)
t1 = time.time()
print('It took {:.2f} sec'.format(t1 - t0))

### Training loop

In [None]:
def train(model, optimizer, epochs=1):
    """
    Train a model on Force dataset using the PyTorch Module API.
    
    Inputs:
    - model: A PyTorch Module giving the model to train.
    - optimizer: An Optimizer object we will use to train the model
    - epochs: (Optional) A Python integer giving the number of epochs to train for
    
    Returns: Nothing, but prints model accuracies during training.
    """
    model = model.to(device=device)  # move the model parameters to CPU/GPU
    for epoch in range(epochs):
        t0 = time.time()
        for t, sample in enumerate(train_loader):
            x, y = sample['frames'], sample['force']
            
            model.train()  # put model to training mode
            x = x.to(device=device, dtype=dtype)  # move to device, e.g. GPU
            y = y.to(device=device, dtype=dtype)
            
            # forward
            y_pred = model(x)
            mse = nn.MSELoss()
            loss = mse(y_pred, y)
            
            # Zero out all of the gradients for the variables which the optimizer
            # will update.
            optimizer.zero_grad()

            # This is the backwards pass: compute the gradient of the loss with
            # respect to each  parameter of the model.
            loss.backward()
            
            # Actually update the parameters of the model using the gradients
            # computed by the backwards pass.
            optimizer.step()
            # print statistics
            if t % print_every == 0:
                print('Epoch {}, iteration {}, loss = {:.4f}'.format(epoch, t, loss.item()))
                check_accuracy(val_loader, model)
                print()
        print("Epoch time: {:.1f} sec\n".format(time.time() - t0))

### Train

In [None]:
learning_rate = 1e-8

model = ConvNet(in_channels=3, maps_1=32, maps_2=64, maps_3=96, neurons_1=100, neurons_2 = 100, num_out=2)
optimizer = optim.SGD(model.parameters(), lr=learning_rate)

train(model, optimizer, epochs=100)