In [45]:
import os, json, cv2, random, numpy as np, matplotlib.pyplot as plt
from PIL import Image

import torch
import torch.nn as nn
from torch.utils.data import Dataset, DataLoader
from pytorch_lightning import loggers as pl_loggers


import torchvision
from torchvision.models.detection.rpn import AnchorGenerator
from torchvision.transforms import functional as F


import albumentations as A
from albumentations.pytorch import ToTensorV2

import pytorch_lightning as pl

import transforms, utils, engine, train
from utils import collate_fn

from torchvision.models.detection import keypointrcnn_resnet50_fpn
from torchvision.models import ResNet50_Weights
from torchvision.io import read_image, ImageReadMode
from torchvision import transforms as T

In [None]:
#get rid of filter, grab all data from json and port that to target (modify if data isnt needed)
#return full image, return full targets
#go back to medium and make sure it uses boxes, keypoints, labels
class MyDataset(Dataset):
    def __init__(self, directory, transform=None, filter=False):
        super(MyDataset, self).__init__()

        self.image_dir = os.path.join(directory, "image")

        if filter:
            self.annos_dir = os.path.join(directory, "filtered")
        else:
            self.annos_dir = os.path.join(directory, "annos")

        self.length = len(os.listdir(self.annos_dir))

        self.lookup = os.listdir(self.annos_dir)
        for i in range(len(self.lookup)):
            self.lookup[i] = self.lookup[i].split(".")[0]

        self.transform = transform

    def __getitem__(self, idx):
        idx_f = self.lookup[idx]

        image_path = os.path.join(self.image_dir, str(idx_f) + ".jpg")
        image = read_image(image_path, ImageReadMode.RGB) / 255.0
        image_size = image.shape[1:]  # Assuming image shape is [C, H, W]

        with open(os.path.join(self.annos_dir, str(idx_f) + ".json")) as f:
            annos = json.load(f)

        if self.transform is not None:
            image = self.transform(image)
            
        keypoints_raw = annos["landmarks"]

        # Reshape keypoints and discard visibility if present
        keypoints = np.array(keypoints_raw).reshape(-1, 3)
        keypoints = keypoints[:, :2].astype(np.float32)  # Keep only x and y coordinates

        # Normalize keypoints to [0, 1] range based on image dimensions
        keypoints[:, 0] = keypoints[:,0]/image_size[1]  # Normalize x coordinates by width
        keypoints[:, 1] = keypoints[:,1]/image_size[0]  # Normalize y coordinates by height

        # Convert to tensor
        keypoints = torch.tensor(keypoints, dtype=torch.float32)

        return image, keypoints

    def __len__(self):
        return self.length

In [46]:
class LitNetwork(pl.LightningModule):
    def __init__(self,num_k=25,batch_size=1):
        super(LitNetwork, self).__init__()

        self.model = keypointrcnn_resnet50_fpn(num_keypoints=num_k,weights_backbone=ResNet50_Weights.IMAGENET1K_V1)

        self.loss_func = nn.MSELoss()

        #self.val_ap = torchmetrics.AveragePrecision(task="multiclass",num_classes=num_k)
        self.b = batch_size

    def forward(self, image, targets):
        print(image.shape)
        print(targets)
        x = self.model(image, targets)
        return x

    def configure_optimizers(self):
        optimizer = torch.optim.Adam(self.parameters(), lr=1e-4)
        scheduler = torch.optim.lr_scheduler.StepLR(optimizer, step_size=100, gamma=0.1)
        return {"optimizer": optimizer, "lr_scheduler": scheduler, "monitor": "val_loss"}

    def training_step(self, data, batch_idx):
        image, targets = data[0], data[1]

        out = self.forward(image,targets)
        out = out[0]["keypoints"]
        loss = self.loss_func(out[0,:,:2], targets)

        self.log("train_loss",loss,prog_bar=True,on_step=False,on_epoch=True,batch_size=self.b,sync_dist=True)
        return loss
    
    def validation_step(self, val_data, batch_idx):
        image, targets = val_data[0], val_data[1]

        loss_dict = self.forward(image,targets)
        #print(out[0])
        out = out[0]["keypoints"]
        #print(out.shape)
        #print(targets.shape)
        print(loss_dict)
        loss = sum(loss for loss in loss_dict.values())#self.loss_func(out[0,:,:2], targets)

        self.log("val_loss",loss,batch_size=self.b,prog_bar=True,on_step=False,on_epoch=True,sync_dist=True)

        #self.val_ap(out, targets)
        #self.log("val_ap",self.val_ap,prog_bar=True,on_step=False,on_epoch=True,sync_dist=True)

        return None

In [42]:
#never ending sanity check

def train_network(workers=8):

    train_dataset = MyDataset("C:\\Users\\crisz\\Documents\\ECU Classes\\CSCI Graduate\\Thesis\\DeepFashion2\\train",filter=True)
    validation_dataset = MyDataset("C:\\Users\\crisz\\Documents\\ECU Classes\\CSCI Graduate\\Thesis\\DeepFashion2\\validation",filter=True)

    b = 1
    train_loader = DataLoader(train_dataset,batch_size=b,num_workers=workers,persistent_workers=False,shuffle=True)
    val_loader = DataLoader(validation_dataset,batch_size=b,num_workers=workers,persistent_workers=False)

    model = LitNetwork(25,b)
    checkpoint = pl.callbacks.ModelCheckpoint(monitor='val_loss', save_top_k=1, mode='min')
    logger = pl_loggers.TensorBoardLogger(save_dir="my_logs")
    logger = pl_loggers.CSVLogger(save_dir="my_logs",name="my_csv_logs")

    device = "gpu" # Use 'mps' for Mac M1 or M2 Core, 'gpu' for Windows with Nvidia GPU, or 'cpu' for Windows without Nvidia GPU

    trainer = pl.Trainer(max_epochs=200, accelerator=device, callbacks=[checkpoint], logger=logger, num_sanity_val_steps=0)
    torch.set_float32_matmul_precision('high')
    trainer.fit(model,train_loader,val_loader)
        


train_network(workers=8)


GPU available: True (cuda), used: True
TPU available: False, using: 0 TPU cores
IPU available: False, using: 0 IPUs
HPU available: False, using: 0 HPUs
LOCAL_RANK: 0 - CUDA_VISIBLE_DEVICES: [0]

  | Name      | Type         | Params
-------------------------------------------
0 | model     | KeypointRCNN | 59.2 M
1 | loss_func | MSELoss      | 0     
-------------------------------------------
59.0 M    Trainable params
222 K     Non-trainable params
59.2 M    Total params
236.811   Total estimated model params size (MB)


Training: |          | 0/? [00:00<?, ?it/s]

torch.Size([1, 3, 624, 468])
tensor([[[0.4594, 0.1010],
         [0.3462, 0.1074],
         [0.3868, 0.1394],
         [0.4573, 0.1522],
         [0.5363, 0.1378],
         [0.5919, 0.1122],
         [0.1282, 0.1314],
         [0.0000, 0.0000],
         [0.0000, 0.0000],
         [0.0000, 0.0000],
         [0.0748, 0.3734],
         [0.1346, 0.3606],
         [0.1774, 0.3862],
         [0.1859, 0.5337],
         [0.0000, 0.0000],
         [0.0000, 0.0000],
         [0.0000, 0.0000],
         [0.6709, 0.5833],
         [0.7179, 0.4038],
         [0.7308, 0.3782],
         [0.7692, 0.4311],
         [0.8141, 0.4888],
         [0.9637, 0.3061],
         [0.8675, 0.2196],
         [0.7799, 0.1442]]], device='cuda:0')


IndexError: too many indices for tensor of dimension 2

In [None]:
def train_transform(image):
    return A.Compose([
        A.Sequential([
            A.RandomCrop(width=256, height=256), # Randomly crop the image
            A.RandomBrightnessContrast(brightness_limit=0.3, contrast_limit=0.3, brightness_by_max=True, always_apply=False, p=1), # Random change of brightness & contrast
        ], p=1)
    ],
    keypoint_params=A.KeypointParams(format='xy'), # More about keypoint formats used in albumentations library read at https://albumentations.ai/docs/getting_started/keypoints_augmentation/
    bbox_params=A.BboxParams(format='pascal_voc', label_fields=['bboxes_labels']) # Bboxes should have labels, read more at https://albumentations.ai/docs/getting_started/bounding_boxes_augmentation/
    )

In [1]:
dataset = MyDataset(directory='C:/Users/crisz/Documents/ECU Classes/CSCI Graduate/Thesis/DeepFashion2/train', filter=True)

# Get a random index
random_index = random.randint(0, len(dataset) - 1)

# Get the image and keypoints at the random index
image, keypoints = dataset[random_index]

# Convert the tensors back to numpy for visualization
image = image.permute(1, 2, 0).numpy()
keypoints = keypoints.numpy()

# Plot the image
plt.imshow(image)

# Plot the keypoints
plt.scatter(keypoints[:, 0], keypoints[:, 1], c='r')

# Show the plot
plt.show()

NameError: name 'MyDataset' is not defined

In [2]:
# Start of Training Section

def get_model(num_keypoints, weights_path=None):
    
    anchor_generator = AnchorGenerator(sizes=(32, 64, 128, 256, 512), aspect_ratios=(0.25, 0.5, 0.75, 1.0, 2.0, 3.0, 4.0))
    model = torchvision.models.detection.keypointrcnn_resnet50_fpn(pretrained=False,
                                                                   pretrained_backbone=True,
                                                                   num_keypoints=num_keypoints,
                                                                   num_classes = 2, # Background is the first class, object is the second class
                                                                   rpn_anchor_generator=anchor_generator)

    if weights_path:
        state_dict = torch.load(weights_path)
        model.load_state_dict(state_dict)        
        
    return model

In [3]:
device = torch.device('cuda') if torch.cuda.is_available() else torch.device('cpu')

KEYPOINTS_FOLDER_TRAIN = "C:\Users\crisz\Documents\ECU Classes\CSCI Graduate\Thesis\DeepFashion2\train"
KEYPOINTS_FOLDER_TEST = "C:\Users\crisz\Documents\ECU Classes\CSCI Graduate\Thesis\DeepFashion2\test-001"

dataset_train = ClassDataset(KEYPOINTS_FOLDER_TRAIN, transform=train_transform(), demo=False)
dataset_test = ClassDataset(KEYPOINTS_FOLDER_TEST, transform=None, demo=False)

data_loader_train = DataLoader(dataset_train, batch_size=3, shuffle=True, collate_fn=collate_fn)
data_loader_test = DataLoader(dataset_test, batch_size=1, shuffle=False, collate_fn=collate_fn)

model = get_model(num_keypoints = 2)
model.to(device)

params = [p for p in model.parameters() if p.requires_grad]
optimizer = torch.optim.SGD(params, lr=0.001, momentum=0.9, weight_decay=0.0005)
lr_scheduler = torch.optim.lr_scheduler.StepLR(optimizer, step_size=5, gamma=0.3)
num_epochs = 5

for epoch in range(num_epochs):
    train_one_epoch(model, optimizer, data_loader_train, device, epoch, print_freq=1000)
    lr_scheduler.step()
    evaluate(model, data_loader_test, device)
    
# Save model weights after training
torch.save(model.state_dict(), r'/path/to/folder/where/to/save/model/weights/keypointsrcnn_weights.pth')


SyntaxError: (unicode error) 'unicodeescape' codec can't decode bytes in position 2-3: truncated \UXXXXXXXX escape (1018161005.py, line 3)

In [None]:
iterator = iter(data_loader_test)
images, targets = next(iterator)
images = list(image.to(device) for image in images)

with torch.no_grad():
    model.to(device)
    model.eval()
    output = model(images)

print("Predictions: \n", output)

In [None]:
image = (images[0].permute(1,2,0).detach().cpu().numpy() * 255).astype(np.uint8)
scores = output[0]['scores'].detach().cpu().numpy()

high_scores_idxs = np.where(scores > 0.7)[0].tolist() # Indexes of boxes with scores > 0.7
post_nms_idxs = torchvision.ops.nms(output[0]['boxes'][high_scores_idxs], output[0]['scores'][high_scores_idxs], 0.3).cpu().numpy() # Indexes of boxes left after applying NMS (iou_threshold=0.3)

# Below, in output[0]['keypoints'][high_scores_idxs][post_nms_idxs] and output[0]['boxes'][high_scores_idxs][post_nms_idxs]
# Firstly, we choose only those objects, which have score above predefined threshold. This is done with choosing elements with [high_scores_idxs] indexes
# Secondly, we choose only those objects, which are left after NMS is applied. This is done with choosing elements with [post_nms_idxs] indexes

keypoints = []
for kps in output[0]['keypoints'][high_scores_idxs][post_nms_idxs].detach().cpu().numpy():
    keypoints.append([list(map(int, kp[:2])) for kp in kps])

bboxes = []
for bbox in output[0]['boxes'][high_scores_idxs][post_nms_idxs].detach().cpu().numpy():
    bboxes.append(list(map(int, bbox.tolist())))
    
visualize(image, bboxes, keypoints)