Code for training KeyPoint R-CNN on Coco Dataset and MPI Dataset (Not used in final project)

In [None]:
import os
import numpy as np
import torch
import torch.utils.data
from torch.utils.data import Dataset, DataLoader
from PIL import Image
import torchvision
import os
from imutils import paths
import scipy.io
import pandas as pd
import csv
import skimage
from skimage import io, transform
from pycocotools.coco import COCO

In [None]:
print(torch.cuda.device_count())
print('Device:', torch.device('cuda:0'))

if torch.cuda.is_available():
  device = torch.device('cuda:0')
else:
  device = torch.device('cpu')
  
print(device)

In [None]:
!wget http://images.cocodataset.org/annotations/annotations_trainval2017.zip
!unzip annotations_trainval2017.zip

In [None]:
TRAIN_ANNO_PATH = '/content/annotations/person_keypoints_train2017.json'
VALID_ANNO_PATH = '/content/annotations/person_keypoints_val2017.json'
train_dataset = COCO(TRAIN_ANNO_PATH)
val_dataset = COCO(VALID_ANNO_PATH)

In [None]:
class CocoDataset(Dataset):
    # initialise function of class
    def __init__(self, root, annotation, transforms=None):
        # the data directory 
        self.root = root
        self.transforms = transforms
        self.coco = COCO(annotation)
        self.ids = list(sorted(self.coco.imgs.keys()))

    # obtain the sample with the given index
    def __getitem__(self, index):
        # Image ID
        img_id = self.ids[index]
        # List: get annotation id from coco
        ann_ids = self.coco.getAnnIds(imgIds=img_id)
        # Dictionary: target coco_annotation file for an image
        coco_annotation = self.coco.loadAnns(ann_ids)
        # path for input image
        path = self.coco.loadImgs(img_id)[0]['file_name']
        # open the input image
        image = Image.open(os.path.join(self.root, path))
        
        # number of objects in the image
        num_objs = len(coco_annotation)

        # Bounding boxes for objects
        # In coco format, bbox = [xmin, ymin, width, height]
        # In pytorch, the input should be [xmin, ymin, xmax, ymax]
        boxes = []
        for i in range(num_objs):
            xmin = coco_annotation[i]['bbox'][0]
            ymin = coco_annotation[i]['bbox'][1]
            xmax = xmin + coco_annotation[i]['bbox'][2]
            ymax = ymin + coco_annotation[i]['bbox'][3]
            boxes.append([xmin, ymin, xmax, ymax])
        boxes = torch.as_tensor(boxes, dtype=torch.float32)
        # Labels (In my case, I only one class: target class or background)
        labels = torch.ones((num_objs,), dtype=torch.int64)
        # Tensorise img_id
        img_id = torch.tensor([img_id])
        # Size of bbox (Rectangular)
        areas = []
        for i in range(num_objs):
            areas.append(coco_annotation[i]['area'])
        areas = torch.as_tensor(areas, dtype=torch.float32)
        # Iscrowd
        iscrowd = torch.zeros((num_objs,), dtype=torch.int64)
        keypoints = []
        for i in range(num_objs):
            keypoints.append(coco_annotation[i]['keypoints'])
        keypoints = torch.as_tensor(keypoints, dtype=torch.float32)

        # Annotation is in dictionary format
        targets = {}
        targets["boxes"] = boxes
        targets["labels"] = labels
        targets["image_id"] = img_id
        targets["area"] = areas
        targets["iscrowd"] = iscrowd
        targets["keypoints"] = keypoints

        if self.transforms is not None:
            image = self.transforms(image)

        return image, targets
    
    def __len__(self):
        return len(self.ids)

In [None]:
def collate_fn(batch):
    return tuple(zip(*batch))

transforms = torchvision.transforms.Compose([torchvision.transforms.ToTensor()])

coco_dataset = CocoDataset('/content/drive/MyDrive/EE381K/train2017', 
                           '/content/annotations/person_keypoints_train2017.json',
                           transforms=transforms)
data_loader = torch.utils.data.DataLoader(coco_dataset,
                                          batch_size=4,
                                          shuffle=True,
                                          num_workers=2,
                                          collate_fn=collate_fn)

In [None]:
num_epochs = 3
model = torchvision.models.detection.keypointrcnn_resnet50_fpn(pretrained=False)

# move model to the right device
model.to(device)
    
# parameters
params = [p for p in model.parameters() if p.requires_grad]
optimizer = torch.optim.SGD(params, lr=0.005, momentum=0.9, weight_decay=0.0005)

len_dataloader = len(data_loader)

for epoch in range(num_epochs):
    model.train()
    i = 0    
    for imgs, annotations in data_loader:
        i += 1
        imgs = list(img.to(device) for img in imgs)
        annotations = [{k: v.to(device) for k, v in t.items()} for t in annotations]
        if annotations[0]['boxes'].size() == torch.Size([0]):
            continue
        loss_dict = model(imgs, annotations)
        losses = sum(loss for loss in loss_dict.values())

        optimizer.zero_grad()
        losses.backward()
        optimizer.step()

        if i % 2000 == 0:
            path = '/content/drive/MyDrive/ColabNotebooks/pose_rcnn_{}.pt'.format(str(i / 2000))
            torch.save(model.state_dict(), path)
            print('saved model')

    # saving trained model
    path = '/content/drive/MyDrive/ColabNotebooks/pose_rcnn_{}.pt'.format(epoch)
    torch.save(model.state_dict(), path)
    print('saved model')
    

In [None]:
  PATH = '/content/drive/MyDrive/ColabNotebooks/pose_rcnn.pt'
  torch.save(model.state_dict(), PATH)
  print('saved model')

saved model


Below uses the MPI Dataset to train the model

In [None]:
class PoseDataset(Dataset):
    def __init__(self, csv_file, root_dir, transform=None):
        self.landmarks_frame = pd.read_csv(csv_file)
        self.root_dir = root_dir
        self.transform = transform
    
    def __len__(self):
        return len(self.landmarks_frame)

    def __getitem__(self, idx):
        if torch.is_tensor(idx):
            idx = idx.tolist()

        img_name = os.path.join(self.root_dir,
                                self.landmarks_frame.iloc[idx, 1])

        image = io.imread(img_name)
        image = skimage.img_as_float(image)
        landmarks = self.landmarks_frame.iloc[idx, 2:34]
        landmarks = np.array([landmarks])
        landmarks = landmarks.astype('float').reshape(-1, 2)

        sample = {'image': image, 'targets': landmarks}

        if self.transform:
            sample = self.transform(sample)

        boxes = self.find_box(sample['image'].size(), sample['targets'])

        targets = {}
        targets['boxes'] = boxes
        targets["labels"] = torch.tensor(1)
        targets["image_id"] = torch.tensor(int(self.landmarks_frame.iloc[idx, 1].split('.')[0]))
        targets['keypoints'] = sample['targets']

        return sample['image'], targets


    def map_keypose(self, landmarks):
        targets = {}
        targets['boxes'] = boxes
        targets['r_ankle'] = landmarks[0]
        targets['r_knee'] = landmarks[1]
        targets['r_hip'] = landmarks[2]
        targets['l_hip'] = landmarks[3]
        targets['l_knee'] = landmarks[4]
        targets['l_ankle'] = landmarks[5]
        targets['pelvis'] = landmarks[6] 
        targets['thorax'] = landmarks[7]
        targets['upper_neck'] = landmarks[8]
        targets['head_top'] = landmarks[9]
        targets['r_wrist'] = landmarks[10]
        targets['r_elbow'] = landmarks[11]
        targets['r_shoulder'] = landmarks[12]
        targets['l_wrist'] = landmarks[13]
        targets['l_elbow'] = landmarks[14]
        targets['l_shoulder'] = landmarks[15]
        return targets

    def find_box(self, image_size, landmarks):
        boxes = []
        xmin = 1000
        xmax = 0
        ymin = 1000
        ymax = 0

        for coord in landmarks:
            if coord[0] < xmin:
                xmin = coord[0]
            if coord[0] > xmax:
                xmax = coord[0]
            if coord[1] < ymin:
                ymin = coord[1]
            if coord[1] > ymax:
                ymax = coord[1]

        h, w = image_size[1], image_size[2]
        xmin = max(0, xmin)
        xmax = min(w, xmax)
        ymin = max(0, ymin)
        ymax = min(h, ymax)

        boxes.append([xmin, ymin, xmax, ymax])
        boxes = torch.as_tensor(boxes, dtype=torch.float32)
        boxes = torch.squeeze(boxes)
        return boxes

In [None]:
class RandomCrop(object):
    def __init__(self, output_size):
        self.output_size = (output_size, output_size)


    def __call__(self, sample):
        image, landmarks = sample['image'], sample['targets']

        h, w = image.shape[:2]
        new_h, new_w = self.output_size

        top = np.random.randint(0, h - new_h)
        left = np.random.randint(0, w - new_w)

        image = image[top: top + new_h,
                      left: left + new_w]

        landmarks = landmarks - [left, top]

        return {'image': image, 'targets': landmarks}


class ToTensor(object):
    def __call__(self, sample):
        image, landmarks = sample['image'], sample['targets']

        # swap color axis because
        # numpy image: H x W x C
        # torch image: C X H X W
        image = image.transpose((2, 0, 1))
        return {'image': torch.from_numpy(image).float(), 'targets': torch.from_numpy(landmarks).float()}

In [None]:
IMAGE_DIR = '/content/drive/MyDrive/EE381K/PoseDS'
CSV_FILE = '/content/drive/MyDrive/Colab_Notebooks/mpii_dataset.csv'

transformations = torchvision.transforms.Compose([ToTensor()])
pose_dataset = PoseDataset(CSV_FILE, IMAGE_DIR, transform=transformations)
data_loader = torch.utils.data.DataLoader(pose_dataset, shuffle=True)

In [None]:
# load an instance segmentation model pre-trained on COCO
model = torchvision.models.detection.keypointrcnn_resnet50_fpn(pretrained=True)

for param in model.parameters():
    param.requires_grad = False

criterion = torch.nn.CrossEntropyLoss()
optimizer = torch.optim.SGD(model.parameters(), lr=0.01)
model.to(device)

In [None]:
print('Starting training')
model.train()

for epoch in range(3):  # loop over the dataset multiple times
    running_loss = 0.0

    for i, data in enumerate(data_loader, 0):
        # get the inputs; data is a list of [inputs, labels]
        inputs, labels = data
        labels = [{t: labels[t].to(device) for t in labels}]        
        inputs = inputs.to(device)

        # zero the parameter gradients
        optimizer.zero_grad()

        # forward + backward + optimize
        outputs = model.forward(inputs, labels)
        loss = criterion(outputs, labels)
        loss.backward()
        optimizer.step()
        
        # print statistics
        running_loss += loss.item()
        if i % 1000 == 999:    # print every 1000 mini-batches
            print('[%d, %5d] loss: %.3f' %
                (epoch + 1, i + 1, running_loss / 1000))
            running_loss = 0.0
            torch.save(model.state_dict(), '/content/chkp/check{}-{}.pt'.format(epoch, int((i + 1) / 1000)))

print('Finished Training')

In [None]:
# saving trained model
PATH = r'/content/pose_cnn.pt'
torch.save(model.state_dict(), PATH)

In [None]:
# testing the model
model.eval()

temp_dataset = torchvision.datasets.ImageFolder('/content/demo')
temp_dataloader = DataLoader(temp_dataset)

correct = 0
total = 1
with torch.no_grad():
    for data in data_loader:
        inputs, labels = inputs.to(device), labels.to(device)
        outputs = model.forward(inputs)
        _, predicted = torch.max(outputs.data, 1)
        total += labels.size(0)
        correct += (predicted == labels).sum().item()
        break

if total != 0:
    total = total - 1
print('correct: {}, total: {}'.format(correct, total))