## LLM classifier

In [14]:
from dotenv import load_dotenv
from typing import List, Dict
import os
load_dotenv()

from openai import OpenAI



# -------------------------
# Config
# -------------------------
ANNOTATION_GLOB = "prompts/annotation_guidelines"
OUT_CSV = "antisemitism_placeholders_dataset.csv"

NUM_POSTS_PER_DEFINITION = 1
TASK_PROMPT = f"""Generate {NUM_POSTS_PER_DEFINITION} social media post templates that is antisemitic. IMPORTANT, make sure this post complies with the definition below of antisemitism.
First, read the definition carefully. Then, generate the post. 
keep the posts short (1-2 sentences), and make them as realistic as possible. 
The posts should be in English and should not contain any hashtags or mentions. Each post should be different in terms of content and style."""


def call_llm(messages: List[Dict[str, str]], provider: str, model: str) -> Dict:
    if provider == "grok":
        client = OpenAI(
        api_key=os.getenv("GROK_API_KEY"),
        base_url="https://api.x.ai/v1",
        )
    elif provider == "openai":
        client = OpenAI()
    else:
        raise ValueError(f"Unknown provider: {provider}")
    resp = client.chat.completions.create(
        model=model,
        temperature=0.2,
        messages=messages,
        max_tokens=1000,
    )
    txt = resp.choices[0].message.content
    return txt,resp




from pathlib import Path
def load_definitions(definitions_dir: str) -> Dict[str, str]:
    """Load antisemitism definitions from markdown files."""
    definitions = {}
    definitions_path = Path(definitions_dir)
    
    if not definitions_path.exists():
        raise FileNotFoundError(f"Directory {definitions_dir} not found")
    
    for md_file in definitions_path.glob("*.md"):
        definition_name = md_file.stem
        with open(md_file, 'r', encoding='utf-8') as f:
            definitions[definition_name] = f.read().strip()
    return definitions


annotations = load_definitions(ANNOTATION_GLOB)

In [15]:
import pandas as pd
"""
‘TweetID’: Represents the tweet ID. 

‘Username’: Represents the username who published the tweet.  

‘Text’: Represents the full text of the tweet (not pre-processed).

‘CreateDate’: Represents the date the tweet was created.  

‘Biased’: Represents the labeled by our annotations if the tweet is antisemitic or non-antisemitic. 

‘Keyword’: Represents the keyword that was used in the query. The keyword can be in the text, including mentioned names, or the username.  

https://zenodo.org/records/7932888
 
"""
twitter_df = pd.read_csv("GoldStanderDataSet.csv",encoding='cp1252')
twitter_df.head()


Unnamed: 0,TweetID,Username,Text,CreateDate,Biased,Keyword
0,1228740093357092865,Celtic_Films,AIPAC should be registered as a foreign agent ...,2020-02-15 17:57:21+00:00,1,Israel
1,1239547900012589056,zariths__,"RT @qiss0rkid: go to israel pls , we don't nee...",2020-03-16 13:43:43+00:00,1,Israel
2,1216559517887954945,kelownascott,"The world, including Canada, has given Israel ...",2020-01-13 03:16:06+00:00,1,Israel
3,1217123508754534400,KeishaJake,These children avoid walking over the US &amp;...,2020-01-14 16:37:12+00:00,0,Israel
4,1232258532273090560,amit2nirvana,RT @IDF: The last 24 hours in Israel: https://...,2020-02-25 10:58:23+00:00,0,Israel


In [16]:
twitter_df["Keyword"].value_counts()

Keyword
Jews       4605
Israel     1524
ZioNazi     529
Kikes       283
Name: count, dtype: int64

In [24]:
samples = twitter_df[twitter_df["Biased"]==1].sample(1, random_state=42)
samples


Unnamed: 0,TweetID,Username,Text,CreateDate,Biased,Keyword
6036,1465147721724178432,rulajebreal,Herzog lit Hanukkah candles in the heart of th...,2021-11-29 02:36:52+00:00,1,Jews


In [None]:
# =========================
# Minimal Grok classifier pipeline
# =========================
from __future__ import annotations
from typing import Dict, Tuple
import json
import os
import re
import pandas as pd
from pathlib import Path
from openai import OpenAI

# -------------------------
# Config
# -------------------------
ANNOTATION_GLOB = "prompts/annotation_guidelines"
MODEL = "grok-3-mini"  # use only Grok
CSV_PATH = "GoldStanderDataSet.csv"  # update if needed

# -------------------------
# Load defs
# -------------------------
def load_definitions(definitions_dir: str) -> Dict[str, str]:
    d: Dict[str, str] = {}
    p = Path(definitions_dir)
    if not p.exists():
        raise FileNotFoundError(f"Directory {definitions_dir} not found")
    for md in p.glob("*.md"):
        d[md.stem] = md.read_text(encoding="utf-8").strip()
    if not d:
        raise RuntimeError("No definitions found")
    return d

annotations = load_definitions(ANNOTATION_GLOB)

# -------------------------
# Prompt builder
# -------------------------
CLASSIFIER_SYSTEM = """You are a careful text classifier.

You will be given:
- a JSON object named "annotations" mapping names to definitions
- a "text" string to classify

Task:
- Pick exactly one key from annotations as the best match for text.
- If several could fit, choose the stricter or more specific one.
- Output JSON only, no extra words.

Output schema:
{
  "answer": "<one key from annotations>",
  "description": "<1 to 3 sentences explaining the choice>"
}

Rules:
- answer must be exactly one of annotations keys
- description must cite concrete cues from the text
"""

def generate_prompt(text: str, annotations: Dict[str, str]) -> str:
    """Return the user prompt content as one string (easy to log)."""
    return (
        "annotations:\n"
        + json.dumps(annotations, ensure_ascii=False, indent=2)
        + "\n\ntext:\n"
        + text
        + "\n\nRespond with JSON only matching the schema."
    )

# -------------------------
# Grok client + call
# -------------------------
def grok_client() -> OpenAI:
    # Needs GROK_API_KEY in env
    return OpenAI(api_key=os.getenv("GROK_API_KEY"), base_url="https://api.x.ai/v1")

def llm(prompt: str, model: str = MODEL) -> str:
    """Send a single-turn chat with fixed system prompt. Return text."""
    client = grok_client()
    resp = client.chat.completions.create(
        model=model,
        temperature=0.0,
        max_tokens=600,
        messages=[
            {"role": "system", "content": CLASSIFIER_SYSTEM},
            {"role": "user", "content": prompt},
        ],
    )
    # return resp.choices[0].message.content or ""
    return resp

# -------------------------
# Robust JSON extraction
# -------------------------
def _first_json_object(s: str) -> str:
    """Extract the first top-level JSON object by tracking braces. No regex recursion."""
    start = s.find("{")
    if start == -1:
        raise ValueError("No '{' found in response")
    depth, i, in_str, esc = 0, start, False, False
    while i < len(s):
        ch = s[i]
        if in_str:
            if esc:
                esc = False
            elif ch == "\\":
                esc = True
            elif ch == '"':
                in_str = False
        else:
            if ch == '"':
                in_str = True
            elif ch == "{":
                depth += 1
            elif ch == "}":
                depth -= 1
                if depth == 0:
                    return s[start : i + 1]
        i += 1
    raise ValueError("Unbalanced JSON braces in response")

def structure_output(raw_resp: str) -> str:
    """Return the JSON text only, stripping any extra prose."""
    raw_resp = raw_resp.strip()
    try:
        # Fast path: already clean JSON
        json.loads(raw_resp)
        return raw_resp
    except Exception:
        return _first_json_object(raw_resp)

def extract_pred_and_desc(json_text: str, allowed_keys: Dict[str, str]) -> Tuple[str, str]:
    """Parse JSON and validate 'answer' against provided annotations keys."""
    obj = json.loads(json_text)
    answer = obj.get("answer", "")
    desc = obj.get("description", "")
    if answer not in allowed_keys:
        # Choose a stable fallback so the pipeline does not crash
        answer = next(iter(allowed_keys.keys()))
        if not desc:
            desc = "Model answer not in annotation keys; fell back to the first key."
    return answer, desc

# -------------------------
# One-sample demo (simple to debug)
# -------------------------
# Load data (only once)
twitter_df = pd.read_csv(CSV_PATH, encoding="cp1252", low_memory=False)

# Pick one positive example
one_sample_df = twitter_df[twitter_df["Biased"] == 1].sample(1, random_state=42)
row = one_sample_df.iloc[0]
text = str(row["Text"])
keyword = row.get("Keyword", None)

# Build prompt -> call Grok -> parse -> print
prompt = generate_prompt(text, annotations)
print(prompt)
resp = llm(prompt, MODEL)
print(resp)
resp_text = structure_output(resp)
# prediction, desc = extract_pred_and_desc(resp_text, annotations)

# print("TEXT:", text)
# print("KEYWORD:", keyword)
# print("PREDICTION:", prediction)
# print("DESC:", desc)

# # -------------------------
# # If you want a tiny loop to make a DF from N samples:
# # -------------------------
# def classify_samples_df(samples_df: pd.DataFrame, annotations: Dict[str, str]) -> pd.DataFrame:
#     rows = []
#     for _, r in samples_df.iterrows():
#         t = str(r["Text"])
#         k = r.get("Keyword", None)
#         p = generate_prompt(t, annotations)
#         raw = llm(p, MODEL)
#         try:
#             jt = structure_output(raw)
#             pred, d = extract_pred_and_desc(jt, annotations)
#         except Exception as e:
#             pred, d = "", f"Parse error: {e}"
#         rows.append({"Text": t, "Keyword": k, "Definition_pred": pred, "description": d})
#     return pd.DataFrame(rows, columns=["Text", "Keyword", "Definition_pred", "description"])

# Example:
# samples = twitter_df[twitter_df["Biased"] == 1].sample(5, random_state=42)
# out_df = classify_samples_df(samples, annotations)
# print(out_df)


annotations:
{
  "Jerusalem Declaration": "# Annotation Guidelines for Identifying Antisemitism Based on the Jerusalem Declaration\n\n## 1. Task Description\n\n### Overview\nThese guidelines are designed to help annotators identify antisemitic content based on the Jerusalem Declaration on Antisemitism (JDA). The JDA provides a clear, concise, and historically-informed definition that aims to strengthen the fight against antisemitism while protecting space for legitimate political discourse, particularly regarding Israel and Palestine.\n\n### Purpose\nThe purpose of these annotations is to accurately identify content that constitutes discrimination, prejudice, hostility, or violence against Jews as Jews, while distinguishing this from legitimate criticism of states, policies, or political ideologies. These guidelines will help create consistent, nuanced assessments that recognize both the serious nature of antisemitism and the importance of protecting free political speech.\n\n### What 