In [3]:
!pip install OpenAI tenacity

Collecting OpenAI
  Downloading openai-1.45.0-py3-none-any.whl.metadata (22 kB)
Collecting httpx<1,>=0.23.0 (from OpenAI)
  Downloading httpx-0.27.2-py3-none-any.whl.metadata (7.1 kB)
Collecting jiter<1,>=0.4.0 (from OpenAI)
  Downloading jiter-0.5.0-cp310-cp310-manylinux_2_17_x86_64.manylinux2014_x86_64.whl.metadata (3.6 kB)
Collecting httpcore==1.* (from httpx<1,>=0.23.0->OpenAI)
  Downloading httpcore-1.0.5-py3-none-any.whl.metadata (20 kB)
Collecting h11<0.15,>=0.13 (from httpcore==1.*->httpx<1,>=0.23.0->OpenAI)
  Downloading h11-0.14.0-py3-none-any.whl.metadata (8.2 kB)
Downloading openai-1.45.0-py3-none-any.whl (374 kB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m374.1/374.1 kB[0m [31m12.5 MB/s[0m eta [36m0:00:00[0m
[?25hDownloading httpx-0.27.2-py3-none-any.whl (76 kB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m76.4/76.4 kB[0m [31m2.5 MB/s[0m eta [36m0:00:00[0m
[?25hDownloading httpcore-1.0.5-py3-none-any.whl (77 kB)
[2K   [90m━

In [4]:
#another method of calling openai
from openai import OpenAI, BadRequestError
from openai.types.chat import ChatCompletion
from tenacity import retry, stop_after_attempt, wait_random_exponential
import time
from typing import Optional

class MinimumDelay:
    def __init__(self, delay: float | int):
        self.delay = delay
        self.start = None

    def __enter__(self):
        self.start = time.time()

    def __exit__(self, exc_type, exc_val, exc_tb):
        end = time.time()
        seconds = end - self.start
        if self.delay > seconds:
            time.sleep(self.delay - seconds)

@retry(wait=wait_random_exponential(min=1, max=90), stop=stop_after_attempt(3))
def chat(client: OpenAI, delay: float | int, **kwargs) -> ChatCompletion | None:
    try:
        with MinimumDelay(delay):
            return client.chat.completions.create(**kwargs)
    except BadRequestError as e:
        print(f"Bad Request: {e}")
        if "safety" in e.message:
            return None
        raise e
    except Exception as e:
        print(f"Exception: {e}")
        raise e

In [5]:
def read_jsonl(path):
    with open(path, "r") as f:
        for line in f:
            line = line.strip()
            if line:
                ex = json.loads(line)
                yield ex

def write_jsonl(path, data):
    with open(path, "w") as f:
        for ex in data:
            f.write(json.dumps(ex) + "\n")

# Function to encode the image
def encode_image(image_path):
    with open(image_path, "rb") as image_file:
        return base64.b64encode(image_file.read()).decode("utf-8")

In [6]:
import base64
import requests
import json
import os
from tqdm import tqdm
#define your api_key
client = OpenAI(api_key, timeout=90)

In [None]:
import pandas as pd

def csv_to_jsonl(csv_file, jsonl_file):

    df = pd.read_csv(csv_file, sep='\t')
    with open(jsonl_file, 'w') as f:
        for index, row in df.iterrows():
            f.write(row.to_json() + '\n')

csv_to_jsonl('test/Test.csv', 'test_miso.jsonl')

In [None]:
preds=[]
seen_ids=set()
skipped=[]

In [None]:
# Path to your image
image_path = 'test/'
data_path = 'test_miso.jsonl'
demo_image_path = 'TRAINING/'

sys = '''You are an expert linguistic assistant.
Frames of communication select particular aspects of an issue and make them salient in communicating a message.
Frames of communication are ubiquitous in social media discourse and can impact how people understand issues and, more importantly, how they form their opinions.
You will be tasked with identifying and articulating misogyny framings on memes.
Misogyny is defined as dislike of, contempt for, or ingrained prejudice against women.'''

num_demos=3
demos_path='miso_demos.jsonl'
demos=list(read_jsonl(demos_path))
demos=demos[:num_demos]


def add_m(message,base64_image,msg=None,demo=True):
  p='''You will be tasked with identifying and articulating misogyny framings on the following memes. You should discuss your reasoning first, and then provide a final decision. Each image provided may or may not contain one or more framings, so your first step is
  (a) Reason about whether the image contains a framing (or more framings), or just states something factual or an experience. If the image contains a framing, the next step is \n(b) Articulate that framing succinctly.\nYou will perform these steps until the answer to (a) is false, either because there are no framings in the image, or because you have already expressed all the framings.'''

  if demo:
    message.append({"role":"user","content":[{"type":"image_url","image_url":{"url": f"data:image/jpeg;base64,{base64_image}"}},{"type":"text","text":p}]})
    message.append({"role":"assistant","content":[{"type":"text","text":msg}]})
  else:
     message.append({"role":"user","content":[{"type":"image_url","image_url":{"url": f"data:image/jpeg;base64,{base64_image}"}},{"type":"text","text":p}]})
  return message

message=[{"role": "system", "content": sys},]

for d in demos:
  d_id=d["file_name"]
  d_img=os.path.join(demo_image_path, d_id)
  base64_image = encode_image(d_img)
  message=add_m(message,base64_image,d["rationale"]+d["frame"])

message1=message.copy()

examples=list(read_jsonl(data_path))

for ex in tqdm(examples):
  ex_id=ex["file_name"]
  if ex_id in seen_ids:
    continue
  ex_img=os.path.join(image_path, ex_id)
  base64_image = encode_image(ex_img)
  message=add_m(message,base64_image,demo=False)
  #print(message)

  completion = chat(
                client,
                delay=1,
                model='gpt-4o',
                messages=message,
                max_tokens=512,
                temperature=1.0,
                top_p=0.7,
                seed=0,
            )

  if completion is None:
    print(f"Skipping example due to API safety error: {ex_id}")
    skipped.append(ex_id)
    seen_ids.add(ex_id)
    continue
  content = completion.choices[0].message.content
  if content == '':
    skipped.append(ex_id)
  else:
    preds.append({"id": ex_id,"content": content})
  seen_ids.add(ex_id)
  message=message1.copy()






100%|██████████| 1000/1000 [1:59:30<00:00,  7.17s/it]


In [None]:
import re

def extract_problems2(reasoning):
    p=[]
    for r in reasoning:
        r=r.lower()
        i=r.find("of")
        j=r.find("arises")
        if i!=-1 and j!=-1:
            p.append(r[i+len("of")+1:j].strip())
    return p

def extract_reason(message):
    if message == '':
        return []
    found_frames = []
    message = re.sub(r'\n\n', r'\n', message)
    message_list = message.split("\n") if "\n" in message else message.split("\\n")
    #reasoning = None
    for line in message_list:
        line = line.strip()
        if not line:
            continue
        try:
            m_id = line.split(":")[0]
            if m_id is not None:
                try:
                    mf_id, mt = m_id.split(".")
                except:
                    mt=m_id
                    mf_id=1
        except:
            return []
        content = line[len(m_id) + 1 :].strip()
        if mt == "a":
            #reasoning = content
            found_frames.append(content)
    return found_frames

def extract_frames(message):
    if message == '':
        return []
    found_frames = []
    message = re.sub(r'\n\n', r'\n', message)
    message=re.sub(r'\\n', r'\n', message)
    message_list = message.split("\n") if "\n" in message else message.split("\\n")
    for line in message_list:
        line = line.strip()
        if not line:
            continue
        try:
            m_id = line.split(":")[0]
            if m_id is not None:
                try:
                    mf_id, mt = m_id.split(".")
                except:
                    mt=m_id
                    mf_id=1
        except:
            return []
        content = line[len(m_id) + 1 :].strip()
        if mt == "b":
            found_frames.append(content)
    return found_frames

In [None]:
write_jsonl('test_preds.jsonl',preds)

In [None]:

preds=list(read_jsonl('test_preds.jsonl'))
pred_path='test_predictions'
os.makedirs(pred_path,exist_ok=True)
all_frames=[]
annotations = []
for j in preds:
    local_ann=[]
    r=extract_reason(j['content'])
    f=extract_frames(j['content'])
    for reason,frame in zip(r,f):
        all_frames.append({'reasoning':reason,'text':frame})
        local_ann.append({'reasoning':reason,'text':frame})
    annotations.append({"articulations":local_ann,"id":j['id']})

seen = set()
dup_count = defaultdict(int)
unique_count=0
unique_dict = {}
unique_articulations = []
for i,f in enumerate(all_frames):
    dup_count[f['text']] += 1
    if f['text'] in seen:
        continue
    seen.add(f['text'])
    unique_articulations.append(f)
    unique_dict[i]=unique_count
    unique_count+=1
for frame in unique_articulations:
    frame['count']=dup_count[frame['text']]

write_jsonl(os.path.join(pred_path, "articulations-full.jsonl"), all_frames)
write_jsonl(os.path.join(pred_path, "articulations-unique.jsonl"), unique_articulations,)
write_jsonl(os.path.join(pred_path, "articulation-annotations.jsonl"), annotations,)

Evaluation

In [None]:
import json
def build_parent_dict(subproblem_parent_pairs):
    """Create a dictionary mapping subproblems to their parents."""
    parent_dict = {}
    for subproblem, parent in subproblem_parent_pairs:
        parent_dict[subproblem] = parent
    return parent_dict

def find_root_ancestor(subproblem, parent_dict):
    """Recursively or iteratively find the root ancestor of a given subproblem."""
    current = subproblem
    while current in parent_dict and parent_dict[current] != current:
        current = parent_dict[current]
    return current


subproblems=list(read_jsonl('subproblems_1c.jsonl'))

subproblems_list=[]
for subproblem in subproblems:

  if 'subproblem(' not in subproblem['response']:
    continue
  pt=(subproblem['response'].split('subproblem(')[1]).split(')')[0].split(', ')
  if len(pt)>1:
    subproblems_list.append(tuple(pt))

parent_dict = build_parent_dict(subproblems_list)

In [None]:
def extract_problems2(r):
    r=r.lower()
    i=r.find("of")
    j=r.find("arises")
    if i!=-1 and j!=-1:
      return r[i+len("of")+1:j].strip()

In [None]:
with open('test_labels.txt','r') as f:
  all_labels={}
  lines=f.readlines()
  for line in lines:
    a=line.split('\t')
    all_labels[a[0]]=[int(x) for x in a[1:]]
  f.close()


In [None]:
#task b for their annotation
from collections import defaultdict
import os
updated_merged_dict=read_from_jsonl_ind('updated_merged_dict_final.jsonl')
labels=['objectification','shaming','stereotyping','violence']
labels_map={}
labels_dict={}
non_miso_list=[]
for i in read_jsonl('test_predictions/articulation-annotations.jsonl'):
  label=[]
  frames=i['articulations']
  if len(frames)==0:
    non_miso_list.append(i['id'])
    continue
  for j in frames:
    problem=extract_problems2(j['reasoning'])
    if problem:
      root=find_root_ancestor(problem,parent_dict)

      if root not in labels:
        label.append('none')
      else:
        label.append(root)
    else:
      continue
  if len(label)==0:
    non_miso_list.append(i['id'])
  else:
    labels_dict[i['id']]=label




#71

In [None]:
org_non_miso=[]
for g in all_labels.keys():
  if all_labels[g][0]==0:
    org_non_miso.append(g)

In [None]:
for key, item in labels_dict.items():
  temp=[1,0,0,0,0]
  if 'shaming' in item:
    temp[1]=1
  if 'stereotyping' in item:
    temp[2]=1
  if 'objectification' in item:
    temp[3]=1
  if 'violence' in item:
    temp[4]=1
  labels_dict[key]=temp

for id in non_miso_list:
  labels_dict[id]=[0,0,0,0,0]


In [None]:
import pandas as pd
from sklearn.metrics import f1_score
pred_df=pd.read_csv('pred_df.csv')
gold_df=pd.read_csv('gold_df.csv')
pred=pred_df[['shaming','stereotyping','objectification','violence']].values.tolist()
gold=gold_df[['shaming','stereotyping','objectification','violence']].values.tolist()
print(f1_score(gold,pred,average='weighted'))

In [None]:
#task a
def check_zeros(lists):
    result = []
    for lst in lists:
        if all(x == 0 for x in lst):
            result.append(0)
        else:
            result.append(1)
    return result
gold=check_zeros(gold)
pred=check_zeros(pred)
print(f1_score(gold,pred,average='macro'))

0.8849396033847943


In [None]:
import pandas as pd
pred_df=pd.DataFrame(columns=['file_name','shaming','stereotyping','objectification','violence'])
i=0
for x,f in zip(gold,files):
  pred_df.loc[i,'shaming']=x[0]
  pred_df.loc[i,'stereotyping']=x[1]
  pred_df.loc[i,'objectification']=x[2]
  pred_df.loc[i,'violence']=x[3]
  pred_df.loc[i,'file_name']=f
  i+=1

pred_df.to_csv('gold_df.csv')