# POST PROCESSING of Extracted Terms
In this notebook we apply a post-processing pipeline to the raw term predictions.

Goal of this step:
- clean and normalize predicted terms
- remove clearly bad or truncated terms
- remove nested / redundant terms inside the same sentence

The final output is a cleaner list of candidate terms, better aligned with the gold standard.


In [11]:
import json, os, string
from typing import List
from typing import Dict

# --- 1. UTILITIES DI BASE ----------------------------------------------------


def load_json(path: str):
    with open(path, "r", encoding="utf-8") as f:
        return json.load(f)


def save_json(obj, path: str):
    os.makedirs(os.path.dirname(path) or ".", exist_ok=True)
    with open(path, "w", encoding="utf-8") as f:
        json.dump(obj, f, ensure_ascii=False, indent=2)
    print(f"✓ Saved cleaned predictions to {path}")


def clean_term(term: str) -> str:
    t = term.strip()
    t = " ".join(t.split())
    t = t.strip(string.punctuation + "«»“”'\"")
    return t.lower()

def normalize_term(term: str) -> str:
    return clean_term(term)

# Stopword finali che indicano spesso termini "tagliati" (es. 'gestione dei')
ITALIAN_BAD_ENDINGS = {
    "di", "dei", "degli", "del", "della", "dello", "delle",
    "e", "ed",
    "a", "ai", "agli", "al", "alla", "alle", "allo",
    "da", "dal", "dai", "dagli", "dalla", "dalle",
    "con", "per", "su", "tra", "fra"
}


### 2. Detecting truncated terms

Sometimes the extractor returns incomplete terms such as:

- *"gestione dei"*
- *"batterie e"*

These usually end with a function word (preposition, conjunction, etc.) and are unlikely to be valid domain terms.

The function `looks_truncated(term)` implements a simple heuristic:

- if the term has only one token → it is **not** considered truncated
- otherwise, if the **last** token is in `ITALIAN_BAD_ENDINGS` → the term is marked as **truncated**

This signal can be used later to filter out bad candidates.


In [12]:
def looks_truncated(term: str) -> bool:
    """
    Heuristica: termini che finiscono con una stopword funzionale
    (es. 'gestione dei', 'batterie e') sono probabilmente tagliati.
    """
    tokens = term.split()
    if len(tokens) < 2:
        return False  # una sola parola: può essere un termine valido (es. 'multimateriale')
    last = tokens[-1]
    return last in ITALIAN_BAD_ENDINGS

### Removing nested / redundant terms

Extractors often produce nested terms for the same sentence.

If a **short term** is fully contained inside a **longer term** (same span), we usually want to keep only the longer, more informative one.

The function `remove_nested_terms(sentence_text, terms)` does:

1. For each term, it finds all character spans in the sentence using `find_spans`.
2. It collects all spans and sorts them by **decreasing length** (longer terms first).
3. Greedy selection:  it keeps a span if it is **not fully contained** inside any already selected span
4. It then sorts the selected spans by `start` offset, to preserve sentence order.
5. Finally, it returns the list of unique terms corresponding to the selected spans.

Result: we remove terms that are strictly nested inside other terms, reducing redundancy and making evaluation fairer.


In [13]:
class Span:
    def __init__(self, term: str, start: int, end: int):
        self.term = term
        self.start = start
        self.end = end

    def length(self) -> int:
        return self.end - self.start

    def __repr__(self):
        return f"Span(term={self.term!r}, start={self.start}, end={self.end})"


def find_spans(sentence: str, term: str) -> List[Span]:
    """
    Trova tutte le occorrenze (span carattere) di 'term' in 'sentence' (case-insensitive).
    """
    spans = []
    sent_l = sentence.lower()
    t = term.lower()
    start = 0
    while True:
        idx = sent_l.find(t, start)
        if idx == -1:
            break
        spans.append(Span(term, idx, idx + len(t)))
        start = idx + 1
    return spans


def remove_nested_terms(sentence_text: str, terms: List[str]) -> List[str]:
    """
    Rimuove termini nidificati: se un termine è completamente contenuto in un altro,
    manteniamo solo lo span più lungo.

    Strategia:
    - Troviamo tutti gli span (start, end) dei termini nella frase.
    - Ordiniamo per lunghezza decrescente.
    - Greedy: teniamo uno span se non è completamente contenuto in uno già tenuto.
    """
    spans: List[Span] = []

    for term in terms:
        term_spans = find_spans(sentence_text, term)
        # se un termine compare più volte, consideriamo comunque tutti gli span
        spans.extend(term_spans)

    if not spans:
        return terms  # niente match → nessun filtraggio

    # ordina per lunghezza decrescente (prima i termini più lunghi)
    spans.sort(key=lambda s: s.length(), reverse=True)

    selected: List[Span] = []
    for cand in spans:
        contained = False
        for s in selected:
            if cand.start >= s.start and cand.end <= s.end:
                contained = True
                break
        if not contained:
            selected.append(cand)

    # ordina gli span selezionati in ordine di apparizione nella frase
    selected.sort(key=lambda s: s.start)

    # termini finali nell'ordine in cui appaiono nella frase
    final_terms = []
    seen = set()
    for s in selected:
        if s.term not in seen:
            final_terms.append(s.term)
            seen.add(s.term)

    return final_terms


    


In [14]:
# --- 3. PIPELINE DI PULIZIA PER UNA SINGOLA FRASE ---------------------------
from typing import Tuple
from typing import Counter

def clean_terms_for_sentence(sentence_text: str, raw_terms: List[str]) -> Tuple[List[str], Dict[str, int]]:
    """
    Pulisce i termini per una singola frase applicando:
      - normalizzazione (lowercase, strip, spazi)
      - rimozione duplicati
      - rimozione termini "tagliati" (es. 'gestione dei')
      - rimozione termini nidificati (nested)
    Ritorna:
      - lista di termini puliti
      - stats locali (contatori)
    """
    stats = Counter()

    # 1. normalizza e rimuovi vuoti
    normalized = []
    for t in raw_terms:
        nt = normalize_term(t)
        if nt:
            normalized.append(nt)
        else:
            stats["removed_empty_after_norm"] += 1

    stats["after_normalization"] = len(normalized)

    # 2. rimuovi duplicati preservando l'ordine
    deduped = []
    seen = set()
    for t in normalized:
        if t not in seen:
            deduped.append(t)
            seen.add(t)
        else:
            stats["removed_duplicates"] += 1

    # 3. rimuovi termini "tagliati" (ending stopword)
    not_truncated = []
    for t in deduped:
        if looks_truncated(t):
            stats["removed_truncated_heuristic"] += 1
        else:
            not_truncated.append(t)

    # 4. rimuovi termini nidificati usando il testo originale
    final_terms = remove_nested_terms(sentence_text, not_truncated)
    stats["after_nested_filter"] = len(final_terms)

    return final_terms, stats




## Span Reconstruction

In [15]:
def reconstruct_spans(tokens, labels):
    spans = []
    current = []
    for tok, lab in zip(tokens, labels):
        if lab.startswith("B-"):
            if current:
                spans.append(" ".join(current))
            current = [tok]
        elif lab.startswith("I-") and current:
            current.append(tok)
        else:
            if current:
                spans.append(" ".join(current))
                current = []
    if current:
        spans.append(" ".join(current))
    return spans


#### Sentence-level debugging

To better understand what the post-processing is doing, we define a helper:

`debug_cleaning_for_sentence(sentence_text, raw_terms)`

This function prints, for a single sentence:

1. **RAW TERMS**: original predicted terms.
2. **NORMALIZED TERMS**: after lowercasing and space normalization.
3. **DEDUPED TERMS**: duplicates removed while preserving order.
4. **TRUNCATED**: terms detected as truncated by `looks_truncated()` and removed.
5. **NOT TRUNCATED**: remaining terms that are passed to the nested-term filter.
6. **FINAL TERMS**: result after `remove_nested_terms()`.

This is useful to check if our heuristics are too aggressive (removing good terms) or too weak.


In [16]:
def debug_cleaning_for_sentence(sentence_text: str, raw_terms: List[str]):
    """
    Stampa passo-passo cosa succede ai termini di una singola frase.
    Utile per capire se l'euristica sta rimuovendo cose buone.
    """
    print("=" * 80)
    print("SENTENCE:")
    print(sentence_text)
    print("\nRAW TERMS:")
    print(raw_terms)

    # 1) normalizzazione
    normalized = [normalize_term(t) for t in raw_terms if normalize_term(t)]
    print("\nNORMALIZED TERMS:")
    print(normalized)

    # 2) deduplica
    deduped = []
    seen = set()
    for t in normalized:
        if t not in seen:
            deduped.append(t)
            seen.add(t)
    print("\nDEDUPED TERMS:")
    print(deduped)

    # 3) troncati
    not_truncated = []
    truncated = []
    for t in deduped:
        if looks_truncated(t):
            truncated.append(t)
        else:
            not_truncated.append(t)

    print("\nTRUNCATED (rimossi dalla euristica):")
    print(truncated)
    print("\nNOT TRUNCATED (che passano allo step nested):")
    print(not_truncated)

    # 4) nested
    final_terms = remove_nested_terms(sentence_text, not_truncated)
    print("\nFINAL TERMS AFTER NESTED FILTER:")
    print(final_terms)

    print("=" * 80)

### Cleaning BERT predictions on the full dev set

In [17]:
from collections import Counter
def clean_bert_predictions(
    bert_pred_path: str,
    data_path: str,
    output_path: str,
    debug_sample_n: int = 0,
):
    data = load_json(data_path)
    # Costruisci mappa (doc, par, sent) -> sentence_text
    sentence_map: Dict[Tuple[str, int, int], str] = {}
    rows = data["data"] if isinstance(data, dict) and "data" in data else data
    for r in rows:
        key = (r["document_id"], r["paragraph_id"], r["sentence_id"])
        sentence_map[key] = r["sentence_text"]

    print(f"✓ Loaded {len(sentence_map)} sentences from {data_path}")

    print(f"Loading BERT predictions from {bert_pred_path}...")
    bert_pred = load_json(bert_pred_path)
    pred_rows = bert_pred["data"] if isinstance(bert_pred, dict) and "data" in bert_pred else bert_pred
    print(f"✓ Loaded {len(pred_rows)} prediction entries")

    global_stats: Counter = Counter()
    cleaned_data = {"data": []}

    for idx, entry in enumerate(pred_rows):
        key = (entry["document_id"], entry["paragraph_id"], entry["sentence_id"])
        sent_text = sentence_map.get(key, "")

        raw_terms: List[str] = entry.get("term_list", []) or []

        cleaned_terms, stats = clean_terms_for_sentence(sent_text, raw_terms)

        # se hai una funzione debug_cleaning_for_sentence e ti serve, la puoi chiamare qui
        if debug_sample_n > 0 and idx < debug_sample_n:
             debug_cleaning_for_sentence(sent_text, raw_terms)

        global_stats.update(stats)
        global_stats["total_sentences"] += 1
        global_stats["total_terms_before"] += len(raw_terms)
        global_stats["total_terms_after"] += len(cleaned_terms)

        cleaned_data["data"].append({
            "document_id": entry["document_id"],
            "paragraph_id": entry["paragraph_id"],
            "sentence_id": entry["sentence_id"],
            "term_list": cleaned_terms,
        })
    
    save_json(cleaned_data, output_path)

    print("\n=== Cleaning statistics ===")
    for k, v in sorted(global_stats.items()):
        print(f"{k}: {v}")

In [18]:
clean_bert_predictions(
    bert_pred_path="../final_train_dev_training/predictions/subtask_a_dev_bert_preds_train_dev.json",
    data_path="../../data/test.json",
    output_path="../final_train_dev_training/predictions/subtask_a_dev_bert_preds_train_dev_cleaned.json",
    debug_sample_n=5,
)
#src\final_train_dev_training\predictions\subtask_a_dev_bert_preds_train_dev.json

✓ Loaded 1142 sentences from ../../data/test.json
Loading BERT predictions from ../final_train_dev_training/predictions/subtask_a_dev_bert_preds_train_dev.json...
✓ Loaded 1142 prediction entries
SENTENCE:
COMUNE DI AMATO

RAW TERMS:
[]

NORMALIZED TERMS:
[]

DEDUPED TERMS:
[]

TRUNCATED (rimossi dalla euristica):
[]

NOT TRUNCATED (che passano allo step nested):
[]

FINAL TERMS AFTER NESTED FILTER:
[]
SENTENCE:
PROVINCIA DI CATANZARO

RAW TERMS:
[]

NORMALIZED TERMS:
[]

DEDUPED TERMS:
[]

TRUNCATED (rimossi dalla euristica):
[]

NOT TRUNCATED (che passano allo step nested):
[]

FINAL TERMS AFTER NESTED FILTER:
[]
SENTENCE:
(UFFICIO DEL SINDACO)

RAW TERMS:
[]

NORMALIZED TERMS:
[]

DEDUPED TERMS:
[]

TRUNCATED (rimossi dalla euristica):
[]

NOT TRUNCATED (che passano allo step nested):
[]

FINAL TERMS AFTER NESTED FILTER:
[]
SENTENCE:
Via Marconi, 14 – 88040 Amato (CZ)

RAW TERMS:
[]

NORMALIZED TERMS:
[]

DEDUPED TERMS:
[]

TRUNCATED (rimossi dalla euristica):
[]

NOT TRUNCATED (che