In [15]:
import torch  
import json
import cv2
import torch.nn as nn
import numpy as np
from torch.optim import Adam
from torch.nn import MSELoss, Linear
from torch.utils.data import Dataset, DataLoader
from torchvision import transforms, models
from tqdm.notebook import tqdm
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")


In [16]:
# Convert training data to images and keypts (arr) so it can go through resnet.

class KeyPtsDataset(Dataset):
    def __init__(self, img_dir, data_file):
        self.img_dire = img_dir
        with open(data_file) as f:
            self.data = json.load(f)
        self.transforms = transforms.Compose(
            [
                transforms.ToPILImage(),
                transforms.Resize((224, 224)),
                transforms.ToTensor(),
                transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])
            ]
        )
    
    def __len__(self): return len(self.data)

    def __getitem__(self, idx):
        item = self.data[idx]
        img = cv2.imread(f"{self.img_dire}/{item['id']}.png")
        h, w = img.shape[:2]
        img = cv2.cvtColor(img, cv2.COLOR_BGR2RGB)
        img = self.transforms(img)
        kps = np.array(item['kps']).flatten().astype(np.float32)
        kps[::2] = 224.0 / w # adjust x
        kps[1::2] = 224.0/h # adjust y
        return img, kps


In [None]:
train_dataset, val_dataset = KeyPtsDataset('data/images', 'data/data_train.json'), KeyPtsDataset('data/images', 'data/data_val.json')
train_loader = DataLoader(train_dataset, batch_size=8, shuffle=True)
val_loader = DataLoader(val_dataset, batch_size=8, shuffle=True)

In [None]:
model = models.resnet50(pretrained=True)
model.fc = Linear(model.fc.in_features, 14 * 2) # replace last layer: fine tuning (custom output size 14 keypts * 2 (ht and width))
loss_fn = MSELoss()
optim = Adam(model.parameters(), lr=1e-4)
# Training
epochs=20
for epoch in range(epochs):
    with tqdm(train_loader, unit='batch', desc=f"Epoch {epoch+1}/{epochs}") as t_epoch:
        for i, (img, kps) in enumerate(t_epoch):
            img, kps = img.to(device), kps.to(device)
            optim.zero_grad()
            outputs = model(img)
            loss = loss_fn(outputs, kps)
            loss.backward()
            optim.step()
            t_epoch.set_postfix(loss=loss.item())

torch.save(model.state_dict(), "keypointsModel.pth")

In [None]:
# Custom?
# 
class CustomResNet50(nn.Module):
    def __init__(self, num_keypoints=14):
        super().__init__()
        
        # Custom block definition
        def conv_block(in_channels, out_channels, stride=1):
            return nn.Sequential(
                nn.Conv2d(in_channels, out_channels, kernel_size=3, stride=stride, padding=1, bias=False),
                nn.BatchNorm2d(out_channels),
                nn.ReLU(inplace=True),
                nn.Conv2d(out_channels, out_channels, kernel_size=3, stride=1, padding=1, bias=False),
                nn.BatchNorm2d(out_channels)
            )
        
        # Initial layers
        self.conv1 = nn.Conv2d(3, 64, kernel_size=7, stride=2, padding=3, bias=False)
        self.bn1 = nn.BatchNorm2d(64)
        self.relu = nn.ReLU(inplace=True)
        self.maxpool = nn.MaxPool2d(kernel_size=3, stride=2, padding=1)
        
        # Stages
        self.stage1 = self._make_stage(64, 64, 3)
        self.stage2 = self._make_stage(256, 128, 4, stride=2)
        self.stage3 = self._make_stage(512, 256, 6, stride=2)
        self.stage4 = self._make_stage(1024, 512, 3, stride=2)
        
        # Global average pooling and final layer
        self.avgpool = nn.AdaptiveAvgPool2d((1, 1))
        self.fc = nn.Linear(2048, num_keypoints * 2)
    
    def _make_stage(self, in_channels, out_channels, num_blocks, stride=1):
        layers = []
        # First block might change stride and channel depth
        layers.append(nn.Sequential(
            nn.Conv2d(in_channels, out_channels, kernel_size=1, stride=stride, bias=False),
            nn.BatchNorm2d(out_channels)
        ))
        
        # Subsequent blocks
        for _ in range(1, num_blocks):
            layers.append(nn.Sequential(
                nn.Conv2d(out_channels, out_channels, kernel_size=3, stride=1, padding=1, bias=False),
                nn.BatchNorm2d(out_channels),
                nn.ReLU(inplace=True)
            ))
        
        return nn.Sequential(*layers)
    
    def forward(self, x):
        x = self.conv1(x)
        x = self.bn1(x)
        x = self.relu(x)
        x = self.maxpool(x)
        
        x = self.stage1(x)
        x = self.stage2(x)
        x = self.stage3(x)
        x = self.stage4(x)
        
        x = self.avgpool(x)
        x = torch.flatten(x, 1)
        x = self.fc(x)
        
        return x