In [89]:
#!wget https://cvlab.postech.ac.kr/research/SPair-71k/data/SPair-71k.tar.gz


In [90]:
from airo_dataset_tools.data_parsers.coco import CocoKeypointAnnotation, CocoImage, CocoKeypointCategory, CocoKeypointsDataset
import json

In [91]:
coco_images = []
coco_annotations = []
coco_categories = []

In [92]:
from pathlib import Path
import glob
SPAIR_ROOT = Path("/home/tlips/Code/few-shot-keypoints/data/SPair-71k")
image_dir = SPAIR_ROOT / "JPEGImages"
annotation_dir = SPAIR_ROOT / "ImageAnnotation"

In [93]:
all_image_paths = sorted(glob.glob(str(image_dir / '**' / '*.jpg'), recursive=True))
print(f"Found {len(all_image_paths)} images in {image_dir}")


Found 1800 images in /home/tlips/Code/few-shot-keypoints/data/SPair-71k/JPEGImages


In [94]:
# for each category, find a single annotation file, get the number of keypoints and create the category.

def image_path_to_annotation_path(image_path):
    image_name = Path(image_path).stem
    category = Path(image_path).parent.name
    annotation_path = annotation_dir / category/ f"{image_name}.json"
    return annotation_path
processed_categories = []
for image_path in all_image_paths:
    category = Path(image_path).parent.name
    annotation_path = image_path_to_annotation_path(image_path)
    
    if category not in processed_categories:
        processed_categories.append(category)
             
        with open(annotation_path, 'r') as f:
            annotations = json.load(f)
        
            n_keypoints = len(annotations['kps'].keys())
            cat = CocoKeypointCategory(
                supercategory="object",
                id=len(coco_categories) + 1,
                name=category,
                #TODO: add semantically meaningful names to each semantic keypoint type
                keypoints=[f"kp{i}" for i in range(n_keypoints)],
                skeleton=[]  # Assuming no skeleton for now
            )
            coco_categories.append(cat)
print(f"Processed {len(processed_categories)} categories.")

Processed 18 categories.


In [95]:
category_to_id_map = {cat.name: cat.id for cat in coco_categories}

In [96]:
# add all images
def generate_coco_images_and_annotations(all_image_paths):
    coco_images = []
    coco_annotations = []
    for image_path in all_image_paths:
        category = Path(image_path).parent.name
        annotation_path = image_path_to_annotation_path(image_path)
        
        with open(annotation_path, 'r') as f:
            annotations = json.load(f)
        
        image_id = len(coco_images) + 1
        coco_image = CocoImage(
            id=image_id,
            file_name=str(Path(image_path).relative_to(image_dir.parent)),
            width=annotations['image_width'],
            height=annotations['image_height'],
        )
        coco_images.append(coco_image)
        
        # add annotations
        bbox = annotations['bndbox']
        x1,y1,x2,y2 = bbox
        width = x2 - x1
        height = y2 - y1
        keypoints = []
        n_keypoints = 0
        for kp in annotations['kps'].values():
            if kp is None:
                keypoints.extend([0, 0, 0])
            else:
                n_keypoints += 1
                u,v = kp
                keypoints.extend([u, v, 2])
        
        coco_annotation = CocoKeypointAnnotation(
            id=len(coco_annotations) + 1,
            image_id=image_id,
            category_id=category_to_id_map[category],
            keypoints=keypoints,
            num_keypoints=n_keypoints,
            bbox=[x1, y1, width, height],
            area=width*height,
        )
        #TODO: add segmentations, but they seem to have different formats.
        coco_annotations.append(coco_annotation)
    return coco_images, coco_annotations


In [97]:
# all in one dataset
coco_images, coco_annotations = generate_coco_images_and_annotations(all_image_paths)

coco_dataset = CocoKeypointsDataset(
    images=coco_images,
    annotations=coco_annotations,
    categories=coco_categories
)

# Save the dataset to a JSON file
output_path = SPAIR_ROOT / "SPAIR_coco_dataset.json"
with open(output_path, 'w') as f:
    json.dump(coco_dataset.model_dump(), f)



In [98]:
# now for each category, find the amount of keypoints for each keypoint type (bc they all have 30 annotations some are never visible, these are dropped)

category_type_amount = {}
for category in coco_dataset.categories:
    category_type_amount[category.name] = {}

    for keypoint in category.keypoints:
        amount = 0
        for annotation in coco_dataset.annotations:
            if annotation.category_id == category.id:
                if annotation.keypoints[3*category.keypoints.index(keypoint)] > 0:
                    amount += 1
        category_type_amount[category.name][keypoint] = amount
    

for category, keypoints in category_type_amount.items():
    print(f"{category}: {keypoints}")

aeroplane: {'kp0': 84, 'kp1': 71, 'kp2': 75, 'kp3': 79, 'kp4': 60, 'kp5': 55, 'kp6': 61, 'kp7': 64, 'kp8': 52, 'kp9': 56, 'kp10': 51, 'kp11': 56, 'kp12': 33, 'kp13': 38, 'kp14': 33, 'kp15': 31, 'kp16': 57, 'kp17': 58, 'kp18': 47, 'kp19': 40, 'kp20': 37, 'kp21': 27, 'kp22': 69, 'kp23': 79, 'kp24': 60, 'kp25': 0, 'kp26': 0, 'kp27': 0, 'kp28': 0, 'kp29': 0}
bicycle: {'kp0': 80, 'kp1': 90, 'kp2': 72, 'kp3': 78, 'kp4': 75, 'kp5': 67, 'kp6': 54, 'kp7': 50, 'kp8': 70, 'kp9': 62, 'kp10': 58, 'kp11': 0, 'kp12': 0, 'kp13': 79, 'kp14': 0, 'kp15': 0, 'kp16': 0, 'kp17': 0, 'kp18': 0, 'kp19': 0, 'kp20': 0, 'kp21': 0, 'kp22': 0, 'kp23': 0, 'kp24': 0, 'kp25': 0, 'kp26': 0, 'kp27': 0, 'kp28': 0, 'kp29': 0}
bird: {'kp0': 100, 'kp1': 7, 'kp2': 8, 'kp3': 91, 'kp4': 51, 'kp5': 46, 'kp6': 92, 'kp7': 51, 'kp8': 51, 'kp9': 68, 'kp10': 62, 'kp11': 57, 'kp12': 31, 'kp13': 30, 'kp14': 49, 'kp15': 37, 'kp16': 77, 'kp17': 0, 'kp18': 0, 'kp19': 0, 'kp20': 0, 'kp21': 0, 'kp22': 0, 'kp23': 0, 'kp24': 0, 'kp25': 0, 'k

In [99]:
# now drop all the keypoint types for which there are 0 visible keypoints
category_id_to_category = {category.id: category for category in coco_dataset.categories}

for annotation in coco_dataset.annotations:
    new_keypoints = []
    for idx in range(len(category_id_to_category[annotation.category_id].keypoints)):
        if category_type_amount[category_id_to_category[annotation.category_id].name][category_id_to_category[annotation.category_id].keypoints[idx]] > 0:
            new_keypoints.extend(annotation.keypoints[idx*3:(idx+1)*3])
    annotation.keypoints = new_keypoints

# drop all keypoints from the categories
for category in coco_dataset.categories:
    new_keypoints = []
    for keypoint in category.keypoints:
        if category_type_amount[category.name][keypoint] > 0:
            new_keypoints.append(keypoint)
    category.keypoints = new_keypoints




for category in coco_dataset.categories:
    print(f"{category.name}: {len(category.keypoints)}")
for annotation in coco_dataset.annotations:
    # check if len(keypoitns )) == 3 x len(category)
    assert len(annotation.keypoints) == 3*len(category_id_to_category[annotation.category_id].keypoints), f"keypoints length {len(annotation.keypoints)} != 3*len(category.keypoints) {3*len(category_id_to_category[annotation.category_id].keypoints)}"

aeroplane: 25
bicycle: 12
bird: 17
boat: 14
bottle: 10
bus: 28
car: 30
cat: 15
chair: 14
cow: 21
dog: 16
horse: 20
motorbike: 13
person: 20
pottedplant: 9
sheep: 21
train: 18
tvmonitor: 16


In [100]:
# split the images by category, and split in train/test splits
for category in coco_categories:
    category_images = [img for img in coco_images if category.name in img.file_name]
    category_annotations = [ann for ann in coco_annotations if ann.category_id == category.id]

    print(f"Category {category.name} has {len(category_images)} images and {len(category_annotations)} annotations.")
    
    train_ratio = 0.5
    # shuffle the images
    import random
    random.seed(2025)  # For reproducibility
    random.shuffle(category_images)
    train_category_images = category_images[:int(len(category_images) * train_ratio)]
    test_category_images = category_images[int(len(category_images) * train_ratio):]

    train_category_annotations = [ann for ann in category_annotations if ann.image_id in [img.id for img in train_category_images]]
    test_category_annotations = [ann for ann in category_annotations if ann.image_id in [img.id for img in test_category_images]]
 
    # Create datasets for train and test splits
    train_category_dataset = CocoKeypointsDataset(
        images=train_category_images,
        annotations=train_category_annotations,
        categories=[category]
    )
    test_category_dataset = CocoKeypointsDataset(
        images=test_category_images,
        annotations=test_category_annotations,
        categories=[category]
    )
    # Save train and test datasets to separate JSON files
    train_output_path = SPAIR_ROOT / f"SPAIR_coco_{category.name}_train.json"
    with open(train_output_path, 'w') as f:
        json.dump(train_category_dataset.model_dump(), f)    
    
    test_output_path = SPAIR_ROOT / f"SPAIR_coco_{category.name}_test.json"
    with open(test_output_path, 'w') as f:
        json.dump(test_category_dataset.model_dump(), f)

Category aeroplane has 100 images and 100 annotations.
Category bicycle has 100 images and 100 annotations.
Category bird has 100 images and 100 annotations.
Category boat has 100 images and 100 annotations.
Category bottle has 100 images and 100 annotations.
Category bus has 100 images and 100 annotations.
Category car has 100 images and 100 annotations.
Category cat has 100 images and 100 annotations.
Category chair has 100 images and 100 annotations.
Category cow has 100 images and 100 annotations.
Category dog has 100 images and 100 annotations.
Category horse has 100 images and 100 annotations.
Category motorbike has 100 images and 100 annotations.
Category person has 100 images and 100 annotations.
Category pottedplant has 100 images and 100 annotations.
Category sheep has 100 images and 100 annotations.
Category train has 100 images and 100 annotations.
Category tvmonitor has 100 images and 100 annotations.


In [101]:
import torch

x = torch.randn(1,512)
img = torch.randn(1,512,256,256)

x = x.unsqueeze(2).unsqueeze(3)

from torch.nn.functional import cosine_similarity

sim  =cosine_similarity(x, img,dim=1)

sim.shape



# find argmax of sim

argmax = sim.argmax()

x = torch.unravel_index(argmax, sim.shape)
_,u,v = x

print(u,v)







tensor(214) tensor(60)
