# Animal Pose Estimation

In [1]:
import torch
import os
import torch
import pandas as pd
import numpy as np
from skimage import io, transform
import matplotlib.pyplot as plt
from sklearn.model_selection import train_test_split
from torch.utils.data import Dataset, DataLoader
import torchvision.transforms.functional as F
from torchvision import transforms, utils
import torch
import torch.nn as nn
import torch.nn.functional as F
from  src.visualize_keypoints import *
import shutil
import warnings

import gdown
warnings.filterwarnings("ignore")

device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
train_on_gpu = torch.cuda.is_available()

### Download Dataset and Unzip 
~ 4min

In [188]:
!mkdir Dataset
!gdown "https://drive.google.com/drive/folders/1xxm6ZjfsDSmv6C9JvbgiGrmHktrUjV5x" -O Dataset --folder
!unzip Dataset/images.zip -d Dataset

Processing file 1Ge9jZppE9pGxqVyNAh6bWh4_ukffWBlF images.zip

Retrieving folder list
Retrieving folder list completed
Building directory structure
Downloading...
From (uriginal): https://drive.google.com/uc?id=1Ge9jZppE9pGxqVyNAh6bWh4_ukffWBlF
From (redirected): https://drive.google.com/uc?id=1Ge9jZppE9pGxqVyNAh6bWh4_ukffWBlF&confirm=t&uuid=e7e95f25-136e-4bde-a5bf-9e75bdc09b86
To: c:\Users\osour\OneDrive - epfl.ch\EPFL\MA2\CIVIL-459 Deep Learning For Autonomous Vehicles\CIVIL-459-Animal-Pose-Estimation\Dataset\images.zip

  0%|          | 0.00/368M [00:00<?, ?B/s]
  0%|          | 524k/368M [00:00<01:12, 5.10MB/s]
  1%|          | 2.10M/368M [00:00<00:43, 8.35MB/s]
  1%|          | 3.15M/368M [00:00<00:49, 7.44MB/s]
  1%|          | 4.19M/368M [00:00<00:51, 7.01MB/s]
  1%|▏         | 5.24M/368M [00:00<00:52, 6.86MB/s]
  2%|▏         | 6.29M/368M [00:00<00:53, 6.72MB/s]
  2%|▏         | 7.34M/368M [00:01<00:54, 6.65MB/s]
  2%|▏         | 8.39M/368M [00:01<00:54, 6.60MB/s]
  3%|▎         | 9.44M/368M [00:01<00:54, 6.56MB/s]
  3%|▎         | 10.5M/3


Processing file 1bAidtvR3MttetncXz0mfr47vYy7Uf5hm keypoints.json
Building directory structure completed
Archive:  Dataset/images.zip
   creating: Dataset/images/
  inflating: Dataset/images/2007_000063.jpg  
  inflating: Dataset/images/2007_000175.jpg  
  inflating: Dataset/images/2007_000332.jpg  
  inflating: Dataset/images/2007_000392.jpg  
  inflating: Dataset/images/2007_000464.jpg  
  inflating: Dataset/images/2007_000491.jpg  
  inflating: Dataset/images/2007_000504.jpg  
  inflating: Dataset/images/2007_000528.jpg  
  inflating: Dataset/images/2007_000549.jpg  
  inflating: Dataset/images/2007_000676.jpg  
  inflating: Dataset/images/2007_000720.jpg  
  inflating: Dataset/images/2007_000783.jpg  
  inflating: Dataset/images/2007_000799.jpg  
  inflating: Dataset/images/2007_000836.jpg  
  inflating: Dataset/images/2007_000876.jpg  
  inflating: Dataset/images/2007_000904.jpg  
  inflating: Dataset/images/2007_000925.jpg  
  inflating: Dataset/images/2007_001073.jpg  
  inflati

  inflating: Dataset/images/2008_004354.jpg  
  inflating: Dataset/images/2008_004374.jpg  
  inflating: Dataset/images/2008_004394.jpg  
  inflating: Dataset/images/2008_004396.jpg  
  inflating: Dataset/images/2008_004402.jpg  
  inflating: Dataset/images/2008_004410.jpg  
  inflating: Dataset/images/2008_004422.jpg  
  inflating: Dataset/images/2008_004427.jpg  
  inflating: Dataset/images/2008_004430.jpg  
  inflating: Dataset/images/2008_004450.jpg  
  inflating: Dataset/images/2008_004453.jpg  
  inflating: Dataset/images/2008_004462.jpg  
  inflating: Dataset/images/2008_004470.jpg  
  inflating: Dataset/images/2008_004476.jpg  
  inflating: Dataset/images/2008_004480.jpg  
  inflating: Dataset/images/2008_004490.jpg  
  inflating: Dataset/images/2008_004498.jpg  
  inflating: Dataset/images/2008_004505.jpg  
  inflating: Dataset/images/2009_001733.jpg  
  inflating: Dataset/images/2009_001783.jpg  
  inflating: Dataset/images/2009_001873.jpg  
  inflating: Dataset/images/2009_0

# Data rework


Our dataset contains the following features:
- **image_id :** (*input, int*) image identification, different animal samples on the same image share the same *image_id*.
- **image :**(*input, array[1024*1024*3]*) image data of the sample.
- **keypoints :** (*output, array[20*3]*) list of individual keypoints, which are lists of three values : [$x_{pos}$, $y_{pos}$, conf].
- **bbox :** (*output, array[4]*) coordinates [$x$,$y$] for diagonal corners defining the bounding box of the animal.
- **label :** (*output, int*) class id of the sample.

## Data annotation formatting

In [189]:
json_file = 'Dataset/keypoints.json'
root_annotations = 'Dataset/labels/'
root_images = 'Dataset/images/'
annotations = json.load(open(json_file, 'r'))

max_image_id = max([int(key) for key in annotations['images'].keys()])

def format_bbox (bbox, image_size):
    bbox[0] = bbox[0]/image_size[1]
    bbox[1] = bbox[1]/image_size[0]
    bbox[2] = bbox[2]/image_size[1]
    bbox[3] = bbox[3]/image_size[0]
    # convert xmax, ymax to w, h
    bbox[2] = bbox[2] - bbox[0]
    bbox[3] = bbox[3] - bbox[1]
    # convert xmin, ymin to x_center, y_center
    bbox[0] = bbox[0] + bbox[2]/2
    bbox[1] = bbox[1] + bbox[3]/2
    bbox = [round(num, 6) for num in bbox]
    return bbox

def extract_info(annotations, image_id, root = root_images):
    filtered_list = [d for d in annotations['annotations'] if d['image_id'] == image_id+1]
    # get image name 
    image_name = annotations['images'][str(image_id+1)]
    category_list = []
    bbox_list = []
    for element in filtered_list:
        # get category id, needs to start from 0
        category_list.append(element['category_id']-1)
        # get bounding box
        bbox  = element['bbox']
        # get image size
        image_path = os.path.join(root, image_name)
        image = cv2.imread(image_path)
        image_size = image.shape
        # normalize bounding box
        bbox_list.append(format_bbox(bbox, image_size))
    return image_name, category_list, bbox_list

def write_info(image_name, category_list, bbox_list, root = root_annotations):
    # write to file with same name as image, removing the extension and replacing it with .txt
    image_name = image_name.split('.')[0] + '.txt'
    with open(os.path.join(root, image_name), 'w') as f:
        # write category id and bounding box
        for i in range(len(category_list)):
            f.write(str(category_list[i]) + ' ' + ' '.join([str(num) for num in bbox_list[i]]) + '\n')

!mkdir -p Dataset\labels

for i in range(max_image_id):
    image_name, category_list, bbox_list = extract_info(annotations, i)
    write_info(image_name, category_list, bbox_list)


A subdirectory or file -p already exists.
Error occurred while processing: -p.


## Data partitioning
Into train, validation and testing

In [190]:
# Read images and annotations
images = [os.path.join(root_images, x) for x in os.listdir(root_images) if x[-3:] == "jpg" or x[-3:] == "png" or x[-4:] == "jpeg" or x[-3:] == "JPG" or x[-3:] == "PNG" or x[-4:] == "JPEG"]
annotations = [os.path.join(root_annotations, x) for x in os.listdir(root_annotations) if x[-3:] == "txt"]

images.sort()
annotations.sort()
print(len(images), len(annotations))
# Split the dataset into train-valid-test splits 
train_images, val_images, train_annotations, val_annotations = train_test_split(images, annotations, test_size = 0.2, random_state = 1)
val_images, test_images, val_annotations, test_annotations = train_test_split(val_images, val_annotations, test_size = 0.5, random_state = 1)

# Create directories (on linux)
#!mkdir Dataset/images/train Dataset/images/val Dataset/images/test Dataset/annotations/train Dataset/annotations/val Dataset/annotations/test

#Utility function to move images 
def move_files_to_folder(list_of_files, destination_folder):
    for f in list_of_files:
        try:
            shutil.move(f, destination_folder)
        except:
            print(f)
            assert False

# create the folders if they don't exist
!mkdir -p Dataset\images\train Dataset\images\val Dataset\images\test Dataset\labels\train Dataset\labels\val Dataset\labels\test

# Move the splits into their folders if they are not already there
move_files_to_folder(train_images, 'Dataset/images/train')
move_files_to_folder(val_images, 'Dataset/images/val/')
move_files_to_folder(test_images, 'Dataset/images/test/')
move_files_to_folder(train_annotations, 'Dataset/labels/train/')
move_files_to_folder(val_annotations, 'Dataset/labels/val/')
move_files_to_folder(test_annotations, 'Dataset/labels/test/')


4608 4608


A subdirectory or file -p already exists.
Error occurred while processing: -p.


In [2]:
from clearml import Task
from ultralytics import YOLO

task = Task.init(project_name="my project", task_name="my task")

yolo_model = YOLO('yolov8n.pt')

results = yolo_model.train(
    data = 'data.yaml',
    imgsz = 640,
    epochs = 50,
    batch = 1,
    name='yolov8n_custom',
)

ClearML Task: created new task id=a28143733d7e407fb15b5f18ce752505
2023-04-17 23:31:08,702 - clearml.Task - INFO - Storing jupyter notebook directly as code
ClearML results page: https://app.clear.ml/projects/e7d62e36ebc84ec2b932249f94beb113/experiments/a28143733d7e407fb15b5f18ce752505/output/log
2023-04-17 23:31:11,155 - clearml.model - INFO - Selected model id: 6701db1d03504114b9dfdec4d07e20d2
ClearML Monitor: GPU monitoring failed getting GPU reading, switching off GPU monitoring


Ultralytics YOLOv8.0.81  Python-3.10.0 torch-2.0.0+cpu CPU
[34m[1myolo\engine\trainer: [0mtask=detect, mode=train, model=yolov8n.pt, data=data.yaml, epochs=50, patience=50, batch=1, imgsz=640, save=True, save_period=-1, cache=False, device=None, workers=8, project=None, name=yolov8n_custom, exist_ok=False, pretrained=False, optimizer=SGD, verbose=True, seed=0, deterministic=True, single_cls=False, image_weights=False, rect=False, cos_lr=False, close_mosaic=0, resume=False, amp=True, overlap_mask=True, mask_ratio=4, dropout=0.0, val=True, split=val, save_json=False, save_hybrid=False, conf=None, iou=0.7, max_det=300, half=False, dnn=False, plots=True, source=None, show=False, save_txt=False, save_conf=False, save_crop=False, show_labels=True, show_conf=True, vid_stride=1, line_thickness=3, visualize=False, augment=False, agnostic_nms=False, classes=None, retina_masks=False, boxes=True, format=torchscript, keras=False, optimize=False, int8=False, dynamic=False, simplify=False, opset=N

ClearML Monitor: Could not detect iteration reporting, falling back to iterations as seconds-from-start


       1/50         0G      1.034      2.887      1.495          3        640: 100%|██████████| 3686/3686 [25:44<00:00,  2.39it/s]
                 Class     Images  Instances      Box(P          R      mAP50  mAP50-95): 100%|██████████| 1843/1843 [07:54<00:00,  3.88it/s]
                   all       3686       4920      0.359      0.518      0.415      0.282

      Epoch    GPU_mem   box_loss   cls_loss   dfl_loss  Instances       Size
       2/50         0G      1.126      2.472      1.558          2        640: 100%|██████████| 3686/3686 [26:10<00:00,  2.35it/s]
                 Class     Images  Instances      Box(P          R      mAP50  mAP50-95): 100%|██████████| 1843/1843 [07:51<00:00,  3.91it/s]
                   all       3686       4920      0.309       0.47      0.335      0.216

      Epoch    GPU_mem   box_loss   cls_loss   dfl_loss  Instances       Size
       3/50         0G      1.212      2.439      1.626          3        640: 100%|██████████| 3686/3686 [25:24<00:00

In [None]:
!yolo task=detect mode=predict mode=runs/detect/yolov8n_custom39/weights/best.pt source=

In [25]:

# cow sheep horse cat dog
labels = {'dog':1, 'cat':2, 'sheep':3, 'horse':4, 'cow':5} 

class AnimalPoseDataset(Dataset):
    def __init__ (self, json_file, root_dir, transform=None):
        self.keypoints_frame = json.load(open(json_file))
        self.root_dir = root_dir
        self.transform = transform
    
    def __len__(self):
        return len(self.keypoints_frame["annotations"])
    
    def __getitem__(self, idx):
        if torch.is_tensor(idx):
            idx = idx.tolist()

        anno_dict = self.keypoints_frame
        img_id = anno_dict["annotations"][idx]["image_id"]
        image_map = anno_dict["images"]
        annotations = anno_dict["annotations"]

        imagename = image_map[str(annotations[idx]["image_id"])]
        bbox = torch.tensor(annotations[idx]["bbox"])
        keypoints = annotations[idx]["keypoints"]
        label = annotations[idx]["category_id"] 
        image_path = os.path.join(self.root_dir, imagename)
        image = cv2.imread(image_path)
        sample = {'image_id': img_id, 'image': image, 'keypoints': keypoints, 'bbox':bbox, 'label':label}
        
        if self.transform:
            sample = self.transform(sample)
            
        return sample
    
    def draw(self, sample):
        image = sample['image']
        bbox = sample['bbox']
        xmin, ymin, xmax, ymax = bbox 
        image = draw_bbox(image, xmin, ymin, xmax, ymax, random_color())
        image = draw_keypoint(image, sample['keypoints'])
        return image


def rotate_image(image, angle):
  image_center = tuple(np.array(image.shape[1::-1]) / 2)
  rot_mat = cv2.getRotationMatrix2D(image_center, angle, 1.0)
  result = cv2.warpAffine(image, rot_mat, image.shape[1::-1], flags=cv2.INTER_LINEAR)

  return result

class Rescale (object):
    def __init__(self, output_size):
        assert isinstance(output_size, (int, tuple))
        self.output_size = output_size
    
    def __call__ (self, sample):
        img_id, image, keypoints, bbox = sample['image_id'],sample['image'], sample['keypoints'], sample['bbox']
        h, w = image.shape[:2]

        if isinstance(self.output_size, int):
            if h > w:
                new_h, new_w = self.output_size * h / w, self.output_size
            else:
                new_h, new_w = self.output_size, self.output_size * w / h
        else:
            new_h, new_w = self.output_size
        
        new_h, new_w = int(new_h), int(new_w)
        # scale the image
        img = transform.resize(image, (new_h, new_w))
        # scale the keypoints
        scaled_keypoints = []
        for kp in keypoints:
            new_x = int(kp[0] * new_w / w)
            new_y = int(kp[1] * new_h / h)
            scaled_keypoints.append([new_x, new_y, kp[2]])
        # convert to tensor
        scaled_keypoints = torch.tensor(scaled_keypoints)
        # scale the bbox
        xmin, ymin, xmax, ymax = bbox
        xmin = int(xmin * new_w / w)
        xmax = int(xmax * new_w / w)
        ymin = int(ymin * new_h / h)
        ymax = int(ymax * new_h / h)
        # convert to tensor
        bbox = torch.tensor([xmin, ymin, xmax, ymax])
        
        return {'image_id':img_id, 'image': img, 'keypoints': scaled_keypoints, 'bbox':bbox, 'label':sample['label']}
        
class SDA(object):
    
    def __init__(self, nb_bodyparts, tolerance=20):
        # number of body parts to add to the image
        self.nb_bodyparts = nb_bodyparts
        self.bodypart_pool = []
        self.tolerance=tolerance

    def __call__(self, sample):
        img_id, image, keypoints, bbox, label = sample['image_id'], sample['image'], sample['keypoints'], sample['bbox'], sample['label']
        image, keypoints, bodyparts = self.crop_bodypart(image, keypoints)
        self.bodypart_pool.extend(bodyparts)
        
        # add the body parts to the image
        for i in range(self.nb_bodyparts):
            image = self.add_bodyparts(image)
        
        return {'image_id':img_id, 'image': image, 'keypoints': keypoints, 'bbox':bbox, 'label':label}

    def crop_bodypart(self, image, keypoints):
        mask = np.zeros(image.shape[:2], dtype=np.uint8)
        draw_keypoint(mask, keypoints)
        # find the contours in the mask
        contours, hierarchy = cv2.findContours(mask, cv2.RETR_TREE, cv2.CHAIN_APPROX_SIMPLE)
        # crop the different body parts and store them 
        bodyparts = []
        for i in range(len(contours)):
            x,y,w,h = cv2.boundingRect(contours[i])
            bodyparts.append(image[y-self.tolerance:y+h+self.tolerance, x-self.tolerance:x+w+self.tolerance])
        # return the image with the body parts and the keypoints
        return image, keypoints, bodyparts
    
    def add_bodyparts(self, image):        
        # randomly select a body part
        # check if the body part pool is empty
        if len(self.bodypart_pool) == 0:
            return image
        bodypart = random.choice(self.bodypart_pool)
        # randomly select an angle
        #angle = random.randint(0, 360)        
        # rotate the body part
        #bodypart = rotate_image(bodypart, angle)
        h,w,_ = bodypart.shape

        # randomly select a position for the body part
        x = random.randint(0, image.shape[1] - w)
        y = random.randint(0, image.shape[0] - h)
        
        image[y:y+h, x:x+w] = cv2.addWeighted(image[y:y+h, x:x+w], 0, bodypart, 1, 0)
        return image


    def show_bodyparts(self):     
        for i in range(len(self.bodypart_pool)):
            plt.imshow(self.bodypart_pool[i])
            plt.show()
#TODO: adapt SDA so it has a limited body part pool, if else it will consume too much memory
dataset = AnimalPoseDataset(json_file='Dataset/keypoints.json', 
                            root_dir='Dataset/images/',
                            transform=transforms.Compose([Rescale((640,640))]))
                                                        

dataloader = DataLoader(dataset, batch_size=1, shuffle=True, num_workers=0)
sample = {'image_id':None, 'image': None, 'keypoints': None, 'bbox':None, 'label':None}
for i_batch, sample_batched in enumerate(dataloader):
    #print(i_batch, sample_batched['image'].size(), sample_batched['label'])
    if i_batch == 3:
        sample['image_id'] = sample_batched['image_id']
        sample['image'] = sample_batched['image']
        sample['keypoints'] = sample_batched['keypoints']
        sample['bbox'] = sample_batched['bbox']
        sample['label'] = sample_batched['label']
        break


# YOLOv8 net

- **k** : kernel size
- **s** : stride, determines the step size the convolution filter moves across the input image.
- **p** : padding, used to control the spatial dimensions of the ouptut feature map by adding extra pixels around the input image or feature map before applying the convolution

Special components:
- **Split**: divides the input feature map into two or more separate feature maps along a specified axis(usually channel axis). This can be useful for processing parts of the feature map separately of feeding them into different parallel sub-nets
- **Bottleneck**: design pattern often used to reduce the dimensionality of feature maps, followed by an expansion to the original dimensionality. This is usually achieved using a series of conv layers with varying kernels sizes and channel dimensions. Helps in reducing the models computational complexity while preserving relevant features
- **Concat**: the concatenation op combines multiple feature maps along a specified axis. This is useful for merging information from different sources or resolutions withing the network, which can help improve the model's ability ot learn complex features and relationships


In [None]:
### Detail block modules
# Conv
class Conv(nn.Module):
    def __init__(self, k, s, p, c_in, c_out):
        super().__init__()
        # 2d conv layer
        self.conv = nn.Conv2d(c_in, c_out, k, s, p)
        # batch normalization
        self.bn = nn.BatchNorm2d(c_out)
        # SiLU activation
        self.act = nn.SiLU()

    def forward(self, x):
        x = self.conv(x)
        x = self.bn(x)
        x = self.act(x)
        return x


class Bottleneck(nn.Module):
    def __init__(self, shortcut, h, w , c_in):
        super().__init__()
        self.conv = Conv(k=3, s=1, p=1, c_in=c_in, c_out=c_in//2)
        self.conv1 = Conv(k=3, s=1, p=1, c_in=c_in//2, c_out=c_in)
        self.shortcut = shortcut

    def forward(self, x):
        if self.shortcut:
            xp = self.conv1(self.conv(x))
            x = x + xp
        else:
            x = self.conv(x)
            x = self.conv1(x)        
        return x

class SPPF(nn.Module):
    def __init__(self, c_in):
        super().__init__()
        self.conv = Conv(k=1, s=1, p=0, c_in=c_in,c_out=c_in)    
        self.maxpool = nn.MaxPool2d(kernel_size=5, stride=32, padding=2) 

    def forward(self, x):
        x1 = self.conv(x)
        x2 = self.maxpool(x1)
        x3 = self.maxpool(x2)
        x4 = self.maxpool(x3)
        xtot = x1 + x2 + x3 + x4
        #x = torch.cat((x4, xtot), dim=1)
        x = self.conv(xtot)
        return x

class C2f(nn.Module):
    def __init__(self, shortcut, c_in, c_out, h, w, n):
        super().__init__()
        self.conv1 = Conv(k=1, s=1, p=0, c_in=c_in, c_out=c_out)
        self.conv2 = Conv(k=1, s=1, p=0, c_in=int(0.5*(n+2)*c_out), c_out=c_out)
        self.bottleneck = Bottleneck(shortcut=shortcut, h=h, w=w, c_in=c_out//2)
        self.n = n
        self.cout = c_out
        self.cin = c_in

    def forward(self, x):
        x = self.conv1(x)
        
        # split the input into half and the other half channels
        split_x = torch.split(x, x.size(1)//2, dim=1)        
        # current bottleneck
        bn = split_x[0]
        # list of bottlenecks to be stacked
        bn_x = []
        bn_x.append(bn)        
        # iterate through the number of bottlenecks to be stacked
        for i in range(self.n):
            # apply the bottleneck to the first half channels and store them in bn_x
            bn = self.bottleneck(bn)
            bn_x.append(bn)
               
        # concatenate the first half channels with the second half channels
        bn_x.append(split_x[1])
        # concatenate the bottlenecks
        x = torch.cat(bn_x, dim=1)
        x = self.conv2(x)                
        return x

class Detect(nn.Module):
    def __init__(self, num_classes, reg_max, c_in):
        super().__init__()
        self.conv = Conv(k=3, s=1, p=1, c_in=c_in, c_out=c_in)
        self.conv2d_bbox = nn.Conv2d(kernel_size=1, stride=1, padding=0, in_channels=c_in, out_channels=4*reg_max)
        self.bn1 = nn.BatchNorm2d(4*reg_max)
        self.conv2d_cls = nn.Conv2d(kernel_size=1, stride=1, padding=0, in_channels=c_in, out_channels=num_classes)
        self.bn2 = nn.BatchNorm2d(num_classes)
        self.linear = nn.Linear(in_features=6400, out_features=4)
        self.activation = nn.Sigmoid()

    def forward(self, x):
        x = self.conv(x)
        x = self.conv(x)
        x_cls = self.conv2d_cls(x)
        x_cls = self.bn2(x_cls)
        x_bbox = self.conv2d_bbox(x)
        x_bbox = self.bn1(x_bbox)
        #x_bbox = self.calculate_bbox(x_bbox)
        return x_cls, x_bbox

    def calculate_bbox(self, x):
        x_pred = torch.sigmoid(x[:,0,:, :])
        y_pred = torch.sigmoid(x[:,1,:, :])
        w_pred = torch.exp(x[:,2,:, :])
        h_pred = torch.exp(x[:,3,:, :])
        bbox = torch.cat([x_pred, y_pred, w_pred, h_pred], dim=1)
        return bbox

# Main Network architecture
class BBoxNet(nn.Module):
    def __init__(self, w, r, d):
        super(BBoxNet, self).__init__()
    # backbone network modules
        self.conv_0_p1 = Conv(k=3, s=2, p=1, c_in=3, c_out=int(64*w))
        self.conv_1_p2 = Conv(k=3, s=2, p=1, c_in=int(64*w), c_out=int(128*w))
        self.c2f_2 = C2f(shortcut=True, h=160, w=160, n=int(3*d), c_in=int(128*w), c_out=int(128*w))
        self.conv_3_p3 = Conv(k=3, s=2, p=1, c_in=int(128*w), c_out=int(256*w))
        self.c2f_4 = C2f(shortcut=True, h=80, w=80, n=int(6*d), c_in=int(256*w), c_out=int(256*w))
        self.conv_5_p4 = Conv(k=3, s=2, p=1, c_in=int(256*w), c_out=int(512*w))
        self.c2f_6 = C2f(shortcut=True, h=40, w=40, n=int(6*d), c_in=int(512*w), c_out=int(512*w))
        self.conv_7_p5 = Conv(k=3, s=2, p=1, c_in=int(512*w), c_out=int(512*w*r))
        self.c2f_8 = C2f(shortcut=True, h=20, w=20, n=int(3*d), c_in=int(512*w*r), c_out=int(512*r*w))
        self.sppf_9 = SPPF(c_in=int(512*w*r))

    # head network modules
        self.upsample_10 = nn.Upsample(size=(40,40), mode='bilinear', align_corners=False)
        self.concat_11 = torch.cat
        self.c2f_12 = C2f(shortcut=False, c_in=int(512*w*(1+r)), c_out=int(512*w), h=40, w=40, n=int(3*d))
        self.upsample_resolution_13a = nn.Upsample(size=(80,80), mode='bilinear', align_corners=False)
        self.upsample_channels_13b = nn.Conv2d(in_channels=int(512*w), out_channels=int(256*w), kernel_size=1)
        self.concat_14 = torch.cat
        self.c2f_15 = C2f(shortcut=False, c_in=int(512*w), c_out=int(256*w), h=80, w=80, n=int(3*d))
        self.conv_16_p3 = Conv(k=3, s=2, p=1, c_in=int(256*w), c_out=int(256*w))
        self.concat_17 = torch.cat
        # ISSUE HERE, THE ARCHITECTURE OUTPUT CHANNEL SIZE IS PROBABLY WRONG, AS THE CONCATENATION DOES NOT INCREASE THE CHANNEL SIZE
        #self.c2f_18 = C2f(shortcut=False, c_in=int(512*w), c_out=int(512*w), h=40, w=40, n=int(3*d))
        self.c2f_18 = C2f(shortcut=False, c_in=192, c_out=192, h=40, w=40, n=int(3*d))
        self.conv_19 = Conv(k=3, s=2, p=1, c_in=192, c_out=192)
        self.concat_20 = torch.cat
        #self.c2f_21 = C2f(shortcut=False, c_in=int(512*w*(1+r)), c_out=int(512*w), h=20, w=20, n=int(3*d))
        self.c2f_21 = C2f(shortcut=False, c_in=448, c_out=int(512*w), h=20, w=20, n=int(3*d))
    
    # output layers
        self.detect1 = Detect(num_classes=6, reg_max=1, c_in=int(256*w))
        self.detect2 = Detect(num_classes=6, reg_max=1, c_in=192)
        self.detect3 = Detect(num_classes=6, reg_max=1, c_in=int(512*w))

    def forward(self,x):
    # backbone pass
        x = self.conv_0_p1(x)
        x = self.conv_1_p2(x)
        x = self.c2f_2(x)
        x = self.conv_3_p3(x)
        x = self.c2f_4(x)
        # save for concat later
        x_4 = x
        
        x = self.conv_5_p4(x)
        x = self.c2f_6(x)
        
        # save for concat later
        x_6 = x
        x = self.conv_7_p5(x)
        x = self.c2f_8(x)
        x = self.sppf_9(x)
        x_9 = x

    # head pass
        # first brancH
        x = self.upsample_10(x)
        x = self.concat_11((x, x_6), dim=1)
        x = self.c2f_12(x)  
        x_12 = x
        x = self.upsample_resolution_13a(x)
        x = self.upsample_channels_13b(x)
        x = self.concat_14((x, x_4), dim=1)
        x = self.c2f_15(x) 
        x_detect1 = x
        
    # second branch
        x = self.conv_16_p3(x)
        # CHECK CHANNEL ISSUE HEREISSUE HERE
        x = self.concat_17((x_12, x), dim=1)
        x = self.c2f_18(x)
        x_detect2 = x
        x = self.conv_19(x)
        # ISSUE PROPAGATES HERE ALSO
        x = self.concat_20((x, x_9), dim=1)
        x = self.c2f_21(x)
        x_detect3 = x
    
    # output layers
        x_cls1, x_bbox1 = self.detect1(x_detect1)
        x_cls2, x_bbox2 = self.detect2(x_detect2)
        x_cls3, x_bbox3 = self.detect3(x_detect3)        

        return [x_cls1, x_bbox1, x_cls2, x_bbox2, x_cls3, x_bbox3]

def calculate_iou(pred_bboxes, gt_bboxes):
     # Calculate intersection
    inter_xmin = torch.max(pred_bboxes[..., 0], gt_bboxes[..., 0])
    inter_ymin = torch.max(pred_bboxes[..., 1], gt_bboxes[..., 1])
    inter_xmax = torch.min(pred_bboxes[..., 2], gt_bboxes[..., 2])
    inter_ymax = torch.min(pred_bboxes[..., 3], gt_bboxes[..., 3])

    inter_width = torch.clamp(inter_xmax - inter_xmin, min=0)
    inter_height = torch.clamp(inter_ymax - inter_ymin, min=0)
    inter_area = inter_width * inter_height

    # Calculate union
    pred_area = (pred_bboxes[..., 2] - pred_bboxes[..., 0]) * (pred_bboxes[..., 3] - pred_bboxes[..., 1])
    gt_area = (gt_bboxes[..., 2] - gt_bboxes[..., 0]) * (gt_bboxes[..., 3] - gt_bboxes[..., 1])
    union_area = pred_area + gt_area - inter_area

    # Calculate IoU
    iou = inter_area / union_area
    return iou
    
def bbox_regression_loss(pred_bboxes, gt_bboxes):
    # Reshape the predicted bounding boxes to (batch, 4, -1)
    pred_bboxes = pred_bboxes.view(pred_bboxes.size(0), 4, -1)
    # Convert the predicted bounding boxes to absolute coordinates
    pred_bboxes_xy = torch.sigmoid(pred_bboxes[:, :2, :])
    pred_bboxes_wh = torch.exp(pred_bboxes[:, 2:, :])
    # Combine the x, y, width, and height to create the final predicted bounding boxes
    pred_bboxes_abs = torch.cat((pred_bboxes_xy, pred_bboxes_wh), dim=1)
    iou_loss = torch.zeros((pred_bboxes_abs.shape[0], pred_bboxes_abs.shape[2]))
    id_list = torch.zeros((pred_bboxes_abs.shape[0], pred_bboxes_abs.shape[1]))


    for sample in range(pred_bboxes_abs.shape[0]):
        for bbox in range(pred_bboxes_abs.shape[2]):
            
            iou = calculate_iou(pred_bboxes_abs[sample, :, bbox], gt_bboxes[sample, :])
            loss = 1 - iou
            iou_loss[sample, bbox] = loss

        # get the index of the predicted bounding box with the highest IoU
        max_iou, max_iou_idx = torch.max(iou_loss[sample], dim=0)
        # if the IoU is greater than 0.5, then the predicted bounding box is a true positive
        if max_iou > 0.5:
            id_list[sample, :] = pred_bboxes_abs[sample, :, max_iou_idx]

    # Calculate the loss
    loss = torch.sum(torch.abs(id_list - gt_bboxes))
    return loss, id_list

    
# define the training function
def train_net(net, train_loader, n_epochs, optimizer, criterion):
    # loop over the number of epochs
    best_loss = 100000
    for epoch in range(n_epochs):
        # initialize variables to monitor training and validation loss
        train_loss = 0.0

        # set the model to training mode
        net.train()
        for i_batch, data in enumerate(dataloader):
            # move tensors to GPU if CUDA is available
            if train_on_gpu:
                data['image'] = data['image'].cuda()
                data['bbox'] = data['bbox'].cuda()
                data['label'] = data['label'].cuda()
                data['image_id'] = data['image_id'].cuda()
                
            # clear the gradients of all optimized variables
            optimizer.zero_grad()
            # forward pass: compute predicted outputs by passing inputs to the model
            output = net(data['image'].permute(0,3,1,2).float())
            
            # calculate the batch loss
            bbox = torch.stack([t for t in data['bbox']], dim=0)
            loss1, _ = criterion(output[1], bbox)
            loss2,_ = criterion(output[3], bbox)
            loss3,_ = criterion(output[5], bbox)
            # backward pass: compute gradient of the loss with respect to model parameters
            loss = loss1 + loss2 + loss3
            loss.backward()
            # perform a single optimization step (parameter update)
            optimizer.step()
            # update training loss

            train_loss += loss.item()*data['image'].size(0)
            # save the model if validation loss has decreased
            if loss.item() < best_loss:
                print('Validation loss decreased ({:.6f} --> {:.6f}).  Saving model ...'.format(best_loss, loss.item()))
                torch.save(net.state_dict(), 'model.pt')
                best_loss = loss.item()
    
            # save the three feature maps with different name        
            #if i_batch%10 == 0:
            #    plt.imsave("Output/feature_map1_"+str(i_batch)+".jpg", output[0][0,0,:,:].detach().cpu().numpy())
            #    plt.imsave("Output/feature_map2_"+str(i_batch)+".jpg", output[1][0,0,:,:].detach().cpu().numpy())
            #    plt.imsave("Output/feature_map3_"+str(i_batch)+".jpg", output[2][0,0,:,:].detach().cpu().numpy())

            # print train loss in percentage, and remaining batches in epoch
            print('Epoch: {} \tBatch: {} \tLoss: {:.6f} \tRemaining: {}'.format(epoch+1, i_batch+1, loss.item(), len(train_loader)-i_batch-1))
            

        # print training statistics
        # calculate average loss over an epoch
        train_loss = train_loss/len(train_loader.dataset)
        print('Epoch: {} \tTraining Loss: {:.6f}'.format(epoch+1, train_loss))

        # step the scheduler
        #scheduler.step()
net = BBoxNet(w=0.25, r=2, d=0.34)
net = net.to(device)
criterion = bbox_regression_loss
optimizer = torch.optim.Adam(net.parameters(), lr=0.0001)
#scheduler = torch.optim.lr_scheduler.StepLR(optimizer, step_size=10, gamma=0.1)
#scheduler = torch.optim.lr_scheduler.CyclicLR(optimizer, base_lr=0.00005, max_lr=0.002,step_size_up=10,mode='exp_range',cycle_momentum=False)
n_epochs = 10
train_net(net, dataloader, n_epochs, optimizer, criterion)


# Pose Estimation

## OpenPose Architecture

In [192]:
class ConvBlock(nn.Module):
    def __init__(self, in_channels, out_channels, kernel_size=3, stride=1, padding=1, bias=False):
        super(ConvBlock, self).__init__()
        self.conv1 = nn.Conv2d(in_channels, out_channels, kernel_size, stride, padding, bias=bias)
        self.bn1 = nn.BatchNorm2d(out_channels)
        self.conv2 = nn.Conv2d(out_channels, out_channels, kernel_size, stride, padding, bias=bias)
        self.bn2 = nn.BatchNorm2d(out_channels)
        self.conv3 = nn.Conv2d(out_channels, out_channels, kernel_size, stride, padding, bias=bias)
        self.bn3 = nn.BatchNorm2d(out_channels)
    
    def forward(self, x):
        x1 = F.relu(self.bn1(self.conv1(x)))
        x2 = F.relu(self.bn2(self.conv2(x)))
        x3 = F.relu(self.bn3(self.conv3(x)))
        x = torch.cat((x1, x2, x3), dim=1)
        return x

class IndiaNet(nn.Module):
    def __init__(self):
        super(IndiaNet, self).__init__()
    # vgg 19 first 10 layers
        self.conv1_1 = nn.Conv2d(3, 64, 3, 1, 1)
        self.bn1_2 = nn.BatchNorm2d(64)
        self.conv2_3 = nn.Conv2d(64, 64, 3, 1, 1)
        self.bn2_4 = nn.BatchNorm2d(64)
        self.pool1_5 = nn.MaxPool2d(2, 2)

        self.conv3_6 = nn.Conv2d(64, 128, 3, 1, 1)
        self.bn3_7 = nn.BatchNorm2d(128)
        self.conv4_8 = nn.Conv2d(128, 128, 3, 1, 1)
        self.bn4_9 = nn.BatchNorm2d(128)
        self.pool2_10 = nn.MaxPool2d(2, 2)

        self.conv5_11 = nn.Conv2d(128, 256, 3, 1, 1)
        self.bn5_12 = nn.BatchNorm2d(256)
        self.conv6_13 = nn.Conv2d(256, 256, 3, 1, 1)
        self.bn6_14 = nn.BatchNorm2d(256)
        self.conv7_15 = nn.Conv2d(256, 256, 3, 1, 1)
        self.bn7_16 = nn.BatchNorm2d(256)
        self.conv8_17 = nn.Conv2d(256, 256, 3, 1, 1)
        self.bn8_18 = nn.BatchNorm2d(256)
        self.pool3_19 = nn.MaxPool2d(2, 2)

        self.conv9_20 = nn.Conv2d(256, 512, 3, 1, 1)
        self.bn9_21 = nn.BatchNorm2d(512)
        self.conv10_22 = nn.Conv2d(512, 512, 3, 1, 1)
        self.bn10_23 = nn.BatchNorm2d(512)
    
    # phi stage
        self.convb1_24 = ConvBlock(512, 512)
        self.convb2_25 = ConvBlock(512, 512)
        self.convb3_26 = ConvBlock(512, 512)
        self.convb4_27 = ConvBlock(512, 512)
        self.convb5_28 = ConvBlock(512, 512)
        self.conv11_29 = nn.Conv2d(512, 512, 1, 1, 1)
        self.bn11_30 = nn.BatchNorm2d(512)
        self.conv12_31 = nn.Conv2d(512, 512, 1, 1, 1)
        self.bn12_32 = nn.BatchNorm2d(512)

    # rho stage
        self.convb6_33 = ConvBlock(512, 512)
        self.convb7_34 = ConvBlock(512, 512)
        self.convb8_35 = ConvBlock(512, 512)
        self.convb9_36 = ConvBlock(512, 512)
        self.convb10_37 = ConvBlock(512, 512)
        self.conv13_38 = nn.Conv2d(512, 512, 1, 1, 1)
        self.bn13_39 = nn.BatchNorm2d(512)
        self.conv14_40 = nn.Conv2d(512, 512, 1, 1, 1)
        self.bn14_41 = nn.BatchNorm2d(512) 

    # vgg stage
    def stage_vgg(self, x):
        x = F.relu(self.bn1_2(self.conv1_1(x)))
        x = F.relu(self.bn2_4(self.conv2_3(x)))
        x = self.pool1_5(x)
        x = F.relu(self.bn3_7(self.conv3_6(x)))
        x = F.relu(self.bn4_9(self.conv4_8(x)))
        x = self.pool2_10(x)
        x = F.relu(self.bn5_12(self.conv5_11(x)))
        x = F.relu(self.bn6_14(self.conv6_13(x)))
        x = F.relu(self.bn7_16(self.conv7_15(x)))
        x = F.relu(self.bn8_18(self.conv8_17(x)))
        x = self.pool3_19(x)
        x = F.relu(self.bn9_21(self.conv9_20(x)))
        x = F.relu(self.bn10_23(self.conv10_22(x)))
        return x
    
    # phi stage
    def stage_phi(self, x):
        x = self.convb1_24(x)
        x = self.convb2_25(x)
        x = self.convb3_26(x)
        x = self.convb4_27(x)
        x = self.convb5_28(x)
        x = F.relu(self.bn11_30(self.conv11_29(x)))
        x = F.relu(self.bn12_32(self.conv12_31(x)))
        return x
    
    # rho stage
    def stage_rho(self, x):
        x = self.convb6_33(x)
        x = self.convb7_34(x)
        x = self.convb8_35(x)
        x = self.convb9_36(x)
        x = self.convb10_37(x)
        x = F.relu(self.bn13_39(self.conv13_38(x)))
        x = F.relu(self.bn14_41(self.conv14_40(x)))
        return x
        
    
    def forward(self, x):
        x = self.stage_vgg(x)
        x = self.stage_phi(x)
        x = self.stage_rho(x)
        return x    
    
def keypoint_loss(output, target):
    pass

model = IndiaNet()
model = model.to(device)

# Loss and optimizer
criterion = nn.CrossEntropyLoss()
optimizer = torch.optim.Adam(model.parameters(), lr=0.001)


## Loss Functions

## OKS Loss

In [193]:
# keypoint specific weight
k_weight = torch.tensor([1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1])

def OKS_loss(pred_kp, target_kp, target_size):
  loss = 0
  div = torch.sum(target_kp[:,2])
  for i in range(pred_kp.shape[0]):
    d = torch.dist(pred_kp[i, :2], target_kp[i,:2])
    loss = loss + torch.exp(torch.square(d) / (2 * torch.square(target_size)) * torch.square(k_weight[i])) * pred_kp[i, 2]
  loss = 1 - loss/div
  return loss

2023-04-17 19:50:12,663 - clearml.Task - INFO - Completed model upload to https://files.clear.ml/my%20project/my%20task.e3bccb751aaa4391969a43112a2e45ec/models/best.pt


## Keypoint Confidence Loss

In [None]:
data = dataloader.__get_item__(0)

def KPconf_loss(pred_kp, target_kp):
    loss = 0
    for i in range(pred_kp.shape[0]):
        loss = loss + nn.BCELoss(pred_kp[i, 2], target_kp[i, 2])
    


# Training

In [None]:

def train(model, train_loader, num_epochs):
    # Train the model
    total_step = len(train_loader)
    for epoch in range(num_epochs):
        for i, data in enumerate(train_loader):
            # Run the forward pass
            images = data['image']
            keypoints = data['keypoints']
            outputs = model(images)
            loss = criterion(outputs, keypoints)
            
            # Backprop and perform Adam optimisation
            optimizer.zero_grad()
            loss.backward()
            optimizer.step()
            
            # Track the accuracy
            total = labels.size(0)
            _, predicted = torch.max(outputs.data, 1)
            correct = (predicted == labels).sum().item()
            accuracy = 100 * correct / total
            
            if (i+1) % 100 == 0:
                print ('Epoch [{}/{}], Step [{}/{}], Loss: {:.4f}, Accuracy: {}%' 
                    .format(epoch+1, num_epochs, i+1, total_step, loss.item(), accuracy))
                
#train(model, dataloader, num_epochs=5)
