# Build a dataset with the flickr API

API reference: https://www.flickr.com/services/api/ <br>
Python API: https://github.com/sybrenstuvel/flickrapi <br>
API allows for 3600 calls per hour.

The quickest way to create a hand-picked dataset is probably to download the pictures manually and use the ids_from_files() function bellow to get the image ids from the file names. Alternatively, create a gallery directly on flickr and get your ids over the API from there (the problem here is that some creators do not allow the adding to galleries, so it won't cover all pictures).

In [1]:
from configs import cfg
import os
import numpy as np
from pathlib import Path
import glob
import json
from pprint import pprint as _pprint
def pprint(data): (_pprint(data, sort_dicts=False))
from PIL import Image
import urllib.request
import flickrapi
from tqdm.auto import tqdm

KEY = cfg.CRED.FLICKR_KEY
SECRET = cfg.CRED.FLICKR_SECRET
flickr = flickrapi.FlickrAPI(KEY, SECRET)

## Define annotation file header

In [37]:
# build constants for annotation file

# dataset info; json key is 'info'
INFO = {
    "description": "Dataset",
    "url": "https://github.com/random9v2/cv-annot",
    "version": "1.0",
    "year": 2022,
    "contributor": "John",
    "date_created": "2022/08/10"
}

# get flickr licenses, order, remove no CC license ('id':0); json key is 'licenses'
LICENSES = sorted(json.loads(flickr.photos.licenses.getInfo(format="json"))["licenses"]["license"], key=lambda d: d['id'])[1:]

# original COCO categories for human keypoints; json key is 'categories'
CATEGORIES_COCO = [
        {
            "supercategory": "person",
            "id": 1,
            "name": "person",
            "keypoints": [ # keypoint keys
                "nose",
                "left_eye",
                "right_eye",
                "left_ear",
                "right_ear",
                "left_shoulder",
                "right_shoulder",
                "left_elbow",
                "right_elbow",
                "left_wrist",
                "right_wrist",
                "left_hip",
                "right_hip",
                "left_knee",
                "right_knee",
                "left_ankle",
                "right_ankle"
            ],
            "skeleton": [ # how keypoints are connected for visualization; does not affect training
                [16,14],
                [14,12],
                [17,15],
                [15,13],
                [12,13],
                [6,12],
                [7,13],
                [6,7],
                [6,8],
                [7,9],
                [8,10],
                [9,11],
                [2,3],
                [1,2],
                [1,3],
                [2,4],
                [3,5],
                [4,6],
                [5,7]
            ]
        }
    ]

# original crowd_pose categories for human keypoints; json key is 'categories'
CATEGORIES_CROWD = [
        {
            "supercategory": "person",
            "id": 1,
            "name": "person",
            "keypoints": [
                "left_shoulder",
                "right_shoulder",
                "left_elbow",
                "right_elbow",
                "left_wrist",
                "right_wrist",
                "left_hip",
                "right_hip",
                "left_knee",
                "right_knee",
                "left_ankle",
                "right_ankle",
                "head",
                "neck"
            ],
            "skeleton": [
                [1,14],
                [1,3],
                [2,14],
                [2,4],
                [3,5],
                [4,6],
                [7,14],
                [7,9],
                [8,14],
                [8,10],
                [9,11],
                [10,12],
                [13,14]
            ]
        }
    ]


# with open(f"configs/crowdpose_keypoint_cat.json", 'w', encoding='utf-8') as f:
#         json.dump({"categories": CATEGORIES_CROWD}, f, ensure_ascii=False, indent=4)


In [3]:
# pprint(LICENSES)

## Build or expand annotation file with flickr image ids (add metadata)

In [11]:
# build/add to dataset
# with big datasets (> 1800), the function must be scheduled to not exeed the hourly API quota (easiest is a sleep() between each image)

ID_PREFIX = '007'  # file name and ID prefix for this dataset (only decorative/helps differentiate sets)
CATEGORIES = CATEGORIES_CROWD  # coco or crowd_pose

def build_dataset(flickr_ids:list, img_dir:str, anno_file:str=None, update:dict=None):
    """
    Builds dataset with annotation file out of flickr image IDs.
    New images will be added to a given annotation file. Old annotations won't be altered.
    
    args:
    flickr_ids: list of flickr image ids
    img_dir: path to download the new images into (dataset path)
    anno_file (optional): annotation file to add new images to
    update / NOT IMPLEMENTED: update part of the annotation file, e.g. {"info": {}, "licenses": {}}; due to safety, updating images/annotations should be disabled
    """    
    final_anno = {}
    max_id = max_count_id = 0
    flickr_ids_add = set(flickr_ids)  # add to anno
    flickr_ids_download = set() # download
    version = '1.0'
    changes = False
    
    if anno_file:
        with open(anno_file) as f:
            anno_json = json.load(f)
            
        # copy of original
        original_anno = anno_json.copy()
        
        # increase version number if images are added
        version = f'{(int(float(anno_json["info"]["version"])) + 1):.1f}'
    
        # only add new flickr images
        ids_in_json = [(image["flickr_id"] if image.get("flickr_id", False) else '') for image in anno_json["images"]]
        flickr_ids_add = set(flickr_ids) - set(ids_in_json)
        
        # for downloading what isn't there
        imgs_in_dir = set([x.name for x in Path(img_dir).glob("*.jpg")])
        for image in anno_json["images"]:
            if image["file_name"] not in imgs_in_dir:
                flickr_ids_download.add(image["flickr_id"])
        
        # get biggest dataset id
        max_id = max(anno_json["images"], key=lambda x:x["id"])["id"]
        max_count_id = int(str(max_id)[1:])
        
        final_anno = anno_json
    else:
        # build starter annotation file with header
        final_anno["info"] = INFO
        final_anno["licenses"] = LICENSES
        final_anno["categories"] = CATEGORIES
        final_anno["images"] = []
            
    # build image meta for all new ids
    image_meta = []
    allowed_licenses = [x["id"] for x in final_anno["licenses"]]
    count_id = 1 if max_count_id==0 else max_count_id+1_000_001  # keep dataset image ids between versions appart
    
    pbar = tqdm(flickr_ids_add)
    for flickr_id in pbar:
        file_name = f'{ID_PREFIX}{str(count_id).zfill(9)}.jpg'
        response = json.loads(flickr.photos.getInfo(photo_id=flickr_id, format="json"))
        if response.get("photo", True):
            if int(response["photo"]["license"]) in allowed_licenses:
                url = get_url(flickr_id, 'z')
                download_image(url, img_dir, file_name)
                remove_metadata_img(Path(img_dir, file_name), quality=98)  # quality could enlarge image size denpending on flickr image quality
                width, height = Image.open(Path(img_dir, file_name)).size
                image_meta.append(
                    {
                        "license": response["photo"]["license"],
                        "file_name": file_name,
                        "dataset_version": version,
                        "height": height,
                        "width": width,
                        "date_captured": response["photo"]["dates"]["taken"],
                        "flickr_url": url,
                        "content_url": response["photo"]["urls"]["url"][0]["_content"],
                        "flickr_id": response["photo"]["id"],
                        "id": int(f'{ID_PREFIX}{str(count_id).zfill(9)}')
                    }
                )
            else:
                print(f'License Error: image {flickr_id} not added; license {response["photo"]["license"]} not allowed.')
        else:
            print(f'Error for flickr_id{flickr_id}: json response does not contain a photo item \n json response: \n')
            pprint(response)   
        count_id+=1
        pbar.set_description(f'download+annotate: image: {flickr_id}')
        
    changes = True if len(image_meta) > 0 else False
    final_anno["images"] += image_meta

    # download remaining
    if len(flickr_ids_download) > 0:
        pbar_2 = tqdm(flickr_ids_download) 
        for flickr_id in pbar_2:
            file_name = next((item for item in final_anno["images"] if item["flickr_id"] == flickr_id), None)["file_name"]
            url = get_url(flickr_id, 'z')
            download_image(url, img_dir, file_name)
            remove_metadata_img(Path(img_dir, file_name))
            pbar_2.set_description(f'download: image: {flickr_id}')
    
    # finish and save annotation file
    final_anno["info"]["version"] = version
    save_file = f'annotationMeta_v{version}.json'
    if anno_file:
        if changes:
            save_path = Path(Path(anno_file).parent, save_file)
            if save_path.is_file():  
                print(f'Error: {save_file} already exists.')
            else:
                with open(save_path, 'w', encoding='utf-8') as f:
                    json.dump(final_anno, f, ensure_ascii=False, indent=4)
        else:
            print('Warning: annotation file says: nothing changed')
    else:
        save_path = Path(img_dir, save_file)
        if save_path.is_file():  
            print(f'Error: {save_file} already exists in image directory.')
        else:
            with open(save_path, 'w', encoding='utf-8') as f:
                json.dump(final_anno, f, ensure_ascii=False, indent=4)
    

## helper functions

def get_url(id:str, size_suffix:str=''):
    """Gets URL for given image id and size.

    args:
    id: flickr image id
    size: defines image size with suffix: https://www.flickr.com/services/api/misc.urls.html
    """
    get_sizes = json.loads(flickr.photos.getSizes(photo_id=id, format="json"))
    found = False
    last_size_url = None
    for size in get_sizes['sizes']['size']:
        last_size_url = size['source']
        _size_suffix = size['source'].split('_')[-1].split('.')[0]  # gets (size) suffix from image source link
        if _size_suffix == size_suffix:
            url = size['source']
            found = True
            break
        elif len(_size_suffix) > 1 and size_suffix == '':  # standard format without suffix in link
            url = size['source']
            found = True
            break
    if not found:   
        url = last_size_url
        suffix = last_size_url.split('_')[-1].split('.')[0]
        print(f'Link/size not found for {id}; biggest image URL available saved, suffix: {suffix if len(suffix)==1 else "none (default)"}')
    return url

def download_image(url:str, output_dir:str, file_name:str):
    """Donwloads image from URL.
    """
    Path(output_dir).mkdir(parents=True, exist_ok=True)
    urllib.request.urlretrieve(url, Path(output_dir, file_name))
    
def remove_metadata_img(img_path:str, quality=75):
    """Removes all metadata of an image. Will also compress the image (PIL default is quality=75).
    """
    image = Image.open(img_path)
    data = list(image.getdata())
    img_no_meta = Image.new(image.mode, image.size)
    img_no_meta.putdata(data)
    img_no_meta.save(img_path, quality=quality)  # default quality=75
    
def save_coco(img_path:str, quality=75):
    """Remove meta and resize.
    """
    image = Image.open(img_path)
    image.thumbnail((640, 640), Image.Resampling.LANCZOS)  # rescale to 640 max. in either dimension
    data = list(image.getdata())  # list of raw pixel data
    img_no_meta = Image.new(image.mode, image.size)
    img_no_meta.putdata(data)
    img_no_meta.save(img_path, quality=quality)  # default quality=75

def ids_from_files(dir):
    """Retrieve flickr IDs from the file name of downloaded images.
    
    flickr file name format: numericalID_imageSecret_imageSize.jpg, e.g. 50235154168_b3201cd930_o.jpg
    """
    return set([x.name.split('_')[0] for x in Path(dir).glob("*.jpg")])


# ids = ids_from_files('/Users/john/Downloads/fireground_dataset_upgrade')
# test_ids = ['4882517696', '4882517696']
# build_dataset(ids, '/Users/john/Downloads/fireground_dataset', '/Users/john/Downloads/fireground_dataset/annotationMeta_v1.0.json')


## Merge coco-annotator export with metadata annotation file created above

In [167]:
# After annotating in coco-annotator, add the annotations to the annotation file created above

# keys in annotations to be removed
ANNO_DELETE = ["color", "metadata"]


def add_coco_annotations(anno_meta, anno_coco):
    """
    Adds in external tools created annotations to the annotation file created above. 
    The file names must remain the same during the whole process.
    Image ids are drawn from the anno_meta file.
    
    args:
    anno_meta: path to .json file – contains meta information for a dataset, e.g., created above with build dataset
    anno_coco: path to .json file – annotation export from coco-annotator
    
    TODO: cleanup bounding box, set to max image size
    """
    with open(anno_meta) as f:
        meta_json = json.load(f)
    with open(anno_coco) as f:
        coco_json = json.load(f)
        
    # build lookup tables with file names and image ids
    key_fn_id_meta = {}
    key_id_fn_coco = {}
    for image in meta_json["images"]:
        key_fn_id_meta[image["file_name"]] = image["id"]
    for image in coco_json["images"]:
        key_id_fn_coco[image["id"]] = image["file_name"]
    # keep category id from meta_json
    # category name has to be the same
    key_name_cat_meta = {}
    key_cat_name_coco = {}
    for category in meta_json["categories"]:
        key_name_cat_meta[category["name"]] = category["id"]
    for category in coco_json["categories"]:
        key_cat_name_coco[category["id"]] = category["name"]
        
    annotations = []
    for anno in coco_json["annotations"]:
        file_name = key_id_fn_coco[anno["image_id"]]
        id_meta = key_fn_id_meta[file_name]
        category_name = key_cat_name_coco[anno["category_id"]]
        category_id_meta = key_name_cat_meta[category_name]
        
        # annotation items to keep
        annotation = {}
        for key, value in anno.items():
            if key not in ANNO_DELETE:
                annotation[key] = value
        
        # overwrite id keys with anno_meta values
        annotation["id"] = anno["id"]
        annotation["image_id"] = id_meta
        annotation["category_id"] = category_id_meta
        
        # finally add new annotation
        annotations.append(annotation)
        
    # merge and save
    final_anno = meta_json.copy()
    final_anno["annotations"] = annotations
    version = meta_json["info"]["version"]
    save_file = f'annotations_v{version}.json'
    save_path = Path(Path(anno_meta).parent, save_file)
    if save_path.is_file():  
        print(f'Error: {save_file} already exists.')
    else:
        with open(save_path, 'w', encoding='utf-8') as f:
            json.dump(final_anno, f, ensure_ascii=False, indent=4)

            
add_coco_annotations('/Users/john/datasets/fireground_pose/json/archive/annotationMeta_v2.0.json', 
                     '/Users/john/Downloads/json.json')

## Check annotations in fiftyone

In [6]:
%%capture
# check annotations in fiftyone
import fiftyone as fo
import webbrowser

# dataset = fo.Dataset.from_dir(
#     dataset_type = fo.types.COCODetectionDataset,
#     label_types = ["detections", "segmentations", "keypoints"],
#     data_path = f'{cfg.DATASET.CROWD_POSE.ROOT}/images',
#     labels_path = f'{cfg.DATASET.CROWD_POSE.ROOT}/json/crowdpose_val.json',
#     max_samples=2000
# )

# dataset = fo.Dataset.from_dir(
#     dataset_type = fo.types.COCODetectionDataset,
#     label_types = ["detections", "segmentations", "keypoints"],
#     data_path = f'{cfg.DATASET.COCO.ROOT}/images/val2017',
#     labels_path = f'{cfg.DATASET.COCO.ROOT}/annotations/person_keypoints_val2017.json',
#     max_samples=2000
# )

# dataset = fo.Dataset.from_dir(
#     dataset_type = fo.types.COCODetectionDataset,
#     label_types = ["detections", "segmentations", "keypoints"],
#     data_path = f'{cfg.DATASET.MYIMG.ROOT}/images',
#     labels_path = f'{cfg.DATASET.MYIMG.ROOT}/json/archive/json4.json',
#     max_samples=20000
# )

dataset = fo.Dataset.from_dir(
    dataset_type = fo.types.COCODetectionDataset,
    label_types = ["detections", "segmentations", "keypoints"],
    data_path = f'{cfg.DATASET.MYIMG.ROOT}/images',
    labels_path = f'{cfg.DATASET.MYIMG.ROOT}/json/annotations_crowd.json',
    max_samples=20000
)

# dataset = fo.Dataset.from_dir(
#     dataset_type = fo.types.COCODetectionDataset,
#     label_types = ["detections", "segmentations", "keypoints"],
#     data_path = f'{cfg.DATASET.MYIMG.ROOT}/images',
#     labels_path = f'{cfg.DATASET.MYIMG.ROOT}/json/annotations/annotations_coco.json',
#     max_samples=20000
# )

port = 5151
session = fo.launch_app(port=port)
webbrowser.open(f'http://localhost:{port}/')

dataset.persistent = True
session.view = dataset.view()



# some useful functions

In [7]:
def remove_key(container:dict):
    """Drop dict key&value on all levels; e.g., segmentations in an annotation file.
    """
    if not isinstance(container, dict):
        return container if not isinstance(container, list) else list(map(d_rem, container))
    return {a:d_rem(b) for a, b in container.items() if a != 'segmentation'}  # adjust key


json_path = '/Users/john/git/_tools/coco-annotator/datasets/annotations_coco_person/coco_person_keypoints_val2017_segDel.json'
save_path = '/Users/john/git/_tools/coco-annotator/datasets/annotations_coco_person/coco_person_keypoints_val2017_segDel2.json'

# with open(json_path) as f:
#     b = remove_key(json.load(f))
# with open(save_path, 'w', encoding='utf-8') as f:
#     json.dump(b, f, ensure_ascii=False, indent=4)


In [56]:
# Switch value of two keys in dict

path = '/Users/john/Downloads/fireground_dataset/annotationMeta_v1.0.json'
with open(path) as f:
    old = json.load(f)
    new = old.copy()
    new.pop('images')
    new['images'] = []
    
    for image in old['images']:
        img = image.copy()
        img['height'] = image['width']
        img['width'] = image['height']
        new['images'].append(img)
      
    # with open(Path(Path(path).parent, 'annotationMeta_v1.1.json'), 'w', encoding='utf-8') as f:
    #             json.dump(new, f, ensure_ascii=False, indent=4)
    

In [37]:
# count keypoints / people

path1 = f'{cfg.DATASET.MYIMG.ROOT}/json/annotations_crowd.json'
path2 = f'{cfg.DATASET.MYIMG.ROOT}/json/annotations_coco.json'

count_crowd = 0
with open(path1) as f:
    file = json.load(f)
    for anno in file['annotations']:
        for kp_vis in anno['keypoints'][2::3]:  # loops for every keypoint through visibility index
            if kp_vis > 0:
                count_crowd += 1
    print(f'{count_crowd=}')
    print(len(file['annotations']))
    
count_coco = 0
with open(path2) as f:
    file = json.load(f)
    for anno in file['annotations']:
        for kp_vis in anno['keypoints'][2:5*3:3]:  # loops for every keypoint through visibility index; ONLY for new HEAD keypoints (first 5)
            if kp_vis > 0:
                count_coco += 1
    print(f'{count_coco=}')
    print(len(file['annotations']))
                
print(f'kp_unique={count_crowd+count_coco}')

count_crowd=12234
1089
count_coco=3238
1089
kp_unique=15472


## Add approximate bounding boxes & segmentation around keypoints for adjustment in coco-annotator

In [87]:
# add bounding box to keypoints for later adjustment in coco annotator
# will override old bounding box and segmentation for annotation, adjust code if not wanted

path = f'{cfg.DATASET.MYIMG.ROOT}/json/crowdpose_test.json'
with open(path) as f:
    old = json.load(f)
    new = old.copy()
    new.pop('annotations')
    new['annotations'] = []
    
    # lookup table for image dimensions
    lookup = {image['id']: {'height': image['height'], 'width': image['width']} for image in old['images']}
    
    for anno in old['annotations']:
        _anno = anno.copy()
        image_id = anno['image_id']
        x_max = lookup[image_id]['width']
        y_max = lookup[image_id]['height']
        kps = anno['keypoints']
        # delete not annotated keypoints (v=0)
        kps = np.array_split(kps, len(kps)/3)
        kps = list(np.array([kp for kp in kps if kp[2] > 0]).flatten())
        # define bounding box
        xs = kps[0::3]
        ys = kps[1::3]
        bbox = [min(xs), min(ys), max(xs)-min(xs), max(ys)-min(ys)]
        # give some margins
        margin = 10. # px, all directions
        bbox = [bbox[0]-margin, bbox[1]-margin, bbox[2]+2*margin, bbox[3]+2*margin]
        bbox = [int(i) for i in bbox]
        # set to image edge if out of bounce
        bbox[0] = bbox[0] if bbox[0] > 0 else 0
        bbox[1] = bbox[1] if bbox[1] > 0 else 0
        bbox[2] = bbox[2] if bbox[0]+bbox[2] < x_max else x_max-bbox[0]
        bbox[3] = bbox[3] if bbox[1]+bbox[3] < y_max else y_max-bbox[1]
        # coco annotator binds a 4 corner segmentation to the bounding box: to be able to adjust the bbox, the segmentation is also needed
        segmentation = [bbox[0]+bbox[2], bbox[1], bbox[0]+bbox[2], bbox[1]+bbox[3], bbox[0], bbox[1]+bbox[3], bbox[0], bbox[1]]
        segmentation = [float(val) for val in segmentation]
        
        _anno['area'] = bbox[2] * bbox[3]
        _anno['bbox'] = bbox
        _anno['segmentation'] = [segmentation]
        _anno['isbbox'] = True  # use segmentation as bounding box in coco_annotator
        
        new['annotations'].append(_anno)
        
    with open(Path(Path(path).parent, 'update.json'), 'w', encoding='utf-8') as f:
        json.dump(new, f, ensure_ascii=False, indent=4)



## Coco-export cleanup for bounding boxes (fix max. dimension) and unused keys

In [165]:
# clean&fix coco-annotator export
# remove unused keys like area & segmentation
# adjust bounding boxes to max. image dimensions

path = f'/Users/john/Downloads/fp_6-11.json'
with open(path) as f:
    old = json.load(f)
    new = old.copy()
    new.pop('annotations')
    new['annotations'] = []
    
    # lookup table for image dimensions
    lookup = {image['id']: {'height': image['height'], 'width': image['width']} for image in old['images']}
    
    for anno in old['annotations']:
        _anno = {}
        image_id = anno['image_id']
        x_max = lookup[image_id]['width']
        y_max = lookup[image_id]['height']
        # keep
        _anno['id'] = anno['id']
        _anno['image_id'] = anno['image_id']
        _anno['category_id'] = anno['category_id']
        _anno['iscrowd'] = anno['iscrowd']
        try:
            _anno['keypoints'] = anno['keypoints']
            _anno['num_keypoints'] = anno['num_keypoints']
        except KeyError:
            _anno['keypoints'] = [0] * 14 * 3  # 14 keypoints, all values 0
            _anno['num_keypoints'] = 0 
        # adjust bbox to max. image dimensions
        bbox = anno['bbox']
        bbox[0] = bbox[0] if bbox[0] > 0 else 0
        bbox[1] = bbox[1] if bbox[1] > 0 else 0
        bbox[2] = bbox[2] if bbox[0]+bbox[2] < x_max else x_max-bbox[0]
        bbox[3] = bbox[3] if bbox[1]+bbox[3] < y_max else y_max-bbox[1]
        _anno['bbox'] = bbox
        
        new['annotations'].append(_anno)
        
    # export
    with open(Path(Path(path).parent, 'json.json'), 'w', encoding='utf-8') as f:
        json.dump(new, f, ensure_ascii=False, indent=4)   
    


## Add crowdIndex & num_people to annotation file

In [43]:
# Update annotation file with crowdIndex & num_people
# https://github.com/Jeff-sjtu/CrowdPose
# Note: crowdIndex is sometimes wrong in the original dataset; num_keypoints always lacks 2 (presumably head, neck)

def point_in_rect(point, rect):
    """Check if point is in rectangle."""
    x1, y1, w, h = rect
    x2, y2 = x1+w, y1+h
    x, y = point
    if (x1 < x and x < x2):
        if (y1 < y and y < y2):
            return True
    return False

# path = f'{cfg.DATASET.CROWD_POSE.ROOT}/json/crowdpose_val.json'
path = f'{cfg.DATASET.MYIMG.ROOT}/json/archive/json3.json'
with open(path) as f:
    old = json.load(f)
    new = old.copy()
    
    anno_lookup = {}
    for image in old['images']:
        annos = []
        for anno in old['annotations']:
            if anno['image_id'] == image['id']:
                annos.append(anno)
        anno_lookup[image['id']] = annos
    
    crowdedness_all = {}
    for _image_id, _annos in anno_lookup.items():
        bboxes = {}
        keypoints = {}
        num_keypoints = {}
        num_people = {}
        crowdedness = {}
        ids = [anno['id'] for anno in _annos if not anno['iscrowd']]  # skip iscrowd instances (contains several people at once)
        for anno in _annos:
            bboxes[anno['id']] = anno['bbox']
            kps = anno['keypoints']
            kps = np.array_split(kps, len(kps)/3)
            kps = [list(kp) for kp in kps if kp[2] > 0]  # count all annotated keypoints, also hidden ones
            keypoints[anno['id']] = kps
            num_keypoints[anno['id']] = len(kps)
        for idx in ids:  # for each annotation instance
            bbox = bboxes[idx]
            kp_count = {'kps_self': 0, 'kps_other': 0}
            for image_id, kps in keypoints.items():  # check all keypoints of all annotations
                if idx == image_id:  # keypoints are in bbox of instance itself
                    for kp in kps:
                        if point_in_rect((kp[0], kp[1]), bbox):
                            kp_count['kps_self'] += 1
                else:
                    for kp in kps:
                        if point_in_rect((kp[0], kp[1]), bbox):
                            kp_count['kps_other'] += 1
            crowdedness[idx] = kp_count
        crowdedness_all[_image_id] = crowdedness
    # calculate crowdIndex
    crowdIndex = {}
    for image_id, kps_stats in crowdedness_all.items():
        ci_sum = 0
        num_valid = 0
        num_persons = 0
        for _, stats in kps_stats.items():
            try:
                ci_sum += stats['kps_other']/stats['kps_self']
                num_valid += 1
                num_persons += 1
            except ZeroDivisionError:
                # skip people with 0 keypoints for index calculation
                num_persons += 1
        crowdIndex[image_id] = ci_sum/num_valid
        num_people[image_id] = num_persons
    
    # sort + normalize
    # !! original crowd_pose dataset contains annotation errors end outliers shift the crowdIndex to not usable values
    _max = crowdIndex[max(crowdIndex, key=crowdIndex.get)]
    crowdIndex = {k: v/_max for k, v in sorted(crowdIndex.items(), key=lambda item: item[1], reverse=True)}
    
    # build json
    new.pop('images')
    new.pop('annotations')
    new['images'] = []
    for image in old['images']:  
        # define (new) order
        _image = {}
        _image['license'] = image['license']
        _image['file_name'] = image['file_name']
        _image['dataset_version'] = image['dataset_version']
        _image['width'] = image['width']
        _image['height'] = image['height']
        _image['num_people'] = num_people[image['id']]
        _image['crowdIndex'] = round(crowdIndex[image['id']], 2)
        _image['date_captured'] = image['date_captured']
        _image['flickr_url'] = image['flickr_url']
        _image['content_url'] = image['content_url']
        _image['flickr_id'] = image['flickr_id']
        _image['id'] = image['id']
        
        new['images'].append(_image)
    new['annotations'] = old['annotations']  # keep annotations at bottom of json
    
    # with open(Path(Path(path).parent, 'json4.json'), 'w', encoding='utf-8') as f:
    #     json.dump(new, f, ensure_ascii=False, indent=4)   


## Add segmentation & area based on bounding boxes

In [13]:
# needed to calculate OKS in COCO
# also needed in order for coco annotator to import bounding boxes
# will not have meaningfull results, because those do not represent real ocject segmentations

path = f'/Users/john/datasets/fireground_pose/json/annotations/annotations_coco.json'
with open(path) as f:
    _anno = json.load(f)
    
    for anno in _anno['annotations']:
        b = anno['bbox']
        # [x2, y2, x3, y3, x4, y4, x0, y0]
        anno['segmentation'] = [ [b[0]+b[2], b[1], b[0]+b[2], b[1]+b[3], b[0], b[1]+b[3], b[0], b[1]] ]
        anno['area'] = b[2]*b[3]
        anno['isbbox'] = True  # for coco-annotator import, to bind segmentation to bounding box
        
    with open(Path(Path(path).parent, 'test.json'), 'w', encoding='utf-8') as f:
        json.dump(_anno, f, ensure_ascii=False, indent=4)   

## Build coco annotations out of crowd_pose annotation file
Keep the 12 body keypoints - head keypoints will be empty and have to be added manually

In [161]:
path = f'{cfg.DATASET.MYIMG.ROOT}/json/crowdpose_test.json'
with open(path) as f:
    old = json.load(f)
    new = old.copy()
    
    # override with coco meta
    new['categories'] = CATEGORIES_COCO
    # remove what isn't valid anymore
    for image in new['images']:
        image.pop('crowdIndex')
    # transfer keypoints
    for annotation in new['annotations']:
        # coco-annotator will not import empty persons; there, people where only head & neck are annotated need a fix
        if sum(annotation['keypoints'][0:-2*3]) == 0:  # all keypoints zero besides head & neck
            annotation['keypoints'] = annotation['keypoints'][-6:-3] + [0] * 4 * 3 + annotation['keypoints'][0:-2*3]  # same as below, but use head values as nose
        else: 
        # insert 5 empty keypoints at the start (5*3 values, all zero)  + remove last 6 values (head & neck)
            annotation['keypoints'] = [0] * 5 * 3 + annotation['keypoints'][0:-2*3]
    
    # save
    # with open(Path(Path(path).parent, 'crowd_to_coco.json'), 'w', encoding='utf-8') as f:
    #     json.dump(new, f, ensure_ascii=False, indent=4)   


### Merge coco-annotator-coco export with crowd_pose file
Use the same body keypoints as in the original crowd_pose annotations and add coco style head keypoints

In [39]:
# anno IDs will have changed but are in the same order per image


coco_path = f'/Users/john/Downloads/fp_8_coco-13.json'
base_path = f'{cfg.DATASET.MYIMG.ROOT}/json/archive/annotations_v2.0.json'

with open(coco_path, 'r') as coco, open(base_path, 'r') as base:
    coco = json.load(coco)
    base = json.load(base)
    new = base.copy()
    
    # annotations with same order but different id
    # check annotations per image and adjust in coco-annotator if necessary
    coco_anno_lookup = {}
    for image in coco['images']:
        annos = []
        for anno in coco['annotations']:
            if anno['image_id'] == image['id']:
                annos.append(anno)
        coco_anno_lookup[image['id']] = annos
        
    base_anno_lookup = {}
    for image in base['images']:
        annos = []
        for anno in base['annotations']:
            if anno['image_id'] == image['id']:
                annos.append(anno)
        base_anno_lookup[image['id']] = annos
        
    for (k1,v1), (k2,v2) in zip(base_anno_lookup.items(), coco_anno_lookup.items()):
        if len(v1) == len(v2):
            pass
        else:
            print(f'Anno mismatch: {k1, k2}')
    
    print(len(coco['annotations']), len(base['annotations']))
    
    # merge
    for anno1, anno2 in zip(new['annotations'], coco['annotations']):
        # print(anno2['keypoints'])
        try:
            anno1['keypoints'] = anno2['keypoints'][0:5*3] + anno1['keypoints'][0:-2*3]
        except KeyError:  # no keypoints in coco annotation (should already be fixed in base file)
            anno1['keypoints'] = [0] * 3 * 5 + anno1['keypoints'][0:-2*3]
    
    # remove crowdIndex (needs new calculation due to new keypoint schema)
    for image in new['images']:
        image.pop('crowdIndex')
        
    # override header (coco keypoint schema)
    new['categories'] = CATEGORIES_COCO
    
    with open(Path(Path(base_path).parent, 'json3.json'), 'w', encoding='utf-8') as f:
        json.dump(new, f, ensure_ascii=False, indent=4)
    
    

1089 1089
