In [9]:
!pip install yacs ultralytics



In [10]:
import os
import time
import pandas as pd
import cv2
import numpy as np
import os
import logging
from yacs.config import CfgNode as CN
import math

import torch
import torch.nn.parallel
import torch.backends.cudnn as cudnn
import torch.optim
import torch.utils.data
import torch.utils.data.distributed
import torchvision.transforms as transforms
import torchvision
import torch.nn as nn

# Configs

In [11]:
cfg = CN()

cfg.OUTPUT_DIR = 'output'
cfg.LOG_DIR = 'log'
cfg.DATA_DIR = ''
cfg.GPUS = (0,)  # Updated
cfg.WORKERS = 24
cfg.PRINT_FREQ = 100
cfg.AUTO_RESUME = True  # Updated
cfg.PIN_MEMORY = True
cfg.RANK = 0

# Cudnn related params
cfg.CUDNN = CN()
cfg.CUDNN.BENCHMARK = True  # Updated
cfg.CUDNN.DETERMINISTIC = False  # Updated
cfg.CUDNN.ENABLED = True  # Updated

# common params for NETWORK
cfg.MODEL = CN()
cfg.MODEL.NAME = 'pose_hrnet'
cfg.MODEL.INIT_WEIGHTS = True
cfg.MODEL.PRETRAINED = '/kaggle/input/darkpose/w32_256×256.pth'  # Updated
cfg.MODEL.NUM_JOINTS = 16
cfg.MODEL.TAG_PER_JOINT = True
cfg.MODEL.TARGET_TYPE = 'gaussian'
cfg.MODEL.IMAGE_SIZE = [256, 256]
cfg.MODEL.HEATMAP_SIZE = [64, 64]
cfg.MODEL.SIGMA = 2

cfg.MODEL.EXTRA = CN()
cfg.MODEL.EXTRA.NUM_FEATURES = 256
cfg.MODEL.EXTRA.NUM_STACKS = 8
cfg.MODEL.EXTRA.NUM_BLOCKS = 1
cfg.MODEL.EXTRA.NUM_CLASSES = 16
cfg.MODEL.EXTRA.PRETRAINED_LAYERS = [
  'conv1',
  'bn1',
  'conv2',
  'bn2',
  'layer1',
  'transition1',
  'stage2',
  'transition2',
  'stage3',
  'transition3',
  'stage4'
]
cfg.MODEL.EXTRA.FINAL_CONV_KERNEL = 1

cfg.MODEL.EXTRA.STAGE2 = CN()
cfg.MODEL.EXTRA.STAGE2.NUM_MODULES = 1
cfg.MODEL.EXTRA.STAGE2.NUM_BRANCHES = 2
cfg.MODEL.EXTRA.STAGE2.NUM_BLOCKS = [4, 4]
cfg.MODEL.EXTRA.STAGE2.NUM_CHANNELS = [32, 64]
cfg.MODEL.EXTRA.STAGE2.BLOCK = 'BASIC'
cfg.MODEL.EXTRA.STAGE2.FUSE_METHOD = 'SUM'

cfg.MODEL.EXTRA.STAGE3 = CN()
cfg.MODEL.EXTRA.STAGE3.NUM_MODULES = 4
cfg.MODEL.EXTRA.STAGE3.NUM_BRANCHES = 3
cfg.MODEL.EXTRA.STAGE3.NUM_BLOCKS = [4, 4, 4]
cfg.MODEL.EXTRA.STAGE3.NUM_CHANNELS = [32, 64, 128]
cfg.MODEL.EXTRA.STAGE3.BLOCK = 'BASIC'
cfg.MODEL.EXTRA.STAGE3.FUSE_METHOD = 'SUM'

cfg.MODEL.EXTRA.STAGE4 = CN()
cfg.MODEL.EXTRA.STAGE4.NUM_MODULES = 3
cfg.MODEL.EXTRA.STAGE4.NUM_BRANCHES = 4
cfg.MODEL.EXTRA.STAGE4.NUM_BLOCKS = [4, 4, 4, 4]
cfg.MODEL.EXTRA.STAGE4.NUM_CHANNELS = [32, 64, 128, 256]
cfg.MODEL.EXTRA.STAGE4.BLOCK = 'BASIC'
cfg.MODEL.EXTRA.STAGE4.FUSE_METHOD = 'SUM'

cfg.LOSS = CN()
cfg.LOSS.USE_TARGET_WEIGHT = True
cfg.LOSS.USE_DIFFERENT_JOINTS_WEIGHT = False
# DATASET related params
cfg.DATASET = CN()
cfg.DATASET.ROOT = '/kaggle/input/mpii-2014'  # Updated
cfg.DATASET.DATASET = 'mpii'
cfg.DATASET.TRAIN_SET = 'train'
cfg.DATASET.TEST_SET = 'valid'
cfg.DATASET.DATA_FORMAT = 'jpg'
cfg.DATASET.HYBRID_JOINTS_TYPE = ''
cfg.DATASET.SELECT_DATA = False

# training data augmentation
cfg.DATASET.COLOR_RGB = True  # Updated
cfg.DATASET.FLIP = True  # Updated
cfg.DATASET.NUM_JOINTS_HALF_BODY = 8  # Updated
cfg.DATASET.PROB_HALF_BODY = -1.0  # Updated
cfg.DATASET.ROT_FACTOR = 30  # Updated
cfg.DATASET.SCALE_FACTOR = 0.25  # Updated

# train
cfg.TRAIN = CN()

cfg.TRAIN.LR_FACTOR = 0.1
cfg.TRAIN.LR_STEP = [170, 200]
cfg.TRAIN.LR = 0.001

cfg.TRAIN.OPTIMIZER = 'adam'
cfg.TRAIN.MOMENTUM = 0.9
cfg.TRAIN.WD = 0.0001
cfg.TRAIN.NESTEROV = False
cfg.TRAIN.GAMMA1 = 0.99
cfg.TRAIN.GAMMA2 = 0.0

cfg.TRAIN.BEGIN_EPOCH = 0
cfg.TRAIN.END_EPOCH = 10  # Updated

cfg.TRAIN.RESUME = False
cfg.TRAIN.CHECKPOINT = ''

cfg.TRAIN.BATCH_SIZE_PER_GPU = 32
cfg.TRAIN.SHUFFLE = True

# testing
cfg.TEST = CN()
cfg.TEST.MODEL_FILE = '/kaggle/input/darkpose/w32_256×256.pth'  # Updated
cfg.TEST.BATCH_SIZE_PER_GPU = 32
cfg.TEST.FLIP_TEST = True
cfg.TEST.POST_PROCESS = True
cfg.TEST.BLUR_KERNEL = 11  # Updated
cfg.TEST.SHIFT_HEATMAP = True
cfg.TEST.USE_GT_BBOX = False

# debug
cfg.DEBUG = CN()
cfg.DEBUG.DEBUG = False  # Updated
cfg.DEBUG.SAVE_BATCH_IMAGES_GT = False  # Updated
cfg.DEBUG.SAVE_BATCH_IMAGES_PRED = False  # Updated
cfg.DEBUG.SAVE_HEATMAPS_GT = False  # Updated
cfg.DEBUG.SAVE_HEATMAPS_PRED = False

# model name
model_name = "w32_256x256_adam_lr1e-3"

CTX = torch.device('cuda') if torch.cuda.is_available() else torch.device('cpu')
DEBUG = True
cudnn.benchmark = cfg.CUDNN.BENCHMARK
torch.backends.cudnn.deterministic = cfg.CUDNN.DETERMINISTIC
torch.backends.cudnn.enabled = cfg.CUDNN.ENABLED

MPII_KEYPOINT_INDEXES = {
  0: "right ankle",
  1: "right knee",
  2: "right hip", 
  3: "left hip", 
  4: "left knee", 
  5: "left ankle",
  6: "pelvis", 
  7: "thorax", 
  8: "upper neck", 
  9: "head top", 
  10: "right wrist",
  11: "right elbow", 
  12: "right shoulder", 
  13: "left shoulder", 
  14: "left elbow",
  15: "left wrist"
}
NUM_KPTS = len(MPII_KEYPOINT_INDEXES)
SKELETON = {
  "left_lower_leg": [0, 1], 
  "left_thigh": [2, 1], 
  "left_hip": [2, 6], 
  "right_lower_leg": [5, 4],
  "right_thigh": [3, 4],
  "right_hip": [3, 6],
  "torso": [6, 7], 
  "neck": [7, 8],             #actually it's thorax - upper neck 
  "head": [8, 9],
  "right_forearm": [10, 11],
  "right_upper_arm": [11, 12], 
  "right_shoulder": [12, 7],
  "left_forearm": [15, 14],
  "left_upper_arm": [14, 13], 
  "left_shoulder": [13, 7]
}
SQUAT_PART = ["left_lower_leg", "left_thigh", "right_lower_leg",
              "right_thigh", "torso"]
SQUAT_KEYPART = [
  ["left_lower_leg", "left_thigh"],
  ["right_lower_leg", "right_thigh"]
]
STAGE_ANGLE = [[180, 170], [170, 155],
               [155, 137], [137, 114],
               [114, 0], [0, 114],
               [114, 148], [148, 168],
               [168, 180]]
COLOR = {
  "red": [0, 0, 255],
  "blue": [255, 0, 0],
  "green": [0, 255, 0]
}

# Function definition

In [12]:
BN_MOMENTUM = 0.1
logger = logging.getLogger(__name__)


def conv3x3(in_planes, out_planes, stride=1):
    """3x3 convolution with padding"""
    return nn.Conv2d(in_planes, out_planes, kernel_size=3, stride=stride,
                     padding=1, bias=False)


class BasicBlock(nn.Module):
    expansion = 1

    def __init__(self, inplanes, planes, stride=1, downsample=None):
        super(BasicBlock, self).__init__()
        self.conv1 = conv3x3(inplanes, planes, stride)
        self.bn1 = nn.BatchNorm2d(planes, momentum=BN_MOMENTUM)
        self.relu = nn.ReLU(inplace=True)
        self.conv2 = conv3x3(planes, planes)
        self.bn2 = nn.BatchNorm2d(planes, momentum=BN_MOMENTUM)
        self.downsample = downsample
        self.stride = stride

    def forward(self, x):
        residual = x

        out = self.conv1(x)
        out = self.bn1(out)
        out = self.relu(out)

        out = self.conv2(out)
        out = self.bn2(out)

        if self.downsample is not None:
            residual = self.downsample(x)

        out += residual
        out = self.relu(out)

        return out


class Bottleneck(nn.Module):
    expansion = 4

    def __init__(self, inplanes, planes, stride=1, downsample=None):
        super(Bottleneck, self).__init__()
        self.conv1 = nn.Conv2d(inplanes, planes, kernel_size=1, bias=False)
        self.bn1 = nn.BatchNorm2d(planes, momentum=BN_MOMENTUM)
        self.conv2 = nn.Conv2d(planes, planes, kernel_size=3, stride=stride,
                               padding=1, bias=False)
        self.bn2 = nn.BatchNorm2d(planes, momentum=BN_MOMENTUM)
        self.conv3 = nn.Conv2d(planes, planes * self.expansion, kernel_size=1,
                               bias=False)
        self.bn3 = nn.BatchNorm2d(planes * self.expansion,
                                  momentum=BN_MOMENTUM)
        self.relu = nn.ReLU(inplace=True)
        self.downsample = downsample
        self.stride = stride

    def forward(self, x):
        residual = x

        out = self.conv1(x)
        out = self.bn1(out)
        out = self.relu(out)

        out = self.conv2(out)
        out = self.bn2(out)
        out = self.relu(out)

        out = self.conv3(out)
        out = self.bn3(out)

        if self.downsample is not None:
            residual = self.downsample(x)

        out += residual
        out = self.relu(out)

        return out


class HighResolutionModule(nn.Module):
    def __init__(self, num_branches, blocks, num_blocks, num_inchannels,
                 num_channels, fuse_method, multi_scale_output=True):
        super(HighResolutionModule, self).__init__()
        self._check_branches(
            num_branches, blocks, num_blocks, num_inchannels, num_channels)

        self.num_inchannels = num_inchannels
        self.fuse_method = fuse_method
        self.num_branches = num_branches

        self.multi_scale_output = multi_scale_output

        self.branches = self._make_branches(
            num_branches, blocks, num_blocks, num_channels)
        self.fuse_layers = self._make_fuse_layers()
        self.relu = nn.ReLU(True)

    def _check_branches(self, num_branches, blocks, num_blocks,
                        num_inchannels, num_channels):
        if num_branches != len(num_blocks):
            error_msg = 'NUM_BRANCHES({}) <> NUM_BLOCKS({})'.format(
                num_branches, len(num_blocks))
            logger.error(error_msg)
            raise ValueError(error_msg)

        if num_branches != len(num_channels):
            error_msg = 'NUM_BRANCHES({}) <> NUM_CHANNELS({})'.format(
                num_branches, len(num_channels))
            logger.error(error_msg)
            raise ValueError(error_msg)

        if num_branches != len(num_inchannels):
            error_msg = 'NUM_BRANCHES({}) <> NUM_INCHANNELS({})'.format(
                num_branches, len(num_inchannels))
            logger.error(error_msg)
            raise ValueError(error_msg)

    def _make_one_branch(self, branch_index, block, num_blocks, num_channels,
                         stride=1):
        downsample = None
        if stride != 1 or \
           self.num_inchannels[branch_index] != num_channels[branch_index] * block.expansion:
            downsample = nn.Sequential(
                nn.Conv2d(
                    self.num_inchannels[branch_index],
                    num_channels[branch_index] * block.expansion,
                    kernel_size=1, stride=stride, bias=False
                ),
                nn.BatchNorm2d(
                    num_channels[branch_index] * block.expansion,
                    momentum=BN_MOMENTUM
                ),
            )

        layers = []
        layers.append(
            block(
                self.num_inchannels[branch_index],
                num_channels[branch_index],
                stride,
                downsample
            )
        )
        self.num_inchannels[branch_index] = \
            num_channels[branch_index] * block.expansion
        for i in range(1, num_blocks[branch_index]):
            layers.append(
                block(
                    self.num_inchannels[branch_index],
                    num_channels[branch_index]
                )
            )

        return nn.Sequential(*layers)

    def _make_branches(self, num_branches, block, num_blocks, num_channels):
        branches = []

        for i in range(num_branches):
            branches.append(
                self._make_one_branch(i, block, num_blocks, num_channels)
            )

        return nn.ModuleList(branches)

    def _make_fuse_layers(self):
        if self.num_branches == 1:
            return None

        num_branches = self.num_branches
        num_inchannels = self.num_inchannels
        fuse_layers = []
        for i in range(num_branches if self.multi_scale_output else 1):
            fuse_layer = []
            for j in range(num_branches):
                if j > i:
                    fuse_layer.append(
                        nn.Sequential(
                            nn.Conv2d(
                                num_inchannels[j],
                                num_inchannels[i],
                                1, 1, 0, bias=False
                            ),
                            nn.BatchNorm2d(num_inchannels[i]),
                            nn.Upsample(scale_factor=2**(j-i), mode='nearest')
                        )
                    )
                elif j == i:
                    fuse_layer.append(None)
                else:
                    conv3x3s = []
                    for k in range(i-j):
                        if k == i - j - 1:
                            num_outchannels_conv3x3 = num_inchannels[i]
                            conv3x3s.append(
                                nn.Sequential(
                                    nn.Conv2d(
                                        num_inchannels[j],
                                        num_outchannels_conv3x3,
                                        3, 2, 1, bias=False
                                    ),
                                    nn.BatchNorm2d(num_outchannels_conv3x3)
                                )
                            )
                        else:
                            num_outchannels_conv3x3 = num_inchannels[j]
                            conv3x3s.append(
                                nn.Sequential(
                                    nn.Conv2d(
                                        num_inchannels[j],
                                        num_outchannels_conv3x3,
                                        3, 2, 1, bias=False
                                    ),
                                    nn.BatchNorm2d(num_outchannels_conv3x3),
                                    nn.ReLU(True)
                                )
                            )
                    fuse_layer.append(nn.Sequential(*conv3x3s))
            fuse_layers.append(nn.ModuleList(fuse_layer))

        return nn.ModuleList(fuse_layers)

    def get_num_inchannels(self):
        return self.num_inchannels

    def forward(self, x):
        if self.num_branches == 1:
            return [self.branches[0](x[0])]

        for i in range(self.num_branches):
            x[i] = self.branches[i](x[i])

        x_fuse = []

        for i in range(len(self.fuse_layers)):
            y = x[0] if i == 0 else self.fuse_layers[i][0](x[0])
            for j in range(1, self.num_branches):
                if i == j:
                    y = y + x[j]
                else:
                    y = y + self.fuse_layers[i][j](x[j])
            x_fuse.append(self.relu(y))

        return x_fuse


blocks_dict = {
    'BASIC': BasicBlock,
    'BOTTLENECK': Bottleneck
}


class PoseHighResolutionNet(nn.Module):

    def __init__(self, cfg, **kwargs):
        self.inplanes = 64
        extra = cfg.MODEL.EXTRA
        super(PoseHighResolutionNet, self).__init__()

        # stem net
        self.conv1 = nn.Conv2d(3, 64, kernel_size=3, stride=2, padding=1,
                               bias=False)
        self.bn1 = nn.BatchNorm2d(64, momentum=BN_MOMENTUM)
        self.conv2 = nn.Conv2d(64, 64, kernel_size=3, stride=2, padding=1,
                               bias=False)
        self.bn2 = nn.BatchNorm2d(64, momentum=BN_MOMENTUM)
        self.relu = nn.ReLU(inplace=True)
        self.layer1 = self._make_layer(Bottleneck, 64, 4)

        self.stage2_cfg = cfg['MODEL']['EXTRA']['STAGE2']
        num_channels = self.stage2_cfg['NUM_CHANNELS']
        block = blocks_dict[self.stage2_cfg['BLOCK']]
        num_channels = [
            num_channels[i] * block.expansion for i in range(len(num_channels))
        ]
        self.transition1 = self._make_transition_layer([256], num_channels)
        self.stage2, pre_stage_channels = self._make_stage(
            self.stage2_cfg, num_channels)

        self.stage3_cfg = cfg['MODEL']['EXTRA']['STAGE3']
        num_channels = self.stage3_cfg['NUM_CHANNELS']
        block = blocks_dict[self.stage3_cfg['BLOCK']]
        num_channels = [
            num_channels[i] * block.expansion for i in range(len(num_channels))
        ]
        self.transition2 = self._make_transition_layer(
            pre_stage_channels, num_channels)
        self.stage3, pre_stage_channels = self._make_stage(
            self.stage3_cfg, num_channels)

        self.stage4_cfg = cfg['MODEL']['EXTRA']['STAGE4']
        num_channels = self.stage4_cfg['NUM_CHANNELS']
        block = blocks_dict[self.stage4_cfg['BLOCK']]
        num_channels = [
            num_channels[i] * block.expansion for i in range(len(num_channels))
        ]
        self.transition3 = self._make_transition_layer(
            pre_stage_channels, num_channels)
        self.stage4, pre_stage_channels = self._make_stage(
            self.stage4_cfg, num_channels, multi_scale_output=False)

        self.final_layer = nn.Conv2d(
            in_channels=pre_stage_channels[0],
            out_channels=cfg.MODEL.NUM_JOINTS,
            kernel_size=extra.FINAL_CONV_KERNEL,
            stride=1,
            padding=1 if extra.FINAL_CONV_KERNEL == 3 else 0
        )

        self.pretrained_layers = cfg['MODEL']['EXTRA']['PRETRAINED_LAYERS']

    def _make_transition_layer(
            self, num_channels_pre_layer, num_channels_cur_layer):
        num_branches_cur = len(num_channels_cur_layer)
        num_branches_pre = len(num_channels_pre_layer)

        transition_layers = []
        for i in range(num_branches_cur):
            if i < num_branches_pre:
                if num_channels_cur_layer[i] != num_channels_pre_layer[i]:
                    transition_layers.append(
                        nn.Sequential(
                            nn.Conv2d(
                                num_channels_pre_layer[i],
                                num_channels_cur_layer[i],
                                3, 1, 1, bias=False
                            ),
                            nn.BatchNorm2d(num_channels_cur_layer[i]),
                            nn.ReLU(inplace=True)
                        )
                    )
                else:
                    transition_layers.append(None)
            else:
                conv3x3s = []
                for j in range(i+1-num_branches_pre):
                    inchannels = num_channels_pre_layer[-1]
                    outchannels = num_channels_cur_layer[i] \
                        if j == i-num_branches_pre else inchannels
                    conv3x3s.append(
                        nn.Sequential(
                            nn.Conv2d(
                                inchannels, outchannels, 3, 2, 1, bias=False
                            ),
                            nn.BatchNorm2d(outchannels),
                            nn.ReLU(inplace=True)
                        )
                    )
                transition_layers.append(nn.Sequential(*conv3x3s))

        return nn.ModuleList(transition_layers)

    def _make_layer(self, block, planes, blocks, stride=1):
        downsample = None
        if stride != 1 or self.inplanes != planes * block.expansion:
            downsample = nn.Sequential(
                nn.Conv2d(
                    self.inplanes, planes * block.expansion,
                    kernel_size=1, stride=stride, bias=False
                ),
                nn.BatchNorm2d(planes * block.expansion, momentum=BN_MOMENTUM),
            )

        layers = []
        layers.append(block(self.inplanes, planes, stride, downsample))
        self.inplanes = planes * block.expansion
        for i in range(1, blocks):
            layers.append(block(self.inplanes, planes))

        return nn.Sequential(*layers)

    def _make_stage(self, layer_config, num_inchannels,
                    multi_scale_output=True):
        num_modules = layer_config['NUM_MODULES']
        num_branches = layer_config['NUM_BRANCHES']
        num_blocks = layer_config['NUM_BLOCKS']
        num_channels = layer_config['NUM_CHANNELS']
        block = blocks_dict[layer_config['BLOCK']]
        fuse_method = layer_config['FUSE_METHOD']

        modules = []
        for i in range(num_modules):
            # multi_scale_output is only used last module
            if not multi_scale_output and i == num_modules - 1:
                reset_multi_scale_output = False
            else:
                reset_multi_scale_output = True

            modules.append(
                HighResolutionModule(
                    num_branches,
                    block,
                    num_blocks,
                    num_inchannels,
                    num_channels,
                    fuse_method,
                    reset_multi_scale_output
                )
            )
            num_inchannels = modules[-1].get_num_inchannels()

        return nn.Sequential(*modules), num_inchannels

    def forward(self, x):
        x = self.conv1(x)
        x = self.bn1(x)
        x = self.relu(x)
        x = self.conv2(x)
        x = self.bn2(x)
        x = self.relu(x)
        x = self.layer1(x)

        x_list = []
        for i in range(self.stage2_cfg['NUM_BRANCHES']):
            if self.transition1[i] is not None:
                x_list.append(self.transition1[i](x))
            else:
                x_list.append(x)
        y_list = self.stage2(x_list)

        x_list = []
        for i in range(self.stage3_cfg['NUM_BRANCHES']):
            if self.transition2[i] is not None:
                x_list.append(self.transition2[i](y_list[-1]))
            else:
                x_list.append(y_list[i])
        y_list = self.stage3(x_list)

        x_list = []
        for i in range(self.stage4_cfg['NUM_BRANCHES']):
            if self.transition3[i] is not None:
                x_list.append(self.transition3[i](y_list[-1]))
            else:
                x_list.append(y_list[i])
        y_list = self.stage4(x_list)

        x = self.final_layer(y_list[0])

        return x

    def init_weights(self, pretrained=''):
        logger.info('=> init weights from normal distribution')
        for m in self.modules():
            if isinstance(m, nn.Conv2d):
                # nn.init.kaiming_normal_(m.weight, mode='fan_out', nonlinearity='relu')
                nn.init.normal_(m.weight, std=0.001)
                for name, _ in m.named_parameters():
                    if name in ['bias']:
                        nn.init.constant_(m.bias, 0)
            elif isinstance(m, nn.BatchNorm2d):
                nn.init.constant_(m.weight, 1)
                nn.init.constant_(m.bias, 0)
            elif isinstance(m, nn.ConvTranspose2d):
                nn.init.normal_(m.weight, std=0.001)
                for name, _ in m.named_parameters():
                    if name in ['bias']:
                        nn.init.constant_(m.bias, 0)

        if os.path.isfile(pretrained):
            pretrained_state_dict = torch.load(pretrained)
            logger.info('=> loading pretrained model {}'.format(pretrained))

            need_init_state_dict = {}
            for name, m in pretrained_state_dict.items():
                if name.split('.')[0] in self.pretrained_layers \
                   or self.pretrained_layers[0] is '*':
                    need_init_state_dict[name] = m
            self.load_state_dict(need_init_state_dict, strict=False)
        elif pretrained:
            logger.error('=> please download pre-trained models first!')
            raise ValueError('{} is not exist!'.format(pretrained))


def get_pose_net(cfg, is_train, **kwargs):
    model = PoseHighResolutionNet(cfg, **kwargs)

    if is_train and cfg.MODEL.INIT_WEIGHTS:
        model.init_weights(cfg.MODEL.PRETRAINED)

    return model

  or self.pretrained_layers[0] is '*':


In [13]:
def flip_back(output_flipped, matched_parts):
    '''
    ouput_flipped: numpy.ndarray(batch_size, num_joints, height, width)
    '''
    assert output_flipped.ndim == 4,\
        'output_flipped should be [batch_size, num_joints, height, width]'

    output_flipped = output_flipped[:, :, :, ::-1]

    for pair in matched_parts:
        tmp = output_flipped[:, pair[0], :, :].copy()
        output_flipped[:, pair[0], :, :] = output_flipped[:, pair[1], :, :]
        output_flipped[:, pair[1], :, :] = tmp

    return output_flipped


def fliplr_joints(joints, joints_vis, width, matched_parts):
    """
    flip coords
    """
    # Flip horizontal
    joints[:, 0] = width - joints[:, 0] - 1

    # Change left-right parts
    for pair in matched_parts:
        joints[pair[0], :], joints[pair[1], :] = \
            joints[pair[1], :], joints[pair[0], :].copy()
        joints_vis[pair[0], :], joints_vis[pair[1], :] = \
            joints_vis[pair[1], :], joints_vis[pair[0], :].copy()

    return joints*joints_vis, joints_vis


def transform_preds(coords, center, scale, output_size):
    target_coords = np.zeros(coords.shape)
    trans = get_affine_transform(center, scale, 0, output_size, inv=1)
    for p in range(coords.shape[0]):
        target_coords[p, 0:2] = affine_transform(coords[p, 0:2], trans)
    return target_coords


def get_affine_transform(
        center, scale, rot, output_size,
        shift=np.array([0, 0], dtype=np.float32), inv=0
):
    if not isinstance(scale, np.ndarray) and not isinstance(scale, list):
        print(scale)
        scale = np.array([scale, scale])

    scale_tmp = scale * 200.0
    src_w = scale_tmp[0]
    dst_w = output_size[0]
    dst_h = output_size[1]

    rot_rad = np.pi * rot / 180

    src_dir = get_dir([0, (src_w-1) * -0.5], rot_rad)
    dst_dir = np.array([0, (dst_w-1) * -0.5], np.float32)
    src = np.zeros((3, 2), dtype=np.float32)
    dst = np.zeros((3, 2), dtype=np.float32)
    src[0, :] = center + scale_tmp * shift
    src[1, :] = center + src_dir + scale_tmp * shift
    dst[0, :] = [(dst_w-1) * 0.5, (dst_h-1) * 0.5]
    dst[1, :] = np.array([(dst_w-1) * 0.5, (dst_h-1) * 0.5]) + dst_dir

    src[2:, :] = get_3rd_point(src[0, :], src[1, :])
    dst[2:, :] = get_3rd_point(dst[0, :], dst[1, :])

    if inv:
        trans = cv2.getAffineTransform(np.float32(dst), np.float32(src))
    else:
        trans = cv2.getAffineTransform(np.float32(src), np.float32(dst))

    return trans


def affine_transform(pt, t):
    new_pt = np.array([pt[0], pt[1], 1.]).T
    new_pt = np.dot(t, new_pt)
    return new_pt[:2]


def get_3rd_point(a, b):
    direct = a - b
    return b + np.array([-direct[1], direct[0]], dtype=np.float32)


def get_dir(src_point, rot_rad):
    sn, cs = np.sin(rot_rad), np.cos(rot_rad)

    src_result = [0, 0]
    src_result[0] = src_point[0] * cs - src_point[1] * sn
    src_result[1] = src_point[0] * sn + src_point[1] * cs

    return src_result


def crop(img, center, scale, output_size, rot=0):
    trans = get_affine_transform(center, scale, rot, output_size)

    dst_img = cv2.warpAffine(
        img, trans, (int(output_size[0]), int(output_size[1])),
        flags=cv2.INTER_LINEAR
    )

    return dst_img
def get_max_preds(batch_heatmaps):
    '''
    get predictions from score maps
    heatmaps: numpy.ndarray([batch_size, num_joints, height, width])
    '''
    assert isinstance(batch_heatmaps, np.ndarray), \
        'batch_heatmaps should be numpy.ndarray'
    assert batch_heatmaps.ndim == 4, 'batch_images should be 4-ndim'

    batch_size = batch_heatmaps.shape[0]
    num_joints = batch_heatmaps.shape[1]
    width = batch_heatmaps.shape[3]
    heatmaps_reshaped = batch_heatmaps.reshape((batch_size, num_joints, -1))
    idx = np.argmax(heatmaps_reshaped, 2)
    maxvals = np.amax(heatmaps_reshaped, 2)

    maxvals = maxvals.reshape((batch_size, num_joints, 1))
    idx = idx.reshape((batch_size, num_joints, 1))

    preds = np.tile(idx, (1, 1, 2)).astype(np.float32)

    preds[:, :, 0] = (preds[:, :, 0]) % width
    preds[:, :, 1] = np.floor((preds[:, :, 1]) / width)

    pred_mask = np.tile(np.greater(maxvals, 0.0), (1, 1, 2))
    pred_mask = pred_mask.astype(np.float32)

    preds *= pred_mask
    return preds, maxvals


def taylor(hm, coord):
    heatmap_height = hm.shape[0]
    heatmap_width = hm.shape[1]
    px = int(coord[0])
    py = int(coord[1])
    if 1 < px < heatmap_width-2 and 1 < py < heatmap_height-2:
        dx  = 0.5 * (hm[py][px+1] - hm[py][px-1])
        dy  = 0.5 * (hm[py+1][px] - hm[py-1][px])
        dxx = 0.25 * (hm[py][px+2] - 2 * hm[py][px] + hm[py][px-2])
        dxy = 0.25 * (hm[py+1][px+1] - hm[py-1][px+1] - hm[py+1][px-1] \
            + hm[py-1][px-1])
        dyy = 0.25 * (hm[py+2*1][px] - 2 * hm[py][px] + hm[py-2*1][px])
        derivative = np.matrix([[dx],[dy]])
        hessian = np.matrix([[dxx,dxy],[dxy,dyy]])
        if dxx * dyy - dxy ** 2 != 0:
            hessianinv = hessian.I
            offset = -hessianinv * derivative
            offset = np.squeeze(np.array(offset.T), axis=0)
            coord += offset
    return coord


def gaussian_blur(hm, kernel):
    border = (kernel - 1) // 2
    batch_size = hm.shape[0]
    num_joints = hm.shape[1]
    height = hm.shape[2]
    width = hm.shape[3]
    for i in range(batch_size):
        for j in range(num_joints):
            origin_max = np.max(hm[i,j])
            dr = np.zeros((height + 2 * border, width + 2 * border))
            dr[border: -border, border: -border] = hm[i,j].copy()
            dr = cv2.GaussianBlur(dr, (kernel, kernel), 0)
            hm[i,j] = dr[border: -border, border: -border].copy()
            hm[i,j] *= origin_max / np.max(hm[i,j])
    return hm


def get_final_preds(config, hm, center, scale):
    coords, maxvals = get_max_preds(hm)
    heatmap_height = hm.shape[2]
    heatmap_width = hm.shape[3]

    # post-processing
    hm = gaussian_blur(hm, config.TEST.BLUR_KERNEL)
    hm = np.maximum(hm, 1e-10)
    hm = np.log(hm)
    for n in range(coords.shape[0]):
        for p in range(coords.shape[1]):
            coords[n,p] = taylor(hm[n][p], coords[n][p])

    preds = coords.copy()

    # Transform back
    for i in range(coords.shape[0]):
        preds[i] = transform_preds(
            coords[i], center[i], scale[i], [heatmap_width, heatmap_height]
        )

    return preds, maxvals

In [14]:
# function definition
def cosine_similarity(a, b):
  cos = ( np.dot(a, b) / 
          (np.linalg.norm(a) * np.linalg.norm(b)) )
  if cos < -1:
    return -1
  elif cos > 1:
    return 1
  else:
    return cos


def angle_degree(a, b):
  angle = (np.arccos(cosine_similarity(a, b)) / 
          np.pi) * 180
  if angle > 180:
    return 180.
  if angle < 0:
    return 0
  else:
    return angle

# @parameter
# part: contain 2 joint coordinate [[x0, y0], [x1, y1]]
def return_vector(part_id, skeleton):
  point_a = skeleton[part_id[0]]
  point_b = skeleton[part_id[1]]
  return point_a - point_b

def is_in_range(num, range1, range2):
  ceil = max(range1, range2)
  floor = min(range1, range2)
  return (num >= floor and num <= ceil)

def find_pose_id(ske_pred, keyparts, stages):
  assert ske_pred.shape == (NUM_KPTS, 2)
  avg_angle = 0
  for keypart in keyparts:
    part_a, part_b = keypart
    vec_a = return_vector(SKELETON[part_a], ske_pred)
    vec_b = return_vector(SKELETON[part_b], ske_pred)

    if sum(vec_a == 0) == 2 or sum(vec_b == 0) == 2:
      return -1
    
    pred_angle = angle_degree(vec_a, vec_b)
    avg_angle += pred_angle

  avg_angle /= len(keyparts)
  for i, stage in enumerate(stages):
    if is_in_range(avg_angle, stage[0], stage[1]):
      return i

def compare_skeleton(ske_true, ske_pred, excer_part, threshold= 10):
  assert ske_true.shape == (NUM_KPTS, 2)
  assert ske_pred.shape == (NUM_KPTS, 2)
  part_flag = []
  for part in excer_part:
    kpt_a, kpt_b = SKELETON[part]
    true_vec = ske_true[kpt_a] - ske_true[kpt_b]
    pred_vec = ske_pred[kpt_a] - ske_pred[kpt_b]
    angle = angle_degree(true_vec, pred_vec)
    part_flag.append(True if angle <= threshold else False)
  return part_flag

def draw_pose(keypoints, part_flag, excer_part, img, joint_thickness=6):
  assert keypoints.shape == (NUM_KPTS, 2)
  for i, part in enumerate(excer_part):
    kpt_a, kpt_b = SKELETON[part]
    c = COLOR["blue"] if part_flag[i] else COLOR["red"]
    x_a, y_a = keypoints[kpt_a]
    x_b, y_b = keypoints[kpt_b]
    cv2.circle(img, (int(x_a), int(y_a)), joint_thickness, COLOR["green"], -1)
    cv2.circle(img, (int(x_b), int(y_b)), joint_thickness, COLOR["green"], -1)
    cv2.line(img, (int(x_a), int(y_a)), (int(x_b), int(y_b)), c, 2)

def get_pose_estimation_prediction(pose_model, image, center, scale):
  scale = np.array([scale, scale])
  rotation = 0

  # pose estimation transformation
  trans = get_affine_transform(center, scale, rotation, cfg.MODEL.IMAGE_SIZE)
  model_input = cv2.warpAffine(
    image,
    trans,
    (int(cfg.MODEL.IMAGE_SIZE[0]), int(cfg.MODEL.IMAGE_SIZE[1])),
    flags=cv2.INTER_LINEAR)
  transform = transforms.Compose([
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.485, 0.456, 0.406],
                          std=[0.229, 0.224, 0.225]),
  ])

  # pose estimation inference
  model_input = transform(model_input).unsqueeze(0)
  # switch to evaluate mode
  pose_model.eval()
  with torch.no_grad():
    # compute output heatmap
    output = pose_model(model_input)
    preds, _ = get_final_preds(
      cfg,
      output.clone().cpu().numpy(),
      np.asarray([center]),
      np.asarray([scale]))

    return preds

def calculate_center_scale(box):
  x1, y1, x2, y2 = box
  center_x = (x2 + x1) / 2
  center_y = (y2 + y1) / 2
  width = x2 - x1
  height = y2 - y1
  
  center = np.array([center_x, center_y], dtype=np.float32)
  scale = max(width, height) / 200

  return center, scale

def get_person_box(model, img):
  results = model(img)
  
  data = results.xyxy[0].cpu().numpy()
    
  if len(data) == 0:
    return np.array([-1])

  max_index = np.argmax(data[:, 4])
  max_element = data[max_index]

  center, scale = calculate_center_scale(max_element[:4])

  return center, scale
    
def load_yolo(version):
  yolo = torch.hub.load('ultralytics/yolov5', version, 
                        pretrained= True, _verbose= False)
  yolo.cuda()
  yolo.classes = [0]
  return yolo

def load_hrnet(cfg):
  pose_model = get_pose_net(cfg, is_train=False)
    
  print(cfg.TEST.MODEL_FILE)
  if cfg.TEST.MODEL_FILE:
    print('=> loading model from {}'.format(cfg.TEST.MODEL_FILE))
    pose_model.load_state_dict(torch.load(cfg.TEST.MODEL_FILE), strict=False)
  else:
    print('expected model defined in config at TEST.MODEL_FILE')

  pose_model = torch.nn.DataParallel(pose_model, device_ids=cfg.GPUS)
  pose_model.to(CTX)
  pose_model.eval()

  return pose_model

def load_true_poses(path):
  joints_path = os.path.join(path, "joints.csv")
  
  img_path = os.path.join(path, "images")
  
  img_paths = os.listdir(img_path)
  img_paths.sort(key= lambda x: int(x[0]))
  imgs = [cv2.imread(os.path.join(img_path, p)) for p in img_paths]
  skes = pd.read_csv(joints_path).to_numpy().reshape(len(imgs), NUM_KPTS, 2)
  return imgs, skes

# Demo

In [15]:
def video_test(video_path, output_path, pose_path, angle_threshold=5):
  box_model = load_yolo("yolov5s")
  pose_model = load_hrnet(cfg)
  true_pose_images, true_pose_skes = load_true_poses(pose_path)

  video_cap = cv2.VideoCapture(video_path)
  if not video_cap.isOpened():
    print("Cannot open this video.")
    return

  frame_width = int(video_cap.get(cv2.CAP_PROP_FRAME_WIDTH))
  frame_height = int(video_cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
  fps = 24.0
  video_out = cv2.VideoWriter(output_path, cv2.VideoWriter_fourcc(*'mp4v'), 
                        fps, (frame_width * 2, frame_height))
  count = 0
  while True:
    pose_id = 0
      
    ret, image_bgr = video_cap.read()
    if not ret:
      break
    
    image = cv2.cvtColor(image_bgr, cv2.COLOR_BGR2RGB)
    center, scale = get_person_box(box_model, image)

    if center[0] == -1:
     continue

    image_pose = image.copy() if cfg.DATASET.COLOR_RGB else image_bgr.copy()
    pose_pred = get_pose_estimation_prediction(pose_model, image_pose, center, scale)[0]
    pose_id = find_pose_id(pose_pred, SQUAT_KEYPART, STAGE_ANGLE)
    if pose_id == -1:
      continue
    
    true_pose_image = true_pose_images[pose_id]
    true_skeleton = true_pose_skes[pose_id]
    part_flag = compare_skeleton(true_skeleton, pose_pred, SQUAT_PART, threshold= angle_threshold)
    draw_pose(pose_pred, part_flag, SQUAT_PART, image_bgr)
    
    true_pose_resized = cv2.resize(true_pose_image, (frame_width, frame_height))
    combined_frame = np.hstack((image_bgr, true_pose_resized))
    video_out.write(combined_frame)
    
    count += 1
    if count % 24 == 0:
      print(count)
  
  video_cap.release()
  video_out.release()
  print("Video processing complete. Output saved to", output_path)

In [16]:
input_video = "/kaggle/input/hrpose-input/tue.mp4"
output_video = "output_video.mp4"
squat_dir = "/kaggle/input/hrpose-input/squat"
video_test(input_video, output_video, squat_dir)

Using cache found in /root/.cache/torch/hub/ultralytics_yolov5_master


/kaggle/input/darkpose/w32_256×256.pth
=> loading model from /kaggle/input/darkpose/w32_256×256.pth
24
48
72
96
120
144
168
192
216
240
264
288
312
336
360
384
408
432
456
480
504
528
552
576
600
624
648
672
696
720
Video processing complete. Output saved to output_video.mp4
