In [2]:
import scipy.io as sio
import cv2
import os
import json
from collections import defaultdict
import numpy as np

def convert_to_list(item):
    """
    Recursively convert NumPy arrays or scalars (or any nested structure) into native Python types.
    """
    if isinstance(item, np.generic):  # For numpy scalars (np.int32, np.float64, etc.)
        return item.item()
    elif isinstance(item, np.ndarray):
        return item.tolist()
    elif isinstance(item, (list, tuple)):
        return [convert_to_list(x) for x in item]
    elif isinstance(item, dict):
        return {k: convert_to_list(v) for k, v in item.items()}
    else:
        return item

def flatten_nested(data):
    """
    Recursively flattens any nested list/tuple/ndarray structure and returns a list of strings.
    """
    if isinstance(data, (list, tuple, np.ndarray)):
        flattened = []
        for item in data:
            flattened.extend(flatten_nested(item))
        return flattened
    else:
        return [str(data)]

def list_to_string(data):
    """
    Flattens the nested data structure and returns a comma-separated string.
    """
    flattened = flatten_nested(data)
    return ", ".join(flattened)

def process_split(struct_data, base_path):
    annotations_by_image = defaultdict(list)
    num_images = struct_data.shape[1]  # Assuming struct_data is (1, N)
    
    for i in range(num_images):
        entry = struct_data[0, i]
        folder = entry["folder"][0]
        filename = entry["filename"][0]
        image_path = os.path.join(base_path, folder, filename)
        
        persons = entry["person"]
        if persons.shape[0] == 1:
            persons = persons[0]  # Flatten the array if needed
        
        for j in range(persons.shape[0]):
            person_entry = persons[j]
            bbox = person_entry["body_bbox"][0]
            discrete = person_entry["annotations_categories"][0]
            continuous = person_entry["annotations_continuous"][0]
            # Omitting gender and age as requested
            
            annotations_by_image[image_path].append({
                "bbox": bbox,
                "discrete": discrete,
                "continuous": continuous
            })
    return annotations_by_image

def clamp_bbox(x1, y1, x2, y2, image_shape):
    """
    Clamp the bounding box coordinates to be within the image bounds.
    Returns a tuple (x1, y1, x2, y2) if valid after clamping, or None if it becomes degenerate.
    """
    h, w = image_shape[:2]
    x1_clamped = max(0, x1)
    y1_clamped = max(0, y1)
    x2_clamped = min(w, x2)
    y2_clamped = min(h, y2)
    if x1_clamped >= x2_clamped or y1_clamped >= y2_clamped:
        return None
    return (x1_clamped, y1_clamped, x2_clamped, y2_clamped)

def load_and_process_images(annotations_by_image):
    output_annotations = []  # List to store valid annotations
    image_id_mapping = {}    # Mapping image_path to a unique image_id
    current_id = 0

    for image_path, annotations in annotations_by_image.items():
        if image_path not in image_id_mapping:
            image_id_mapping[image_path] = current_id
            current_id += 1
        image_id = image_id_mapping[image_path]

        image = cv2.imread(image_path)
        if image is None:
            print(f"Error loading image: {image_path}")
            continue

        for annotation in annotations:
            bbox = annotation["bbox"]
            # Convert bbox coordinates to integers
            x1, y1, x2, y2 = map(int, bbox)
            # Clamp bounding box coordinates instead of skipping
            clamped = clamp_bbox(x1, y1, x2, y2, image.shape)
            if clamped is None:
                print(f"Clamped bbox is degenerate for image {image_path}: {bbox}")
                continue
            cx1, cy1, cx2, cy2 = clamped

            output_annotations.append({
                "image_id": image_id,
                "image_path": image_path,
                "bbox": [cx1, cy1, cx2, cy2],
                "discrete": convert_to_list(annotation["discrete"]),
                "continuous": convert_to_list(annotation["continuous"])
            })

    return output_annotations

import os   # make sure os is imported at top

# Set the paths to your .mat file and the base directory for images.
mat_file  = os.path.join("annotations", "Annotations.mat")
base_path = "emotic"

# Load the .mat file data.
mat_data = sio.loadmat(mat_file)

all_annotations = {}

# Process each split (train, val, test) and store the results.
for split in ['train', 'val', 'test']:
    struct_data           = mat_data[split]
    annotations_by_image  = process_split(struct_data, base_path)
    split_annotations     = load_and_process_images(annotations_by_image)
    all_annotations[split]= [convert_to_list(ann) for ann in split_annotations]

# Create a new dictionary with flattened string values for discrete and continuous labels.
json_data = {}
for split, anns in all_annotations.items():
    json_data[split] = []
    for ann in anns:
        ann["discrete"]   = list_to_string(ann["discrete"])
        ann["continuous"] = list_to_string(ann["continuous"])
        json_data[split].append(ann)

# Dump the final annotations to a JSON file in Code/
json_file = "output_annotations.json"
with open(json_file, mode='w', encoding='utf-8') as f:
    json.dump(json_data, f, indent=4)

print(f"Annotations saved to {json_file}")



Annotations saved to output_annotations.json


In [3]:
## import json
import random
from collections import defaultdict

# 1) Load the existing annotations (only 'train' split)
with open("output_annotations.json", "r", encoding="utf-8") as f:
    data = json.load(f)

train_anns = data["train"]  # list of dicts

# 2) Group annotations by image_id
anns_by_image = defaultdict(list)
for ann in train_anns:
    anns_by_image[ann["image_id"]].append(ann)

# 3) Randomly split image_ids into 70/15/15
image_ids = list(anns_by_image.keys())
random.shuffle(image_ids)  # no seed → non‑deterministic

n = len(image_ids)
n_train = int(n * 0.70)
n_val   = int(n * 0.15)
# ensure all images accounted for
n_test  = n - n_train - n_val

train_ids = image_ids[:n_train]
val_ids   = image_ids[n_train : n_train + n_val]
test_ids  = image_ids[n_train + n_val : ]

# 4) Flatten back into three annotation lists
new_splits = {"train": [], "val": [], "test": []}

for img_id in train_ids:
    new_splits["train"].extend(anns_by_image[img_id])
for img_id in val_ids:
    new_splits["val"].extend(anns_by_image[img_id])
for img_id in test_ids:
    new_splits["test"].extend(anns_by_image[img_id])

# 5) Write out combined JSON
with open("train_val_test_split.json", "w", encoding="utf-8") as f:
    json.dump(new_splits, f, indent=4)

print("Wrote new_splits → train_val_test_split.json")


Wrote new_splits → train_val_test_split.json


In [4]:
import json

# Define the 7 universal emotion categories in your desired order
universal_categories = [
    "Anger",
    "Disgust",
    "Fear",
    "Happiness",
    "Sadness",
    "Surprise",
    "Contempt"
]

# Mapping from each of the 26 EMOTIC labels to one of the 7 universal categories
emotic_to_universal = {
    "Anger":       "Anger",
    "Annoyance":   "Anger",
    "Disapproval": "Anger",
    
    "Aversion":    "Disgust",
    "Fear":        "Fear",

    "Affection":   "Happiness",
    "Confidence":  "Happiness",
    "Engagement":  "Happiness",
    "Esteem":      "Happiness",
    "Excitement":  "Happiness",
    "Happiness":   "Happiness",
    "Pleasure":    "Happiness",
    "Peace":       "Happiness",

    "Sadness":     "Sadness",
    "Suffering":   "Sadness",
    "Fatigue":     "Sadness",
    "Pain":        "Sadness",

    "Surprise":            "Surprise",
    "Anticipation":        "Surprise",
    "Doubt/Confusion":     "Surprise",

    "Disconnection":   "Contempt",
    "Disquietment":    "Contempt",
    "Embarrassment":    "Contempt",
    "Sensitivity":      "Contempt",
    "Sympathy":         "Contempt",
    "Yearning":         "Contempt"
}


def build_multihot(discrete_str):
    """
    Convert the comma-separated EMOTIC labels into a 7-dimensional multi-hot vector.
    """
    # Split, strip whitespace
    labels = [lbl.strip() for lbl in discrete_str.split(",") if lbl.strip()]
    # Start with zeros
    vec = [0] * len(universal_categories)
    # For each emotic label, map to universal and set that index to 1
    for lbl in labels:
        uni = emotic_to_universal.get(lbl)
        if uni is None:
            # Warn on unknown labels
            print(f"Warning: '{lbl}' not found in mapping; skipping.")
            continue
        idx = universal_categories.index(uni)
        vec[idx] = 1
    return vec


if __name__ == "__main__":
    # Input and output JSON files
    in_json  = "train_val_test_split.json"
    out_json = "train_val_test_split_with_multihot.json"

    # Load existing splits
    with open(in_json, "r", encoding="utf-8") as f:
        data = json.load(f)

    # Process each split
    for split in ["train", "val", "test"]:
        for entry in data.get(split, []):
            discrete_str = entry.get("discrete", "")
            entry["multi_hot"] = build_multihot(discrete_str)

    # Save updated JSON
    with open(out_json, "w", encoding="utf-8") as f:
        json.dump(data, f, indent=4)

    print(f"Saved updated annotations with multi-hot vectors to '{out_json}'")

Saved updated annotations with multi-hot vectors to 'train_val_test_split_with_multihot.json'
