All of the imports

In [None]:
import torch as to
import torch.nn as nn 
import torch.nn.functional as F 
import torch.optim as optim 
import matplotlib.pyplot as plt 
import numpy as np 
import scipy.spatial as spatial 
from numpy.matlib import repmat 
import matplotlib.pyplot as plt 
from pyface.api import GUI 
import math 
import numpy.random as ra 
import sys
import os
import json
import torchvision
import cv2 as cv


from torchvision.io import read_image
from scipy.ndimage import rotate
from torchvision.transforms.functional import rotate as trot
from torchvision.transforms import GaussianBlur

device = to.device('cuda') if to.cuda.is_available() else to.device('cpu')

Helper functions

In [None]:
def get_annotations(path):
    """
    Helper function to get annotations from the json files
    """
    with open(path, 'r') as f:
        annotations = json.load(f)
            
        anns = []
        bbox = []
        files = []
        for ann in annotations['data']:
            temp = []
            landmarks = ann['landmarks']
            files.append(ann['file'])
            for i in range(1,len(landmarks),2):
                temp.append([landmarks[i-1], landmarks[i]])
                
            anns.append(temp)
            bbox.append(ann['bbox'])
                
        anns = np.array(anns)
        bbox = np.array(bbox)
        files = np.array(files)
    return anns,bbox,files

def vector_to_heatmaps(keypoints):
    """
    Creates heatmaps from keypoint locations for a single image.
    Input: array of size N_KEYPOINTS x 2
    Output: array of size N_KEYPOINTS x MODEL_IMG_SIZE x MODEL_IMG_SIZE
    """
    heatmaps = np.zeros([N_KEYPOINTS, MODEL_IMG_SIZE, MODEL_IMG_SIZE])
    for k, (x,y) in enumerate(keypoints):
        x, y = int(x), int(y)
        if (0 < x < MODEL_IMG_SIZE) and (0 < y < MODEL_IMG_SIZE):
            heatmaps[k, int(y), int(x)] = 1
            
    heatmaps = blur_heatmaps(heatmaps)
    return heatmaps
    
    
def blur_heatmaps(heatmaps):
    """
    Blurs the heatmaps using GaussianBlur of defined size
    """
    heatmaps_blurred = heatmaps.copy()
    for k in range(len(heatmaps)):
        if heatmaps_blurred[k].max() == 1:
            heatmaps_blurred[k] = cv.GaussianBlur(heatmaps[k], (51,51), 3)
            heatmaps_blurred[k] = heatmaps_blurred[k] / heatmaps_blurred[k].max()
    
    return heatmaps_blurred

Defining the Architecture and Loss functions

In [None]:
reshape=False

#bin gradients here
grad_bins = 8

#configure the network: filter size
ksz = (5,5)
kha = (np.floor(ksz[0]/2).astype(int),np.floor(ksz[1]/2).astype(int))
    

class ConvBlock(nn.Module):
    def __init__(self, in_depth, out_depth):
        super().__init__()
        
        self.double_conv = nn.Sequential(
            nn.BatchNorm2d(in_depth),
            nn.Conv2d(in_depth, out_depth, kernel_size=3, padding=1, bias=False),
            nn.ReLU(inplace=True),
            nn.BatchNorm2d(out_depth),
            nn.Conv2d(out_depth, out_depth, kernel_size=3, padding=1, bias=False),
            nn.ReLU(inplace=True),
        )
        
    def forward(self, x):
        return self.double_conv(x)


#so here it is:
def OGCNN(im,bins,k1):
    #inputs: im: gray or single channel
    #bins: 0:8 segmented images
    #k1: filter
    
    #out: imsz - (ksz-1)
    
    
    ksz = k1.shape[1:] #extra 1 dim just as technicality 
    kha = (np.floor(ksz[0]/2).astype(int), np.floor(ksz[1]/2).astype(int))
    
    ks = to.zeros(8,ksz[0],ksz[1])
    ks[0] = k1
    
    #8 rotations:
    for i in range(1,8):
        ks[i] = trot(k1, 45*i, torchvision.transforms.functional.InterpolationMode.BILINEAR)
        
    # bascially, zero-padding
    imsz = im.shape
    im2 = to.zeros(imsz[0]+2*kha[0],imsz[1]+2*kha[1])
    im2[kha[0]:kha[0]+imsz[0],kha[1]:kha[1]+imsz[1]] = im
    
    #flatten convolutional neighbors
    flat = to.zeros(imsz[0]*imsz[1],ksz[0]*ksz[1])
    for i in range(ksz[0]):
        for j in range(ksz[1]):
            flat[:,ksz[1]*i + j] = im2[i:i+imsz[0],j:j+imsz[1]].flatten()
                
    labels = bins.flatten()
    flatout = to.zeros(imsz[0]*imsz[1])
        
    #apply convolutional filters
    for i in range(8):
        l = labels==i
        flatout[l] = flat[l,:]@(ks[i].flatten())
        
    out = flatout.unflatten(0,imsz)[kha[0]:-kha[0],kha[1]:-kha[1]] #prune off ends 
    #out = flatout.unflatten(0,imsz)
    
    return out


def OGCNN2(im,bins,k1):
    '''multiple filts at once'''
    
    #inputs: im: gray or single channel
    #bins: 0:8 segmented images
    '''k1: n*row*col filters (n filt total)'''
    
    #out: imsz - (ksz-1)
    
    
    ksz = k1.shape[1:] #first dim stores # filt
    nks = k1.shape[0]
    
    kha = (np.floor(ksz[0]/2).astype(int), np.floor(ksz[1]/2).astype(int))
    
    ks = to.zeros(8,nks,ksz[0],ksz[1]).to(device)
    
    ks[0] = k1
    
    #8 rotations:
    for i in range(1,8):
        ks[i] = trot(k1, 45*i, torchvision.transforms.functional.InterpolationMode.BILINEAR)
        
    imsz = im.shape
    im2 = to.zeros(imsz[0]+2*kha[0],imsz[1]+2*kha[1]).to(device)
    im2[kha[0]:kha[0]+imsz[0],kha[1]:kha[1]+imsz[1]] = im
    
    flat = to.zeros(imsz[0]*imsz[1],ksz[0]*ksz[1]).to(device) #flatten convolutional neighbors
    for i in range(ksz[0]):
        for j in range(ksz[1]):
            flat[:,ksz[1]*i + j] = im2[i:i+imsz[0],j:j+imsz[1]].flatten()
                
    labels = bins.flatten()
    #
    flatout = to.zeros(imsz[0]*imsz[1],nks).to(device) #now life gets complicated
        
    for i in range(8):
        l = labels==i
        flatout[l,:] = flat[l,:]@(ks[i].flatten(1,2).T) #apply convolutional filters
        

    out = flatout.unflatten(0,imsz).permute(2,0,1) #[kha[0]:-kha[0],kha[1]:-kha[1]] 
    '''prune off ends?????????'''
        
    return out


def focal_loss(pred,tru):
    """
    Focal loss calculated over 17 output channels
    """
    eps = 1e-6
    gamma = 3
    input_soft = F.softmax(pred, dim=0) + eps
    alpha = 1

    # compute the actual focal loss
    weight = to.pow(1. - input_soft, gamma)
    focal = -alpha * weight * to.log(input_soft)
    loss_tmp = to.sum(tru*focal, dim=1)

    loss = to.mean(loss_tmp)
    return loss


def focal_loss1c(pred,tru):
    """
    Focal loss calculated over 1 channel output
    """
    eps = 1e-6
    gamma = 3
    alpha =1
    input_soft = F.softmax(to.cat((pred[None,:,:],1-pred[None,:,:]),0),0)+eps #1 channel output
    tru = to.cat((tru[None,:,:],1-tru[None,:,:]),0)
    
    # compute the actual focal loss
    weight = to.pow(1. - input_soft, gamma)
    focal = -alpha * weight * to.log(input_soft)
    loss_tmp = to.sum(tru*focal, dim=1)

    loss = to.mean(loss_tmp)
    return loss




class IoULoss(nn.Module):
    """
    Intersection over Union Loss.
    IoU = Area of Overlap / Area of Union
    This loss is calculated on our heatmaps.
    """
    
    def __init__(self):
        super(IoULoss, self).__init__()
        self.EPSILON = 1e-6
        
    def _op_sum(self, x):
        return x.sum(-1).sum(-1)
    
    def forward(self, y_pred, y_true):
        inter = self._op_sum(y_true * y_pred)
        union = self._op_sum(y_true ** 2) + self._op_sum(y_pred ** 2) - self._op_sum(y_true * y_pred)
        iou = (inter + self.EPSILON) / (union + self.EPSILON)
        iou = to.mean(iou)
        return 1 - iou
        


MODEL_NEURONS = 10

class ShallowUNet(nn.Module):
    """
    A lighter implementation of UNet, it has:
    - fewer downsampling blocks
    - fewer neurons in the layers
    - Batch Normalization is added
    
    The original UNet paper:
    https://arxiv.org/abs/1505.04597
    """
    
    def __init__(self, in_channel, out_channel):
        super().__init__()
        
        
        self.k1 = to.nn.Parameter(to.randn(in_channel,k1sz[0],k1sz[1])) #convolutional filter
        self.b1 = to.nn.Parameter(to.randn(in_channel,1,1)) #convolutional offset vector
        
        
        self.down_conv1 = ConvBlock(in_channel, MODEL_NEURONS)
        self.down_conv2 = ConvBlock(MODEL_NEURONS, MODEL_NEURONS*2)
        self.down_conv3 = ConvBlock(MODEL_NEURONS*2, MODEL_NEURONS*4)
        
        self.bottle_neck = ConvBlock(MODEL_NEURONS*4, MODEL_NEURONS*8)
        
        self.up_conv1 = ConvBlock(MODEL_NEURONS*8 + MODEL_NEURONS*4, MODEL_NEURONS*4)
        self.up_conv2 = ConvBlock(MODEL_NEURONS*4 + MODEL_NEURONS*2, MODEL_NEURONS*2)
        self.up_conv3 = ConvBlock(MODEL_NEURONS + MODEL_NEURONS*2, MODEL_NEURONS)
        
        self.conv_out = nn.Sequential(
            nn.Conv2d(MODEL_NEURONS, out_channel, kernel_size=3, padding=1, bias=False),
            nn.Sigmoid()
        )
        
        # Helper classes
        self.maxpool = nn.MaxPool2d(2)
        self.upsample = nn.Upsample(scale_factor=2, mode='bilinear', align_corners=False)
        
    def forward(self, im,bins):
        
        x = F.relu( OGCNN2(im,bins,self.k1)+self.b1 )
        
        conv_d1 = self.down_conv1(x[None,:,:,:])
        conv_d2 = self.down_conv2(self.maxpool(conv_d1))
        conv_d3 = self.down_conv3(self.maxpool(conv_d2))
        
        bottle_neck = self.bottle_neck(self.maxpool(conv_d3))
        
        conv_u1 = self.up_conv1(to.cat([self.upsample(bottle_neck), conv_d3], dim=1))
        conv_u2 = self.up_conv2(to.cat([self.upsample(conv_u1), conv_d2], dim=1))
        conv_u3 = self.up_conv3(to.cat([self.upsample(conv_u2), conv_d1], dim=1))
        
        out = self.conv_out(conv_u3)
        return out



#should we make these.... atrous???
# Different filter sizes
k1sz = (15,15)
k2sz = (10,10)
k3sz = (5,5)
k4sz = (25,25)

#Define the neural network 
class Net(nn.Module): 
    def __init__(self): 
        super(Net, self).__init__() 
        
        
        # Multi-channel filter. Add more self.ki for more OG layers. 
        self.k1 = to.nn.Parameter(to.randn(40,k1sz[0],k1sz[1])) #convolutional filter
        self.b1 = to.nn.Parameter(to.randn(40,1,1)) #convolutional offset vector
        
                                 
        self.l1 = to.nn.Sequential(
            to.nn.Conv2d(40, 80, kernel_size=5, stride=1, padding=1),
            to.nn.ReLU(),
            to.nn.MaxPool2d(kernel_size=2, stride=2),
            to.nn.Dropout(p=.2)) 
        self.l2 = to.nn.Sequential(
            to.nn.Conv2d(80, 160, kernel_size=3, stride=1, padding=1),
            to.nn.ReLU(),
            to.nn.MaxPool2d(kernel_size=2, stride=2),
            to.nn.Dropout(p=.15))
        self.l3 = to.nn.Sequential(
            to.nn.Conv2d(160, 320, kernel_size=3, stride=1, padding=1),
            to.nn.ReLU(),
            to.nn.MaxPool2d(kernel_size=2, stride=2, padding=1),
            to.nn.Dropout(p=.05))
        self.l4 = to.nn.Sequential(
            to.nn.Conv2d(320, 100, kernel_size=3, stride=1, padding=1),
            to.nn.ReLU(),
            to.nn.Dropout(p=.1))
        self.l5 = to.nn.Sequential(
            to.nn.Conv2d(100, 1, kernel_size=3, stride=1, padding=1),
            to.nn.ReLU(),
            to.nn.MaxPool2d(kernel_size=2, stride=2, padding=1),
            to.nn.Dropout(p=.05))
        
        self.up = to.nn.Upsample(size = (200,200))
    def forward(self,im,bins):
        
        x = F.relu( OGCNN2(im,bins,self.k1)+self.b1 )
        
        x = self.l1(x[None,:,:,:]) 
        x = self.l2(x)
        x = self.l3(x)
        x = self.l4(x)
        x = self.l5(x)
        return self.up(to.sigmoid(x)) #F.hardsigmoid(x) #1-F.relu(1-F.relu(x)) #experimental... forces in 0 to 1 range 
 
 
model = ShallowUNet(20,1)
model.to(device)

Prepare the data and annotations

In [None]:
reshape=False

N_KEYPOINTS = 17
MODEL_IMG_SIZE = 200


trpath = 'HigherHRNet-Human-Pose-Estimation/data/coco/images/train2017_cropped/'
trpath_grad = 'openmonkey_annotations/train_grad_cropped/'
valpath = 'HigherHRNet-Human-Pose-Estimation/data/coco/images/val2017_cropped/'
valpath_grad = 'openmonkey_annotations/val_grad_cropped/'

tr_ann,tr_bbox,tr_files = get_annotations('openmonkey_annotations/person_keypoints_train2017.json')
val_ann,val_bbox,val_files = get_annotations('openmonkey_annotations/person_keypoints_val2017.json')


#train on subset of the data... just for now:
per2use = .4
extract = np.random.rand(tr_files.shape[0])>(1-per2use)
fnames_tr = tr_files[extract]

n_tr = fnames_tr.shape[0]

tr_ann = tr_ann[extract,:,:]
tr_bbox = tr_bbox[extract,:]

#fnames_tr = np.array(os.listdir(trpath))

extract = np.random.rand(val_files.shape[0])>(1-per2use)
fnames_val = val_files[extract]
n_val = fnames_val.shape[0]

val_ann = val_ann[extract,:,:]
val_bbox = val_bbox[extract,:]

z = to.zeros(MODEL_IMG_SIZE,MODEL_IMG_SIZE)
    
val_ann2 = 0*val_ann
val_ann_ims = []
for i in range(val_ann.shape[0]):
    ai = MODEL_IMG_SIZE*(val_ann[i]-val_bbox[i,0:2])/val_bbox[i,2:]
    ai[ai>=200] = 199
    val_ann2[i,:,:] = ai

tr_tot = np.shape(tr_files)[0]

In [None]:
ims_val = []
bins_val = []

for i in range(n_val):
    ims_val.append(cv.imread(valpath + fnames_val[i][0:-4]+'.jpg',0))
    bins_val.append(cv.imread(valpath_grad + fnames_val[i][0:-4]+'.png',0))

Setting up the trainer class

In [None]:
from torch.autograd import Variable

class Trainer:
    def __init__(self, model, criterion, optimizer, config, scheduler=None):
        self.model = model
        self.criterion = criterion
        self.optimizer = optimizer
        self.loss = {"train": [], "val": []}
        self.epochs = config['epochs']
        self.batch_size = config['batch_size']
        self.batches_per_epoch = config['batches_per_epoch']
        self.batches_per_epoch_val = config['batches_per_epoch_val']
        self.device = config['device']
        self.scheduler = scheduler
        self.checkpoint_frequency = 100
        self.early_stopping_epochs = 10
        self.early_stopping_avg = 10
        self.early_stopping_precision = 8
        
        self.to_tensor = transforms.Compose(
            [
                transforms.ToPILImage(),
                transforms.ToTensor(),
            ]
        )
        
    def train(self):
        for epoch in range(self.epochs):
            self._epoch_train()
            self._epoch_eval()
            print(
                "Epoch: {}/{}, Train Loss={}, Val Loss={}".format(
                    epoch + 1,
                    self.epochs,
                    np.round(self.loss["train"][-1], 10),
                    np.round(self.loss["val"][-1], 10)
                )
            )
        
            
            if self.scheduler is not None:
                self.scheduler.step(self.loss['train'][-1])
                
            # Saving the model
            if (epoch+1) % self.checkpoint_frequency == 0:
                to.save(self.model.state_dict(), "model_{}".format(str(epoch+1).zfill(3)))
                
            # early stopping
            if epoch < self.early_stopping_avg:
                min_val_loss = np.round(np.mean(self.loss['val']), self.early_stopping_precision)
                no_decrease_epochs = 0
                
            else:
                val_loss = np.round(np.mean(self.loss['val'][-self.early_stopping_avg:]), self.early_stopping_precision)
                
                if val_loss >= min_val_loss:
                    no_decrease_epochs += 1
                else:
                    min_val_loss = val_loss
                    no_decrease_epochs = 0
                    
            
            if no_decrease_epochs > self.early_stopping_epochs:
                print("Early Stopping")
                break
                
        to.save(self.model.state_dict(), "model_final")
        return self.model
        
        
    def _epoch_train(self):
        self.model.train()
        running_loss = []
        
        for i in range(self.batches_per_epoch):
            
            loss = 0
            self.optimizer.zero_grad()
            idxes = np.random.randint(low=0, high=n_tr, size=(self.batch_size,))
            for j in idxes:
                heatmaps = vector_to_heatmaps(tr_ann2[i,:,:])
                labels = to.tensor(heatmaps).to(cuda0)
                
                outputs = model(ims_tr[j], bins_tr[j])
                loss += criterion(outputs, labels)
                
            loss /= self.batch_size
            loss.backward()
            self.optimizer.step()
            running_loss.append(loss.item())
        
        epoch_loss = np.mean(running_loss)
        self.loss['train'].append(epoch_loss)
                
    def _epoch_eval(self):
        self.model.eval()
        running_loss = []
        
        with to.no_grad():
            for i in range(self.batches_per_epoch_val):
                loss = 0
                idxes = np.random.randint(low=0, high=n_val, size=(self.batch_size,))
                for j in idxes:
                    heatmaps = vector_to_heatmaps(val_ann2[i,:,:])
                    labels = to.tensor(heatmaps).to(cuda0)
                
                    output = model(ims_val[j], bins_val[j])
                    loss += criterion(output, labels)
                    
                loss /= self.batch_size
                running_loss.append(loss.item())
                
            epoch_loss = np.mean(running_loss)
            self.loss['val'].append(epoch_loss)

Train the model:

In [None]:
config = {
    "epochs": 1000,
    "batch_size": 48,
    "batches_per_epoch": 50,
    "batches_per_epoch_val": 20,
    "learning_rate": 0.1,
    "device": device
}

model = Net()
model.to(cuda0)
criterion = FocalLoss()
optimizer = optim.SGD(model.parameters(), lr=config["learning_rate"], momentum=0.9)
# optimizer = optim.Adadelta(model.parameters(), config["learning_rate"])
# optimizer = optim.RMSprop(model.parameters(), lr=config["learning_rate"], momentum=0.9)
scheduler = optim.lr_scheduler.ReduceLROnPlateau(
    optimizer=optimizer, factor=0.5, patience=20, verbose=True, threshold=0.00001
)

trainer = Trainer(model, criterion, optimizer, config, scheduler)
model = trainer.train()

---
Code for metrics evaluation


<br>Calculating the ki values:
<br>This is inspired from: https://cocodataset.org/#keypoints-eval
<br>They use a scale factor, which we've taken as the width of the image.

In [None]:
counts = np.zeros(17)
ki = np.zeros(17)
N = len(train_keypoints['data'])

for annot in train_keypoints['data']:
    keypoints = annot['landmarks']
    width = annot['bbox'][2]
    
    for i in range(0,17*2,2):
        sigma_squared = (keypoints[i]**2 + keypoints[i+1]**2) / width**2
        ki[i//2] += sigma_squared

ki = ki / (N)
ki = np.sqrt(ki)

We'll use these ki values to calculate our metrics

In [None]:
def calc_metrics(pred_path, actual_path):

    with open(pred_path) as f:
        val_keypoint_results = json.load(f)

    with open(actual_path) as g:
        val_keypoints_actual = json.load(g)

    actual_key_map = {}
    for annot in val_keypoints_actual['annotations']:
        actual_key_map[annot['image_id']] = (annot['keypoints'], annot['bbox'][2])

    coco_ks = ki
    pos_to_idx = {
        0:0, 1:1, 2:2, 3:3, 4:4, 5:5, 6:6, 7:7, 8:8, 9:9, 10:10, 11:11, 12:12, 13:13, 14:14, 15:15, 16:16
    }

    keypoint_kis = np.zeros(17)
    for i in range(17):
        keypoint_kis[i] = coco_ks[pos_to_idx[i]]

    J = len(val_keypoint_results)

    pck_epsilon = 0.02
    pa_epsilon = 0.02

    MPJPE = np.zeros(17)
    PCK = 0
    PA = 0

    for i in range(J):
        img_id = val_keypoint_results[0]['image_id']
        width = actual_key_map[img_id][1]
        actual_keypoints = actual_key_map[img_id][0]
        pred_keypoints = val_keypoint_results[0]['keypoints']

        for j in range(0,17*3,3):
            dist = np.sqrt((actual_keypoints[j]-pred_keypoints[j])**2 + (actual_keypoints[j+1]-pred_keypoints[j+1])**2) / width
            MPJPE[j//3] += dist
            PCK += 1 if dist < pck_epsilon else 0

            num = -((actual_keypoints[j]-pred_keypoints[j])**2 + (actual_keypoints[j+1]-pred_keypoints[j+1])**2)
            denm = 2 * (width**2) * (keypoint_kis[j//3]**2)
            normal = np.exp(num/denm)
            PA += 1 if normal >= pa_epsilon else 0

    MPJPE = MPJPE / J
    PCK = PCK / (17*J)
    PA = PA / (17*J)

    return MPJPE, PCK, PA

---
Generate FCN-ResNet masks for the images:

In [None]:
# sample execution (requires torchvision)
from PIL import Image
from torchvision.models.detection import maskrcnn_resnet50_fpn

fcn_model = to.hub.load('pytorch/vision:v0.10.0', 'fcn_resnet50', pretrained=True)
fcn_model.eval()

input_image = Image.open(trpath + fnames_tr[100])
input_image = input_image.convert("RGB")
preprocess = transforms.Compose([
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]),
])

input_tensor = preprocess(input_image)
input_batch = input_tensor.unsqueeze(0)

input_batch = input_batch.to('cuda')
fcn_model.to('cuda')

with to.no_grad():
    output = fcn_model(input_batch)
output_predictions = output['out'][0].argmax(0)

Apply the masks to the images and save the results

In [None]:
import re
from PIL import Image
from torchvision.models.detection import maskrcnn_resnet50_fpn

fcn_model = to.hub.load('pytorch/vision:v0.10.0', 'fcn_resnet50', pretrained=True)
fcn_model.eval()

destpath = 'openmonkey_annotations/masked_val_full/'
baseline_path = 'HigherHRNet-Human-Pose-Estimation/data/coco/images/val2017_cropped/'
grad_path = 'openmonkey_annotations/val_grad_cropped/'
dest_grad_path = 'openmonkey_annotations/masked_grad_val/'

file_names = os.listdir(baseline_path)

for file_name in file_names:

    input_image = Image.open(baseline_path + file_name)
    input_image = input_image.convert("RGB")
    preprocess = transforms.Compose([
        transforms.ToTensor(),
        transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]),
    ])

    input_tensor = preprocess(input_image)
    input_batch = input_tensor.unsqueeze(0)

    input_batch = input_batch.to('cuda')
    fcn_model.to('cuda')

    with to.no_grad():
        output = fcn_model(input_batch)
    output_predictions = output['out'][0].argmax(0)
    r = Image.fromarray(output_predictions.byte().cpu().numpy()).resize(input_image.size)

    kernel = np.ones((30, 30), 'uint8')
    dilated_img = cv.dilate( np.array(r), kernel)

    masked_img = np.array(input_image)
    masked_img[:,:,0][dilated_img <= 0] = 0
    masked_img[:,:,1][dilated_img <= 0] = 0
    masked_img[:,:,2][dilated_img <= 0] = 0
    
    img_id = re.findall(r'[0-9]+', file_name)[0]
    new_file_name = 'val_%07d.png' % int(img_id)
    grad_bin = cv.imread(grad_path + new_file_name)
    
    grad_bin[dilated_img <= 0] = 0

    plt.imsave(destpath + file_name, masked_img)
    plt.imsave(dest_grad_path + new_file_name, grad_bin)

The bins were not saved correctly, because PIL's Image.open reads it incorrectly. Using cv.imread gives correct results. The above code has been modified now, but you can use the one below to make the correction.

In [None]:
img_path = 'openmonkey_annotations/masked_train_full/'
grad_path = 'openmonkey_annotations/train_grad_cropped/'
dest_grad_path = 'masked_grad_train/'

file_names = os.listdir(img_path)

for file_name in file_names:
    masked_img = cv.imread(img_path + file_name)

    img_id = re.findall(r'[0-9]+', file_name)[0]
    new_file_name = 'train_%07d.png' % int(img_id)
    grad_bin = cv.imread(grad_path + new_file_name)
    grad_bin[masked_img <= 0] = 0
    cv.imwrite(new_file_name, grad_bin)

---
Helper code to convert the Simple Baseline output file to OMC output format

In [None]:
act_path = "openmonkey_annotations/test_prediction.json"
res_path = "coco_annotations/results/SimpleBaseline/Trained_resnet_50_test_ogcnn/keypoints_val2017_results.json"

with open(res_path, 'r') as f:
    annotations = json.load(f)
    
res_map = {}
for ann in annotations:
    res_map[ann["image_id"]] = ann["keypoints"]

    
with open(act_path, 'r') as g:
    actual = json.load(g)

data = []

for ann in actual["data"]:
    temp = ann
    img_id = int(ann["file"].split("_")[1].split(".")[0])
    pred_kps = res_map[img_id]
    
    kps = []
    for i in range(0,len(pred_kps),3):
        kps.extend([pred_kps[i] + ann["bbox"][0], pred_kps[i+1] + ann["bbox"][1]])

    temp["landmarks"] = kps
    data.append(temp)
    
result = {
    "data": data,
}

In [None]:
with open("coco_annotations/results/SimpleBaseline/Trained_resnet_50_test_ogcnn/test_prediction.json", 'w') as fp:
    json.dump(result, fp)