<a href="https://colab.research.google.com/github/praveenkumarkatwe/colab_scratchpad/blob/main/Untitled5.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

SyntaxError: incomplete input (ipython-input-1374580336.py, line 1)

In [None]:
"""
Compute FactCC and QAGS scores for summarization outputs.

This script extends the earlier ``compute_factcc_feqa_scores.py`` by replacing
the FEQA/QuestEval component with **QAGS** (Question Answering and
Generation for Summarization).  QAGS is a question‑answering based metric
introduced by Wang et al. (2020) that assesses factual consistency by
comparing answers to automatically generated questions when the source
article and the model summary are used as context【596674913693464†L117-L143】.  The
`factsumm` library provides a convenient implementation of QAGS via the
``FactSumm`` class.  For each pair of source and summary, the method
``extract_qas`` generates questions from the summary, answers them using
both the source and the summary, and returns the average F1 overlap of
answers (the QAGS score)【456578228132877†L305-L361】.

The script reads a JSON file containing records with at least the following
fields:

* ``text`` – the source document or passage to be summarized.
* ``reference`` – the reference (gold) summary (optional but retained for
  completeness).
* ``generated_before`` – the system summary before fine‑tuning.
* ``generated_after`` – the system summary after fine‑tuning.

It computes two factuality scores for each of the ``generated_before`` and
``generated_after`` summaries:

1. **FactCC** – a classifier‑based probability that the summary is
   factually consistent with the source【484315385132402†L6-L23】.  We load the
   ``manueldeprada/FactCC`` model via Hugging Face ``transformers`` and
   report the softmax probability of the ``factual`` class.
2. **QAGS** – a QA‑based faithfulness score computed via ``FactSumm``
   in ``factsumm``.  ``extract_qas`` returns a float between 0 and 1
   representing the average F1 overlap of answers to questions posed on
   the source and summary【456578228132877†L305-L361】.

The resulting augmented records include four new keys per record:

* ``factcc_before`` – FactCC score for the pre‑fine‑tuning summary.
* ``qags_before`` – QAGS score for the pre‑fine‑tuning summary.
* ``factcc_after`` – FactCC score for the post‑fine‑tuning summary.
* ``qags_after`` – QAGS score for the post‑fine‑tuning summary.

### Usage

Install dependencies:

```bash
pip install transformers torch factsumm
```

Then run the script over your JSON file:

```bash
python compute_factcc_qags_scores.py --input_path input.json --output_path output.json
```

You can optionally add ``--use_cuda`` to place the FactCC model on GPU and
use GPU for QAGS if available.  Note that ``factsumm`` will automatically
download several pre‑trained models (NER, relation extraction, question
generation, question answering) the first time it is used【703489966589102†L46-L116】.

"""

import argparse
import json
import os
from typing import Any, Dict, List

import torch
from transformers import BertTokenizer, BertForSequenceClassification

# Import FactSumm for QAGS computation.  We wrap this in a try/except
# to provide a helpful error message if the user has not installed
# ``factsumm``.  FactSumm encapsulates all the components needed to
# generate and answer questions and compute the QAGS score【456578228132877†L305-L361】.
try:
    from factsumm import FactSumm
except ImportError as exc:
    raise ImportError(
        "factsumm is required for QAGS scoring but is not installed. "
        "Please install it with `pip install factsumm` and ensure your "
        "environment has access to the internet to download the underlying models."
    ) from exc


def load_models(
    model_name: str = "manueldeprada/FactCC",
    use_cuda: bool = False,
) -> tuple:
    """Load the FactCC classifier and the FactSumm QAGS scorer.

    Args:
        model_name: Hugging Face identifier for the FactCC model.
        use_cuda: Whether to load models on GPU if available.

    Returns:
        A tuple ``(tokenizer, classifier, factsumm)``.
    """
    # Load FactCC tokenizer and classifier
    tokenizer = BertTokenizer.from_pretrained(model_name)
    classifier = BertForSequenceClassification.from_pretrained(model_name)
    classifier.eval()
    if use_cuda and torch.cuda.is_available():
        classifier.to("cuda")
    # Instantiate FactSumm.  The constructor sets up configuration for
    # named entity recognition, relation extraction, question generation
    # and question answering using default models.  The heavy models are
    # loaded lazily on first use.
    factsumm = FactSumm()
    return tokenizer, classifier, factsumm


def compute_factcc_score(
    classifier: BertForSequenceClassification,
    tokenizer: BertTokenizer,
    source: str,
    summary: str,
) -> float:
    """Compute the FactCC probability that ``summary`` is factual for ``source``.

    The classifier predicts two logits, corresponding to ``non‑factual`` and
    ``factual`` classes.  After applying softmax we return the probability of
    the second class (index 1).  See the FactCC paper for details【484315385132402†L6-L23】.

    Args:
        classifier: Pre‑trained FactCC classifier.
        tokenizer: Tokenizer associated with the FactCC model.
        source: Original source document.
        summary: Candidate summary.

    Returns:
        A float in [0, 1] representing the FactCC factuality score.
    """
    # Tokenize the source and summary.  The sequence is truncated at 512
    # tokens (the BERT maximum) and padded to ensure fixed length.
    inputs = tokenizer(
        source,
        summary,
        max_length=512,
        padding="max_length",
        truncation=True,
        return_tensors="pt",
    )
    with torch.no_grad():
        logits = classifier(**inputs).logits
    probs = torch.softmax(logits, dim=-1)
    factual_prob = probs[0, 1].item()
    return float(factual_prob)


def compute_qags_score(
    factsumm: FactSumm,
    source: str,
    summary: str,
    use_cuda: bool = False,
) -> float:
    """Compute the QAGS score for a (source, summary) pair using FactSumm.

    QAGS compares answers to questions generated from the summary when
    conditioned on the source versus conditioned on the summary.  The
    underlying implementation generates questions with a T5 model and
    answers them with a BERT‑style QA model【456578228132877†L305-L361】.

    Args:
        factsumm: Instantiated ``FactSumm`` object.
        source: Original source document.
        summary: Candidate summary.
        use_cuda: Whether to run QAGS on GPU (if available).  FactSumm
            internally takes a ``device`` string: ``"cuda"`` or ``"cpu"``.

    Returns:
        A float QAGS score between 0 and 1.
    """
    device = "cuda" if use_cuda and torch.cuda.is_available() else "cpu"
    # ``extract_qas`` returns the QAGS score directly.  We pass
    # ``verbose=False`` to suppress intermediate prints.
    score = factsumm.extract_qas(
        source,
        summary,
        source_ents=None,
        summary_ents=None,
        verbose=False,
        device=device,
    )
    # ``extract_qas`` logs the QAGS score, but also returns it.  We cast to
    # float for JSON serialization.
    return float(score)


def process_json(
    records: List[Dict[str, Any]],
    tokenizer: BertTokenizer,
    classifier: BertForSequenceClassification,
    factsumm: FactSumm,
    use_cuda: bool = False,
) -> List[Dict[str, Any]]:
    """Augment each record with FactCC and QAGS scores.

    Args:
        records: List of input records loaded from JSON.
        tokenizer: FactCC tokenizer.
        classifier: FactCC classifier.
        factsumm: FactSumm object for QAGS computation.
        use_cuda: Whether to use GPU.

    Returns:
        The list of records with new score fields.
    """
    for idx, item in enumerate(records):
        source = item.get("text") or item.get("source")
        reference = item.get("reference") or item.get("reference_summary")
        before = (
            item.get("generated_before")
            or item.get("summary_before")
            or item.get("generated_summary")
        )
        after = (
            item.get("generated_after")
            or item.get("summary_after")
            or item.get("generated_finetuned")
        )
        if source is None or before is None or after is None:
            raise KeyError(
                f"Record {idx} is missing required keys: 'text', 'generated_before', 'generated_after'."
            )
        # Compute FactCC scores for before and after summaries
        item["factcc_before"] = compute_factcc_score(classifier, tokenizer, source, before)
        item["factcc_after"] = compute_factcc_score(classifier, tokenizer, source, after)
        # Compute QAGS scores using FactSumm.  We ignore the reference here
        # since QAGS only uses source and summary.
        try:
            qags_before = compute_qags_score(factsumm, source, before, use_cuda)
        except Exception:
            qags_before = 0.0
        try:
            qags_after = compute_qags_score(factsumm, source, after, use_cuda)
        except Exception:
            qags_after = 0.0
        item["qags_before"] = qags_before
        item["qags_after"] = qags_after
    return records


def main() -> None:
    parser = argparse.ArgumentParser(
        description=(
            "Compute FactCC and QAGS scores for summaries in a JSON file."
        )
    )
    parser.add_argument(
        "--input_path",
        type=str,
        required=True,
        help="Path to the input JSON file.",
    )
    parser.add_argument(
        "--output_path",
        type=str,
        default=None,
        help=(
            "Path to save the scored JSON.  Defaults to the input filename with "
            "'_scored' appended before the extension."
        ),
    )
    parser.add_argument(
        "--use_cuda",
        action="store_true",
        help="Whether to run FactCC and QAGS on CUDA if available.",
    )

    # Determine output path

    input_path="output.json"
    output_path="output_scored.json"
    use_cuda=False
    if output_path is None:
        base, ext = os.path.splitext(input_path)
        output_path = f"{base}_scored{ext}"
    # Load input JSON
    with open(input_path, "r", encoding="utf-8") as fin:
        data = json.load(fin)
    if not isinstance(data, list):
        raise ValueError("Input JSON must be a list of records.")
    # Load models
    tokenizer, classifier, factsumm = load_models(use_cuda=args.use_cuda)
    # Process and score
    scored_data = process_json(
        data,
        tokenizer,
        classifier,
        factsumm,
        use_cuda=use_cuda,
    )
    # Write output
    with open(output_path, "w", encoding="utf-8") as fout:
        json.dump(scored_data, fout, indent=2, ensure_ascii=False)
    print(f"Wrote scored records to {args.output_path}")


if __name__ == "__main__":

    main()

FileNotFoundError: [Errno 2] No such file or directory: 'output.json'

In [None]:
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
compute_factcc_qags_scores.py

Add FactCC-like and QAGS-like factual consistency scores to a before/after
summarization dataset.

Input JSON format (list of dicts). The script is flexible and recognizes
multiple key names. For your file, these are the important ones:

- Source text:
    - preferred: "input"
    - also accepted: "text", "source"

- Reference summary (optional; unused by default for QAGS but kept if needed):
    - "reference", "reference_summary"

- BEFORE summary:
    - preferred: "generatedsummary_before"
    - also accepted: "generated_before", "summary_before", "generated_summary"

- AFTER summary:
    - preferred: "generatedsummary_after"
    - also accepted: "generated_after", "summary_after", "generated_finetuned"

Outputs (per record, appended):
    - "factcc_before", "factcc_after"      -> probability that summary is consistent
    - "qags_before",   "qags_after"        -> average token-F1 between QA answers
                                              from source vs. summary, across
                                              auto-generated questions.

USAGE:
    pip install -q transformers torch accelerate
    # for QAGS-like (QG + QA):
    pip install -q sentencepiece

    python compute_factcc_qags_scores.py \
        --input_path /path/to/before_after.json \
        --output_path /path/to/before_after_scored.json

You can change model names:
    --factcc_model tals/albert-base-v2-factcc
    --qg_model valhalla/t5-small-qg-hl
    --qa_model deepset/roberta-base-squad2

NOTES:
- "FactCC-like": We use a binary/textual entailment-style classifier that accepts
  (source, summary) as a sequence pair and returns a consistency probability.
  If the chosen model exposes labels, we map to 'consistent' as best as possible.

- "QAGS-like": We generate questions from the summary, then answer each question
  twice: once from the SOURCE ('answer_src') and once from the SUMMARY
  ('answer_sum'). We compute token-level F1(answer_src, answer_sum) over the
  generated questions and average. Higher is better.

- Truncation: Long source texts are truncated to model max length; you can
  adjust with --max_source_tokens and --max_summary_tokens.

"""

import argparse
import json
import math
import os
import sys
from typing import Any, Dict, List, Optional, Tuple

import torch
from transformers import (
    AutoTokenizer,
    AutoModelForSequenceClassification,
    AutoModelForSeq2SeqLM,
    pipeline,
)

# ----------------------------
# Helpers
# ----------------------------

def pick_first(*vals):
    """Return the first non-empty value from provided candidates (strings)."""
    for v in vals:
        if v is None:
            continue
        if isinstance(v, str) and v.strip() == "":
            continue
        return v
    return None


def tokenize_pair(tokenizer, a: str, b: str, max_a: int, max_b: int):
    """Tokenize a pair (a=source, b=summary) with independent truncation budgets."""
    # Strategy: truncate each string individually, then let tokenizer pair them.
    # We do a rough truncation by tokens using tokenizer.encode with truncation=True.
    a_ids = tokenizer.encode(a, add_special_tokens=False, truncation=True, max_length=max_a)
    b_ids = tokenizer.encode(b, add_special_tokens=False, truncation=True, max_length=max_b)

    # Build pair with special tokens
    pair = tokenizer.prepare_for_model(
        a_ids,
        b_ids,
        truncation=True,
        max_length=max_a + max_b + 3,  # [CLS], [SEP], [SEP] (model-dependent)
        return_tensors="pt",
        add_special_tokens=True,
    )
    return pair


def softmax(x: torch.Tensor, dim: int = -1) -> torch.Tensor:
    return torch.nn.functional.softmax(x, dim=dim)


def sanitize(s: Optional[str]) -> str:
    return s if (isinstance(s, str) and len(s) > 0) else ""


def simple_token_f1(a: str, b: str) -> float:
    """Token-level F1 (space-split; case-insensitive; strips punctuation lightly)."""
    import re
    def norm(t: str) -> List[str]:
        t = t.lower()
        t = re.sub(r"[^a-z0-9\s]", " ", t)
        toks = [x for x in t.split() if x]
        return toks

    A, B = norm(a), norm(b)
    if not A and not B:
        return 1.0
    if not A or not B:
        return 0.0
    # overlap
    from collections import Counter
    cA, cB = Counter(A), Counter(B)
    overlap = sum((cA & cB).values())
    if overlap == 0:
        return 0.0
    precision = overlap / max(1, sum(cA.values()))
    recall    = overlap / max(1, sum(cB.values()))
    if precision + recall == 0:
        return 0.0
    return 2 * precision * recall / (precision + recall)


# ----------------------------
# FactCC-like scoring
# ----------------------------

class FactCCScorer:
    """
    Wraps a sequence classifier to output P(consistent | source, summary).

    We attempt to map label indices to {consistent, inconsistent} by inspecting
    model.config.id2label. If unknown, we assume the higher logit corresponds to 'consistent'.
    """

    def __init__(self, model_name: str, device: str = "auto"):
        self.model_name = model_name
        self.tokenizer = AutoTokenizer.from_pretrained(model_name, use_fast=True)
        self.model = AutoModelForSequenceClassification.from_pretrained(model_name)
        if device == "auto":
            self.device = 0 if torch.cuda.is_available() else -1
        elif device == "cpu":
            self.device = -1
        else:
            # assume it's an integer CUDA device id
            try:
                self.device = int(device)
            except Exception:
                self.device = -1

        if self.device >= 0:
            self.model.to(f"cuda:{self.device}")
        self.model.eval()

        # try to infer label mapping
        self.consistent_label_idx = None
        id2label = getattr(self.model.config, "id2label", None)
        if isinstance(id2label, dict) and len(id2label) == self.model.config.num_labels:
            for k, v in id2label.items():
                name = str(v).lower()
                if any(t in name for t in ["entail", "support", "consistent", "true", "label_1"]):
                    self.consistent_label_idx = int(k)
                    break
            # fallback: if labels look like {0: 'LABEL_0', 1: 'LABEL_1'}
            if self.consistent_label_idx is None and 1 in id2label:
                self.consistent_label_idx = 1  # assume positive=1
        if self.consistent_label_idx is None:
            # unknown mapping; will take max logit as 'consistent'
            self.consistent_label_idx = -1

    @torch.no_grad()
    def score(self, source: str, summary: str,
              max_source_tokens: int = 512, max_summary_tokens: int = 256) -> float:
        source = sanitize(source)
        summary = sanitize(summary)
        if not source or not summary:
            return 0.0

        pair = tokenize_pair(self.tokenizer, source, summary, max_source_tokens, max_summary_tokens)
        if self.device >= 0:
            pair = {k: v.to(f"cuda:{self.device}") for k, v in pair.items()}
        logits = self.model(**pair).logits  # [1, num_labels]
        probs = softmax(logits, dim=-1).squeeze(0)

        if self.consistent_label_idx == -1:
            # take max as 'consistent'
            return float(torch.max(probs).item())
        else:
            return float(probs[self.consistent_label_idx].item())


# ----------------------------
# QAGS-like scoring
# ----------------------------

class QAGSLite:
    """
    QAGS-like scorer: generate questions from SUMMARY, then answer the questions
    twice (from SOURCE and from SUMMARY). Compute token-level F1 between the
    two answers and average.

    This is a lightweight approximation of QAGS:
    - Question Generation: a seq2seq T5 model fine-tuned for QG with highlighting.
    - QA: extractive QA model (SQuAD2-style).
    """

    def __init__(self,
                 qg_model: str = "valhalla/t5-small-qg-hl",
                 qa_model: str = "deepset/roberta-base-squad2",
                 device: str = "auto"):
        # device handling for pipelines:
        if device == "auto":
            self.device = 0 if torch.cuda.is_available() else -1
        elif device == "cpu":
            self.device = -1
        else:
            try:
                self.device = int(device)
            except Exception:
                self.device = -1

        # QG pipeline: we will manually craft highlights and feed through text2text
        self.qg_tokenizer = AutoTokenizer.from_pretrained(qg_model, use_fast=True)
        self.qg_model = AutoModelForSeq2SeqLM.from_pretrained(qg_model)
        if self.device >= 0:
            self.qg_model.to(f"cuda:{self.device}")
        self.qg_model.eval()

        # QA pipeline
        self.qa_pipe = pipeline(
            task="question-answering",
            model=qa_model,
            tokenizer=qa_model,
            device=self.device
        )

    @torch.no_grad()
    def _gen_questions(self, summary: str, max_questions: int = 5,
                       max_input_len: int = 384, max_out_len: int = 32) -> List[str]:
        """
        Very simple QG using "highlight" trick:
          input: "generate questions: <hl> sentence <hl> rest"
        We'll split the summary into sentences and highlight each one to prompt QG.
        """
        import re
        sents = [s.strip() for s in re.split(r'(?<=[\.\?\!])\s+', summary) if s.strip()]
        questions = []
        for sent in sents:
            prompt = f"generate questions: <hl> {sent} <hl> {summary}"
            enc = self.qg_tokenizer(
                prompt,
                return_tensors="pt",
                truncation=True,
                max_length=max_input_len
            )
            if self.device >= 0:
                enc = {k: v.to(f"cuda:{self.device}") for k, v in enc.items()}
            out = self.qg_model.generate(
                **enc,
                num_beams=4,
                max_length=max_out_len,
                early_stopping=True
            )
            text = self.qg_tokenizer.decode(out[0], skip_special_tokens=True).strip()
            # Models often emit multiple questions separated by "?" or "<sep>"
            parts = re.split(r'\?|\<sep\>', text)
            for p in parts:
                q = p.strip()
                if q:
                    if not q.endswith("?"):
                        q += "?"
                    questions.append(q)
            if len(questions) >= max_questions:
                break
        # de-dup and trim
        dedup = []
        seen = set()
        for q in questions:
            qn = q.lower()
            if qn not in seen:
                seen.add(qn)
                dedup.append(q)
        return dedup[:max_questions]

    def _answer(self, question: str, context: str) -> str:
        context = sanitize(context)
        if not question or not context:
            return ""
        try:
            out = self.qa_pipe({"question": question, "context": context})
            if isinstance(out, list) and out:
                out = out[0]
            ans = out.get("answer", "") if isinstance(out, dict) else ""
            return ans.strip()
        except Exception:
            return ""

    def score(self, source: str, summary: str,
              max_questions: int = 5) -> float:
        summary = sanitize(summary)
        source  = sanitize(source)
        if not summary or not source:
            return 0.0

        questions = self._gen_questions(summary, max_questions=max_questions)
        if not questions:
            return 0.0

        f1s = []
        for q in questions:
            ans_src = self._answer(q, source)
            ans_sum = self._answer(q, summary)
            f1s.append(simple_token_f1(ans_src, ans_sum))
        if not f1s:
            return 0.0
        return float(sum(f1s) / len(f1s))


# ----------------------------
# Processing logic
# ----------------------------

def process_json(records: List[Dict[str, Any]],
                 factcc: FactCCScorer,
                 qags: QAGSLite,
                 max_source_tokens: int,
                 max_summary_tokens: int,
                 qags_max_questions: int) -> List[Dict[str, Any]]:
    """
    Augment each record with FactCC and QAGS-like scores.
    Handles multiple key variants and raises a clear error if a required field is missing.
    """
    for idx, item in enumerate(records):
        source = pick_first(item.get("text"),
                           item.get("source"),
                           item.get("input"))
        reference = pick_first(item.get("reference"),
                               item.get("reference_summary"))

        before = pick_first(item.get("generated_before"),
                           item.get("summary_before"),
                           item.get("generated_summary"),
                           item.get("generatedsummary_before"))

        after  = pick_first(item.get("generated_after"),
                           item.get("summary_after"),
                           item.get("generated_finetuned"),
                           item.get("generatedsummary_after"))

        if source is None or before is None or after is None:
            raise KeyError(
                f"Record {idx} is missing required keys:\n"
                f"  source in ['text','source','input'] -> {bool(source)}\n"
                f"  before in ['generated_before','summary_before','generated_summary','generatedsummary_before'] -> {bool(before)}\n"
                f"  after  in ['generated_after','summary_after','generated_finetuned','generatedsummary_after'] -> {bool(after)}"
            )

        # FactCC-like
        try:
            item["factcc_before"] = factcc.score(source, before,
                                                 max_source_tokens=max_source_tokens,
                                                 max_summary_tokens=max_summary_tokens)
        except Exception:
            item["factcc_before"] = 0.0

        try:
            item["factcc_after"] = factcc.score(source, after,
                                                max_source_tokens=max_source_tokens,
                                                max_summary_tokens=max_summary_tokens)
        except Exception:
            item["factcc_after"] = 0.0

        # QAGS-like
        try:
            item["qags_before"] = qags.score(source, before,
                                             max_questions=qags_max_questions)
        except Exception:
            item["qags_before"] = 0.0

        try:
            item["qags_after"] = qags.score(source, after,
                                            max_questions=qags_max_questions)
        except Exception:
            item["qags_after"] = 0.0

        # Keep reference around if you want to do reference-aware analysis later
        if reference is not None and "reference" not in item:
            item["reference"] = reference

    return records


# ----------------------------
# CLI
# ----------------------------

def parse_args():
    p = argparse.ArgumentParser(description="Compute FactCC-like and QAGS-like scores for before/after summaries.")
    p.add_argument("--input_path", required=False, help="Path to input JSON (list of dicts).")
    p.add_argument("--output_path", required=False, help="Where to write augmented JSON.")
    p.add_argument("--factcc_model", default="tals/albert-base-v2-factcc",
                   help="HF model for pairwise consistency classification.")
    p.add_argument("--qg_model", default="valhalla/t5-small-qg-hl",
                   help="HF seq2seq model for question generation.")
    p.add_argument("--qa_model", default="deepset/roberta-base-squad2",
                   help="HF extractive QA model.")
    p.add_argument("--device", default="auto",
                   help="Device for models: 'auto' | 'cpu' | CUDA index (e.g., '0').")
    p.add_argument("--max_source_tokens", type=int, default=448,
                   help="Max tokens reserved for source in the FactCC pair.")
    p.add_argument("--max_summary_tokens", type=int, default=192,
                   help="Max tokens reserved for summary in the FactCC pair.")
    p.add_argument("--qags_max_questions", type=int, default=5,
                   help="Maximum questions to generate per summary for QAGS-like scoring.")
    return p.parse_args()


def main():
    args = parse_args()
    args.input_path="output.json"
    args.output_path="output_scored.json"
    args.use_cuda=False
    # Load input
    with open(args.input_path, "r", encoding="utf-8") as f:
        data = json.load(f)
    if not isinstance(data, list):
        raise ValueError("Input JSON must be a list of objects.")

    # Init scorers
    factcc = FactCCScorer(model_name=args.factcc_model, device=args.device)
    qags = QAGSLite(qg_model=args.qg_model, qa_model=args.qa_model, device=args.device)

    # Process
    data = process_json(
        records=data,
        factcc=factcc,
        qags=qags,
        max_source_tokens=args.max_source_tokens,
        max_summary_tokens=args.max_summary_tokens,
        qags_max_questions=args.qags_max_questions,
    )

    # Save
    with open(args.output_path, "w", encoding="utf-8") as f:
        json.dump(data, f, ensure_ascii=False, indent=2)

    print(f"✅ Wrote augmented JSON to: {args.output_path}")
    print("Keys added per record: factcc_before, factcc_after, qags_before, qags_after")


if __name__ == "__main__":
    main()


usage: colab_kernel_launcher.py [-h] [--input_path INPUT_PATH]
                                [--output_path OUTPUT_PATH]
                                [--factcc_model FACTCC_MODEL]
                                [--qg_model QG_MODEL] [--qa_model QA_MODEL]
                                [--device DEVICE]
                                [--max_source_tokens MAX_SOURCE_TOKENS]
                                [--max_summary_tokens MAX_SUMMARY_TOKENS]
                                [--qags_max_questions QAGS_MAX_QUESTIONS]
colab_kernel_launcher.py: error: unrecognized arguments: -f /root/.local/share/jupyter/runtime/kernel-422a61aa-88fc-440f-9518-b2976dfb3cbe.json


SystemExit: 2

  warn("To exit: use 'exit', 'quit', or Ctrl-D.", stacklevel=1)


In [6]:
# @title 🧠 Define scorers and helpers (run once)

import json, re, math, os, sys
from typing import Any, Dict, List, Optional
import torch
from transformers import (
    AutoTokenizer,
    AutoModelForSequenceClassification,
    AutoModelForSeq2SeqLM,
    pipeline,
)

def pick_first(*vals):
    for v in vals:
        if v is None:
            continue
        if isinstance(v, str) and v.strip() == "":
            continue
        return v
    return None

def sanitize(s: Optional[str]) -> str:
    return s if (isinstance(s, str) and len(s) > 0) else ""

def softmax(x: torch.Tensor, dim: int = -1) -> torch.Tensor:
    return torch.nn.functional.softmax(x, dim=dim)

def tokenize_pair(tokenizer, a: str, b: str, max_a: int, max_b: int):
    a_ids = tokenizer.encode(a, add_special_tokens=False, truncation=True, max_length=max_a)
    b_ids = tokenizer.encode(b, add_special_tokens=False, truncation=True, max_length=max_b)
    pair = tokenizer.prepare_for_model(
        a_ids, b_ids,
        truncation=True,
        max_length=max_a + max_b + 3,
        return_tensors="pt",
        add_special_tokens=True,
    )
    return pair

def simple_token_f1(a: str, b: str) -> float:
    def norm(t: str) -> List[str]:
        t = t.lower()
        t = re.sub(r"[^a-z0-9\s]", " ", t)
        toks = [x for x in t.split() if x]
        return toks
    A, B = norm(a), norm(b)
    if not A and not B:
        return 1.0
    if not A or not B:
        return 0.0
    from collections import Counter
    cA, cB = Counter(A), Counter(B)
    overlap = sum((cA & cB).values())
    if overlap == 0:
        return 0.0
    precision = overlap / max(1, sum(cA.values()))
    recall    = overlap / max(1, sum(cB.values()))
    if precision + recall == 0:
        return 0.0
    return 2 * precision * recall / (precision + recall)

class FactCCScorer:
    """Binary (source, summary) consistency probability via sequence classifier."""
    def __init__(self, model_name: str, device: str = "auto"):
        self.model_name = model_name
        self.tokenizer = AutoTokenizer.from_pretrained(model_name, use_fast=True)
        self.model = AutoModelForSequenceClassification.from_pretrained(model_name)
        if device == "auto":
            self.device = 0 if torch.cuda.is_available() else -1
        elif device == "cpu":
            self.device = -1
        else:
            try:
                self.device = int(device)
            except Exception:
                self.device = -1
        if self.device >= 0:
            self.model.to(f"cuda:{self.device}")
        self.model.eval()

        self.consistent_label_idx = None
        id2label = getattr(self.model.config, "id2label", None)
        if isinstance(id2label, dict) and len(id2label) == self.model.config.num_labels:
            for k, v in id2label.items():
                name = str(v).lower()
                if any(t in name for t in ["entail", "support", "consistent", "true", "label_1"]):
                    self.consistent_label_idx = int(k)
                    break
            if self.consistent_label_idx is None and 1 in id2label:
                self.consistent_label_idx = 1
        if self.consistent_label_idx is None:
            self.consistent_label_idx = -1

    @torch.no_grad()
    def score(self, source: str, summary: str,
              max_source_tokens: int = 448, max_summary_tokens: int = 192) -> float:
        source = sanitize(source)
        summary = sanitize(summary)
        if not source or not summary:
            return 0.0
        pair = tokenize_pair(self.tokenizer, source, summary, max_source_tokens, max_summary_tokens)
        if self.device >= 0:
            pair = {k: v.to(f"cuda:{self.device}") for k, v in pair.items()}
        logits = self.model(**pair).logits
        probs = softmax(logits, dim=-1).squeeze(0)
        if self.consistent_label_idx == -1:
            return float(torch.max(probs).item())
        return float(probs[self.consistent_label_idx].item())

class QAGSLite:
    """
    Generate questions from SUMMARY and answer them from SOURCE and SUMMARY;
    average token-F1 between the two answers (QAGS-style).
    """
    def __init__(self,
                 qg_model: str = "valhalla/t5-small-qg-hl",
                 qa_model: str = "deepset/roberta-base-squad2",
                 device: str = "auto"):
        if device == "auto":
            self.device = 0 if torch.cuda.is_available() else -1
        elif device == "cpu":
            self.device = -1
        else:
            try:
                self.device = int(device)
            except Exception:
                self.device = int(device)
        self.qg_tokenizer = AutoTokenizer.from_pretrained(qg_model, use_fast=True)
        self.qg_model = AutoModelForSeq2SeqLM.from_pretrained(qg_model)
        if self.device >= 0:
            self.qg_model.to(f"cuda:{self.device}")
        self.qg_model.eval()

        self.qa_pipe = pipeline(
            task="question-answering",
            model=qa_model,
            tokenizer=qa_model,
            device=self.device
        )

    @torch.no_grad()
    def _gen_questions(self, summary: str, max_questions: int = 5,
                       max_input_len: int = 384, max_out_len: int = 32) -> List[str]:
        sents = [s.strip() for s in re.split(r'(?<=[\.\?\!])\s+', summary) if s.strip()]
        questions = []
        for sent in sents:
            prompt = f"generate questions: <hl> {sent} <hl> {summary}"
            enc = self.qg_tokenizer(prompt, return_tensors="pt", truncation=True, max_length=max_input_len)
            if self.device >= 0:
                enc = {k: v.to(f"cuda:{self.device}") for k, v in enc.items()}
            out = self.qg_model.generate(**enc, num_beams=4, max_length=max_out_len, early_stopping=True)
            text = self.qg_tokenizer.decode(out[0], skip_special_tokens=True).strip()
            parts = re.split(r'\?|\<sep\>', text)
            for p in parts:
                q = p.strip()
                if q:
                    if not q.endswith("?"):
                        q += "?"
                    questions.append(q)
            if len(questions) >= max_questions:
                break
        dedup, seen = [], set()
        for q in questions:
            qn = q.lower()
            if qn not in seen:
                seen.add(qn)
                dedup.append(q)
        return dedup[:max_questions]

    def _answer(self, question: str, context: str) -> str:
        context = sanitize(context)
        if not question or not context:
            return ""
        try:
            out = self.qa_pipe({"question": question, "context": context})
            if isinstance(out, list) and out:
                out = out[0]
            return (out.get("answer", "") if isinstance(out, dict) else "").strip()
        except Exception:
            return ""

    def score(self, source: str, summary: str, max_questions: int = 5) -> float:
        summary = sanitize(summary); source = sanitize(source)
        if not summary or not source:
            return 0.0
        questions = self._gen_questions(summary, max_questions=max_questions)
        if not questions:
            return 0.0
        f1s = []
        for q in questions:
            ans_src = self._answer(q, source)
            ans_sum = self._answer(q, summary)
            f1s.append(simple_token_f1(ans_src, ans_sum))
        return float(sum(f1s)/len(f1s)) if f1s else 0.0

def process_json(records: List[Dict[str, Any]],
                 factcc: FactCCScorer,
                 qags: QAGSLite,
                 max_source_tokens: int,
                 max_summary_tokens: int,
                 qags_max_questions: int) -> List[Dict[str, Any]]:
    for idx, item in enumerate(records):
        source = pick_first(item.get("text"), item.get("source"), item.get("input"))
        reference = pick_first(item.get("reference"), item.get("reference_summary"))
        before = pick_first(item.get("generated_before"),
                            item.get("summary_before"),
                            item.get("generated_summary"),
                            item.get("generatedsummary_before"),
                            item.get("generatedsummary_before_EHI")) # Added the new key here
        after  = pick_first(item.get("generated_after"),
                            item.get("summary_after"),
                            item.get("generated_finetuned"),
                            item.get("generatedsummary_after"),
                            item.get("generatedsummary_after_EHI")) # Added the new key here
        if source is None or before is None or after is None:
            raise KeyError(
                f"Record {idx} missing keys. "
                f"source in ['text','source','input'] present? {bool(source)} | "
                f"before in ['generated_before','summary_before','generated_summary','generatedsummary_before','generatedsummary_before_EHI'] present? {bool(before)} | "
                f"after in ['generated_after','summary_after','generated_finetuned','generatedsummary_after','generatedsummary_after_EHI'] present? {bool(after)}"
            )
        try:
            item["factcc_before"] = factcc.score(source, before, max_source_tokens, max_summary_tokens)
        except Exception:
            item["factcc_before"] = 0.0
        try:
            item["factcc_after"]  = factcc.score(source, after, max_source_tokens, max_summary_tokens)
        except Exception:
            item["factcc_after"] = 0.0
        try:
            item["qags_before"]   = qags.score(source, before, max_questions=qags_max_questions)
        except Exception:
            item["qags_before"] = 0.0
        try:
            item["qags_after"]    = qags.score(source, after, max_questions=qags_max_questions)
        except Exception:
            item["qags_after"] = 0.0

        if reference is not None and "reference" not in item:
            item["reference"] = reference
    return records

In [7]:
# 🚫 Force Transformers/HF Hub to run without any token
import os
# If your notebook (or Colab UI) injected a token, drop it:
os.environ.pop("HF_TOKEN", None)
os.environ.pop("HUGGING_FACE_HUB_TOKEN", None)
os.environ["HF_HUB_DISABLE_TELEMETRY"] = "1"      # optional: quiet logs
os.environ["TRANSFORMERS_OFFLINE"] = "0"          # make sure online downloads are OK
print("HF tokens cleared; running without authentication.")


HF tokens cleared; running without authentication.


In [8]:
# @title ▶️ Run scoring (upload JSON or point to Drive path)
# @markdown Choose how to provide your input JSON:
use_upload = True  # @param {type:"boolean"}
# @markdown If you prefer to read from Google Drive, set `use_upload=False` and edit the `input_path` below.
input_path = "/content/drive/MyDrive/before_after_all_metrics_XSUM_mistral.json"  # @param {type:"string"}

# Model & runtime knobs
factcc_model = "tals/albert-base-v2-factcc"  # @param {type:"string"}
qg_model     = "valhalla/t5-small-qg-hl"     # @param {type:"string"}
qa_model     = "deepset/roberta-base-squad2" # @param {type:"string"}
device       = "auto"                        # @param ["auto","cpu","0","1","2","3"]
max_source_tokens   = 448                    # @param {type:"integer"}
max_summary_tokens  = 192                    # @param {type:"integer"}
qags_max_questions  = 5                      # @param {type:"integer"}


factcc_model = "manueldeprada/FactCC"        # ⬅️ replace the old tals/... id
qg_model     = "valhalla/t5-small-qg-hl"
qa_model     = "deepset/roberta-base-squad2"

device       = "auto"
hf_token     = None  # keep None; not required for these public repos
from google.colab import files
import json, os

# Upload or set path
if use_upload:
    print("📂 Upload your JSON file...")
    uploaded = files.upload()
    input_path = list(uploaded.keys())[0]
else:
    # Optional: mount Drive if needed
    if input_path.startswith("/content/drive"):
        from google.colab import drive
        if not os.path.ismount("/content/drive"):
            drive.mount("/content/drive")

print(f"Using input: {input_path}")

# Load input
with open(input_path, "r", encoding="utf-8") as f:
    data = json.load(f)
assert isinstance(data, list), "Input JSON must be a list of objects."

# Init scorers
print("🔧 Initializing models...")
factcc = FactCCScorer(model_name=factcc_model, device=device)
qags   = QAGSLite(qg_model=qg_model, qa_model=qa_model, device=device)

# Process
print("🚀 Scoring (this can take a while on CPU)...")
aug = process_json(
    records=data,
    factcc=factcc,
    qags=qags,
    max_source_tokens=max_source_tokens,
    max_summary_tokens=max_summary_tokens,
    qags_max_questions=qags_max_questions,
)

# Save output
base = os.path.splitext(os.path.basename(input_path))[0]
output_path = f"/content/{base}_scored.json"
with open(output_path, "w", encoding="utf-8") as f:
    json.dump(aug, f, ensure_ascii=False, indent=2)

print(f"✅ Wrote: {output_path}")

# Offer download
try:
    files.download(output_path)
except Exception as e:
    print("You can manually download from:", output_path)


📂 Upload your JSON file...


Saving Dialogueset_distilbart.json to Dialogueset_distilbart (2).json
Using input: Dialogueset_distilbart (2).json
🔧 Initializing models...


Device set to use cuda:0


🚀 Scoring (this can take a while on CPU)...


KeyError: "Record 0 missing keys. source in ['text','source','input'] present? True | before in ['generated_before','summary_before','generated_summary','generatedsummary_before','generatedsummary_before_EHI'] present? False | after in ['generated_after','summary_after','generated_finetuned','generatedsummary_after','generatedsummary_after_EHI'] present? False"

In [None]:
!pip install factsumm




In [None]:
python compute_factcc_qags_scores.py --input_path input.json --output_path output.json

SyntaxError: invalid syntax (ipython-input-238069336.py, line 1)