In [1]:
import os, json, cv2, numpy as np, matplotlib.pyplot as plt

import torch
from torch.utils.data import Dataset, DataLoader

import torchvision
from torchvision.models.detection.rpn import AnchorGenerator
from torchvision.transforms import functional as F

In [2]:
import transforms, utils, engine, train
from utils import collate_fn
from engine import train_one_epoch, evaluate

In [3]:
class ClassDataset(Dataset):
    def __init__(self, root, demo=False):                
        self.root = root
        self.demo = demo
        self.imgs_files = sorted(os.listdir(os.path.join(root, "images")))
        self.annotations_files = sorted(os.listdir(os.path.join(root, "annotations")))
    
    def __getitem__(self, idx):
        img_path = os.path.join(self.root, "images", self.imgs_files[idx])
        annotations_path = os.path.join(self.root, "annotations", self.annotations_files[idx])

        img_original = cv2.imread(img_path)
        img_original = cv2.cvtColor(img_original, cv2.COLOR_BGR2RGB)        
        
        with open(annotations_path) as f:
            data = json.load(f)
            bboxes_original = data['bboxes']
            keypoints_original = data['keypoints']
            bboxes_labels_original = ['Dog' for _ in bboxes_original]                 
        
        bboxes_original = torch.as_tensor(bboxes_original, dtype=torch.float32)
        target_original = {}
        target_original["boxes"] = bboxes_original
        target_original["labels"] = torch.as_tensor([1 for _ in bboxes_original], dtype=torch.int64)
        target_original["image_id"] = torch.tensor([idx])
        target_original["iscrowd"] = torch.zeros(len(bboxes_original), dtype=torch.int64)
        target_original["keypoints"] = torch.as_tensor(keypoints_original, dtype=torch.float32)        
        img_original = F.to_tensor(img_original)

        return img_original, target_original
    
    def __len__(self):
        return len(self.imgs_files)

In [4]:
KEYPOINTS_FOLDER_TRAIN = './'
dataset = ClassDataset(KEYPOINTS_FOLDER_TRAIN, demo=True)
data_loader = DataLoader(dataset, batch_size=1, shuffle=True, collate_fn=collate_fn)

iterator = iter(data_loader)

In [5]:
len(dataset.annotations_files),len(dataset.imgs_files)

(200, 200)

In [6]:
batch = next(iterator)

In [7]:
print("Original targets:\n", batch[1], "\n\n")

Original targets:
 ({'boxes': tensor([[ 35.,  32., 281., 226.]]), 'labels': tensor([1]), 'image_id': tensor([96]), 'iscrowd': tensor([0]), 'keypoints': tensor([[[ 70.,  46.,   1.],
         [  0.,   0.,   0.],
         [ 94.,  61.,   1.],
         [  0.,   0.,   0.],
         [ 45.,  49.,   1.],
         [ 88., 105.,   1.],
         [232., 111.,   1.],
         [132.,  97.,   1.],
         [118., 145.,   1.],
         [  0.,   0.,   0.],
         [236., 169.,   1.],
         [  0.,   0.,   0.],
         [120., 200.,   1.],
         [  0.,   0.,   0.],
         [269., 191.,   1.],
         [270., 181.,   1.],
         [115., 216.,   1.],
         [105., 213.,   1.],
         [260., 220.,   1.],
         [257., 211.,   1.]]])},) 




In [8]:
def visualize(image, bboxes, keypoints):
    fontsize = 8

    for bbox in bboxes:
        start_point = (bbox[0], bbox[1])
        end_point = (bbox[0]+bbox[2],bbox[1]+bbox[3])
        image = cv2.rectangle(image.copy(), start_point, end_point, (0,255,0), 2)
    
    for kps in keypoints:
        for idx, kp in enumerate(kps):
            image = cv2.circle(image.copy(), tuple(kp), 3, (255,0,0), 1)
            image = cv2.putText(image.copy(), " " + keypoints_classes_ids20names[idx], tuple(kp), cv2.FONT_HERSHEY_SIMPLEX, 0.4, (255,0,0), 1, cv2.LINE_AA)
    
    
    image = cv2.resize(image, None, fx=2, fy=2)
    cv2.imshow("Output-Keypoints",image)
    cv2.waitKey(0)
    cv2.destroyAllWindows()

In [9]:
keypoints_classes_ids20names = {0: 'L_eye', 1: 'R_eye', 2:'L_ear', 3:'R_ear', 4:'Nose', 5:'Throat', 6:'Tail', 7:'withers', 8:'L_F_elbow', 9:'R_F_elbow', 10:'L_B_elbow', 11:'R_B_elbow', 12:'L_F_knee', 13:'R_F_knee', 14:'L_B_knee', 15:'R_B_knee', 16:'L_F_paw', 17:'R_F_paw', 18:'L_B_paw', 19:'R_B_pse'}

In [10]:
image = (batch[0][0].permute(1,2,0).numpy() * 255).astype(np.uint8)
bboxes = batch[1][0]['boxes'].detach().cpu().numpy().astype(np.int32).tolist()

In [11]:
keypoints = []

In [12]:
for kps in batch[1][0]['keypoints'].detach().cpu().numpy().astype(np.int32).tolist():
    keypoints.append([kp[:2] for kp in kps])

In [13]:
keypoints

[[[70, 46],
  [0, 0],
  [94, 61],
  [0, 0],
  [45, 49],
  [88, 105],
  [232, 111],
  [132, 97],
  [118, 145],
  [0, 0],
  [236, 169],
  [0, 0],
  [120, 200],
  [0, 0],
  [269, 191],
  [270, 181],
  [115, 216],
  [105, 213],
  [260, 220],
  [257, 211]]]

In [14]:
def get_model(num_keypoints, weights_path=None):
    
    anchor_generator = AnchorGenerator(sizes=(32, 64, 128, 256, 512), aspect_ratios=(0.25, 0.5, 0.75, 1.0, 2.0, 3.0, 4.0))
    model = torchvision.models.detection.keypointrcnn_resnet50_fpn(pretrained=False,
                                                                   pretrained_backbone=True,
                                                                   num_keypoints=num_keypoints,
                                                                   num_classes = 2, # Background is the first class, object is the second class
                                                                   rpn_anchor_generator=anchor_generator)

    if weights_path:
        state_dict = torch.load(weights_path)
        model.load_state_dict(state_dict)        
        
    return model

In [15]:
device = torch.device('cuda') if torch.cuda.is_available() else torch.device('cpu')

KEYPOINTS_FOLDER_TRAIN = './train/'
KEYPOINTS_FOLDER_TEST = './test/'

dataset_train = ClassDataset(KEYPOINTS_FOLDER_TRAIN, demo=False)
dataset_test = ClassDataset(KEYPOINTS_FOLDER_TEST, demo=False)

data_loader_train = DataLoader(dataset_train, batch_size=1, shuffle=True, collate_fn=collate_fn)
data_loader_test = DataLoader(dataset_test, batch_size=1, shuffle=False, collate_fn=collate_fn)

model = get_model(num_keypoints = 20)
model.to(device)
params = [p for p in model.parameters() if p.requires_grad]
optimizer = torch.optim.SGD(params, lr=0.001, momentum=0.9, weight_decay=0.0005)
lr_scheduler = torch.optim.lr_scheduler.StepLR(optimizer, step_size=5, gamma=0.3)
num_epochs = 5

In [None]:
for epoch in range(num_epochs):
    train_one_epoch(model, optimizer, data_loader_train, device, epoch, print_freq=1000)
    lr_scheduler.step()
# Save model weights after training
torch.save(model.state_dict(), './model/weights/keypointsrcnn_weights.pth')

Epoch: [0]  [  0/165]  eta: 0:06:32  lr: 0.000007  loss: 9.4864 (9.4864)  loss_classifier: 0.7253 (0.7253)  loss_box_reg: 0.0099 (0.0099)  loss_keypoint: 8.0575 (8.0575)  loss_objectness: 0.6887 (0.6887)  loss_rpn_box_reg: 0.0050 (0.0050)  time: 2.3784  data: 0.0050  max mem: 906
Epoch: [0]  [164/165]  eta: 0:00:00  lr: 0.001000  loss: 7.1600 (8.1284)  loss_classifier: 0.0547 (0.1632)  loss_box_reg: 0.1148 (0.0600)  loss_keypoint: 6.9276 (7.5375)  loss_objectness: 0.0425 (0.3508)  loss_rpn_box_reg: 0.0065 (0.0170)  time: 0.2227  data: 0.0016  max mem: 1868
Epoch: [0] Total time: 0:00:36 (0.2226 s / it)
Epoch: [1]  [  0/165]  eta: 0:00:40  lr: 0.001000  loss: 6.5481 (6.5481)  loss_classifier: 0.1139 (0.1139)  loss_box_reg: 0.2163 (0.2163)  loss_keypoint: 6.1715 (6.1715)  loss_objectness: 0.0378 (0.0378)  loss_rpn_box_reg: 0.0086 (0.0086)  time: 0.2483  data: 0.0000  max mem: 1868
Epoch: [1]  [164/165]  eta: 0:00:00  lr: 0.001000  loss: 6.6410 (6.7601)  loss_classifier: 0.0448 (0.0529)  

In [None]:
data_loader_test

In [None]:
len(data_loader_test.dataset.annotations_files),len(data_loader_test.dataset.imgs_files)

In [None]:
iterator_test = iter(data_loader_test)
images, targets = next(iterator_test)
images = list(image.to(device) for image in images)

with torch.no_grad():
    model.to(device)
    model.eval()
    output = model(images)

print("Predictions: \n", output)

In [None]:
image = (images[0].permute(1,2,0).detach().cpu().numpy() * 255).astype(np.uint8)
scores = output[0]['scores'].detach().cpu().numpy()

high_scores_idxs = np.where(scores > 0.7)[0].tolist() # Indexes of boxes with scores > 0.7
post_nms_idxs = torchvision.ops.nms(output[0]['boxes'][high_scores_idxs], output[0]['scores'][high_scores_idxs], 0.3).cpu().numpy() # Indexes of boxes left after applying NMS (iou_threshold=0.3)

# Below, in output[0]['keypoints'][high_scores_idxs][post_nms_idxs] and output[0]['boxes'][high_scores_idxs][post_nms_idxs]
# Firstly, we choose only those objects, which have score above predefined threshold. This is done with choosing elements with [high_scores_idxs] indexes
# Secondly, we choose only those objects, which are left after NMS is applied. This is done with choosing elements with [post_nms_idxs] indexes

keypoints = []
for kps in output[0]['keypoints'][high_scores_idxs][post_nms_idxs].detach().cpu().numpy():
    keypoints.append([list(map(int, kp[:2])) for kp in kps])

bboxes = []
for bbox in output[0]['boxes'][high_scores_idxs][post_nms_idxs].detach().cpu().numpy():
    bboxes.append(list(map(int, bbox.tolist())))
    
visualize(image, bboxes, keypoints)