# Import and helper functions

In [None]:
import json
import pandas as pd
import re
from collections import Counter
import numpy as np

In [None]:
# Get the distribution of frames for each period

def get_frame_distribution(df, target_lemma):

    span = len(target_lemma)
    corpora = sorted(df["corpus"].dropna().unique())
    frames_with_lemma = {}

    re_pattern = re.compile(rf"(?<!\w){re.escape(target_lemma.lower())}(?!\w)")

    for corpus in corpora:
        corpus_df = df[df['corpus'] == corpus]
        frames_with_lemma[corpus] = []
        
        for _, row in corpus_df.iterrows():
            org_sent = (row.get("sentence") or "")
            frames = row.get("frames") or []
            
            for frame in frames:
                # ATTENTION: To obtain trigger only or frame element only, simply comment out one of the two sections below.
                
                # Add frame if the target word is the trigger of the frame
                trigger_start = frame.get("trigger_location", None)
                if isinstance(trigger_start, int):
                    trigger_end = trigger_start + span
                    if org_sent[trigger_start:trigger_end].lower() == target_lemma.lower():
                        frames_with_lemma[corpus].append(frame.get("name"))
                        continue
                
                # Add frame if the target word appears in any of the frame elements
                for element in (frame.get("frame_elements") or []):
                    txt = (element.get("text") or "").lower()
                    if re_pattern.search(txt):
                        frames_with_lemma[corpus].append(frame.get("name"))
                        break

    return frames_with_lemma

In [None]:
import re
import pandas as pd

def get_frames_target(df, target_lemma, output_csv=None):
    if output_csv is None:
        raise ValueError("output_csv must be provided")

    target = (target_lemma or "").lower()
    span = len(target)

    # word-boundary an toàn hơn \b trong một số trường hợp
    re_pattern = re.compile(rf"(?<!\w){re.escape(target)}(?!\w)")

    records = []

    for _, row in df.iterrows():
        org_sent = (row.get("sentence") or "")
        org_sent_l = org_sent.lower()

        frames = row.get("frames") or []
        target_lemma_frames = []

        for frame in frames:
            # ATTENTION: To obtain trigger only or frame element only, simply comment out one of the two sections below.

            # target is trigger
            trigger_start = frame.get("trigger_location", None)
            if isinstance(trigger_start, int) and 0 <= trigger_start <= len(org_sent) - span:
                trigger_end = trigger_start + span
                if org_sent_l[trigger_start:trigger_end] == target:
                    target_lemma_frames.append(frame.get("name"))
                    continue

            # target is amongst the frame elements
            for element in (frame.get("frame_elements") or []):
                txt = (element.get("text") or "").lower()
                if re_pattern.search(txt):
                    target_lemma_frames.append(frame.get("name"))
                    break

        records.append({
            "subfolder": row.get("corpus"),
            "id": org_sent,
            "frames": target_lemma_frames,  # list
        })

    output_df = pd.DataFrame(records, columns=["subfolder", "id", "frames"])

    # Keep only rows where frames is a non-empty list
    output_df = output_df[
        output_df["frames"].apply(lambda x: isinstance(x, list) and len(x) > 0)
    ].reset_index(drop=True)

    # Add target column as a 1-element list
    output_df["target"] = output_df.index.map(lambda _: [target_lemma])

    # Explode frames: ['a','b'] -> two rows; then wrap each into ['a'], ['b']
    output_df = output_df.explode("frames", ignore_index=True)
    output_df["frames"] = output_df["frames"].apply(
        lambda x: [x] if pd.notna(x) else []
    )

    # (safety) drop any empty frames rows
    output_df = output_df[output_df["frames"].apply(len) > 0].reset_index(drop=True)

    output_df.to_csv(output_csv, index=False)
    return output_df


In [None]:
from collections import Counter
import numpy as np

def jsd_from_lists(list_p, list_q, log_base=2, min_freq=1):
    """
    Jensen–Shannon divergence between two empirical distributions (lists of labels).

    min_freq: keep items whose *combined* count across both periods is >= min_freq.
              (i.e., count_p[item] + count_q[item] >= min_freq)

    Returns: jsd, support, p, q
    """
    c_p, c_q = Counter(list_p), Counter(list_q)

    # joint support (before filtering)
    support_all = set(c_p) | set(c_q)

    # filter by combined frequency across both periods
    if min_freq is None:
        min_freq = 1
    if min_freq < 1:
        raise ValueError("min_freq must be >= 1 (or None).")

    support = sorted(
        k for k in support_all
        if (c_p.get(k, 0) + c_q.get(k, 0)) >= min_freq
    )

    # if nothing survives filtering, return NaN (or change to 0.0 if you prefer)
    if not support:
        return float("nan"), [], np.array([]), np.array([])

    p = np.array([c_p.get(k, 0) for k in support], dtype=float)
    q = np.array([c_q.get(k, 0) for k in support], dtype=float)

    sp, sq = p.sum(), q.sum()
    
    # if one side becomes empty after filtering, JSD is not well-defined as written
    if sp == 0 or sq == 0:
        return float("nan"), support, np.array([]), np.array([])

    p = p / sp
    q = q / sq
    m = 0.5 * (p + q)

    def kl(a, b):
        mask = a > 0
        return np.sum(a[mask] * np.log(a[mask] / b[mask]))

    jsd = 0.5 * kl(p, m) + 0.5 * kl(q, m)

    if log_base == 2:
        jsd = jsd / np.log(2)

    return jsd, support, p, q


# Main

In [None]:
selected_lemmas = [
        "attack_nn","bag_nn","ball_nn","bit_nn","chairman_nn","circle_vb","contemplation_nn","donkey_nn",
        "edge_nn","face_nn","fiction_nn","gas_nn","graft_nn","head_nn","land_nn","lane_nn","lass_nn",
        "multitude_nn","ounce_nn","part_nn","pin_vb","plane_nn","player_nn","prop_nn","quilt_nn","rag_nn",
        "record_nn","relationship_nn","risk_nn","savage_nn","stab_nn","stroke_vb","thump_nn","tip_vb",
        "tree_nn","twist_nn","word_nn"
    ]

In [None]:
jsd_results = {}

for lemma in selected_lemmas:
    # Load total results
    total_results_path = f'... /{lemma}_lemma_FrameNet_parsed.jsonl' # <-- Replace with the actual path
    total_results_df = pd.read_json(total_results_path, lines=True)
    
    # Get the distribution of frames
    frames_distribution = get_frame_distribution(total_results_df, lemma)
    get_frames_target(total_results_df, lemma, output_csv=f'... /{lemma}_lemma_frames.csv') # <-- Replace with the actual path
    
    # Calculate JSD for each each lemma
    jsd, support, p, q = jsd_from_lists(
        frames_distribution["corpus_1"],
        frames_distribution["corpus_2"],
        log_base=2,
        )
    jsd_results[lemma] = jsd

In [None]:
jsd_results

In [None]:
from pathlib import Path
import numpy as np
from scipy.stats import spearmanr

# --- inputs ---
gold_path_task2 = Path("... /semeval2020_ulscd_eng/truth/graded.txt") # <-- Replace with the actual path
gold_path_task1 = Path("... /semeval2020_ulscd_eng/truth/binary.txt") # <-- Replace with the actual path


# Parse gold files
gold_task2 = {}
with gold_path_task2.open("r", encoding="utf-8") as f:
    for line in f:
        line = line.strip()
        if not line:
            continue
        k, v = line.split()[:2]
        gold_task2[k] = float(v)

gold_task1 = {}
with gold_path_task1.open("r", encoding="utf-8") as f:
    for line in f:
        line = line.strip()
        if not line:
            continue
        k, v = line.split()[:2]
        gold_task1[k] = int(float(v))  # robust if 0.0/1.0


# Task2 Spearman (graded)
keys2 = sorted(set(gold_task2) & set(jsd_results))
x = [gold_task2[k] for k in keys2]
y = [jsd_results[k] for k in keys2]
rho, p = spearmanr(x, y)
print(f"[Task2] n={len(keys2)}  spearman_rho={rho:.6f}")

# Task1 Accuracy with fixed threshold: JSD >= 0.5 => 1 else 0
thr = 0.5
keys1 = sorted(gold_task1.keys())

gold_bin = np.array([gold_task1[k] for k in keys1], dtype=int)
preds = np.array([1 if jsd_results.get(k, float("-inf")) >= thr else 0 for k in keys1], dtype=int)

acc = (preds == gold_bin).mean()
coverage = sum(k in jsd_results for k in keys1)
print(f"[Task1 JSD>=0.5] n={len(keys1)}  coverage={coverage}/{len(keys1)}  acc={acc:.6f}")


In [None]:
from __future__ import annotations

from math import ceil
from typing import Dict, Tuple, List, Optional
import pandas as pd
import re

def load_scores_txt(path: str) -> dict[str, float]:
    scores = {}
    with open(path, "r", encoding="utf-8") as f:
        for line in f:
            line = line.strip()
            if not line:
                continue

            line = line.strip("{} ,")

            parts = re.split(r"\s+", line)
            if len(parts) < 2:
                continue

            key = parts[0].strip().strip("',\"")
            val_str = parts[1].strip().strip("',\"")

            try:
                scores[key] = float(val_str)
            except ValueError:
                continue
    return scores


def rank_quadrants_from_two_scores(
    pred_scores: Dict[str, float],     # e.g., jsd_scores
    gold_scores: Dict[str, float],     # e.g., gold
    top_frac: float = 0.30,
    bottom_frac: float = 0.30,
    rank_method: str = "min",
    top_k_each: int = 10,
    join: str = "inner",               # "inner" (default) or "outer"
) -> Tuple[pd.DataFrame, Dict[str, List[str]]]:
    """
    Build TP/FP/FN/TN based on top/bottom regions of ranks in pred vs gold lists.

    TP: pred TOP    & gold TOP
    FP: pred TOP    & gold BOTTOM
    FN: pred BOTTOM & gold TOP
    TN: pred BOTTOM & gold BOTTOM
    Others: NA (MID)
    """

    if not (0 < top_frac <= 1) or not (0 < bottom_frac <= 1):
        raise ValueError("top_frac and bottom_frac must be in (0, 1].")
    if top_frac + bottom_frac > 1:
        raise ValueError("top_frac + bottom_frac must be <= 1 to keep non-overlapping regions.")
    if join not in {"inner", "outer"}:
        raise ValueError("join must be 'inner' or 'outer'.")

    pred_s = pd.Series(pred_scores, dtype="float64").rename("pred")
    gold_s = pd.Series(gold_scores, dtype="float64").rename("gold")

    df = pd.concat([pred_s, gold_s], axis=1, join=join)

    # If outer join, drop missing pairs (cannot rank compare)
    df = df.dropna(subset=["pred", "gold"]).copy()

    if df.empty:
        return pd.DataFrame(), {"TP": [], "FP": [], "FN": [], "TN": []}

    # Ranking (higher score => higher rank)
    df["pred_rank"] = df["pred"].rank(ascending=False, method=rank_method)
    df["gold_rank"] = df["gold"].rank(ascending=False, method=rank_method)

    n = len(df)
    top_n = max(1, ceil(top_frac * n))
    bot_n = max(1, ceil(bottom_frac * n))
    bot_start_rank = n - bot_n + 1

    df["pred_top"] = df["pred_rank"] <= top_n
    df["pred_bottom"] = df["pred_rank"] >= bot_start_rank
    df["gold_top"] = df["gold_rank"] <= top_n
    df["gold_bottom"] = df["gold_rank"] >= bot_start_rank

    def _region(is_top: bool, is_bottom: bool) -> str:
        if is_top:
            return "TOP"
        if is_bottom:
            return "BOTTOM"
        return "MID"

    df["pred_region"] = [_region(t, b) for t, b in zip(df["pred_top"], df["pred_bottom"])]
    df["gold_region"] = [_region(t, b) for t, b in zip(df["gold_top"], df["gold_bottom"])]

    df["quadrant"] = "NA"
    df.loc[df["pred_top"] & df["gold_top"], "quadrant"] = "TP"
    df.loc[df["pred_top"] & df["gold_bottom"], "quadrant"] = "FP"
    df.loc[df["pred_bottom"] & df["gold_top"], "quadrant"] = "FN"
    df.loc[df["pred_bottom"] & df["gold_bottom"], "quadrant"] = "TN"

    # Diagnostics
    df["rank_gap_signed"] = df["pred_rank"] - df["gold_rank"]
    df["rank_gap_abs"] = df["rank_gap_signed"].abs()
    df["rank_sum"] = df["pred_rank"] + df["gold_rank"]

    # Build example lists
    tp = df[df["quadrant"] == "TP"].sort_values(["rank_sum", "pred_rank", "gold_rank"]).index.tolist()
    fp = df[df["quadrant"] == "FP"].sort_values(["pred_rank", "gold_rank"], ascending=[True, False]).index.tolist()
    fn = df[df["quadrant"] == "FN"].sort_values(["gold_rank", "pred_rank"], ascending=[True, False]).index.tolist()
    tn = df[df["quadrant"] == "TN"].sort_values(["rank_sum", "pred_rank", "gold_rank"], ascending=[False, False, False]).index.tolist()

    groups = {
        "TP": tp[:top_k_each],
        "FP": fp[:top_k_each],
        "FN": fn[:top_k_each],
        "TN": tn[:top_k_each],
    }

    # Optional: sort for readability
    df = df.sort_values(["pred_rank", "gold_rank"])

    return df, groups

gold_scores = load_scores_txt(gold_path_task2)  
df_q, groups = rank_quadrants_from_two_scores(jsd_results, gold_scores, top_frac=0.3, bottom_frac=0.3)
print(df_q[["pred","gold","pred_rank","gold_rank","pred_region","gold_region","quadrant"]])
print(groups)
print(df_q["quadrant"].value_counts())
missing_in_gold = set(jsd_results) - set(gold_scores)
missing_in_pred = set(gold_scores) - set(jsd_results)
print("missing_in_gold:", sorted(missing_in_gold))
print("missing_in_pred:", sorted(missing_in_pred))

