<a href="https://www.kaggle.com/code/marlonmalheiros/simple-baseline-for-estimation?scriptVersionId=221129393" target="_blank"><img align="left" alt="Kaggle" title="Open in Kaggle" src="https://kaggle.com/static/images/open-in-kaggle.svg"></a>

# 1 - Config

In [6]:
# 1.1: Basic Notebook Configuration
# - Automatic reload of modules
# - Inline plotting

# %reload_ext autoreload
# %autoreload 2
# %matplotlib inline

import os
import json
import cv2
import random
import numpy as np
import matplotlib.pyplot as plt

import torch
import torch.nn as nn
import torch.optim as optim
from torch.cuda.amp import autocast, GradScaler
from torch.utils.data import Dataset, DataLoader
from torchvision import transforms
from torchvision.models.resnet import resnet50  # We'll adapt the ResNet backbone
from torch.utils.data import random_split, DataLoader

import torch.backends.cudnn as cudnn

import time

cudnn.benchmark = True

# If you have a custom model file:
# from my_models import SimpleBaselinePose  # Example name for your pose model
# 2.1: Define key parameters

# Data paths
COCO_IMAGES_ROOT = r"/kaggle/input/coco-2017-dataset/coco2017/train2017"      # e.g., COCO train2017 images
COCO_ANNOTATIONS = r"/kaggle/input/coco-2017-dataset/coco2017/annotations/person_keypoints_train2017.json" # e.g., COCO annotations

# Val files for quick tests
# COCO_IMAGES_ROOT = r"C:\Users\Marlon\Downloads\val2017"      # e.g., COCO train2017 images
# COCO_ANNOTATIONS = r"c:\Users\Marlon\Downloads\annotations\person_keypoints_val2017.json" # e.g., COCO annotations

# Training details from the paper
IMG_WIDTH = 192
IMG_HEIGHT = 256
ASPECT_RATIO = (4, 3)    # height : width
SCALE_AUG = 0.3          # ±30%
ROTATION_AUG = 40.0      # ±40 degrees
FLIP_PROB = 0.5          # 50% horizontal flip

# Optimizer parameters
BASE_LR = 1e-3 # Authors use 1, im tweaking to increase speed
LR_MILESTONES = [50, 75]    # epochs at which LR drops to 1e-4, then 1e-5, original [90, 120]
NUM_EPOCHS = 100 # Original 140
BATCH_SIZE = 256 # Authors use 128

# Number of keypoints in COCO is 17
NUM_KEYPOINTS = 17

# GPU usage
# DEVICE = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
NUM_WORKERS = 4  # For DataLoader

# 2 - Class dataset

In [7]:
class COCOPoseDataset(Dataset):
    def __init__(self, annotations_json, images_root, 
                 transform=None, input_size=(256,192), 
                 scale_aug=0.3, rotation_aug=40, flip_prob=0.5,
                 min_box_side=4):
        
        """
        annotations_json: Path to a COCO-format JSON that includes 'images' and 'annotations'
        images_root: The directory with the actual .jpg files
        min_box_side: The minimum bounding box dimension (width or height) after aspect ratio correction.
                      If it's below this value, we skip that annotation.
        """
        
        self.images_root = images_root
        self.transform = transform
        self.input_size = input_size  # (height=256, width=192)
        self.scale_aug = scale_aug
        self.rotation_aug = rotation_aug
        self.flip_prob = flip_prob
        
        self.num_keypoints = 17  # COCO
        self.sigma = 2  # for generating heatmaps
        self.min_box_side = min_box_side
        
        # 1. Load and filter the annotations in init:
        self.annotations = self._load_and_filter_coco(annotations_json)
        
    def __len__(self):
        return len(self.annotations)

    def __getitem__(self, idx):
        ann = self.annotations[idx]
        image_path = os.path.join(self.images_root, ann["file_name"])
        
        img = cv2.imread(image_path, cv2.IMREAD_COLOR)
        img = cv2.cvtColor(img, cv2.COLOR_BGR2RGB)

        # Bbox is already aspect-ratio adjusted and guaranteed to be >= min_box_side
        # according to our init filtering.
        bbox = ann["bbox"]  # [x, y, w, h], 4:3, clipped
        x, y, w, h = bbox
        
        # -- Crop --
        cropped = img[int(y):int(y+h), int(x):int(x+w)]
        
        # -- Keypoints: convert to local coords within the crop
        keypoints_local = self.extract_local_keypoints(ann['keypoints'], bbox)
        
        # -- Apply data augmentation (scale, rotate, flip)
        cropped_aug, keypoints_aug = self.apply_augmentation_with_keypoints(
            cropped, keypoints_local
        )

        # -- Resize to final network input size (256×192)
        final_img = cv2.resize(cropped_aug, (self.input_size[1], self.input_size[0]))
        
        # -- Scale the keypoints from (cropped_aug.shape) to (256×192)
        H_aug, W_aug = cropped_aug.shape[:2]
        scale_x = self.input_size[1] / W_aug
        scale_y = self.input_size[0] / H_aug
        
        # Now we generate 1/4-scale heatmaps => 64×48
        keypoints_scaled = []
        for (kx, ky, v) in keypoints_aug:
            # First scale up to (256,192)
            new_x = kx * scale_x
            new_y = ky * scale_y
            # Then scale down to (64,48)
            final_x = new_x * (48 / 192)
            final_y = new_y * (64 / 256)
            keypoints_scaled.append((final_x, final_y, v))

        heatmaps = self.generate_heatmaps(keypoints_scaled, (64,48))

        # Convert to torch tensor
        final_img = torch.from_numpy(final_img).permute(2,0,1).float()
        heatmaps = torch.from_numpy(heatmaps).float()

        if self.transform:
            final_img = self.transform(final_img)

        return final_img, heatmaps

    # ----------------------------------------------------------------
    # HELPER METHODS
    # ----------------------------------------------------------------
    def _load_and_filter_coco(self, annotations_json):
        """
        1. Parse the COCO JSON
        2. Build a map of image_id -> (file_name, width, height)
        3. For each annotation (person, iscrowd=0, 17 keypoints),
           adjust bounding box to 4:3, clip to image edges,
           and check if it's >= min_box_side in width/height.
           If valid, store annotation in self.annotations.
        """
        with open(annotations_json, 'r') as f:
            coco_data = json.load(f)
        
        # image_id -> (file_name, width, height)
        image_dict = {}
        for img_info in coco_data["images"]:
            image_dict[img_info["id"]] = {
                "file_name": img_info["file_name"],
                "width":     img_info["width"],
                "height":    img_info["height"]
            }

        filtered_anns = []
        for ann in coco_data["annotations"]:
            # Check basic conditions
            if (ann["category_id"] != 1 or
                ann["iscrowd"] != 0 or
                "keypoints" not in ann or
                len(ann["keypoints"]) != 51):
                continue

            num_visible = sum(v > 0 for v in ann["keypoints"][2::3])
            if num_visible < 5:
                continue


            img_id = ann["image_id"]
            img_info = image_dict[img_id]
            orig_bbox = ann["bbox"]  # [x,y,w,h] in COCO coords
            
            # Adjust to 4:3 ratio, clip to boundaries
            adj_bbox = self.adjust_aspect_ratio(orig_bbox, 
                                                img_info["width"], 
                                                img_info["height"])
            x, y, w, h = adj_bbox
            
            # Skip if it is too small
            if w < self.min_box_side or h < self.min_box_side:
                continue
            
            # Keep it
            filtered_anns.append({
                "file_name": img_info["file_name"],
                "bbox": [x, y, w, h],
                "keypoints": ann["keypoints"]
            })
        
        print(f"Filtered annotations: from {len(coco_data['annotations'])} down to {len(filtered_anns)}")
        return filtered_anns

    def adjust_aspect_ratio(self, bbox, img_w, img_h, target_ar=4/3):
        """Extend or shrink bbox to 4:3, then clip to image boundaries."""
        x, y, w, h = bbox
        current_ar = h / w
        
        if current_ar < target_ar:
            # Extend height
            new_h = w * target_ar
            center_y = y + h/2
            y = center_y - new_h/2
            h = new_h
        else:
            # Extend width
            new_w = h / target_ar
            center_x = x + w/2
            x = center_x - new_w/2
            w = new_w
        
        # Clip
        x = max(0, x)
        y = max(0, y)
        w = min(img_w - x, w)
        h = min(img_h - y, h)
        
        return [x, y, w, h]
    
    def extract_local_keypoints(self, keypoints_global, bbox):
        """
        Convert absolute keypoints [x,y,v] to local coords 
        relative to the top-left corner of the bounding box.
        """
        x0, y0, w, h = bbox
        keypoints_local = []
        for i in range(self.num_keypoints):
            gx = keypoints_global[3*i]
            gy = keypoints_global[3*i + 1]
            v  = keypoints_global[3*i + 2]  # visibility
            local_x = gx - x0
            local_y = gy - y0
            keypoints_local.append((local_x, local_y, v))
        return keypoints_local

    def apply_augmentation_with_keypoints(self, cropped_img, keypoints_local):
        """
        Apply random scale, rotation, and horizontal flip to 
        both the cropped image and the local keypoints.
        """
        H, W = cropped_img.shape[:2]
        
        # 1) Random scale
        scale_factor = 1.0 + random.uniform(-self.scale_aug, self.scale_aug)
        new_W = int(W * scale_factor)
        new_H = int(H * scale_factor)
        new_W = max(1, new_W)  # avoid zero dimension
        new_H = max(1, new_H)

        resized_img = cv2.resize(cropped_img, (new_W, new_H))
        kp_scaled = []
        for (kx, ky, v) in keypoints_local:
            kp_scaled.append((kx * scale_factor, ky * scale_factor, v))

        # 2) Random rotation
        angle = random.uniform(-self.rotation_aug, self.rotation_aug)
        center = (new_W / 2, new_H / 2)
        M = cv2.getRotationMatrix2D(center, angle, 1.0)
        rotated_img = cv2.warpAffine(resized_img, M, (new_W, new_H))

        kp_rotated = []
        for (kx, ky, v) in kp_scaled:
            px = M[0,0]*kx + M[0,1]*ky + M[0,2]
            py = M[1,0]*kx + M[1,1]*ky + M[1,2]
            kp_rotated.append((px, py, v))

        # 3) Random horizontal flip
        final_img = rotated_img
        kp_final = kp_rotated
        if random.random() < self.flip_prob:
            final_img = cv2.flip(rotated_img, 1)
            kp_flipped = []
            for (kx, ky, v) in kp_rotated:
                flipped_x = (new_W - 1) - kx
                kp_flipped.append((flipped_x, ky, v))
            kp_final = kp_flipped
        
        return final_img, kp_final

    def generate_heatmaps(self, keypoints, heatmap_size):
        """
        Create 'num_keypoints' heatmaps of shape (heatmap_size[0], heatmap_size[1]).
        e.g., (64,48).
        """
        H, W = heatmap_size
        heatmaps = np.zeros((self.num_keypoints, H, W), dtype=np.float32)

        for k_id in range(self.num_keypoints):
            kx, ky, v = keypoints[k_id]
            if v > 0:  # visible or labeled
                cx = int(round(kx))
                cy = int(round(ky))
                self._draw_gaussian(heatmaps[k_id], (cx, cy), sigma=self.sigma)
        return heatmaps

    def _draw_gaussian(self, heatmap, center, sigma=2):
        x, y = center
        H, W = heatmap.shape
        size = int(3 * sigma)
        x1, x2 = x - size, x + size + 1
        y1, y2 = y - size, y + size + 1
        
        xx, yy = np.meshgrid(np.arange(x1, x2), np.arange(y1, y2))
        gaussian = np.exp(-((xx - x)**2 + (yy - y)**2)/(2*sigma**2))

        x1c, x2c = max(0, x1), min(W, x2)
        y1c, y2c = max(0, y1), min(H, y2)

        if x2c <= x1c or y2c <= y1c:
            return
        
        heatmap_section = heatmap[y1c:y2c, x1c:x2c]
        gaussian_section = gaussian[(y1c - y1):(y2c - y1), (x1c - x1):(x2c - x1)]
        np.maximum(heatmap_section, gaussian_section, out=heatmap_section)


## Class check

In [8]:
# # 4.1: Create the dataset and dataloader

# train_dataset = COCOPoseDataset(
#     annotations_json=COCO_ANNOTATIONS, 
#     images_root=COCO_IMAGES_ROOT,
#     input_size=(IMG_HEIGHT, IMG_WIDTH), 
#     scale_aug=SCALE_AUG, 
#     rotation_aug=ROTATION_AUG, 
#     flip_prob=FLIP_PROB,
#     transform=None
# )

# train_loader = DataLoader(train_dataset, 
#                           batch_size=4, 
#                           shuffle=True, 
#                           )

# # 4.2: Visualize a few samples
# def visualize_sample(img_tensor, heatmaps_1_4):
#     """
#     img_tensor: shape (3, 256, 192)
#     heatmaps_1_4: shape (17, 64, 48)
#     """

#     # Convert the input image to NumPy for plotting
#     img_np = img_tensor.permute(1,2,0).numpy().astype(np.uint8)

#     # Summation of all keypoint heatmaps (shape: 64×48)
#     heatmap_sum_small = heatmaps_1_4.numpy().sum(axis=0)  # shape (64, 48)

#     # Upsample the heatmap_sum to 256×192
#     heatmap_sum_large = cv2.resize(
#         heatmap_sum_small, 
#         (img_np.shape[1], img_np.shape[0]),  # (width=192, height=256)
#         interpolation=cv2.INTER_LINEAR
#     )
    
#     fig, ax = plt.subplots(1, 2, figsize=(10,5))
#     ax[0].imshow(img_np)
#     ax[0].set_title("Augmented Image (256×192)")

#     ax[1].imshow(img_np, alpha=0.6)
#     ax[1].imshow(heatmap_sum_large, cmap='jet', alpha=0.5)
#     ax[1].set_title("Overlay: Summed Heatmap Upsampled")
#     plt.show()


# sample_iter = iter(train_loader)
# images, hmaps = next(sample_iter)  # shape: (B, 3, 256, 192), (B, 17, 256, 192)

# for i in range(images.size(0)):
#     visualize_sample(images[i], hmaps[i])


# 3 - Model

In [9]:
class SimpleBaselinePose(nn.Module):
    def __init__(self, backbone='resnet50', num_keypoints=17):
        super().__init__()
        # 5.1: Load a ResNet-50 (optionally from torchvision)
        resnet = resnet50(pretrained=True)
        
        # Remove the classification head (fc layer)
        modules = list(resnet.children())[:-2]  # keep up to layer4's output
        self.backbone = nn.Sequential(*modules)
        
        # 5.2: Add deconvolution layers
        # The paper typically uses 3 deconv layers, each with 256 filters, kernel=4, stride=2
        # Output feature map should match (H/4, W/4) upsampled by 2^3=8, leading to HxW final
        # for input resolution 256x192, the final feature map is also 64x48 in the backbone
        # so 3 deconv layers of stride 2 will get us back to 256x192
        self.deconv_layers = self._make_deconv_layer(3, 256)
        
        # 5.3: Final conv layer to predict heatmaps
        self.final_layer = nn.Conv2d(
            in_channels=256,
            out_channels=num_keypoints,
            kernel_size=1,
            stride=1,
            padding=0
        )

    def _make_deconv_layer(self, num_layers, num_filters):
        layers = []
        for i in range(num_layers):
            # Deconv
            layers.append(
                nn.ConvTranspose2d(
                    in_channels = 2048 if i==0 else num_filters, 
                    out_channels= num_filters,
                    kernel_size=4,
                    stride=2,
                    padding=1,
                    output_padding=0,
                    bias=False
                )
            )
            layers.append(nn.BatchNorm2d(num_filters))
            layers.append(nn.ReLU(inplace=True))
        return nn.Sequential(*layers)

    def forward(self, x):
        x = self.backbone(x)       # shape: (batch, 2048, H/32, W/32) for ResNet50
        x = self.deconv_layers(x)  # upsample to (batch, 256, H/4, W/4)
        x = self.final_layer(x)    # shape: (batch, 17, H/4, W/4) or possibly H, W if using 3 upsampling layers
        return x


# 4 - Loss

In [11]:
# criterion = nn.MSELoss().to(DEVICE)
criterion = nn.MSELoss().cuda()

# 5 - Model, Optimizer and LR

In [13]:
# model = SimpleBaselinePose(backbone='resnet50', num_keypoints=NUM_KEYPOINTS).to(DEVICE)
model = SimpleBaselinePose(backbone='resnet50', num_keypoints=NUM_KEYPOINTS)

model = model.cuda()

# If multiple GPUs are detected, wrap with DataParallel
if torch.cuda.device_count() > 1:
    print(f"Using {torch.cuda.device_count()} GPUs!")
    model = nn.DataParallel(model)

optimizer = optim.Adam(model.parameters(), lr=BASE_LR)

# MultiStepLR will drop LR by 10 at each milestone
lr_scheduler = optim.lr_scheduler.MultiStepLR(optimizer, milestones=LR_MILESTONES, gamma=0.1)

Using 2 GPUs!


# 6 - Training loop

In [None]:
train_dataset = COCOPoseDataset(
    annotations_json=COCO_ANNOTATIONS, 
    images_root=COCO_IMAGES_ROOT,
    input_size=(IMG_HEIGHT, IMG_WIDTH), 
    scale_aug=SCALE_AUG, 
    rotation_aug=ROTATION_AUG, 
    flip_prob=FLIP_PROB,
    transform=None
)

scaler = GradScaler()

# 8.1: Create DataLoader for training, validation
# Let's say we want to split 90% for training, 10% for validation
train_size = int(0.9 * len(train_dataset))
val_size = len(train_dataset) - train_size

# random_split will produce two Subset objects
train_subset, val_subset = random_split(train_dataset, [train_size, val_size])

print(f"Train subset size: {len(train_subset)}")
print(f"Val subset size:   {len(val_subset)}")

# Create DataLoaders
train_loader = DataLoader(train_subset, batch_size=BATCH_SIZE, shuffle=True, num_workers=4, pin_memory=True, persistent_workers=True, prefetch_factor=8)
val_loader   = DataLoader(val_subset,   batch_size=BATCH_SIZE, shuffle=False, num_workers=4, pin_memory=True, persistent_workers=True, prefetch_factor=8)

# For quick local test only
# train_loader = DataLoader(train_subset, batch_size=8, shuffle=True)
# val_loader   = DataLoader(val_subset,   batch_size=8, shuffle=False)

# 8.2: Training Loop
best_val_loss = float('inf')

num_epochs = 140
# NUM_EPOCHS
for epoch in range(2):
    # ---------------------------
    # Training
    # ---------------------------
    model.train()
    start_time = time.time()
    running_loss = 0.0
    
    for images, heatmaps in train_loader:
        # images = images.to(DEVICE)
        # heatmaps = heatmaps.to(DEVICE)
        images = images.cuda()     # put the entire batch on GPU:0
        heatmaps = heatmaps.cuda() # same
        
        optimizer.zero_grad()

        with torch.autocast(device_type='cuda', dtype=torch.float16):
            outputs = model(images)
            loss = criterion(outputs, heatmaps)        
        
        # Scale the loss before backprop
        scaler.scale(loss).backward()
        # Update parameters
        scaler.step(optimizer)
        # Adjust the scaling factor
        scaler.update()
        
        running_loss += loss.item() * images.size(0)
    
    epoch_train_loss = running_loss / len(train_loader.dataset)
    
    # ---------------------------
    # Validation
    # ---------------------------
    model.eval()
    val_running_loss = 0.0
    
    with torch.no_grad():
        for images, heatmaps in val_loader:
            # images = images.to(DEVICE)
            # heatmaps = heatmaps.to(DEVICE)
            images = images.cuda()     # put the entire batch on GPU:0
            heatmaps = heatmaps.cuda() # same

            outputs = model(images)
            val_loss = criterion(outputs, heatmaps)
            
            val_running_loss += val_loss.item() * images.size(0)
    
    epoch_val_loss = val_running_loss / len(val_loader.dataset)
    
    # Update LR
    lr_scheduler.step()

    # Elapsed time for this epoch
    elapsed_time = time.time() - start_time
    
    # Track best model
    if epoch_val_loss < best_val_loss:
        best_val_loss = epoch_val_loss
        torch.save(model.state_dict(), "best_model.pth")
    
    # Print stats
    print(f"Epoch {epoch+1}/{num_epochs} "
          f"Train Loss: {epoch_train_loss:.6f} "
          f"Val Loss: {epoch_val_loss:.6f} "
          f"Time: {elapsed_time:.2f} sec")

Filtered annotations: from 262465 down to 134214
Train subset size: 120792
Val subset size:   13422


  scaler = GradScaler()
