In [1]:
import json
import os
import pandas as pd    
import pprint
import shutil
import random
import copy

In [2]:
def clean_text(text):
    text = text.replace("Meme Center", "")
    pttns = [".net", ".co", ".com", ".c"]
    new_word_ls = []
    for w in text.split():
        wl = w.lower()
        if not any([wl.strip("]/*@.,!#$^()%-;: ").endswith(p) for p in pttns]):
            new_word_ls.append(w)
    text = " ".join(new_word_ls)
    return text
        
def preprocess_MAMI(
    root="./",
    save_data_jsons=False,
    include_gt_captions=False
):
    # # Process Goatbench
    gb_count = 0
    gb_folder = "../../data/GoatBench/misogyny/"
    GB_items = {}
    new_GB_items = {}
    if os.path.exists(gb_folder):
        gb_path = os.path.join(gb_folder, "test.jsonl")
        f = open(gb_path, 'r')
        for line in f:
            one_data = json.loads(line)
            if one_data["id"] not in GB_items:
                GB_items[one_data["id"]] = {'label': int(one_data["label"]), 'text': one_data["text"]}
            else:
                print("redundancy detected ...")
        gb_save_to = "../../data/GoatBench/data/misogyny/"
        if not os.path.exists(gb_save_to):
            os.makedirs(gb_save_to)
        print("GoatBench Loaded ...")

    save_to = os.path.join(root, 'data')
    if not os.path.exists(save_to):
        os.makedirs(save_to)
    # root = os.path.join(root, 'src')
    image_path = os.path.join(root, 'MAMI_2022_images')

    img_dist = {}
    for sp in ['training', 'test']:
        img_dist[sp] = []
        if os.path.exists(os.path.join(image_path, f"{sp}_images")):
            for img in os.listdir(os.path.join(image_path, f"{sp}_images")):
                img_dist[sp].append(img.split(".")[0])
    splits = ['test', 'dev']
    categories = ['misogynistic', 'non-misogynistic']
    sub_categories = ['shaming', 'stereotype', 'objectification', 'violence']
    #ddict = {t: 0 for t in categories + sub_categories}
    data_statistics = {sp: {'misogynistic':0, 'non-misogynistic':0, 'shaming':0, 'stereotype':0, 'objectification':0, 'violence':0} for sp in splits}
    #print(data_statistics)

    for split in splits:
        sp = split
        if include_gt_captions:
            annotations = {}
            anno_file = os.path.join(f"./caption_annotations", f"{split}.json")
            if os.path.exists(anno_file):
                annotations = json.load(anno_file) # a dict {"12": {'img':, 'gt_caption':}}
        if split == 'dev':
            sp = 'validation'
        
        new_data = []
        file_path = os.path.join(root, f"{sp}.tsv")
        split_df = pd.read_csv(file_path, sep='\t')
        result = split_df.to_json(orient="records")
        ori_data = json.loads(result)
        
        for item in ori_data:
            # remove img
            new_id = item['file_name'].split(".")[0]
            item['id'] = new_id
            item.pop('file_name')
            if item['label'] == 1:
                data_statistics[split]['misogynistic'] += 1
            else:
                data_statistics[split]['non-misogynistic'] += 1
            
            item["sub_label"] = []
            for t in sub_categories:
                if item[t] == 1:
                    item["sub_label"].append(t)
                    data_statistics[split][t] += 1
                    item.pop(t)
                else:
                    item.pop(t)
            
            if include_gt_captions:
                item['gt_description'] = ""
                if annotations and (item["id"] in annotations):
                    item['gt_description'] = annotations[item["id"]]["gt_caption"]
            
            # -------------------------- copy and move imgage -------------------------- #
            label_folder = item["sub_label"][-1] if item["sub_label"] else "0"
            new_split_image_path = os.path.join(image_path, split, label_folder)
            if not os.path.exists(new_split_image_path):
                os.makedirs(new_split_image_path)
            new_img_path = os.path.join(image_path, split, label_folder, f"{new_id}.png")
            if not os.path.exists(new_img_path):
                ori_img_path = ""
                for sp_dir, img_ls in img_dist.items():
                    if new_id in img_ls:
                        ori_img_path = os.path.join(image_path, f"{sp_dir}_images", f"{new_id}.jpg")
                        assert os.path.exists(ori_img_path)
                        break
                assert ori_img_path != ""
                #os.rename(this_img_path, new_img_path)
                shutil.copy(ori_img_path, new_img_path)

            item['img'] = os.path.join('./data/MAMI', 'MAMI_2022_images', split, label_folder, f"{new_id}.png")
            # -------------------------- copy and move imgage -------------------------- #
            caption = item.pop("text")
            item["text"] = clean_text(caption)
            new_data.append(item)
            if GB_items:
                if new_id in list(GB_items.keys()):
                    ori_item = copy.deepcopy(GB_items[new_id])
                    ori_text = ori_item["text"]
                    ori_text = clean_text(ori_text)
                    if set(item['text'].split()) <= set(ori_text.split()):
                        if new_id not in new_GB_items:
                            gb_count += 1
                            one_gb = copy.deepcopy(item)
                            one_gb.pop("label")
                            ori_item.pop("text")
                            one_gb["task"] = "mami"
                            new_GB_items[new_id] = dict(**ori_item, **one_gb)
                            if split in ["train", "dev"]:
                                print(f"one {split} entry...")
                    else:
                        print("same id but different caption ...")
                        clean_ori_text = item['text']
                        print(f"sp: {split}| id: {new_id}\nori caption = {clean_ori_text}\n goatbench caption = {ori_text}\n********************")
                        #break
        
        if save_data_jsons:
            json.dump(new_data, open(os.path.join(save_to, f'{split}.json'), 'w'), indent=4)
        print(f"{split} finished! #data = {len(new_data)}")
        #print(pprint.pformat(data_statistics))
    print(pprint.pformat(data_statistics))
    if save_data_jsons and new_GB_items:
        print(f"#GOATBENCH Instances = {gb_count} = {len(new_GB_items)}")
        json.dump([item for _, item in new_GB_items.items()], open(os.path.join(gb_save_to, f'test.json'), 'w'), indent=4)

preprocess_MAMI(save_data_jsons=True, include_gt_captions=False)

GoatBench Loaded ...
test finished! #data = 1000
dev finished! #data = 1000
{'dev': {'misogynistic': 500,
         'non-misogynistic': 500,
         'objectification': 228,
         'shaming': 123,
         'stereotype': 276,
         'violence': 100},
 'test': {'misogynistic': 500,
          'non-misogynistic': 500,
          'objectification': 348,
          'shaming': 146,
          'stereotype': 350,
          'violence': 153}}
#GOATBENCH Instances = 1000 = 1000


### Generate annotation sheets

In [3]:
def mami_gen_annotation_sheet(source_paths):
    sep = "mami/"
    save_to = "./caption_annotations"
    if not os.path.exists(save_to):
        os.makedirs(save_to)
    splits = list(set([src.split(sep)[1].split("/")[0] for src in source_paths]))
    splits_save_to = {sp: os.path.join(save_to, f'{sp}.json') for sp in splits}
    #data_to_annotate = {sp: {} for sp in splits}
    data_to_annotate = {}
    for sp, path in splits_save_to.items():
        if os.path.exists(path):
            data_to_annotate[sp] = json.load(open(path))
        else:
            data_to_annotate[sp] = {}
    for src in source_paths:
        split = src.split(sep)[1].split("/")[0]
        for item in json.load(open(src)):
            if item['id'] not in data_to_annotate[split]:
                data_to_annotate[split][item['id']] = {
                    'img': item['img'],
                    'gt_caption': ''''''
                }
    for sp, data in data_to_annotate.items():
        json.dump(data, open(splits_save_to[sp], 'w'), indent=4)
        print(f"#data to be annotated in {sp}: {len(data)}")
    return

sources = [
    "/data/fengjun/projects/LLM/meme/HMC/results/mami/test/seed-42/qwen2.5-14bf/D6_llava1.6-7bf_len-1024_GPU-2_20250316015122/best/79.91/round-1_StepDecision-vio-vio/incorrect.json",
    "/data/fengjun/projects/LLM/meme/HMC/results/mami/test/seed-42/qwen2.5-14bf/D6_llava1.6-7bf_len-1024_GPU-2_20250316015122/best/79.4/round-1_StepDecision-vio-vio/incorrect.json",
    "/data/fengjun/projects/LLM/meme/HMC/results/mami/test/seed-42/qwen2.5-14bf/D6_llava1.6-7bf_len-1024_GPU-2_20250316015122/history/1-round-1_Decision-v0-v0/incorrect.json"
]
mami_gen_annotation_sheet(sources)

#data to be annotated in test: 310
