In [24]:
from __future__ import print_function, division
import gc
import torch
import torch.nn as nn
import torch.optim as optim
import matplotlib.pyplot as plt
import torchvision
from torch.autograd import Variable
from torchvision import models, transforms
import copy
from torch.optim import lr_scheduler
import time
from torch.utils.data import Dataset, DataLoader
import os
import pandas as pd
import numpy as np
from sklearn import model_selection
from torch.utils.data.sampler import SubsetRandomSampler
import cv2
from skimage import transform
from torchsummary import summary
from timeit import default_timer as timer

In [22]:
torch.cuda.empty_cache()
plt.ion()

device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
print(device)

cuda


In [39]:
a = np.array([1,2,3,4,5])
print(a/np.max(a))

[0.2 0.4 0.6 0.8 1. ]


In [45]:
class MudraDataset(Dataset):
    def __init__(self, root_dir, state='All', transform=None):
        #"""
        #Args:
            #csv_files (string): Path to the csv file with annotations
            #root_dir (string): Directory with all the images
            #transform (callable, optional): Optional transform to be applied on a sample
        #"""
        #self.mudraJoints = pd.read_csv(csv_file)
        self.state = state
        self.root_dir = root_dir
        self.transform = transform
        self.data = []
        self.X_data = []
        self.Y_data = []
        
        if os.path.isdir(root_dir):
            for dirs in os.listdir(root_dir):
                #print(dirs)
                full_path = os.path.join(root_dir, dirs)
                csv_path = os.path.join(full_path, 'Annotation.csv')
                csv_file = pd.read_csv(csv_path)
                for i, row in csv_file.iterrows():
                    img_path = os.path.join(full_path, row[0])
                    #print(img_path)
                    image = img_path
                    self.X_data.append(image)
                    jointAngle = row[1:]
                    jointAngle = np.array([jointAngle])
                    maxval = np.max(jointAngle)
                    if maxval>0:
                        jointAngle = jointAngle/np.max(jointAngle)
                    jointAngle = jointAngle.astype('float')
                    self.Y_data.append(jointAngle)
                    #sample = {'image': image, 'jointAngle': jointAngle}
                    #self.data.append(sample)

        X_train, X_test, Y_train, Y_test = model_selection.train_test_split(self.X_data,
                                                self.Y_data, test_size=0.25, random_state=42)
        if state == 'Train':    
            self.X_data, self.Y_data = X_train, Y_train
        elif state == 'Test':
            self.X_data, self.Y_data = X_test, Y_test
    
    def __len__(self):
        return len(self.X_data)
    
    def __getitem__(self, idx):
        if torch.is_tensor(idx):
            idx = idx.tolist()
        
        sample = {'image': cv2.imread(self.X_data[idx]).astype('float'), 'jointCood': self.Y_data[idx]}
        
        if self.transform:
            sample = self.transform(sample)
        return sample

In [41]:
#Data Preprocessing Rescale, RandomCrop, and ToTensor
class Rescale(object):
    #Args:
    #output_size (tuple or int): Desired output size. If tuple, output is matched to output size. It int, smaller of
    #image edges is matched to output_size keeping aspect ratio the same
    
    def __init__(self, output_size):
        assert isinstance(output_size, (int, tuple))
        self.output_size = output_size
    
    def __call__(self, sample):
        image, joints = sample['image'], sample['jointCood']
        #print(image)
        h, w = image.shape[:2]
        if isinstance(self.output_size, int):
            if h > w:
                new_h, new_w = self.output_size * h / w, self.output_size
            else:
                new_h, new_w = self.output_size, self.output_size * w / h
        else:
            new_h, new_w = self.output_size
        
        new_h, new_w = int(new_h), int(new_w)
        
        img = transform.resize(image, (new_h, new_w))
        
        return {'image': img, 'jointCood': joints}
    
class RandomCrop(object):
    #Args:
    #output_size (tuple or int): Desired output size. If int, square crop is made

    def __init__(self, output_size):
        assert isinstance(output_size, (tuple, int))

        if isinstance(output_size, int):
            self.output_size = (output_size, output_size)
        else:
            assert len(output_size) == 2
            self.output_size = output_size

    def __call__(self, sample):
        image, joints = sample['image'], sample['jointCood']
        #print(image)
        h, w = image.shape[:2]
        new_h, new_w = self.output_size

        top = np.random.randint(0, h - new_h)
        left = np.random.randint(0, w - new_w)

        image = image[top: top + new_h, left: left + new_w]

        return {'image': image, 'jointCood': joints}

class ToTensor(object):
    #Convert numpy array image to tensor image

    def __call__(self, sample):
        image, joints = sample['image'], sample['jointCood']

        #swap color axis because
        #numpy image: H * W * C
        #torch image: C * H * W
        image = image.transpose((2, 0, 1))
        return {'image': torch.from_numpy(image).to(torch.float), 'jointCood': torch.from_numpy(joints).to(torch.float)}

In [46]:
train_dataset = MudraDataset(root_dir='Mudra Dataset/Single hand', state='Train',
                                  transform=transforms.Compose([Rescale((224,224)), ToTensor()]))

test_dataset = MudraDataset(root_dir='Mudra Dataset/Single hand', state='Test',
                                  transform=transforms.Compose([Rescale((224,224)), ToTensor()]))

val_dataset = MudraDataset(root_dir='Mudra Dataset/Single hand', state='Train',
                                  transform=transforms.Compose([Rescale((224,224)), ToTensor()]))

In [47]:
def data_loader(batch_size, random_seed=42, valid_size=0.1, shuffle=True, test=False):
    num_train = len(train_dataset)
    indices = list(range(num_train))
    split = int(np.floor(valid_size * num_train))
    
    if shuffle:
        np.random.seed(random_seed)
        np.random.shuffle(indices)
    
    train_idx, valid_idx = indices[split:], indices[:split]
    train_sampler = SubsetRandomSampler(train_idx)
    valid_sampler = SubsetRandomSampler(valid_idx)
    
    train_loader = torch.utils.data.DataLoader(train_dataset, batch_size=batch_size, sampler=train_sampler)
    valid_loader = torch.utils.data.DataLoader(val_dataset, batch_size=batch_size, sampler=valid_sampler)
    
    if test:
        return torch.utils.data.DataLoader(test_dataset, batch_size=batch_size, shuffle=shuffle)
    else:
        return (train_loader, valid_loader)

In [48]:
train_loader, valid_loader = data_loader(batch_size=16)
test_loader = data_loader(batch_size=16, test=True)

In [11]:
class ResidualBlock(nn.Module):
    def __init__(self, in_channels, out_channels, stride = 1, downsample = None):
        super(ResidualBlock, self).__init__()
        self.conv1 = nn.Sequential(
                        nn.Conv2d(in_channels, out_channels, kernel_size = 3, stride = stride, padding = 1),
                        nn.BatchNorm2d(out_channels),
                        nn.ReLU())
        self.conv2 = nn.Sequential(
                        nn.Conv2d(out_channels, out_channels, kernel_size = 3, stride = 1, padding = 1),
                        nn.BatchNorm2d(out_channels))
        self.downsample = downsample
        self.relu = nn.ReLU()
        self.out_channels = out_channels
        
    def forward(self, x):
        residual = x
        out = self.conv1(x)
        out = self.conv2(out)
        if self.downsample:
            residual = self.downsample(x)
        out += residual
        out = self.relu(out)
        return out

In [14]:
class ResNet(nn.Module):
    def __init__(self, block, layers, num_classes = 10):
        super(ResNet, self).__init__()
        self.inplanes = 64
        self.conv1 = nn.Sequential(
                        nn.Conv2d(3, 64, kernel_size = 7, stride = 2, padding = 3),
                        nn.BatchNorm2d(64),
                        nn.ReLU())
        self.maxpool = nn.MaxPool2d(kernel_size = 3, stride = 2, padding = 1)
        self.layer0 = self._make_layer(block, 64, layers[0], stride = 1)
        self.layer1 = self._make_layer(block, 128, layers[1], stride = 2)
        self.layer2 = self._make_layer(block, 256, layers[2], stride = 2)
        self.layer3 = self._make_layer(block, 512, layers[3], stride = 2)
        self.avgpool = nn.AvgPool2d(7, stride=1)
        self.fc = nn.Linear(512, num_classes)
        
    def _make_layer(self, block, planes, blocks, stride=1):
        downsample = None
        if stride != 1 or self.inplanes != planes:
            
            downsample = nn.Sequential(
                nn.Conv2d(self.inplanes, planes, kernel_size=1, stride=stride),
                nn.BatchNorm2d(planes),
            )
        layers = []
        layers.append(block(self.inplanes, planes, stride, downsample))
        self.inplanes = planes
        for i in range(1, blocks):
            layers.append(block(self.inplanes, planes))

        return nn.Sequential(*layers)
    
    
    def forward(self, x):
        x = self.conv1(x)
        x = self.maxpool(x)
        x = self.layer0(x)
        x = self.layer1(x)
        x = self.layer2(x)
        x = self.layer3(x)

        x = self.avgpool(x)
        x = x.view(x.size(0), -1)
        x = self.fc(x)

        return x


In [52]:
num_classes = 14
num_epochs = 50
batch_size = 16
learning_rate = 0.00001

model = ResNet(ResidualBlock, [3,4,6,3], num_classes=num_classes).to(device)
print(model)

#Loss and optimizer
criterion = nn.MSELoss()
optimizer = torch.optim.SGD(model.parameters(), lr = learning_rate, weight_decay=0.001, momentum=0.9)

#Train the model
total_step = len(train_loader)

ResNet(
  (conv1): Sequential(
    (0): Conv2d(3, 64, kernel_size=(7, 7), stride=(2, 2), padding=(3, 3))
    (1): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
    (2): ReLU()
  )
  (maxpool): MaxPool2d(kernel_size=3, stride=2, padding=1, dilation=1, ceil_mode=False)
  (layer0): Sequential(
    (0): ResidualBlock(
      (conv1): Sequential(
        (0): Conv2d(64, 64, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
        (1): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
        (2): ReLU()
      )
      (conv2): Sequential(
        (0): Conv2d(64, 64, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
        (1): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
      )
      (relu): ReLU()
    )
    (1): ResidualBlock(
      (conv1): Sequential(
        (0): Conv2d(64, 64, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
        (1): BatchNorm2d(64, eps=1e-05, momentum=0.1, affin

In [51]:
best_loss = float('inf')
early_stop = 5
best_epoch = -1

for epoch in range(num_epochs):
    tot_loss = 0.0
    for i, sample in enumerate(train_loader):
        #Move tensor to the configured device
        images = sample['image'].to(device)
        labels = sample['jointCood'].to(device)
        
        #Forward pass
        outputs = model(images)
        loss = criterion(outputs, labels)
        tot_loss += torch.mean(loss).item()
        
        #Backward and optimize
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()
        
        #Garbage collection
        del images, labels, outputs, loss
        torch.cuda.empty_cache()
        gc.collect()
    
    print("Epoch [{}/{}], Train loss: {:.4f}".format(epoch+1, num_epochs, tot_loss/total_step))
    
    #Validation
    with torch.no_grad():
        correct=0
        total=0
        tot_loss = 0.0
        for i, sample in enumerate(valid_loader):
            images = sample['image'].to(device)
            labels = sample['jointCood'].to(device)
            outputs = model(images)
            
            #Calculate validation loss
            loss = criterion(outputs, labels)
            tot_loss += torch.mean(loss).item()
            
            #Garbage collection
            del images, labels, outputs, loss
            torch.cuda.empty_cache()
            gc.collect()
        
        avg_loss = tot_loss/len(valid_loader)
        if avg_loss < best_loss:
            best_loss = avg_loss
            best_epoch = epoch
            torch.save(model.state_dict(), "best_regression_model.pth")
        elif epoch - best_epoch > early_stop:
            print("Early stopped training at epoch %d" % epoch)
            break
        
        print("Validation loss: {:.4f}".format(avg_loss))

  return F.mse_loss(input, target, reduction=self.reduction)
  return F.mse_loss(input, target, reduction=self.reduction)


Epoch [1/50], Train loss: 0.2059


  return F.mse_loss(input, target, reduction=self.reduction)


Validation loss: 0.1978
Epoch [2/50], Train loss: 0.1942
Validation loss: 0.1946
Epoch [3/50], Train loss: 0.1922
Validation loss: 0.1985
Epoch [4/50], Train loss: 0.1911
Validation loss: 0.1962
Epoch [5/50], Train loss: 0.1907
Validation loss: 0.1951
Epoch [6/50], Train loss: 0.1902
Validation loss: 0.1942
Epoch [7/50], Train loss: 0.1896
Validation loss: 0.1927
Epoch [8/50], Train loss: 0.1894
Validation loss: 0.1939
Epoch [9/50], Train loss: 0.1895
Validation loss: 0.1951
Epoch [10/50], Train loss: 0.1890
Validation loss: 0.1983
Epoch [11/50], Train loss: 0.1893
Validation loss: 0.1940
Epoch [12/50], Train loss: 0.1894
Validation loss: 0.1930
Epoch [13/50], Train loss: 0.1890
Early stopped training at epoch 12
