## Train Hand Detector

In [None]:
import torch
from torch import nn
import torch.nn.functional as F
from torchvision import transforms
from torchvision.io import read_image
from torch.utils.data import Dataset, ConcatDataset, random_split
from torch.utils.data.dataloader import DataLoader
from torchvision.models import resnet50
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import cv2
from math import ceil
import sys
import os
import time

Packages versions:

In [None]:
print('Python:', sys.version)
print('PyTorch:', torch.__version__)

**Initial configurations**

In [None]:
# define model id
model_id = 'detector_model_07'

# define images path
images_folder = './data/full_images'

# define annotations path
annotations_path = './data/annotations.csv'

Data preparation:

In [None]:
# create dataset class
class CustomImageDataset(Dataset):
    def __init__(self, annotations_file, img_dir, transform = None):
        self.bbox = pd.read_csv(annotations_file)
        self.img_dir = img_dir
        self.transform = transform

    def __len__(self):
        return self.bbox.shape[0]

    def __getitem__(self, idx):
        img_path = os.path.join(self.img_dir, self.bbox.iloc[idx, 0])
        image = read_image(img_path)
        bbox = torch.tensor(self.bbox.iloc[idx, 1:])
        if self.transform:
            image = self.transform(image)
        return image.float(), bbox.float()

In [None]:
# read data
dataset = CustomImageDataset(annotations_path, images_folder, transform = transforms.Compose(
    [transforms.Resize((480, 640)),
     transforms.ToPILImage(),
     transforms.ToTensor(),
     transforms.Normalize(mean = [0.485, 0.456, 0.406], std = [0.229, 0.224, 0.225])]
))

In [None]:
# print dataset size
dataset_size = len(dataset)
print('Size of dataset:', dataset_size)

In [None]:
# splitting data into train, val and test
val_size = ceil(dataset_size * 0.2)
test_size = ceil(dataset_size * 0.2)
train_size = len(dataset) - val_size - test_size
train_data, val_data, test_data = random_split(dataset, [train_size, val_size, test_size], torch.Generator().manual_seed(74))

print('* Sizes after splitting *')
print('Train:', train_size)
print('Validation:', val_size)
print('Test:', test_size)

In [None]:
# load the train and validation datasets into batches
batch_size = 10
train_dl = DataLoader(train_data, batch_size = batch_size, shuffle = True, num_workers = 0, pin_memory = True)
val_dl = DataLoader(val_data, batch_size = batch_size, num_workers = 0, pin_memory = True)

Create classes and functions for training:

In [None]:
# create base class for model training
class ObjectDetectionBase(nn.Module):
    
    def training_step(self, batch):
        images, bbox = batch 
        out = self(images.float())  # predictions
        loss = F.mse_loss(out, bbox) # loss
        return loss
    
    def validation_step(self, batch):
        images, bbox = batch
        out = self(images) # predictions
        loss = F.mse_loss(out, bbox) # loss
        return {'val_loss': loss.detach()}
        
    def validation_epoch_end(self, outputs):
        batch_losses = [x['val_loss'] for x in outputs]
        epoch_loss = torch.stack(batch_losses).mean() # mean of loss
        return {'val_loss': epoch_loss.item()}
    
    def epoch_end(self, epoch, result):
        train_loss = round(result['train_loss'], 8)
        val_loss = round(result['val_loss'], 8)
        print(f'Epoch [{epoch}], train_loss: {train_loss}, val_loss: {val_loss}')

In [None]:
# architecture of CNN model
class HandDetection(ObjectDetectionBase):
    
    def __init__(self, baseModel):

        super().__init__()
        
        self.baseModel = baseModel
        
        self.network = nn.Sequential(
            
            #nn.Conv2d(3, 32, kernel_size = 3, stride = 1, padding = 1),
            #nn.ReLU(),
            #nn.Conv2d(32, 64, kernel_size = 3, stride = 1, padding = 1),
            #nn.ReLU(),
            #nn.MaxPool2d(2,2),
            #
            #nn.Conv2d(64, 128, kernel_size = 3, stride = 1, padding = 1),
            #nn.ReLU(),
            #nn.Conv2d(128, 128, kernel_size = 3, stride = 1, padding = 1),
            #nn.ReLU(),
            #nn.MaxPool2d(2,2),
            #
            #nn.Conv2d(128, 256, kernel_size = 3, stride = 1, padding = 1),
            #nn.ReLU(),
            #nn.Conv2d(256, 256, kernel_size = 3, stride = 1, padding = 1),
            #nn.ReLU(),
            #nn.MaxPool2d(2,2),
            #
            #nn.Flatten(),
            #nn.Linear(76800, 1024),
            nn.Linear(baseModel.fc.in_features, 1024),
            nn.ReLU(),
            nn.Linear(1024, 512),
            nn.ReLU(),
            nn.Linear(512, 4),
            nn.Sigmoid()
        )
        
        self.baseModel.fc = nn.Identity()
    
    def forward(self, xb):
        features = self.baseModel(xb)
        output = self.network(features)
        return output

In [None]:
# define device
def get_default_device():
    # set device to GPU or CPU
    if torch.cuda.is_available():
        return torch.device('cuda')
    else:
        return torch.device('cpu')
    
def to_device(data, device):
    # mode data to device
    if isinstance(data,(list,tuple)):
        return [to_device(x,device) for x in data]
    
    return data.to(device,non_blocking = True)

class DeviceDataLoader():
    
    def __init__(self, dl, device):
        self.dl = dl
        self.device = device
    
    def __iter__(self):
        for b in self.dl:
            yield to_device(b,self.device)
            
    def __len__(self):
        # number of batches
        return len(self.dl)

In [None]:
# model fitting
@torch.no_grad()
def evaluate(model, val_loader):
    model.eval()
    outputs = [model.validation_step(batch) for batch in val_loader]
    return model.validation_epoch_end(outputs)

def fit(epochs, lr, model, train_loader, val_loader, opt_func = torch.optim.SGD):
    history = []
    optimizer = opt_func(model.parameters(), lr)
    
    for epoch in range(epochs):
        model.train()
        train_losses = []
        n_batches = len(train_loader)
        train_loader_iter = iter(train_loader)
              
        for i in range(n_batches):
            batch = next(train_loader_iter)
            loss = model.training_step(batch)
            train_losses.append(loss)
            loss.backward()
            optimizer.step()
            optimizer.zero_grad()
            
        result = evaluate(model, val_loader)
        result['train_loss'] = torch.stack(train_losses).mean().item()
        model.epoch_end(epoch, result)
        history.append(result)
    
    return history

In [None]:
# define plot
def plot_losses(history):
    """ Plot the losses in each epoch"""
    train_losses = [x.get('train_loss') for x in history]
    val_losses = [x['val_loss'] for x in history]
    plt.plot(train_losses, '-bx')
    plt.plot(val_losses, '-rx')
    plt.xlabel('epoch')
    plt.ylabel('loss')
    plt.legend(['Training', 'Validation'])
    plt.title('Train and Validation Losses')
    plt.savefig(f'./outputs/plots/losses_{model_id}.png')

**Training the model**

In [None]:
# pretrained resnet50
resnet = resnet50(pretrained=True)

# freeze parameters
for param in resnet.parameters():
    param.requires_grad = False

In [None]:
# define device and load data
device = get_default_device()
print('Device:', device)

train_dl = DeviceDataLoader(train_dl, device)
val_dl = DeviceDataLoader(val_dl, device)

# load the model to the device
model = to_device(HandDetection(resnet), device)

In [None]:
# define hyperparameters
epochs = 400
lr = 0.05
opt_func = torch.optim.SGD

In [None]:
# fit
start_time = time.time()
model_result = fit(epochs = epochs, lr = lr, model = model, train_loader = train_dl, val_loader = val_dl, opt_func = opt_func)
end_time = time.time()
print(f'Total time: {(end_time - start_time)/60:.3} min')

In [None]:
# plot loss
plot_losses(model_result)

In [None]:
# save model
model_path = f'./outputs/models/{model_id}.pth'
torch.save(model.state_dict(), model_path)

In [None]:
# save results
result_list = ['Model ID:', model_id, '\n',
               'Path to model:', model_path, '\n',
               'Model architecture:', str(model), '\n',
               'Hyperparameters:', '- Epochs:', str(epochs), '- Learning rate:', str(lr), '- Optimization function:', str(opt_func), '\n',
               'Evaluation:', str(model_result[-1])]

textfile = open(f'./outputs/results/results_{model_id}.txt', 'w')
for element in result_list:
    textfile.write(element + "\n")
textfile.close()

**Get the final model**

In [None]:
# change model ID
model_id = 'final_model'

In [None]:
# merge train and validation datasets
full_train = ConcatDataset([train_data, val_data])

In [None]:
# load the datasets into batches
full_train_dl = DataLoader(full_train, batch_size, shuffle = True, num_workers = 0, pin_memory = True)
test_dl = DataLoader(test_data, batch_size, num_workers = 0, pin_memory = True)

In [None]:
# final architecture of CNN model
class HandDetectionFinal(ObjectDetectionBase):
    
    def __init__(self):

        super().__init__()
        self.network = nn.Sequential(
            
            nn.Conv2d(3, 32, kernel_size = 3, stride = 1, padding = 1),
            nn.ReLU(),
            nn.Conv2d(32, 64, kernel_size = 3, stride = 1, padding = 1),
            nn.ReLU(),
            nn.MaxPool2d(2,2),
        
            nn.Conv2d(64, 128, kernel_size = 3, stride = 1, padding = 1),
            nn.ReLU(),
            nn.Conv2d(128, 128, kernel_size = 3, stride = 1, padding = 1),
            nn.ReLU(),
            nn.MaxPool2d(2,2),
            
            nn.Conv2d(128, 256, kernel_size = 3, stride = 1, padding = 1),
            nn.ReLU(),
            nn.Conv2d(256, 256, kernel_size = 3, stride = 1, padding = 1),
            nn.ReLU(),
            nn.MaxPool2d(2,2),
            
            nn.Flatten(),
            nn.Linear(76800, 1024),
            nn.ReLU(),
            nn.Linear(1024, 512),
            nn.ReLU(),
            nn.Linear(512, 4),
            nn.ReLU()
        )
    
    def forward(self, xb):
        return self.network(xb)

In [None]:
# load data
full_train_dl = DeviceDataLoader(train_dl, device)
test_dl = DeviceDataLoader(val_dl, device)

# load the model to the device
final_model = to_device(HandDetectionFinal(), device)

In [None]:
# define hyperparameters
epochs = 30
lr = 0.01
opt_func = torch.optim.SGD

In [None]:
# fit
start_time = time.time()
final_model_result = fit(epochs = epochs, lr = lr, model = final_model, train_loader = full_train_dl, val_loader = test_dl, opt_func = opt_func)
end_time = time.time()
print(f'Total time: {(end_time - start_time)/60:.3} min')

In [None]:
# plot loss
plot_losses(final_model_result)

In [None]:
# evaluate the model in test data
result = evaluate(final_model, test_dl)
print(result)

In [None]:
# save model
model_path = f'./outputs/models/{model_id}.pth'
torch.save(final_model.state_dict(), model_path)

**Prediction for an image**

In [None]:
def convert_pixel_coord(prediction, img_shape):
    pred = np.zeros(4)
    pred[0] = torch.round(prediction[0][0] * img_shape[1])
    pred[1] = torch.round(prediction[0][1] * img_shape[2])
    pred[2] = torch.round(prediction[0][2] * img_shape[1])
    pred[3] = torch.round(prediction[0][3] * img_shape[2])
    return pred.astype(int)

def predict_bbox(img, model):
    # resized_img = transforms.Resize((120, 160))(img)
    resized_img = to_device(img.unsqueeze(0), device)
    prediction = model(resized_img)
    adj_prediction = convert_pixel_coord(prediction, img.shape)
    return adj_prediction

In [None]:
# get image path
file_path = './data/full_images/full_image_A_120.png'

# read image
img = cv2.imread(file_path, cv2.IMREAD_COLOR)
img = cv2.cvtColor(img, cv2.COLOR_BGR2RGB)

# display prediction
img_tensor = transforms.ToTensor()(img)
prediction = predict_bbox(img_tensor, model)
print(f'Predicted values: {prediction}')

# plot result
plt.figure(figsize = (6, 6))
img = cv2.rectangle(img,
                    (prediction[0], prediction[1]),
                    (prediction[2], prediction[3]),
                    color = (255, 0, 0),
                    thickness = 2)
plt.imshow(img)