In [1]:
import os
import cv2
import glob
import json
import math
import torch

import numpy as np
import pandas as pd
import matplotlib.pyplot as plt

from tqdm.auto import tqdm
from torch.utils.data import Dataset, DataLoader
from sklearn.model_selection import train_test_split

In [2]:
def generate_dataframe(path_data, path_label, num_joints=17):
    annots = sorted(glob.glob(os.path.join(path_label, "*.json")))
    
    num_joint = 17
    target = []

    for fname in tqdm(annots):
        with open(fname, "r") as f:
            data = json.load(f)

        width, height = data['label_info']['image']['width'], data['label_info']['image']['height']
        keypoints = data['label_info']['annotations'][0]['keypoints']
        x, y = [], []
        
        
        for i in range(num_joint):
            x_ = keypoints[i * 3]
            y_ = keypoints[i * 3 + 1]
            
            if x_ >= width:
                x_ -= 1
            if y_ >= height:
                y_ -= 1
            
            x.append(x_)
            y.append(y_)

        keypoints_ = list(zip(x, y))
        target.append(keypoints_)
    
    df = pd.DataFrame({"image": sorted(glob.glob(os.path.join(path_data, "*.jpg"))), "target": target, "target_file": annots})
    
    return df

path_data = './images'
path_labels = './annotations'

df = generate_dataframe(path_data, path_labels)
df

  0%|          | 0/9000 [00:00<?, ?it/s]

Unnamed: 0,image,target,target_file
0,./images/livestock_cow_keypoints_000001.jpg,"[(168, 9), (147, 11), (115, 10), (120, 60), (1...",./annotations/livestock_cow_keypoints_000001.json
1,./images/livestock_cow_keypoints_000002.jpg,"[(186, 13), (171, 19), (132, 11), (144, 108), ...",./annotations/livestock_cow_keypoints_000002.json
2,./images/livestock_cow_keypoints_000003.jpg,"[(293, 28), (270, 43), (201, 40), (178, 185), ...",./annotations/livestock_cow_keypoints_000003.json
3,./images/livestock_cow_keypoints_000004.jpg,"[(26, 70), (28, 62), (32, 47), (11, 58), (5, 7...",./annotations/livestock_cow_keypoints_000004.json
4,./images/livestock_cow_keypoints_000005.jpg,"[(110, 21), (91, 36), (73, 19), (30, 173), (44...",./annotations/livestock_cow_keypoints_000005.json
...,...,...,...
8995,./images/livestock_cow_keypoints_008996.jpg,"[(19, 33), (36, 29), (71, 13), (73, 65), (74, ...",./annotations/livestock_cow_keypoints_008996.json
8996,./images/livestock_cow_keypoints_008997.jpg,"[(139, 6), (128, 6), (106, 9), (105, 59), (128...",./annotations/livestock_cow_keypoints_008997.json
8997,./images/livestock_cow_keypoints_008998.jpg,"[(46, 10), (42, 8), (32, 7), (35, 33), (37, 41...",./annotations/livestock_cow_keypoints_008998.json
8998,./images/livestock_cow_keypoints_008999.jpg,"[(216, 9), (176, 21), (137, 30), (109, 123), (...",./annotations/livestock_cow_keypoints_008999.json


In [3]:
train_df, valid_df = train_test_split(df, test_size=0.2)
train_df = train_df.reset_index(drop=True)
valid_df = valid_df.reset_index(drop=True)

In [4]:
import albumentations as A
from albumentations.pytorch import ToTensorV2

width = 512
height = 512

train_transforms = A.Compose([
    A.augmentations.Resize(width=width, height=height, p=1.0),
#     A.augmentations.HorizontalFlip(p=0.5),
#     A.augmentations.ShiftScaleRotate(p=0.5, shift_limit=0.05, scale_limit=0.1, rotate_limit=45),
#     A.HueSaturationValue(p=0.5),
#     A.RandomBrightness(p=0.5),
#     A.GaussNoise(p=0.3),
#     A.GaussianBlur(p=0.3),
#     A.ImageCompression(p=0.3),
    A.augmentations.Normalize(p=1.0),
    ToTensorV2(),
], keypoint_params=A.KeypointParams(format='xy'))

valid_transforms = A.Compose([
    A.augmentations.Resize(width=width, height=height, p=1.0),
    A.augmentations.Normalize(p=1.0),
    ToTensorV2(),
], keypoint_params=A.KeypointParams(format='xy'))


In [5]:
class CustomDataset(Dataset):
    def __init__(self, df, transform, width, height, stride, sigma, num_points=17):
        super().__init__()
        self.df = df
        
        self.transform = transform
        self.num_points = num_points
        
        self.stride = stride
        self.sigma = sigma
    
    
    def __getitem__(self, idx):
        image = cv2.cvtColor(cv2.imread(df.loc[idx, "image"], cv2.IMREAD_COLOR), cv2.COLOR_BGR2RGB)
        keypoints = df.loc[idx, "target"]
        
        transformed = self.transform(image=image, keypoints=keypoints)
        transformed_image = transformed['image']
        transformed_keypoints = transformed['keypoints']
        
        keypoint_maps = self.__generate_heatmap(transformed_keypoints)
        
        return transformed_image, keypoint_maps
        
    
    def __len__(self):
        return self.df.shape[0]
    
    
    def __generate_heatmap(self, points):
        keypoint_maps = np.zeros((self.num_points + 1, width // self.stride, height // self.stride), np.float32)
        
        for index, point in enumerate(points):
            self.__add_gaussian(keypoint_maps[index], point[0], point[1], self.stride, self.sigma)
        
        keypoint_maps[-1] = 1 - keypoint_maps.max(axis=0)
        
        return keypoint_maps
    
    
    def __add_gaussian(self, keypoint_map, x, y, stride, sigma):
        n_sigma = 4
        top_left = [int(x - n_sigma * sigma), int(y - n_sigma * sigma)]
        top_left[0] = max(top_left[0], 0)
        top_left[1] = max(top_left[1], 0)

        bottom_right = [int(x + n_sigma * sigma), int(y + n_sigma * sigma)]
        height, width = keypoint_map.shape
        bottom_right[0] = min(bottom_right[0], width * stride)
        bottom_right[1] = min(bottom_right[1], height * stride)

        shift = stride / 2 - 0.5
        for map_y in range(top_left[1] // stride, bottom_right[1] // stride):
            for map_x in range(top_left[0] // stride, bottom_right[0] // stride):
                d2 = (map_x * stride + shift - x) * (map_x * stride + shift - x) + \
                    (map_y * stride + shift - y) * (map_y * stride + shift - y)
                exponent = d2 / 2 / sigma / sigma
                if exponent > 4.6052:  # threshold, ln(100), ~0.01
                    continue
                keypoint_map[map_y, map_x] += math.exp(-exponent)
                if keypoint_map[map_y, map_x] > 1:
                    keypoint_map[map_y, map_x] = 1

In [8]:
train_dataset = CustomDataset(train_df, train_transforms, width=width, height=height, stride=4, sigma=1, num_points=17)
train_iterator = DataLoader(train_dataset, batch_size=128, shuffle=True)

In [11]:
for image, target in train_iterator:
    print(f"X shape: {image.shape} & y shape: {target.shape}")

X shape: torch.Size([128, 3, 512, 512]) & y shape: torch.Size([128, 18, 128, 128])
X shape: torch.Size([128, 3, 512, 512]) & y shape: torch.Size([128, 18, 128, 128])
X shape: torch.Size([128, 3, 512, 512]) & y shape: torch.Size([128, 18, 128, 128])
X shape: torch.Size([128, 3, 512, 512]) & y shape: torch.Size([128, 18, 128, 128])
X shape: torch.Size([128, 3, 512, 512]) & y shape: torch.Size([128, 18, 128, 128])
X shape: torch.Size([128, 3, 512, 512]) & y shape: torch.Size([128, 18, 128, 128])
X shape: torch.Size([128, 3, 512, 512]) & y shape: torch.Size([128, 18, 128, 128])
X shape: torch.Size([128, 3, 512, 512]) & y shape: torch.Size([128, 18, 128, 128])
X shape: torch.Size([128, 3, 512, 512]) & y shape: torch.Size([128, 18, 128, 128])
X shape: torch.Size([128, 3, 512, 512]) & y shape: torch.Size([128, 18, 128, 128])
X shape: torch.Size([128, 3, 512, 512]) & y shape: torch.Size([128, 18, 128, 128])
X shape: torch.Size([128, 3, 512, 512]) & y shape: torch.Size([128, 18, 128, 128])
X sh