In [18]:
import os
import numpy as np
import json

notebook_path = os.path.abspath('')
repo_path = "/home/sjauhri/IAS_WS/EgoVis/2handedafforder_repo/"
path_to_frames = repo_path + "BENCHMARK/ego4d_bench/bench_frames_shuffled"
output_dir = os.path.join(notebook_path, "masks")
if not os.path.exists(output_dir):
    os.makedirs(output_dir)
# Define the path to the JSON file
json_file_path = os.path.join(repo_path, "BENCHMARK/annotations-ego4d-01-05-2025/toras_anns.json")

# Load the JSON file
with open(json_file_path, 'r') as file:
    toras_datas = json.load(file)

image_dimensions = (785, 512) # (w, h) size of the image in toras
needed_image_dimensions = (512, 512) # (w, h) based on the size of just the inpainted image in toras
final_image_dimensions = (855, 855) # (w, h) size of the final image based on the training setup

# optionally also save the benchmark image with overlays
save_overlay_image = True

In [19]:
import shapely
from shapely.geometry import Polygon, MultiPolygon
import cairo
import cv2

def draw_toras_mask(task_entity, image_width, image_height, output_file_path):
    """
    Args:
    Convert Toras format annotation to masks
    For each entity, only its first segmentation block is rendered.
    """
    surface = cairo.ImageSurface(cairo.FORMAT_RGB24, image_width, image_height)
    ctx = cairo.Context(surface)
    exteriors = []
    interiors = []
    # Draw masks in the first segmentation block that's encountered.
    blocks = task_entity['annotationBlocks']
    for blk in blocks:
        if blk['blockName'] != 'Segmentation':
            continue

        for x in blk['annotations']:
            segments = x['segments']
            m_poly = MultiPolygon(polygons=[Polygon(p) for p in segments if len(p) >= 3])
            m_poly = shapely.make_valid(m_poly)
            polys = m_poly.geoms if hasattr(m_poly, 'geoms') else [m_poly]
            for poly in polys:
                if isinstance(poly, Polygon):
                    exteriors.append((poly.exterior.coords, Polygon(poly.exterior).area))
                    interiors += [(x.coords, -Polygon(x).area) for x in poly.interiors]
        break
    
    # Draw the largest polygons first.
    all = sorted(exteriors + interiors, key=lambda x: -np.abs(x[1]))
    for (contour, signed_area) in all:
        color = (1, 1, 1) if signed_area > 0 else (0, 0, 0)
        cairo_fill_path(ctx, contour, color)

    surface.write_to_png(output_file_path)

def cairo_fill_path(ctx: cairo.Context, poly: np.ndarray, rgb: (float, float, float)) -> None:
    """
    Args:
        poly: Shape [N, 2] array of (x, y) coordinates.
    """
    if len(poly) < 3:
        return

    ctx.move_to(poly[0][0], poly[0][1])
    for p in poly[1:]:
        ctx.line_to(*p)
    ctx.close_path()
    ctx.set_source_rgb(*rgb)
    ctx.fill()


# loop through the data
for datapoint in toras_datas:
    name = datapoint["documents"][0]['name']
    frame_id = name[name.rindex('.png')-8 : name.rindex('.png')]
    video_id = name[name.index('frame_')+6 : name.rindex('.png')-9]
    # frame_id = frame_id.zfill(7) # Add leading zeros to make frame_id 7 digits long
    ann = datapoint.get('annotation')
    if ann is None:
        continue

    frame_out_path = os.path.join(output_dir, video_id, frame_id)
    # make subfolder for the frame_id if it does not exist already
    if not os.path.exists(frame_out_path):
        os.makedirs(frame_out_path)
    
    # loop through entities in the annotation
    ents = ann['annotationGroups'][0]['annotationEntities']
    if save_overlay_image:
        # optionally also save the benchmark image with overlays
        overlay_masks = []
    for ent in ents:
        if 'right' in ent['name']:
            ent_name = 'right'
        elif 'left' in ent['name']:
            ent_name = 'left'

        output_file_path = os.path.join(frame_out_path, 'aff_' + ent_name + '.png')
        
        # draw the mask and save it
        draw_toras_mask(ent, image_dimensions[0], image_dimensions[1], output_file_path)

        # crop out the part of the mask that corresponds to the inpainted image
        mask = cv2.imread(output_file_path, cv2.IMREAD_GRAYSCALE)
        # if mask is empty i.e. all black, skip this entity and delete the file
        if np.all(mask == 0):
            os.remove(output_file_path)
            continue
        mask_cropped = mask[:needed_image_dimensions[1], :needed_image_dimensions[0]]
        # scale down to final image size
        mask_final = cv2.resize(mask_cropped, final_image_dimensions, interpolation=cv2.INTER_NEAREST)
        cv2.imwrite(output_file_path, mask_final)

        if save_overlay_image:
            mask_colored = cv2.cvtColor(mask, cv2.COLOR_GRAY2BGR)
            if ent_name == 'right':
                mask_colored[np.where((mask == 255))] = [0, 0, 255]  # Change white areas to red
            if ent_name == 'left':
                mask_colored[np.where((mask == 255))] = [0, 255, 0]  # Change white areas to green
            overlay_masks.append(mask_colored.copy())
    
    if save_overlay_image:
        # save the overlay image
        overlay_image = cv2.imread(os.path.join(path_to_frames, name))
        for overlay_mask in overlay_masks:
            overlay_image = cv2.addWeighted(overlay_image, 1.0, overlay_mask, 0.5, 0)
        cv2.imwrite(os.path.join(frame_out_path, 'bench_frame_overlay.png'), overlay_image)