In [2]:
from math import inf
from typing import Callable, List, Tuple, Any

# --------------------------------------------------------------------------
# Helper: run the DP assuming m ≤ n (code identical to your original logic)
# --------------------------------------------------------------------------
def _align_core(m: int,
                n: int,
                cost: Callable[[int, int], float],
                invalid: float = inf
) -> Tuple[float, List[Tuple[int, int]]]:
    """Core DP: requires m ≤ n."""
    dp   = [[invalid]*(n+1) for _ in range(m+1)]
    prev = [[-1]     *(n+1) for _ in range(m+1)]

    for j in range(1, n+1):          # base row i = 1
        dp[1][j]   = cost(1, j)
        prev[1][j] = 0

    for i in range(2, m+1):          # inductive rows
        for j in range(1, n+1):
            best = invalid
            argk = -1
            for k in range(1, j):    # k < j
                cand = dp[i-1][k] + cost(i, j)
                if cand < best:
                    best, argk = cand, k
            dp[i][j]   = best
            prev[i][j] = argk

    opt_cost, j_star = min((dp[m][j], j) for j in range(1, n+1))
    if opt_cost == invalid:
        return inf, []               # no feasible subsequence (shouldn’t happen)
    align = []
    j = j_star
    for i in range(m, 0, -1):
        align.append((i, j))
        j = prev[i][j]
    align.reverse()
    return opt_cost, align

# --------------------------------------------------------------------------
# Public wrapper that handles m > n by swapping arguments
# --------------------------------------------------------------------------
def align_rows(m: int,
               n: int,
               cost: Callable[[int, int], float],
               invalid: float = inf
) -> Tuple[float, List[Tuple[int, int]]]:
    """
    Order‑preserving minimum‑cost alignment of m predicted rows to n true rows.
    Works for any m, n ≥ 0.  When m > n the function returns the best alignment
    for a *subset* of the predicted rows (those paired in the output list).
    Unmatched predicted rows can be treated later as false positives.
    """
    if m == 0 or n == 0:
        return inf, []

    # Case 1: m ≤ n – run the core DP directly
    if m <= n:
        return _align_core(m, n, cost, invalid)

    # Case 2: m > n – swap roles so the shorter table is the "predicted" one
    def cost_swapped(i: int, j: int) -> float:
        # i now refers to original true row index, j to original predicted row
        return cost(j, i)

    opt_cost, swapped_align = _align_core(n, m, cost_swapped, invalid)
    if not swapped_align:
        return inf, []

    # swapped_align gives (true_idx, pred_idx); invert to (pred_idx, true_idx)
    align = [(pred, true) for true, pred in swapped_align]
    align.sort()  # ensure increasing order in predicted-table indices

    return opt_cost, align






In [3]:
# ----------------------------------------------------------------------
# Utility: Jaccard similarity between two rows (tuples of key–value pairs)
# ----------------------------------------------------------------------

from typing import List, Tuple, Any, Set

KVPair = Tuple[Any, Any]   # (key, value) ; key & value can be any hashables
Row    = List[KVPair]      # a table row is a list of KV pairs

def _kvset(row: Row) -> Set[Tuple[str, str]]:
    """
    Normalise a row of KV pairs into a set of lowercase strings so that
    ('Price', '$10 ') and ('price', '$10') are treated as the same.
    """
    return {
        (str(k).strip().lower(), str(v).strip().lower())
        for k, v in row
    }

def jaccard_row_sim(row_pred: Row, row_true: Row) -> float:
    """
    Jaccard similarity between two rows represented as lists of KV pairs.
    1.0 = perfect match, 0.0 = no overlap.
    """
    set_pred = _kvset(row_pred)
    set_true = _kvset(row_true)

    if not set_pred and not set_true:          # both rows empty
        return 1.0

    intersection = len(set_pred & set_true)
    union        = len(set_pred | set_true)

    # print(set_pred)
    # print(set_true)
    # print(intersection, union) 
    return intersection / union


# ----------------------------------------------------------------------
# Cost function w(i,j) for the DP:
# w = 1 – Jaccard similarity
# ----------------------------------------------------------------------

def make_cost_fn(table_pred: List[Row], table_true: List[Row]):
    """
    Returns a closure w(i,j) suitable for align_rows().
    Indices i, j are 1‑based to match the DP.
    """
    def w(i: int, j: int) -> float:
        sim = jaccard_row_sim(table_pred[i-1], table_true[j-1])
        # print(i,table_pred[i-1])
        # print(j,table_true[j-1])
        # print(sim)
        return 1.0 - sim            # cost ≥ 0 ; lower is better
    return w



# ----------------------------------------------------------------------
# Example usage with the align_rows() routine defined earlier
# ----------------------------------------------------------------------
if __name__ == "__main__":
    # Ground‑truth table (T1) : list of rows; each row is list of KV pairs
    T1 = [[('Spots', 'missing'), ('#', 3), ('Ch', 'WMGL'), ('Day', 'M'), ('Air Date', '09/04/23'), ('Air Time', '3:47 PM'), ('Description', 'M-F'), ('Start/End Time', '3p-7p'), ('Length', ':30'), ('Ad-ID', 'MOORE SPOT 2'), ('Rate', '$20.00'), ('Type', 'NM')], [('Spots', 'missing'), ('#', 2), ('Ch', 'WMGL'), ('Day', 'M'), ('Air Date', '09/04/23'), ('Air Time', '4:16 PM'), ('Description', 'M-F'), ('Start/End Time', '3p-7p'), ('Length', ':30'), ('Ad-ID', 'MOORE SPOT 1'), ('Rate', '$20.00'), ('Type', 'NM')]]
    T2= [[('Spots', 'missing'), ('#', 1), ('Ch', 'WMGL'), ('Day', 'M'), ('Air Time', 'missing'), ('Description', 'M-F'), ('Start/End Time', 'missing'), ('Length', 'missing'), ('Ad-ID', 'missing'), ('Rate', '$20.00')], [('Spots', 'missing'), ('Ch', 'WMGL'), ('Day', 'Tu'), ('Air Date', 'missing'), ('Air Time', '6:23 AM'), ('Length', ':30'), ('Ad-ID', 'MOORE SPOT 2'), ('Rate', '$20.00'), ('Type', 'NM')], [('Spots', 'missing'), ('#', 2), ('Ch', 'WMGL'), ('Day', 'missing'), ('Air Date', '09/05/23'), ('Air Time', '9:39 AM'), ('Description', 'M-F'), ('Start/End Time', '6a-10a'), ('Length', 'missing'), ('Ad-ID', 'MOORE SPOT 1'), ('Rate', '$20.00'), ('Type', 'NM')]]

    m, n = len(T2), len(T1)
    print(m,n)
    w = make_cost_fn(T2, T1)

    # for i in range(1,m+1):
    #     for j in range(1,n+1):
    #         print('row i:', i-1, T2[i-1])
    #         print('row j:', j-1, T1[j-1])
    #         print(w(i,j))

    opt_cost, alignment = align_rows(m, n, w)

    print(f"Optimal cost  : {opt_cost:.3f}")
    #print("Row alignment :", alignment)
    for pairs in alignment:
        i = pairs[0] - 1
        j = pairs[1] - 1
        print(i,j)
        print(T2[i])
        print(T1[j])


3 2
Optimal cost  : 1.188
1 0
[('Spots', 'missing'), ('Ch', 'WMGL'), ('Day', 'Tu'), ('Air Date', 'missing'), ('Air Time', '6:23 AM'), ('Length', ':30'), ('Ad-ID', 'MOORE SPOT 2'), ('Rate', '$20.00'), ('Type', 'NM')]
[('Spots', 'missing'), ('#', 3), ('Ch', 'WMGL'), ('Day', 'M'), ('Air Date', '09/04/23'), ('Air Time', '3:47 PM'), ('Description', 'M-F'), ('Start/End Time', '3p-7p'), ('Length', ':30'), ('Ad-ID', 'MOORE SPOT 2'), ('Rate', '$20.00'), ('Type', 'NM')]
2 1
[('Spots', 'missing'), ('#', 2), ('Ch', 'WMGL'), ('Day', 'missing'), ('Air Date', '09/05/23'), ('Air Time', '9:39 AM'), ('Description', 'M-F'), ('Start/End Time', '6a-10a'), ('Length', 'missing'), ('Ad-ID', 'MOORE SPOT 1'), ('Rate', '$20.00'), ('Type', 'NM')]
[('Spots', 'missing'), ('#', 2), ('Ch', 'WMGL'), ('Day', 'M'), ('Air Date', '09/04/23'), ('Air Time', '4:16 PM'), ('Description', 'M-F'), ('Start/End Time', '3p-7p'), ('Length', ':30'), ('Ad-ID', 'MOORE SPOT 1'), ('Rate', '$20.00'), ('Type', 'NM')]


In [4]:
from typing import List, Dict, Tuple, Any, Iterable
from dataclasses import dataclass
import math
import trix

KVPair   = Tuple[Any, Any]      # (key, value)
Row      = List[KVPair]         # tuple = list of KV pairs
Match    = Tuple[int, int]      # (pred_row_index, true_row_index), 0-based

def normalise(s: str) -> str:
    s = s.strip()
    if s.isdigit():               # purely numeric → drop leading zeros
        s = s.lstrip("0") or "0"  # keep at least one zero
    return s.lower()

# ------------------------- Normalization helper ------------------------- #
def _kv_set(row: Row) -> set:
    """Normalise a row of KV pairs to a set of (key, value) strings."""
    return {
        (str(k).strip().lower(), str(v).strip().lower())
        for k, v in row
    }


# ------------------------------------------------------------------ equality
def eval_eq(a: str, b: str) -> int:
    """
    Placeholder for the user‑supplied equality function.
    Returns 1 if two strings should be considered equal.
    """
    a = normalise(a)
    b = normalise(b)
    if a == 'true' and b == '\uf0fc':
        return 1
    if b == 'true' and a == '\uf0fc':
        return 1
    return trix.equal(a,b) 

# ------------------------------------------------------------------ PRF1 data
@dataclass
class PRF1:
    precision: float
    recall: float
    f1: float

# ------------------------------------------------------------------ scoring
def tuple_prf1(
    pred_row: Row,
    true_row: Row,
    value_eq: Callable[[str, str], int] = eval_eq
) -> PRF1:
    pred_pairs: List[KVPair] = [
        (str(k).strip().lower(), str(v).strip()) for k, v in pred_row
    ]
    true_pairs: List[KVPair] = [
        (str(k).strip().lower(), str(v).strip()) for k, v in true_row
    ]

    precision = 0
    recall = 0

    for k_pred, v_pred in pred_pairs:
        for k_true, v_true in true_pairs:
            if value_eq(k_pred, k_true) == 1 and value_eq(v_pred, v_true) == 1:
                precision += 1
                break
    
    for k_true, v_true in true_pairs:
        for k_pred, v_pred in pred_pairs:
            if value_eq(k_pred, k_true) == 1 and value_eq(v_pred, v_true) == 1:
                recall += 1
                break
    if len(pred_pairs) > 0:
        precision /= len(pred_pairs)
    if len(true_pairs) > 0:
        recall /= len(true_pairs)


    # """
    # Precision / Recall / F1 for a predicted vs. true row, using
    # a custom equality test `value_eq(a, b) -> 0/1` on *values*.

    # • Keys are matched after case‑/whitespace‑normalising.
    # • Each true KV pair can be matched at most once (greedy one‑to‑one).
    # """
    # # normalise keys once
    # pred_pairs: List[KVPair] = [
    #     (str(k).strip().lower(), str(v).strip()) for k, v in pred_row
    # ]
    # true_pairs: List[KVPair] = [
    #     (str(k).strip().lower(), str(v).strip()) for k, v in true_row
    # ]

    # used_true = [False] * len(true_pairs)
    # tp = 0

    # for k_pred, v_pred in pred_pairs:
    #     # find first unused true pair with same key and value_eq == 1
    #     match_idx = -1
    #     for j, (k_true, v_true) in enumerate(true_pairs):
    #         if not used_true[j] and k_pred == k_true and value_eq(v_pred, v_true) == 1:
    #             match_idx = j
    #             break
    #     if match_idx >= 0:
    #         used_true[match_idx] = True
    #         tp += 1

    # fp = len(pred_pairs) - tp
    # fn = len(true_pairs) - tp

    # precision = tp / (tp + fp) if (tp + fp) else 0.0
    # recall    = tp / (tp + fn) if (tp + fn) else 0.0
    f1        = (2 * precision * recall / (precision + recall)
                 if (precision + recall) else 0.0)
    return PRF1(precision, recall, f1)


# -------------------- Table-level Precision / Recall / F1 -------------------- #
@dataclass
class TablePRF1:
    precision: float
    recall: float
    f1: float
    per_tuple_pred: List[PRF1]
    per_tuple_true: List[PRF1]

def table_prf1(
    pred_table: List[Row],
    true_table: List[Row],
    matches: Iterable[Match]
) -> TablePRF1:
    """
    Compute table-level precision, recall, F1.

    Inputs:
    -------
    pred_table  : list of predicted tuples (rows, each a list of KV pairs)
    true_table  : list of ground-truth tuples
    matches     : iterable of (p_i, t_j) where row p_i in pred_table is matched
                  to row t_j in true_table. Indices are 0-based and one-to-one.

    Definitions (as per your instructions):
    ---------------------------------------
    - Tuple-level PRF1 is computed only for matched pairs.
    - Table-level Precision: average over all predicted tuples:
         * if a predicted tuple is matched, use its tuple precision
         * if unmatched, precision = 0
    - Table-level Recall: average over all true tuples:
         * if a true tuple is matched, use its tuple recall
         * if unmatched, recall = 0
    - Table-level F1: harmonic mean of the above precision & recall.

    Returns:
    --------
    TablePRF1 dataclass with overall P/R/F1 and lists of per-tuple PRF1 stats.
    """
    # Build quick lookup from matches
    match_dict_pred = {p-1: t-1 for p, t in matches}
    match_dict_true = {t-1: p-1 for p, t in matches}

    # print(match_dict_pred)
    # print(match_dict_true)

    # 1) per-predicted-row precision list
    #print('checking precision...')
    per_tuple_pred: List[PRF1] = []
    for p_idx, p_row in enumerate(pred_table):
        #print(p_idx)
        if p_idx in match_dict_pred:
            t_idx = match_dict_pred[p_idx]
            #print(p_row)
            #print(true_table[t_idx])
            prf1 = tuple_prf1(p_row, true_table[t_idx])
            #print(prf1.precision) 
        else:
            prf1 = PRF1(precision=0.0, recall=0.0, f1=0.0)
        per_tuple_pred.append(prf1)

    # 2) per-true-row recall list
    per_tuple_true: List[PRF1] = []
    for t_idx, t_row in enumerate(true_table):
        if t_idx in match_dict_true:
            p_idx = match_dict_true[t_idx]
            prf1 = tuple_prf1(pred_table[p_idx], t_row)
        else:
            prf1 = PRF1(precision=0.0, recall=0.0, f1=0.0)
        per_tuple_true.append(prf1)

    # 3) aggregate
    table_precision = sum(r.precision for r in per_tuple_pred) / len(pred_table) if pred_table else 0.0
    table_recall    = sum(r.recall    for r in per_tuple_true) / len(true_table) if true_table else 0.0
    table_f1        = (2 * table_precision * table_recall / (table_precision + table_recall)
                       if (table_precision + table_recall) else 0.0)

    return TablePRF1(table_precision, table_recall, table_f1, per_tuple_pred, per_tuple_true)

def table_sim(table_t,table_p):
    m, n = len(table_p), len(table_t)
    w = make_cost_fn(table_p, table_t)
    opt_cost, alignment = align_rows(m, n, w)
    #print(alignment)
    table_metrics = table_prf1(table_p, table_t, alignment)
    return table_metrics 

if __name__ == "__main__":

    T1 = [[('Spots', 'missing'), ('#', 3), ('Ch', 'WMGL'), ('Day', 'M'), ('Air Date', '09/04/23'), ('Air Time', '3:47 PM'), ('Description', 'M-F'), ('Start/End Time', '3p-7p'), ('Length', ':30'), ('Ad-ID', 'MOORE SPOT 2'), ('Rate', '$20.00'), ('Type', 'NM')], [('Spots', 'missing'), ('#', 2), ('Ch', 'WMGL'), ('Day', 'M'), ('Air Date', '09/04/23'), ('Air Time', '4:16 PM'), ('Description', 'M-F'), ('Start/End Time', '3p-7p'), ('Length', ':30'), ('Ad-ID', 'MOORE SPOT 1'), ('Rate', '$20.00'), ('Type', 'NM')]]
    T2= [[('Spots', 'missing'), ('#', 1), ('Ch', 'WMGL'), ('Day', 'M'), ('Air Time', 'missing'), ('Description', 'M-F'), ('Start/End Time', 'missing'), ('Length', 'missing'), ('Ad-ID', 'missing'), ('Rate', '$20.00')], [('Spots', 'missing'), ('Ch', 'WMGL'), ('Day', 'Tu'), ('Air Date', 'missing'), ('Air Time', '6:23 AM'), ('Length', ':30'), ('Ad-ID', 'MOORE SPOT 2'), ('Rate', '$20.00'), ('Type', 'NM')], [('Spots', 'missing'), ('#', 2), ('Ch', 'WMGL'), ('Day', 'missing'), ('Air Date', '09/05/23'), ('Air Time', '9:39 AM'), ('Description', 'M-F'), ('Start/End Time', '6a-10a'), ('Length', 'missing'), ('Ad-ID', 'MOORE SPOT 1'), ('Rate', '$20.00'), ('Type', 'NM')]]
    matches = [(1,1)]

    table_metrics = table_prf1(T2, T1, matches)

    print(f"Table Precision: {table_metrics.precision:.3f}")
    print(f"Table Recall   : {table_metrics.recall:.3f}")
    print(f"Table F1       : {table_metrics.f1:.3f}")

    print("\nPer-predicted-row PRF1 (precision focus):")
    for i, r in enumerate(table_metrics.per_tuple_pred):
        print(f"  Pred row {i}: P={r.precision:.2f}, R={r.recall:.2f}, F1={r.f1:.2f}")

    print("\nPer-true-row PRF1 (recall focus):")
    for j, r in enumerate(table_metrics.per_tuple_true):
        print(f"  True row {j}: P={r.precision:.2f}, R={r.recall:.2f}, F1={r.f1:.2f}")


Table Precision: 0.167
Table Recall   : 0.208
Table F1       : 0.185

Per-predicted-row PRF1 (precision focus):
  Pred row 0: P=0.50, R=0.42, F1=0.45
  Pred row 1: P=0.00, R=0.00, F1=0.00
  Pred row 2: P=0.00, R=0.00, F1=0.00

Per-true-row PRF1 (recall focus):
  True row 0: P=0.50, R=0.42, F1=0.45
  True row 1: P=0.00, R=0.00, F1=0.00


In [5]:
from dataclasses import dataclass
from typing import Iterable, Tuple, Any, Set

KVPair = Tuple[Any, Any]  # (key, value)


@dataclass
class PRF1:
    precision: float
    recall: float
    f1: float


def _kv_normalize(kv: Iterable[KVPair]) -> Set[Tuple[str, str]]:
    """Normalize to lowercase, stripped strings for robust equality."""
    return {
        (str(k).strip().lower(), str(v).strip().lower())
        for k, v in kv
    }


def kv_prf1(kv_pred: Iterable[KVPair], kv_true: Iterable[KVPair]) -> PRF1:
    """
    Compute precision, recall, and F1 between two sets of key–value pairs.

    Precision = |kv_pred ∩ kv_true| / |kv_pred|
    Recall    = |kv_pred ∩ kv_true| / |kv_true|
    F1        = 2 * P * R / (P + R)
    """
    P = _kv_normalize(kv_pred)
    G = _kv_normalize(kv_true)

    tp = len(P & G)
    fp = len(P - G)
    fn = len(G - P)

    precision = tp / (tp + fp) if (tp + fp) else 0.0
    recall    = tp / (tp + fn) if (tp + fn) else 0.0
    f1        = (2 * precision * recall / (precision + recall)) if (precision + recall) else 0.0
    return (precision, recall, f1)




In [6]:
import os, json  

def scan_folder(path, filter_file_type = '.json'):
    file_names = []
    for root, dirs, files in os.walk(path):
        for file in files:
            file_name = os.path.join(root, file)
            if('DS_Store' in file_name):
                continue
            if(filter_file_type not in file_name):
                continue
            file_names.append(file_name)
    return file_names

def read_json(path):
    with open(path, 'r') as file:
        data = json.load(file)
    return data



In [7]:
# def get_kvs(data):
#     kvs = []
#     for kv in data:
#         tuples = list(kv.items())
#         kvs += tuples
#     return kvs 

def get_kvs(data):
    """
    Flatten a list of dicts into a list of (key, value) tuples,
    **skipping** entries whose key *and* value are both empty.

    An entry is considered “empty” if:
      • the key, once converted to string and stripped, is "",  **and**
      • the value is None  **or** its string‑stripped form is "".
    """
    kvs = []
    for kv in data:       
        if len(kv) == 0:
            continue                 # each kv is a dict
        for k, v in kv.items():
            key_empty   = str(k).strip() == ""
            val_empty   = (v is None) or str(v).strip() == ""
            if key_empty and val_empty:   # skip the all‑empty pair
                continue
            kvs.append((k, v))
    return kvs


def get_table(data):
    table = []
    for tuple in data:
        tuple_l = list(tuple.items()) 
        table.append(tuple_l)
    return table 

def get_data(path):
    data = read_json(path)
    result = []
    id_list = []
    for rec in data:#scan one record 
        #print(rec)
        rec_o = {}
        rid = rec['id']
        if rid in id_list:
            continue 
        id_list.append(rid) 
        content = rec['content']
        #print(rid, content)
        rec_o['id'] = rid 
        content_o = []
        for block in content:#scan one data block 
            block_o = {}
            #print(block['type'])
            if block['type'] == 'table':
                table = get_table(block['content'])
                block_o['type'] = block['type']
                block_o['content'] = table
            if block['type'] == 'kv':
                kvs = get_kvs(block['content'])
                block_o['type'] = block['type']
                block_o['content'] = kvs 
            if len(block_o) > 0:
                content_o.append(block_o)
        rec_o['content'] = content_o 
        result.append(rec_o)
    
    return result 


from pathlib import Path
parent_directory = str(Path().resolve().parent)
truth_folder_path = parent_directory + '/data'
files = scan_folder(truth_folder_path)
# for file in files:
#     print(file)
#     data = read_json(file) 
#     result = display_data(data)
#     break


In [8]:
from typing import List
from dataclasses import dataclass

# ------------------------------------------------------------------ helpers
def _to_set(lst: List[str]) -> set:
    return {s.strip().lower() for s in lst}

def jaccard(a: List[str], b: List[str]) -> float:
    sa, sb = _to_set(a), _to_set(b)
    if not sa and not sb:
        return 1.0
    return len(sa & sb) / len(sa | sb)

# ----------------------------------------------------------------- result
@dataclass
class BestMatch:
    idx0: int          # zero‑based position in Bs   (0 → first list)
    similarity: float  # Jaccard similarity
    candidate: List[str]

# ------------------------------------------------------------ main routine
def most_similar(A: List[str], Bs: List[List[str]]) -> BestMatch:
    best_idx0, best_sim = -1, -1.0
    for i, B in enumerate(Bs):
        sim = jaccard(A, B)
        if sim > best_sim:
            best_idx0, best_sim = i, sim
    return BestMatch(
        idx0=best_idx0,
        similarity=best_sim,
        candidate=Bs[best_idx0] if best_idx0 >= 0 else []
    )

if __name__ == "__main__":
    A  = ["Apple", "Banana", "Cherry"]
    B1 = ["banana", "cherry", "durian"]
    B2 = ["fig", "grape"]
    B3 = ["apple", "banana", "cherry", "date"]

    best = most_similar(A, [B1, B2, B3])
    print(best.candidate)


['apple', 'banana', 'cherry', 'date']


In [9]:
def get_schema(table):
    schema = []
    if len(table) == 0:
        return schema
    tuple = table[0]
    for cell in tuple:
        schema.append(cell[0])
    return schema 

def eval_tables(rec_t,rec_p):
    #get table schema for all predicted tables 
    schema_p = []
    rid = 0
    tid = 0
    mp = {}
    for block in rec_p['content']:
        if block['type'] == 'table':
            schema_p.append(get_schema(block['content'])) 
            mp[tid] = rid 
            tid += 1
        rid += 1 

    #print(mp)


    table_metrics = []
    for block in rec_t['content']:
        if block['type'] == 'table':
            cur_schema = get_schema(block['content'])
            target = most_similar(cur_schema, schema_p)
            #print('true schema:', cur_schema)
            #print('target schema:', target.candidate) 
            idx = target.idx0 
            
            o_idx = mp[idx] 
            # print(idx, o_idx)
            #print('truth1:', block['content'])
            #print('pred1:', rec_p['content'][o_idx]['content'])
            table_metric = table_sim(block['content'], rec_p['content'][o_idx]['content']) 
            #print(table_metric.precision, table_metric.recall, table_metric.f1)
            table_metrics.append(table_metric)
    
    #count average table precision
    precision_c = 0
    tab_precision = 0
    for metric in table_metrics:
        for i, r in enumerate(metric.per_tuple_pred):
            tab_precision += r.precision
            precision_c += 1

    #count average table recall
    recall_c = 0
    tab_recall = 0
    for metric in table_metrics:
        for i, r in enumerate(metric.per_tuple_true):
            tab_recall += r.recall
            recall_c += 1
    
    tab_precision = tab_precision / precision_c
    tab_recall = tab_recall / recall_c
    tab_F1 = 2*tab_precision*tab_recall/(tab_precision + tab_recall) 

    return tab_precision, tab_recall, tab_F1

def merge_tables(rec):
    table = []
    for block in rec['content']:
        if block['type'] == 'table':
            table += block['content'] 
    return table 

def eval_concat_table(rec_t,rec_p):
    merged_tab_t = merge_tables(rec_t)
    merged_tab_p = merge_tables(rec_p) 
    # print(merged_tab_t)
    # print(merged_tab_p) 
    table_metric = table_sim(merged_tab_t, merged_tab_p)
    return table_metric.precision, table_metric.recall, table_metric.f1, len(merged_tab_t), len(merged_tab_p)

def eval_structure_accuracy_record_pair(rec_t,rec_p):
    precision = 0
    recall = 0
    F1 = 0
    kvs_t = []
    kvs_p = []
    kv_recall_c = 0
    kv_precision_c = 0
    #eval kvs 
    
    for block in rec_t['content']:#scan each block 
        #print('true:', block)
        if block['type'] == 'kv':
            kvs_t += block['content']
            kv_recall_c += 1
    
    for block in rec_p['content']:
        #print('pred:', block)
        if block['type'] == 'kv':
            kvs_p += block['content']
            kv_precision_c += 1

    # print(kvs_t)
    # print(kvs_p)
    (kvs_precision, kvs_recall, kvs_F1) = kv_prf1(kvs_t,kvs_p) 

    #print('kvs:', kvs_precision, kvs_recall, kvs_F1, kv_precision_c, kv_recall_c)

    #eval tables 
    tab_precision, tab_recall, tab_F1, recall_c,precision_c  = eval_concat_table(rec_t, rec_p) 
    #print('tables:', tab_precision, tab_recall, tab_F1)
    
    precision = (tab_precision*precision_c + kvs_precision * kv_precision_c) / (precision_c + kv_precision_c)
    recall = (tab_recall*recall_c + kvs_recall * kv_recall_c) / (recall_c + kv_recall_c)
    if (precision + recall) > 0:
        F1 = 2*precision*recall/(precision + recall) 
    else:
        F1 = 0

    return precision, recall, F1, tab_precision, tab_recall, tab_F1, kvs_precision, kvs_recall, kvs_F1


def eval_structure_accuracy(truth_path, result_path):
    truth = get_data(truth_path)
    result = get_data(result_path)
    avg_precision = 0
    avg_recall = 0
    avg_F1 = 0
    #print(len(truth))
    size = min(len(truth), len(result))
    for i in range(size):
        rec_t = truth[i]
        rec_p = result[i]
        # print('truth:',rec_t)
        # print('pred:', rec_p)
        precision, recall, F1, tab_precision, tab_recall, tab_F1, kvs_precision, kvs_recall, kvs_F1 = eval_structure_accuracy_record_pair(rec_t, rec_p)
        avg_precision += precision
        avg_recall += recall
        avg_F1 += F1
        #print(i, precision, recall, F1)
        #break 
    
    avg_precision /= size
    avg_recall /= size
    avg_F1 /= size
    #print(avg_precision, avg_recall, avg_F1)
    return avg_precision, avg_recall, avg_F1

import pandas as pd

def read_csv(path):
    return pd.read_csv(path) 

def get_difficult_labels():
    parent_directory = str(Path().resolve().parent)
    path = parent_directory + '/difficulties.csv' 
    df = read_csv(path)
    labels = dict(zip(df['name'], df['complexity']))
    #print(labels)   
    return labels         

def get_complexity(labels, file_name):
    for doc, complexity in labels.items():
        if doc.lower() in file_name.lower():
            return complexity
    return 1


In [12]:
def end_2_end_structure_eval(approach):    
    current_folder = os.getcwd()
    parent_folder = os.path.dirname(current_folder)
    pdf_folder_path = parent_folder + '/data/raw'
    pdfs = scan_folder(pdf_folder_path,'.pdf')
    #approach = 'TRIX' 
    labels = get_difficult_labels()

    #print(labels)

    easy_p = 0
    easy_r = 0
    easy_f = 0
    medium_p = 0
    medium_r = 0
    medium_f = 0
    hard_p = 0
    hard_r = 0
    hard_f = 0

    easy_cnt = 0
    medium_cnt = 0
    hard_cnt = 0

    for pdf_path in pdfs:
        
        result_path = ''
        if approach == 'TRIX':
            result_path = pdf_path.replace('data/raw','out').replace('.pdf','_TRIX.json')
        if approach == 'Eva_D': 
            result_path = pdf_path.replace('data/raw','out').replace('.pdf','_Eva_D.json')
        if approach == 'AzureDI':
            result_path = pdf_path.replace('data/raw','out').replace('.pdf','_AzureDI.json')
        if approach == 'TEXTRACT':
            result_path = pdf_path.replace('data/raw','out').replace('.pdf','_TEXTRACT.json')
        if approach == 'vLLMS':
            result_path = pdf_path.replace('data/raw','out').replace('.pdf','_vLLMS.json')

        #print(result_path)
        if(not os.path.isfile(result_path)):
            continue 

        truth_path = pdf_path.replace('raw','truths').replace('.pdf','.json')
        #print(truth_path)
        if(not os.path.isfile(truth_path)):
            continue 

        complexity = get_complexity(labels, pdf_path)
        
        #print(complexity, truth_path, result_path)
        precision, recall, F1 = eval_structure_accuracy(truth_path, result_path)
        #print(precision, recall, F1)

        if complexity == 1:
            easy_p += precision
            easy_r += recall
            easy_f += F1
            easy_cnt += 1
        elif complexity == 2:
            medium_p += precision
            medium_r += recall
            medium_f += F1
            medium_cnt += 1
        elif complexity == 3:
            hard_p += precision
            hard_r += recall
            hard_f += F1
            hard_cnt += 1
        
    if easy_cnt > 0:
        easy_p /= easy_cnt
        easy_r /= easy_cnt
        easy_f /= easy_cnt

    if medium_cnt > 0:
        medium_p /= medium_cnt
        medium_r /= medium_cnt
        medium_f /= medium_cnt

    if hard_cnt > 0:
        hard_p /= hard_cnt
        hard_r /= hard_cnt
        hard_f /= hard_cnt

    print('Easy datasets: P|R|F1', easy_p, easy_r, easy_f)
    print('Medium datasets: P|R|F1', medium_p, medium_r, medium_f)
    print('Hard datasets: P|R|F1', hard_p, hard_r, hard_f)

In [13]:
approach = 'TRIX'

end_2_end_structure_eval(approach)

Easy datasets: P|R|F1 0.9360922755688295 0.914446983452787 0.9241207981220811
Medium datasets: P|R|F1 0.812131693682935 0.7584461761208132 0.769161505073607
Hard datasets: P|R|F1 0.7928959838335872 0.7639237166560547 0.7653608673041603
