https://github.com/google-research/google-research/tree/master/taperception

https://dl.acm.org/doi/fullHtml/10.1145/3491102.3517497#BibPLXBIB0037

Adding notebook to github for version control. 
Download json_dir and image_dir from google drive to local, or run in Google colab and mount drive.

In [None]:
pip install torchvision numpy pandas scikit-image

In [None]:
#Imports 
import pandas as pd
import json
import numpy as np
import re
import os
import torch
import torch.nn as nn
import torch.optim as optim
from torch.optim import lr_scheduler
import torch.backends.cudnn as cudnn
import torchvision
from torchvision import datasets, models, transforms, utils
from torch.utils.data import Dataset, DataLoader
from skimage import io, transform
import torchvision.models as models

In [None]:
DEVICE = "cuda" if torch.cuda.is_available() else "cpu"
torch.manual_seed(42)

In [None]:
dataset = pd.read_csv('https://raw.githubusercontent.com/google-research-datasets/taperception/main/rico_tap_annotations_idsonly.csv')

json_dir = #ADD PATH OF JSON DIRECTORY
img_dir = #ADD PATH TO IMAGE DIRECTORY

In [None]:
dataset.head()

In [None]:
class image:
    """
    stores the image bounds and related json file to each image. 
    stores all existing image objects in array. 
    """

    _all = []
    
    def __init__(self, image_id, json_file):
        self.image_id = image_id
        self.json_file = json_file
        self.image_bounds = self.json_file['activity']['root']['rel-bounds']
        self.image_bounds_height = self.image_bounds[3]
        self.image_bounds_width = self.image_bounds[2]

        image._all.append(self)

    @classmethod
    def all(cls):
        return cls._all
    
    def get_image_bounds_height(self):
        return self.image_bounds_height

    def get_image_bounds_width(self):
        return self.image_bounds_width

    def get_json_file(self):
        return self.json_file

    def list_objects(self):
        """
        returns list of all objects owned by image
        """
        image_objects = [img_obj for img_obj in img_obj.all() if img_obj.owner == self]
        return image_objects


In [None]:
class img_obj:
    """
    finds the bounds of an object within an image using the image json file. 
    stores object bounds. 
    stores all existing objects in array.
    """

    _all = []

    def __init__(self, object_id, image):
        #object
        self.object_bounds = []
        self.object_id = object_id
        self.image = image

        img_obj._all.append(self)
        self.find_object_bounds(self.image.get_json_file()) #calls method to find object bounds

    @classmethod
    def all(cls):
        return cls._all

    def find_object_bounds(self, dict_file):
        """
        recurring method which loops through dictionary file to find the object id and returns bounds
        TODO: fix this method. there has to be a better way. 
        """
        for key in dict_file:
            if key == 'pointer' and dict_file['pointer'] == self.object_id:
                self.set_object_bounds(dict_file['rel-bounds'])
            if isinstance(dict_file[key], dict):
                self.find_object_bounds(dict_file[key])
            elif isinstance(dict_file[key], list):
                self.find_object_bounds_lst(dict_file[key])

    def find_object_bounds_lst(self, lst):
        """
        recurring method which loops through list to find the object id and return bounds
        """
        for item in lst:
            if isinstance(item, dict):
                self.find_object_bounds(item)
            elif isinstance(item, list):
                self.find_object_bounds_lst(item)

    def get_object_bounds(self):
        return self.object_bounds

    def set_object_bounds(self, new_bounds):
        self.object_bounds = new_bounds
    

In [None]:
class applyMask(object):

    """
    Matrix multiplication of the RGB image and a binary mask of the object
    """

    def __call__(self, sample):
        
        image = sample['image']
        
        object_bounds, height, width = self.get_bounds(sample['image_id'], sample['object_id'])
        
        binary_mask = np.zeros(shape=(image.shape[0], image.shape[1]))
        x_ratio_min = object_bounds[0]/width
        x_ratio_max = object_bounds[2]/width
        y_ratio_min = object_bounds[1]/height
        y_ratio_max = object_bounds[3]/height
        
        for x in range(image.shape[0]):
            for y in range(image.shape[1]):
                if x_ratio_min <= x/image.shape[0] < x_ratio_max and y_ratio_min <= y/image.shape[1] < y_ratio_max:
                    binary_mask[x,y] = 1 #sets binary mask value to 1 if within tappable bounds
        concat = np.dstack((image, binary_mask)) #matrix multiplication of image and binary mask
    
        return {'image': concat, 'label': sample['label']}

    def get_bounds(self, image_id, object_id):
        """
        Returns bounds of the object and image
        """

        #loads json file for specific image id
        json_file = open(json_dir + str(image_id) + '.json') 
        image_json = json.load(json_file)

        #checks if image obect is created 
        image_match = [image for image in image.all() if image.image_id == image_id]
        if len(image_match) == 0:
            #if no, create image object
            img = image(image_id, image_json)
        else:
            img = image_match[0]
        obj = img_obj(object_id, img) #create object
        return obj.get_object_bounds(), img.get_image_bounds_height(), img.get_image_bounds_width()
        
class ToTensor(object):
    """Convert ndarrays in sample to Tensors."""

    def __call__(self, sample):
        image, label = sample['image'], sample['label']

        # swap color axis because
        # numpy image: H x W x C
        # torch image: C x H x W
        image = image.transpose((2, 0, 1))
        return {'image': torch.from_numpy(image),
                'label': label}
                
    

In [None]:
class Tappable(Dataset):
    """
    Creates dataset from the csv of labelled image and object ids
    """

    def __init__(self,root_dir, dataset, transform=None):
        self.root_dir = root_dir
        self.transform = transform
        self.dataset = dataset

    def __len__(self):
        return len(self.dataset)

    def __getitem__(self, idx):
        if torch.is_tensor(idx):
            idx = idx.tolist()

        image_id = self.dataset.iloc[idx, 0]
        object_id = self.dataset.iloc[idx, 1]

        img_name = os.path.join(self.root_dir,
                                str(image_id) + ".jpg", )
        
        image = io.imread(img_name)
        
        label = self.dataset.iloc[idx, 2]
        
        sample = {'image': image, 'image_id': image_id, 'object_id': object_id, 'label': int(label) }
        
        if self.transform:
            sample_out = self.transform(sample)

        return sample_out

In [None]:
dataset_train = Tappable(dataset= dataset[dataset['split']=='train'],
                         root_dir= img_dir,
                         transform=transforms.Compose([
                         applyMask(),
                         ToTensor()
                         ]))

dataset_test = Tappable(dataset= dataset[dataset['split']=='test'],
                        root_dir= img_dir,
                        transform=transforms.Compose([
                        applyMask(),
                        ToTensor()
                        ]))
#batch size should be 1024
dataloader_train = DataLoader(dataset_train,batch_size=20, shuffle=True, pin_memory=True if DEVICE == "cuda" else False)
dataloader_test = DataLoader(dataset_test,batch_size=20, shuffle=True, pin_memory=True if DEVICE == "cuda" else False)

In [None]:
#resnet18 cnn model 
class Block(nn.Module):
    def __init__(self, num_layers, in_channels, out_channels, identity_downsample=None, stride=1):
        super(Block, self).__init__()
        self.num_layers = num_layers
        self.expansion = 1
        self.conv1 = nn.Conv2d(in_channels, out_channels, kernel_size=1, stride=1, padding=0)
        self.bn1 = nn.BatchNorm2d(out_channels)
        self.conv2 = nn.Conv2d(in_channels, out_channels, kernel_size=3, stride=stride, padding=1)
        self.bn2 = nn.BatchNorm2d(out_channels)
        self.conv3 = nn.Conv2d(out_channels, out_channels * self.expansion, kernel_size=1, stride=1, padding=0)
        self.bn3 = nn.BatchNorm2d(out_channels * self.expansion)
        self.relu = nn.ReLU()
        self.identity_downsample = identity_downsample

    def forward(self, x):
        identity = x
        x = self.conv2(x)
        x = self.bn2(x)
        x = self.relu(x)
        x = self.conv3(x)
        x = self.bn3(x)

        if self.identity_downsample is not None:
            identity = self.identity_downsample(identity)

        x += identity
        x = self.relu(x)
        return x

class ResNet(nn.Module):
    def __init__(self, num_layers, block, image_channels, num_classes):
        super(ResNet, self).__init__()
        self.expansion = 1
        layers = [2, 2, 2, 2] #resnet 18
        self.in_channels = 64
        self.conv1 = nn.Conv2d(image_channels, 64, kernel_size=7, stride=2, padding=3)
        self.bn1 = nn.BatchNorm2d(64)
        self.relu = nn.ReLU()
        self.maxpool = nn.MaxPool2d(kernel_size=3, stride=2, padding=1)

        # ResNetLayers
        self.layer1 = self.make_layers(num_layers, block, layers[0], intermediate_channels=64, stride=1)
        self.layer2 = self.make_layers(num_layers, block, layers[1], intermediate_channels=128, stride=2)
        self.layer3 = self.make_layers(num_layers, block, layers[2], intermediate_channels=256, stride=2)
        self.layer4 = self.make_layers(num_layers, block, layers[3], intermediate_channels=512, stride=2)

        self.avgpool = nn.AdaptiveAvgPool2d((1, 1))
        self.fc = nn.Linear(512 * self.expansion, num_classes)

    def forward(self, x):
        x = self.conv1(x)
        x = self.bn1(x)
        x = self.relu(x)
        x = self.maxpool(x)

        x = self.layer1(x)
        x = self.layer2(x)
        x = self.layer3(x)
        x = self.layer4(x)

        x = self.avgpool(x)
        x = x.reshape(x.shape[0], -1)
        x = self.fc(x)
        return x

    def make_layers(self, num_layers, block, num_residual_blocks, intermediate_channels, stride):
        layers = []

        identity_downsample = nn.Sequential(nn.Conv2d(self.in_channels, intermediate_channels*self.expansion, kernel_size=1, stride=stride),
                                            nn.BatchNorm2d(intermediate_channels*self.expansion))
        layers.append(block(num_layers, self.in_channels, intermediate_channels, identity_downsample, stride))
        self.in_channels = intermediate_channels * self.expansion
        for i in range(num_residual_blocks - 1):
            layers.append(block(num_layers, self.in_channels, intermediate_channels)) 
        return nn.Sequential(*layers)

In [None]:
model = ResNet(18, Block, 4, 1000)
model = model.to(DEVICE)

#loss
criterion = nn.CrossEntropyLoss()

#lr - reduces epoch by factor 10 at specific epoch times
optimizer = optim.SGD(model.parameters(), lr=0.05, nesterov=True, momentum = 0.9)
lambda_lr = lambda epoch: 0.1 ** epoch if epoch in [100, 500, 1000, 1300] else 1
lr_scheduler = optim.lr_scheduler.LambdaLR(
   optimizer=optimizer,
   lr_lambda = lambda_lr
)

In [None]:
n_epochs = 2 #should be 1500
test_loss = []
train_loss = []
total_step_train = len(dataloader_train)
total_step_test = len(dataloader_test)
valid_loss_min = np.Inf

for epoch in range(1, n_epochs+1):
    model.train()
    running_loss = 0.0
    print(f'Epoch {epoch}\n')
    for batch_idx, item in enumerate(dataloader_train):
        inputs, labels = item['image'].type(torch.FloatTensor).to(DEVICE), item['label'].type(torch.LongTensor).to(DEVICE)
        optimizer.zero_grad()
        outputs = model(inputs)
        loss = criterion(outputs, labels)
        loss.backward()
        optimizer.step()
        running_loss += loss.item()
        print('Epoch [{}/{}], Step [{}/{}], Train Loss: {:.4f}'
                   .format(epoch, n_epochs, batch_idx, total_step_train, loss.item()))
        if (batch_idx) % 20 == 0:
            print('Epoch [{}/{}], Step [{}/{}], Train Loss: {:.4f}'
                   .format(epoch, n_epochs, batch_idx, total_step_train, loss.item()))
    train_loss.append(running_loss/total_step_train)
    lr_scheduler.step()

    model.eval()
    batch_loss = 0
    for batch_idx, item in enumerate(dataloader_test):
      data_t, target_t = item['image'].type(torch.FloatTensor).to(DEVICE), item['label'].type(torch.LongTensor).to(DEVICE)
      outputs_t = model(data_t)
      loss_t = criterion(outputs_t, target_t)
      batch_loss += loss_t.item()
      if (batch_idx) % 20 == 0:
            print('Epoch [{}/{}], Step [{}/{}], Test Loss: {:.4f}'
                   .format(epoch, n_epochs, batch_idx, total_step_test, loss_t.item()))

    test_loss.append(batch_loss/total_step_test)

    print(f'\ntrain-loss: {np.mean(train_loss):.4f}, test-loss: {np.mean(test_loss):.4f}')

    network_learned = batch_loss < valid_loss_min

    if network_learned:
          valid_loss_min = batch_loss
          torch.save(model.state_dict(), 'resnet.pt')
          print('Improvement-Detected, save-model')