In [108]:
import os
import json
import numpy as np
from typing import List, Dict, Any
from typing import List, Dict, Any
import random
from collections import defaultdict

In [154]:
label_map = {
    "Phone Number": "Phone Number",
    "Mobile Number": "Phone Number",
    "Postal Home Address": "Location",
    "Postal Work Address": "Location",
    "Postal Address" :"Location",
    "Date of Birth": "Date",
    "Full Date": "Date",
    "Sex": "Gender",
    "Gender": "Gender",
    "Natural Person Name": "Person Name",
}

In [115]:
def load_dimensions_map(json_path):
    with open(json_path, 'r') as f:
        data = json.load(f)

    file_dimensions_map = {}

    for entry in data:
        file_name = entry.get("data", {}).get("ocr")
        if file_name:
            file_name = file_name.split("/")[-1]  # Extract just the filename (e.g., votes_fhfb0066_page1.png)

            # Get the first result object from annotations
            results = entry.get("annotations", [])[0].get("result", [])
            if results:
                first_result = results[0]
                width = first_result.get("original_width")
                height = first_result.get("original_height")

                if width is not None and height is not None:
                    file_dimensions_map[file_name] = {
                        "original_width": width,
                        "original_height": height
                    }

    return file_dimensions_map

json_file_path = "/Volumes/MyDataDrive/thesis/code-2/data/manual-label-2.json"  # Replace with actual path
dimensions_map = load_dimensions_map(json_file_path)

In [None]:
# def estimate_accuracies_triplet(L: np.ndarray, eps: float = 1e-12) -> np.ndarray:
#     """
#     Triplet-based accuracy estimation (Fu et al., 2020).
#     L: binary matrix of shape (n_spans, n_annotators).
#     Returns: array of length m with estimated accuracies in [0,1].
#     """
#     # print(L)
#     n, m = L.shape
#     # print(m)
#     # 1) Compute pairwise agreement rates r_{j,k}
#     r = np.array([[ (L[:, j] * L[:, k]).mean() for k in range(m) ]
#                   for j in range(m)])
#     # 2) If fewer than 3 annotators, default to perfect accuracy
#     if m < 3:
#         return np.ones(m)

#     # 3) For each annotator j, estimate a_j from all triplets (j,k,l)
#     a = np.ones(m)
#     for j in range(m):
#         estimates = []
#         others = [x for x in range(m) if x != j]
#         # iterate over all unordered pairs (k,l)
#         for idx_k in range(len(others)):
#             for idx_l in range(idx_k + 1, len(others)):
#                 k = others[idx_k]
#                 l = others[idx_l]
#                 denom = r[k, l] + eps
#                 if denom > 0:
#                     est = np.sqrt((r[j, k] * r[j, l]) / denom)
#                     estimates.append(est)
#         # aggregate via median
#         if estimates:
#             a[j] = float(np.median(estimates))
#     return a


# def infer_probs(L: np.ndarray, a: np.ndarray, pi: float = 0.5) -> np.ndarray:
#     """
#     Infer posterior probability for each span given label matrix L and accuracies a.
#     """
#     n = L.shape[0]
#     posts = np.zeros(n, dtype=float)
#     for i in range(n):
#         S_p, S_m = pi, 1 - pi
#         for j in range(len(a)):
#             if L[i, j] == 1:
#                 S_p *= (1 + a[j]) / 2
#                 S_m *= (1 - a[j]) / 2
#         posts[i] = S_p / (S_p + S_m)
#     return posts


# def score_one_file(json_path: str):
#     """
#     Load a votes_*.json, compute triplet-based accuracies and posteriors,
#     attach 'probability' to each record, and write out *_scored.json.
#     """
#     # load original records
#     with open(json_path, 'r') as f:
#         items: List[Dict[str, Any]] = json.load(f)

#     # determine annotator count
#     all_anns = {ann for rec in items for ann in rec['annotators']}
#     m = max(all_anns) + 1

#     # build label matrix L: spans × annotators
#     n = len(items)
#     L = np.zeros((n, m), dtype=int)
#     for i, rec in enumerate(items):
#         for ann in rec['annotators']:
#             L[i, ann] = 1

#     # estimate accuracies with triplet method
#     a = estimate_accuracies_triplet(L)

#     # infer posterior probabilities for each span
#     posts = infer_probs(L, a)

#     # attach and write out
#     for i, rec in enumerate(items):
#         rec['probability'] = float(posts[i])

#     str_id  = str(random.randint(1,1000))
#     out_path = f"/Volumes/MyDataDrive/thesis/code-2/src/weak-labels-algo/test/{str_id}.json"
#     with open(out_path, 'w') as f:
#         json.dump(items, f, indent=4)
#     print(f"Wrote scored file: {out_path}")


# def score_all_jsons(votes_dir: str):
#     """
#     Process all votes_*.json in votes_dir.
#     """
#     for fn in os.listdir(votes_dir):
#         if fn.startswith('votes_') and fn.endswith('.json'):
#             score_one_file(os.path.join(votes_dir, fn))


In [155]:
def estimate_accuracies_triplet(L: np.ndarray, eps: float = 1e-12) -> np.ndarray:
    """
    Triplet-based accuracy estimation (Fu et al., 2020).
    L: binary matrix of shape (n_spans, n_annotators).
    Returns: array of length m with estimated accuracies in [0,1].
    """
    n, m = L.shape
    # Pairwise agreement rates
    r = np.array([[ (L[:, j] * L[:, k]).mean() for k in range(m) ] for j in range(m)])
    if m < 3:
        return np.ones(m)
    # Triplet-based estimates
    a = np.ones(m)
    for j in range(m):
        estimates: List[float] = []
        others = [o for o in range(m) if o != j]
        for idx_k in range(len(others)):
            for idx_l in range(idx_k + 1, len(others)):
                k, l = others[idx_k], others[idx_l]
                denom = r[k, l] + eps
                if denom > 0:
                    estimates.append(np.sqrt((r[j, k] * r[j, l]) / denom))
        if estimates:
            a[j] = float(np.median(estimates))
    return a


def infer_probs(L: np.ndarray, a: np.ndarray, pi: float = 0.5) -> np.ndarray:
    """
    Compute posterior probability for each span given L and accuracies.
    """
    n = L.shape[0]
    posts = np.zeros(n, dtype=float)
    for i in range(n):
        S_p, S_m = pi, 1 - pi
        for j, vote in enumerate(L[i]):
            if vote:
                S_p *= (1 + a[j]) / 2
                S_m *= (1 - a[j]) / 2
        posts[i] = S_p / (S_p + S_m)
    return posts


def score_all_jsons_global(votes_dir: str):
    """
    Process all votes_*.json: group spans by PII type across all files,
    compute triplet-based accuracies & posteriors per type,
    attach probabilities, and write scored files.
    """
    # 1) Load files
    records_by_file: Dict[str, List[Dict[str, Any]]] = {}
    all_recs: List[Dict[str, Any]] = []
    order_map: List[tuple] = []  # (file_path, local_index)
    for fname in sorted(os.listdir(votes_dir)):
        if not (fname.startswith('votes_') and fname.endswith('.json')):
            continue
        path = os.path.join(votes_dir, fname)
        with open(path, 'r') as f:
            items = json.load(f)
        records_by_file[path] = items
        for idx, rec in enumerate(items):
            all_recs.append(rec)
            order_map.append((path, idx))

    if not all_recs:
        print("No vote files to process.")
        return

    # 2) Determine number of annotators
    all_anns = {ann for rec in all_recs for ann in rec.get('annotators', [])}
    m = max(all_anns) + 1 if all_anns else 0

    # 3) Group indices by PII type (hashable key)
    type_to_indices: Dict[Any, List[int]] = defaultdict(list)
    for idx, rec in enumerate(all_recs):
        key = rec.get('pii_type')
        # normalize list keys to tuple
        if isinstance(key, list):
            key = tuple(key)
        type_to_indices[key].append(idx)

    # 4) Compute probabilities per type
    probs = np.zeros(len(all_recs), dtype=float)
    for pii_type, idxs in type_to_indices.items():
        # Build label matrix for this type
        L = np.zeros((len(idxs), m), dtype=int)
        for row_i, rec_idx in enumerate(idxs):
            for ann in all_recs[rec_idx].get('annotators', []):
                L[row_i, ann] = 1
        # Estimate accuracies & infer posteriors
        a = estimate_accuracies_triplet(L)
        posts = infer_probs(L, a)
        for i, rec_idx in enumerate(idxs):
            probs[rec_idx] = posts[i]

    # 5) Attach probabilities back in each file's list
    for (path, local_idx), p in zip(order_map, probs.tolist()):
        records_by_file[path][local_idx]['probability'] = float(p)
        file_name = path.split("/")[-1].split(".")[0] + ".png"
        bb = dimensions_map[file_name]

        # Here we change to abosolute bbox
        x = records_by_file[path][local_idx]['bbox'][0] * bb['original_width']
        y = records_by_file[path][local_idx]['bbox'][1] * bb['original_height']
        width =  records_by_file[path][local_idx]['bbox'][2] * bb['original_width']
        height = records_by_file[path][local_idx]['bbox'][3] * bb['original_height']

        x0 = round(x)
        y0 = round(y)
        x1 = round(width)
        y1 = round(height)

        int_bbox = [x0, y0, x1, y1]

        records_by_file[path][local_idx]['bbox'] = int_bbox


        if isinstance(records_by_file[path][local_idx]['pii_type'], list):
            new_list = []
            for i in records_by_file[path][local_idx]['pii_type']:
                new_list.append(label_map.get(i, i))
            records_by_file[path][local_idx]['pii_type'] = new_list
        else:
            records_by_file[path][local_idx]['pii_type'] = label_map.get(
                records_by_file[path][local_idx]['pii_type'],
                records_by_file[path][local_idx]['pii_type']
            )

        print(records_by_file[path][local_idx]['pii_type'])


    # 5: write back per file
    for path, items in records_by_file.items():
        # out_path = path.replace('.json', '_scored.json')
        file_name = path.split("/")[-1]
        out_path = f"/Volumes/MyDataDrive/thesis/code-2/src/weak-labels-algo/test-final/{file_name}"
        print(f"For : {path} is : {str_id}")
        with open(out_path, 'w') as f:
            json.dump(items, f, indent=4)
        print(f"Wrote scored file: {out_path}")


In [None]:
# # No pii type spliting 
# def estimate_accuracies_triplet(L: np.ndarray, eps: float = 1e-12) -> np.ndarray:
#     """
#     Triplet-based accuracy estimation (Fu et al., 2020).
#     L: binary matrix of shape (n_spans, n_annotators).
#     Returns: array of length m with estimated accuracies in [0,1].
#     """
#     n, m = L.shape
#     # Compute pairwise agreement rates
#     r = np.array([[ (L[:, j] * L[:, k]).mean() for k in range(m) ]
#                   for j in range(m)])
#     if m < 3:
#         return np.ones(m)
#     # Triplet-based estimates
#     a = np.ones(m)
#     for j in range(m):
#         estimates: List[float] = []
#         others = [o for o in range(m) if o != j]
#         for idx_k in range(len(others)):
#             for idx_l in range(idx_k + 1, len(others)):
#                 k, l = others[idx_k], others[idx_l]
#                 denom = r[k, l] + eps
#                 if denom > 0:
#                     estimates.append(np.sqrt((r[j, k] * r[j, l]) / denom))
#         if estimates:
#             a[j] = float(np.median(estimates))
#     return a


# def infer_probs(L: np.ndarray, a: np.ndarray, pi: float = 0.5) -> np.ndarray:
#     """
#     Compute posterior probability for each span given L and accuracies.
#     """
#     n = L.shape[0]
#     posts = np.zeros(n, dtype=float)
#     for i in range(n):
#         S_p, S_m = pi, 1 - pi
#         for j, vote in enumerate(L[i]):
#             if vote:
#                 S_p *= (1 + a[j]) / 2
#                 S_m *= (1 - a[j]) / 2
#         posts[i] = S_p / (S_p + S_m)
#     return posts


# def score_all_jsons_global(votes_dir: str):
#     """
#     Process all votes_*.json: build one global label matrix across all spans,
#     compute triplet-based accuracies, infer probabilities,
#     attach probabilities, and write scored files.
#     """
#     # 1) Load all files and flatten records
#     records_by_file: Dict[str, List[Dict[str, Any]]] = {}
#     all_recs: List[Dict[str, Any]] = []
#     order_map: List[tuple] = []  # (file_path, local_index)

#     for fname in sorted(os.listdir(votes_dir)):
#         if not (fname.startswith('votes_') and fname.endswith('.json')):
#             continue
#         path = os.path.join(votes_dir, fname)
#         with open(path, 'r') as f:
#             items = json.load(f)
#         records_by_file[path] = items
#         for idx, rec in enumerate(items):
#             all_recs.append(rec)
#             order_map.append((path, idx))

#     if not all_recs:
#         print("No vote files to process.")
#         return

#     # 2) Determine annotator count
#     all_anns = {ann for rec in all_recs for ann in rec.get('annotators', [])}
#     m = max(all_anns) + 1 if all_anns else 0

#     # 3) Build global label matrix L: spans x annotators
#     n = len(all_recs)
#     L = np.zeros((n, m), dtype=int)
#     for i, rec in enumerate(all_recs):
#         for ann in rec.get('annotators', []):
#             L[i, ann] = 1

#     # 4) Estimate accuracies and infer probabilities
#     a = estimate_accuracies_triplet(L)
#     posts = infer_probs(L, a)

#     # 5) Attach probabilities back to records
#     for (path, local_idx), p in zip(order_map, posts.tolist()):
#         records_by_file[path][local_idx]['probability'] = float(p)

#     # 6) Write each scored file
#     for path, items in records_by_file.items():
#         str_id  = str(random.randint(1,1000))
#         file_name = path.split("/")[-1]
#         out_path = f"/Volumes/MyDataDrive/thesis/code-2/src/weak-labels-algo/test-final/{file_name}"
#         print(f"For : {path} is : {str_id}")
#         with open(out_path, 'w') as f:
#             json.dump(items, f, indent=4)
#         print(f"Wrote scored file: {out_path}")

In [156]:

if __name__ == '__main__':
    # set this to your directory of JSON votes files
    score_all_jsons_global('/Volumes/MyDataDrive/thesis/code-2/src/weak-labels/Qwen3-8B-per_page_votes_merged/')

Person Name
Person Name
Person Name
Person Name
Person Name
Person Name
Person Name
Person Name
Person Name
Email Address
Email Address
Email Address
Email Address
Email Address
Phone Number
['Phone Number', 'Phone Number']
Phone Number
['Phone Number', 'Phone Number']
Location
Location
Organization Name
Organization Name
Organization Name
Organization Name
Date
Date
Date
['Invoice Number', 'Contract Number']
Person Name
Email Address
Email Address
Organization Name
Person Name
Email Address
Person Name
Person Name
Email Address
Phone Number
['Phone Number', 'Phone Number']
Location
Organization Name
Date
Date
Contract Number
Person Name
Person Name
Person Name
Person Name
Person Name
Person Name
Person Name
Contract Number
Contract Number
Person Name
Person Name
Person Name
Person Name
Person Name
Person Name
Person Name
Email Address
Email Address
Phone Number
['Phone Number', 'Phone Number']
Location
Organization Name
Date
Date
Date
['Invoice Number', 'Contract Number']
Person Name


In [16]:
json_path = "/Volumes/MyDataDrive/thesis/code-2/src/weak-labels/Qwen3-8B-per_page_votes_merged/votes_fgfv0233_page1.json"
score_one_file(json_path)

Wrote scored file: /Volumes/MyDataDrive/thesis/rvl-test-preprocessing/src/rvl_test_preprocessing/weak-labels-algo/test.json


In [21]:
str_id  = random.randint(1,1000)
print(str_id)

66


In [24]:
if __name__ == '__main__':
    # adjust to your JSON directory
    score_all_jsons('/Volumes/MyDataDrive/thesis/code-2/src/weak-labels/Qwen3-8B-per_page_votes_merged')

Wrote scored file: /Volumes/MyDataDrive/thesis/rvl-test-preprocessing/src/rvl_test_preprocessing/weak-labels-algo/test/111.json
Wrote scored file: /Volumes/MyDataDrive/thesis/rvl-test-preprocessing/src/rvl_test_preprocessing/weak-labels-algo/test/953.json
Wrote scored file: /Volumes/MyDataDrive/thesis/rvl-test-preprocessing/src/rvl_test_preprocessing/weak-labels-algo/test/237.json
Wrote scored file: /Volumes/MyDataDrive/thesis/rvl-test-preprocessing/src/rvl_test_preprocessing/weak-labels-algo/test/90.json
Wrote scored file: /Volumes/MyDataDrive/thesis/rvl-test-preprocessing/src/rvl_test_preprocessing/weak-labels-algo/test/593.json
Wrote scored file: /Volumes/MyDataDrive/thesis/rvl-test-preprocessing/src/rvl_test_preprocessing/weak-labels-algo/test/749.json
Wrote scored file: /Volumes/MyDataDrive/thesis/rvl-test-preprocessing/src/rvl_test_preprocessing/weak-labels-algo/test/812.json
Wrote scored file: /Volumes/MyDataDrive/thesis/rvl-test-preprocessing/src/rvl_test_preprocessing/weak-lab

In [87]:
def score_all_jsons_global(votes_dir: str):
    """
    Process all votes_*.json: group spans by PII type across all files,
    compute triplet-based accuracies & posteriors per type,
    attach probabilities, and write scored files.
    """
    # 1) Load files
    pii_types = set()
    for fname in sorted(os.listdir(votes_dir)):
        if not (fname.startswith('votes_') and fname.endswith('.json')):
            continue
        path = os.path.join(votes_dir, fname)
        with open(path, 'r') as f:
            items = json.load(f)
        for idx, rec in enumerate(items):
            if isinstance(rec['pii_type'], list) and len(rec['pii_type']) > 1:
                pii_types.add(rec['pii_type'][0])
            else:
                pii_types.add(rec['pii_type'])
    
    print(pii_types)


score_all_jsons_global("/Volumes/MyDataDrive/thesis/code-2/src/weak-labels-algo/test-final")

{'ID Card Number', 'Sex', 'Email Address', 'Invoice Number', 'Phone Number', 'Contract Number', 'Full Date', 'Postal Address', 'Organization Name', 'Natural Person Name'}


In [117]:
import json

def load_dimensions_map(json_path):
    with open(json_path, 'r') as f:
        data = json.load(f)

    file_dimensions_map = {}

    for entry in data:
        file_name = entry.get("data", {}).get("ocr")
        if file_name:
            file_name = file_name.split("/")[-1]  # Extract just the filename (e.g., votes_fhfb0066_page1.png)

            # Get the first result object from annotations
            results = entry.get("annotations", [])[0].get("result", [])
            if results:
                first_result = results[0]
                width = first_result.get("original_width")
                height = first_result.get("original_height")

                if width is not None and height is not None:
                    file_dimensions_map[file_name] = {
                        "original_width": width,
                        "original_height": height
                    }

    return file_dimensions_map

# Example usage
json_file_path = "/Volumes/MyDataDrive/thesis/code-2/data/manual-label-2.json"  # Replace with actual path
dimensions_map = load_dimensions_map(json_file_path)

# Query like this
print(dimensions_map["votes_fgbf0249_page1.png"])
# Output: {'original_width': 1728, 'original_height': 2292}

{'original_width': 2550, 'original_height': 3352}
