Train network for Gesture Recognition from Video

In [None]:
import torch
import torch.nn as nn
import torch.nn.parallel
import torch.backends.cudnn as cudnn
import torch.optim as optim
import torch.utils.data

import torchvision.transforms as transforms
import torchvision.datasets as datasets
import torchvision.models as models

from torch.autograd import Variable

import numpy as np
import random
from PIL import Image
from ipywidgets import FloatProgress
from IPython.display import display
from __future__ import print_function

from model import ModelDefinition
from dataset import ReadImages, collection
import os
import os.path as path
import glob
import random
import math

import cv2
#os.environ['CUDA_VISIBLE_DEVICES'] = '0'

In [None]:
def readFrameAnnotation(annotationFile):
    """
        read annotation file
        return the list of annotation ([start, end], gesture)
    """
    anno = []
    for l in open(annotationFile).read().splitlines():
        s = l.split(' ')
        anno += [ ([int(s[1]), int(s[2])], int(s[0])-1)]
    return anno

In [None]:
def findGestureFrame(frameNumber, annotationFile):
    """
        from Frame Number and the list of annotation
        return the Gesture or None if not in annation
    """
    for seq, gest in annotationFile:
        if frameNumber >= seq[0] and frameNumber <= seq[1]:
            return gest
    return None

In [None]:
def copyParameters(net, netBase):
    for i, f in enumerate(net.features):
        if type(f) is torch.nn.modules.conv.Conv2d:
            if i < len(netBase.features._modules):
                if f.weight.size() == netBase.features[i].weight.size():
                    f.weight.data = netBase.features[i].weight.data
                    f.bias.data = netBase.features[i].bias.data
    for i, c in enumerate(net.classifier):
        if type(c) is torch.nn.modules.linear.Linear:
            if c.weight.size() == netBase.classifier[i].weight.size():
                c.weight.data = netBase.classifier[i].weight.data
                c.bias.data = netBase.classifier[i].bias.data

In [None]:
def fillInput(nframe, video, with_cuda=False):
    t = transforms.Compose(
                (transforms.ToPILImage(),
                transforms.Scale(225),
                transforms.RandomCrop(225),
                transforms.ToTensor())
                )
    if with_cuda:
        inputs = torch.Tensor(nframe,3,225,225).cuda()
    else:
        inputs = torch.Tensor(nframe,3,225,225)
    for j in range(nframe):
        ret, frame = video.read()
        if frame is None:
            print("Error : None Frame")
            exit(0)
        frame = t(frame)
        inputs[j] = frame
    return inputs

In [None]:
#TODO : test if difference between learning only gesture per batch

def learnSequence(sequence, gesture, video, model, criterion, optimize, batchSize=32):
    numberFrame = seq[1] - seq[0]
    running_loss = 0
    while numberFrame > 0:
        if numberFrame >= batchSize:
            inputs = fillInput(batchSize, video, True)
            numberFrame -= batchSize
            
            labels = torch.LongTensor([gesture]*batchSize).cuda()
        else:
            inputs = fillInput(numberFrame, video, True)
            labels = torch.LongTensor([gesture]*numberFrame).cuda()
            numberFrame = 0
            
        inputs = Variable(inputs)
        optimizer.zero_grad()
        outputs = model(inputs)
        loss = criterion(outputs, Variable(labels))
        loss.backward()
        optimizer.step()
        running_loss += loss.data[0]
    return running_loss

In [None]:
def testSequence(seq, gesture, video, model, batchSize=32):
    numberFrame = seq[1] - seq[0]
    correct = 0
    while numberFrame > 0:
        if numberFrame >= batchSize:
            inputs = fillInput(batchSize, video, True)
            numberFrame -= batchSize
        else:
            inputs = fillInput(numberFrame, video, True)
            numberFrame = 0
            
        inputs = Variable(inputs)
        outputs = model(inputs)
        _, predicted = torch.max(outputs.data, 1)
        predicted = predicted.tolist()
        for i in range(len(predicted)):
            correct = (predicted[i][0] == gesture) or correct
    return correct

In [None]:
def testNet(model, testDir):
    model.eval()
    t = 0
    c = 0
    for video in glob.glob(testDir+'*.mp4'):
        print("Test video ", video)
        fName = path.splitext(path.basename(video))[0] #basename
        annotation = readFrameAnnotation(testDir+fName)
        videoCap = cv2.VideoCapture(video)
        for seq, gesture in annotation:
            #print("Frame : ", seq[0], '-', seq[1])
            #t += seq[1] - seq[0]
            t += 1
            c += testSequence(seq, gesture, videoCap, model)
            #print("Correct :", c)
    print("Correctness : ", c, '/', t)
    return c

In [None]:
def testNFrame(model, testDir, frame_num=5):
    """
        Test the model with a window of frame_num frames
    """
    print("TestNFrame")
    model.eval()
    c = 0
    t = 0
    for video in glob.glob(testDir+'*.mp4'):
        print("Test video ", video)
        fName = path.splitext(path.basename(video))[0] #basename
        annotation = readFrameAnnotation(testDir+fName)
        videoCap = cv2.VideoCapture(video)
        for seq, gesture in annotation:
            for i in range( (seq[1]-seq[0])/frame_num):
                inputs = fillInput(frame_num, videoCap, True)
                inputs = Variable(inputs, volatile=True)
                outputs = model(inputs)
                t += 1
                c += (int(outputs.data.sum(0).max(1)[1].cpu()[0][0]) == gesture)
    print("Correctness : ", c, '/', t)
    return c

In [None]:
def testImages(model, testDir, transf=transforms.ToTensor(), batch_size=32):
    """
        Test model on images organized : class/imName
    """
    model.eval()
    dtest = datasets.ImageFolder(testDir, transform=transf)
    l = torch.utils.data.DataLoader(dtest, batch_size=batch_size, num_workers=6, drop_last=False, pin_memory=False)
    c = 0
    for batch_idx, (data,target) in enumerate(l):
        data = Variable(data.cuda(), volatile=True)
        output = model(data)
        pred = output.data.max(1)[1].cpu()
        c += pred.eq(target).sum()
    print("Correctness on Images : ", c,"/", len(l.dataset), ':', float(c)/len(l.dataset)*100, '%' )
    return c

In [None]:
class AlexNetS(nn.Module):

    def __init__(self, num_classes=1000):
        super(AlexNetS, self).__init__()
        self.features = nn.Sequential(
            nn.Conv2d(3, 64, kernel_size=11, stride=4, padding=2),
            nn.ReLU(inplace=True),
            nn.MaxPool2d(kernel_size=3, stride=2),
            nn.Conv2d(64, 192, kernel_size=5, padding=2),
            nn.ReLU(inplace=True),
            nn.MaxPool2d(kernel_size=3, stride=2),
            nn.Conv2d(192, 384, kernel_size=3, padding=1),
            nn.ReLU(inplace=True),
            nn.Conv2d(384, 256, kernel_size=3, padding=1),
            nn.ReLU(inplace=True),
            nn.Conv2d(256, 256, kernel_size=3, padding=1),
            nn.ReLU(inplace=True),
            #nn.MaxPool2d(kernel_size=3, stride=1),
            #nn.Conv2d(256, , kernel_size=13, padding=1),
            #nn.Conv2d(256, 256, kernel_size=13, padding=1),
            #nn.ReLU(inplace=True),
            nn.AvgPool2d(13,13),
        )
        self.classifier = nn.Sequential(
            nn.Dropout(),
            nn.Linear(256 * 6 * 6, 4096),
            nn.ReLU(inplace=True),
            nn.Dropout(),
            nn.Linear(4096, 4096),
            nn.ReLU(inplace=True),
            nn.Linear(4096, num_classes),
        )
        self.newClassifier = nn.Sequential(
            nn.Linear(256,6)
        )

    def forward(self, x):
        x = self.features(x)
        #x = x.view(x.size(0), 256 * 6 * 6)
        x = x.view(x.size(0), 256)
        x = self.newClassifier(x)
        return x


In [None]:
m = AlexNetS()
t = Variable(torch.Tensor(32,3,225,225))
m(t)

In [None]:
def trainOnVideos():
    rootDir = '/video/Gesture/'
    model = AlexNetS()
    #model = models.VGG(models.vgg.make_layers(models.vgg.cfg['B'], batch_norm=True), num_classes=6)
    copyParameters(model, models.alexnet(pretrained=True))
    #model = torch.load('best-model.ckpt')
    criterion = nn.CrossEntropyLoss()
    lr = 0.01

    model.cuda()
    best = testNet(model, rootDir+'test/')
    rl = 0
    videos = glob.glob(rootDir+'*.mp4')
    j = 0
    for epoch in range(5):
        random.shuffle(videos)
        for video in videos:
            model.train()
            optimizer = optim.SGD(model.parameters(), lr=lr, momentum=0.9, weight_decay=0.0005)
            #testNet(model, rootDir+'test/')
            print("Video ", video)
            fName = path.splitext(path.basename(video))[0] #basename
            annotation = readFrameAnnotation(rootDir+'annotation/'+fName) #read annotation

            videoCap = cv2.VideoCapture(video)
            i = 0
            for seq, gesture in annotation:
                #print("Sequence ", seq, " Gesture : ", gesture)
                rl += learnSequence(seq, gesture, videoCap, model, criterion, optimizer)
                i += 1
                if i%5 == 4:
                    print("[epoch %d] loss : %.3f" % (epoch, rl/5) )
                    #i = 0
                    rl = 0.0
            
            if j%5 == 4:
                r = testNet(model, rootDir+'test/')
                if r > best:
                    best = r
                    print("Saving best model")
                    torch.save(model, 'best-model.ckpt')
            #torch.save(model, path.join('model-'+str(epoch)+".ckpt"))
            #lr = 0.001
            j += 1
            videoCap.release()
        lr = 0.001
        

In [None]:
def trainOnImages(model, rootDir='/video/GestureImages/trainBGIMAG3_All/', batch_size=32, trainTrans=transforms.ToTensor(), testTrans=transforms.ToTensor(), lr=0.001, epoch=50):
    d = datasets.ImageFolder(rootDir, transform=trainTrans)
    l = torch.utils.data.DataLoader(d, batch_size=32, shuffle=True, num_workers=6, drop_last=True, pin_memory=False)
    model.train()
    model.cuda()
    best = testImages(model, '/video/GestureImages/trainBGOffice_All/')
    #testNet(model=model)
    criterion = nn.CrossEntropyLoss().cuda()
    optimizer = optim.SGD(model.parameters(), lr=0.001, momentum=0.9, weight_decay=0.0005)
    for ep in range(epoch):
        model.train()
        for batch_idx, (data,target) in enumerate(l):
            #data, target = data.cuda(device=0), target.cuda(device=0)
            data, target = Variable(data.cuda()), Variable(target.cuda())
            optimizer.zero_grad()
            output = model(data)
            loss = criterion(output, target)
            loss.backward()
            optimizer.step()
            if batch_idx % 50 == 0:
                print('Train Epoch: {} [{}/{} ({:.0f}%)]\tLoss: {:.6f}'.format(
                    ep, batch_idx * len(data), len(l.dataset),
                    100. * batch_idx / len(l), loss.data[0]))
        #r = testNFrame(model, '/video/Gesture/test/')
        r = testImages(model, '/video/GestureImages/trainBGOffice_All/')
        if r > best :
            best = r
            print("Saving best model")
            torch.save(model, 'best-model.ckpt')
        if ep%2 == 0:
            testNet(model=model, testDir='/video/Gesture/test/')
            testNFrame(model, '/video/Gesture/test/')

In [None]:
#export 
import torch
import torch.onnx
from torch.autograd import Variable
d = Variable(torch.Tensor(1,3,224,224))
model=torch.load('best-model.ckpt').cpu()
torch.onnx.export(model, d, "gesture.onnx", verbose=True)

In [None]:
#model = AlexNetS()
#copyParameters(model, models.alexnet(pretrained=True))
t = transforms.Compose(
                (transforms.ToPILImage(),
                transforms.Scale(225),
                transforms.RandomCrop(225),
                transforms.ToTensor())
                )
model=torch.load('best-model.ckpt')
trainOnImages(model, epoch=10)

trainOnImages(model, '/video/GestureImages/trainBGOffice_All/')




## LSTM ##





In [None]:
import torch.nn.functional as F

class ConvLSTMCell(nn.Module):
    def __init__(self, in_channels, out_channels, kernel_size, padding=1, stride=1):
        super(ConvLSTMCell, self).__init__()
        
        self.k = kernel_size
        self.in_channels = in_channels
        self.out_channels = out_channels
        self.padding = padding
        self.stride = stride
        
        self.w_i = nn.Parameter(torch.Tensor(4*out_channels, in_channels, kernel_size, kernel_size))
        self.w_h = nn.Parameter(torch.Tensor(4*out_channels, in_channels, kernel_size, kernel_size))
        self.w_c = nn.Parameter(torch.Tensor(3*out_channels, in_channels, kernel_size, kernel_size))
        # TODO include bias terms
        
        self.reset_parameters()
        
    def reset_parameters(self):
        n = 4 * self.in_channels * self.k * self.k
        stdv = 1. / math.sqrt(n)
        
        self.w_i.data.uniform_(-stdv, stdv)
        self.w_h.data.uniform_(-stdv, stdv)
        self.w_c.data.uniform_(-stdv, stdv)
        
    def forward(self, x, hx):
        h, c = hx
        wx = F.conv2d(x, self.w_i, padding=self.padding, stride=self.stride)
        wh = F.conv2d(h, self.w_h, padding=self.padding, stride=self.stride)
        wc = F.conv2d(c, self.w_c, padding=self.padding, stride=self.stride)
        
        i = F.sigmoid(wx[:, :self.out_channels] + wh[:, :self.out_channels] + wc[:, :self.out_channels])
        f = F.sigmoid(wx[:, self.out_channels:2*self.out_channels] + wh[:, self.out_channels:2*self.out_channels] 
                + wc[:, self.out_channels:2*self.out_channels])
        g = F.tanh(wx[:, 2*self.out_channels:3*self.out_channels] + wh[:, 2*self.out_channels:3*self.out_channels])
        
        c_t = f * c + i * g
        o_t = F.sigmoid(wx[:, 3*self.out_channels:] + wh[:, 3*self.out_channels:] 
                        + wc[:, 2*self.out_channels: ]*c_t)
        h_t = o_t * F.tanh(c_t)
        
        return h_t, (h_t, c_t)

In [None]:
class convRNN_1_layer(nn.Module):
    def __init__(self, num_classes=1000):
        super(convRNN_1_layer, self).__init__()
        self.features = nn.Sequential(
            nn.Conv2d(3, 64, kernel_size=11, stride=4, padding=2),
            nn.ReLU(inplace=True),
            nn.MaxPool2d(kernel_size=3, stride=2),
            nn.Conv2d(64, 192, kernel_size=5, padding=2),
            nn.ReLU(inplace=True),
            nn.MaxPool2d(kernel_size=3, stride=2),
            nn.Conv2d(192, 384, kernel_size=3, padding=1),
            nn.ReLU(inplace=True),
            nn.Conv2d(384, 256, kernel_size=3, padding=1),
            nn.ReLU(inplace=True),
            nn.Conv2d(256, 256, kernel_size=3, padding=1),
            nn.ReLU(inplace=True),
            nn.MaxPool2d(kernel_size=3, stride=2),
        )
        self.convRNN = ConvLSTMCell(256,256,kernel_size=3, padding=1, stride=1)
        self.classifier = nn.Sequential(
            nn.Conv2d(256,6,kernel_size=1, padding=0, stride=1),
            nn.AvgPool2d(kernel_size=6, stride=1, padding=0)
        )

    def forward(self, x, hx):
        x = self.features(x)
        x, hx = self.convRNN(x, hx)
        x = self.classifier(x).squeeze().unsqueeze(0)
        return x, hx

In [None]:
def copyAlexNetParameters(model, target):
    for i, a in enumerate(model.features):
        if type(a) is torch.nn.modules.conv.Conv2d:
            target.features[i].weight = a.weight
            target.features[i].bias   = a.bias

In [None]:
class FrameError(Exception):
    def __init__(self, value):
        self.value = value
    def __str__(self):
        return "Frame Error number :"+str(self.value)

In [None]:
def trainSequence(model, dataset="/video/Gesture/", criterion = nn.CrossEntropyLoss(), lr = 0.01):
    videos = glob.glob(dataset+'*.mp4')
    print("Nb video to handle : ", len(videos))
    model.cuda()
    
    optimizer = optim.SGD( [{'params': model.convRNN.parameters(), 
                            'params': model.classifier.parameters()} ], 
                          lr=lr, momentum=0.9, weight_decay=0.0005)
    

    for epoch in range(5):
        print("epoch:", epoch)
        random.shuffle(videos)
        for video in videos:
            rl = 0
            model.train()
            
            print("Training on Video ", video)
            fName = path.splitext(path.basename(video))[0] #basename
            annotation = readFrameAnnotation(dataset+'annotation/'+fName) #read annotation

            videoCap = cv2.VideoCapture(video)
            nframe = 0
            i = 0
            for seq, gesture in annotation:
                print("Sequence ", seq, " Gesture : ", gesture)
                while nframe != seq[0]:
                    ret, frame = videoCap.read()
                    if not ret:
                        print("Error : None Frame ", nframe)
                        exit(0)
                    nframe += 1
                    
                    
                rl += trainGesture(model, videoCap, seq[1]-seq[0], gesture, criterion, optimizer)
                
                i += 1
                if i%5 == 4:
                    print("[epoch %d] loss : %.3f" % (epoch, rl/5) )
                    #i = 0
                    rl = 0.0
            
            if j%5 == 4:
                r = testNet(model, rootDir+'test/')
                if r > best:
                    best = r
                    print("Saving best model")
                    torch.save(model, 'best-model.ckpt')
            #torch.save(model, path.join('model-'+str(epoch)+".ckpt"))
            #lr = 0.001
            j += 1
            videoCap.release()
        lr = 0.001
    

In [None]:
def trainGesture(model, video, nframe, gesture, criterion, optimize, batchSize=32):
    frame = 0
    #define lstm hidden states
    hx = Variable(torch.Tensor(1,256,6,6).random_().cuda())
    cx = Variable(torch.Tensor(1,256,6,6).random_().cuda())
    
    running_loss = 0
    
    t = transforms.Compose(
                (transforms.ToPILImage(),
                transforms.Scale(225),
                transforms.RandomCrop(225),
                transforms.ToTensor())
                )
    
    while frame != nframe:
        #read frame
        ret, vidframe = video.read()
        frame += 1
        if ret:
            inputs = torch.Tensor(1,3,225,225).cuda()
            inputs[0] = t(vidframe)
            outputs = model(Variable(inputs), (hx, cx))

            pred = outputs[0]

            hx, cx = outputs[1]

            label = torch.LongTensor([gesture]).cuda()
            loss = criterion(pred, Variable(label))
            loss.backward(retain_graph=True)
            optimize.step()
            running_loss += loss.data[0]
    return running_loss

In [None]:
model = convRNN_1_layer()
copyAlexNetParameters(models.alexnet(pretrained=True), model)
trainSequence(model, dataset="/video/Gesture/")