In [1]:
%%bash
!(stat -t /usr/local/lib/*/dist-packages/google/colab > /dev/null 2>&1) && exit

cd /content && rm -rf /content/home
git clone https://github.com/kmeng01/rome home > install.log 2>&1
pip install -r /content/home/scripts/colab_reqs/rome.txt >> install.log 2>&1
pip install --upgrade google-cloud-storage >> install.log 2>&1
pip install datasets >> install.log 2>&1
pip install rouge-score

Collecting rouge-score
  Downloading rouge_score-0.1.2.tar.gz (17 kB)
  Preparing metadata (setup.py): started
  Preparing metadata (setup.py): finished with status 'done'
Building wheels for collected packages: rouge-score
  Building wheel for rouge-score (setup.py): started
  Building wheel for rouge-score (setup.py): finished with status 'done'
  Created wheel for rouge-score: filename=rouge_score-0.1.2-py3-none-any.whl size=24935 sha256=bd43ed57f1c21e6abc556f9dfbbd1cb43485a1196318a4cb6317c35aa0b05d68
  Stored in directory: /root/.cache/pip/wheels/5f/dd/89/461065a73be61a532ff8599a28e9beef17985c9e9c31e541b4
Successfully built rouge-score
Installing collected packages: rouge-score
Successfully installed rouge-score-0.1.2
Collecting pytorch_lightning
  Downloading pytorch_lightning-2.4.0-py3-none-any.whl.metadata (21 kB)
Collecting torchmetrics>=0.7.0 (from pytorch_lightning)
  Downloading torchmetrics-1.6.0-py3-none-any.whl.metadata (20 kB)
Collecting lightning-utilities>=0.10.0 (from

In [2]:
IS_COLAB = True
try:
    import google.colab, torch, os

    IS_COLAB = True
    if not torch.cuda.is_available():
        raise Exception("Change runtime type to include a GPU.")
    os.chdir("/content/home")
    torch.set_grad_enabled(False)
except ModuleNotFoundError as _:
    pass

%load_ext autoreload
%autoreload 2

In [3]:
from transformers import AutoTokenizer, AutoModelForCausalLM, AutoConfig, AutoModelForPreTraining
import os, re, json
!ls /content/home
from util.globals import DATA_DIR
from dsets import KnownsDataset
import nltk
from nltk.translate.bleu_score import sentence_bleu, SmoothingFunction
import random
from rouge_score import rouge_scorer
from pytorch_lightning import LightningModule
import spacy
import random
# Load spaCy's transformer-based model
nlp = spacy.load("en_core_web_sm")



baselines     dsets	   globals.yml	LICENSE    README.md  scripts
CITATION.cff  experiments  hparams	notebooks  rome       util


In [4]:

class GPT2:

  def __init__(
      self,
      model: str = "gpt2",
      device: str = "cuda",
      check_point: AutoModelForCausalLM = None,
  ):
    super().__init__()
    self.model = model
    self.device = device
    self.tokenizer = AutoTokenizer.from_pretrained(model)  # GPT2Tokenizer)
    if check_point is None:
      self.model = AutoModelForPreTraining.from_pretrained(model).to(device)
    else:
      self.model = check_point

  def generate_output_ids(
      self,
      prompt: str,
  ) -> torch.Tensor:
    """Generate answers based on different configurations.

    Args:
        prompt (str): Input text.

    Returns:
        torch.Tensor: Output token IDs.
    """
    input_ids = torch.tensor(self.tokenizer.encode(prompt)).unsqueeze(0)
    input_ids = input_ids.to(self.device)

    # self.model.eval()
    attention_mask = torch.ones_like(input_ids).to(
        self.device
    )  # Create attention mask with 1s for real tokens
    output_ids = self.model.generate(
        input_ids=input_ids,
        attention_mask=attention_mask,
        no_repeat_ngram_size=2,
        num_return_sequences=1,
        pad_token_id=self.tokenizer.eos_token_id,  # Stop generation at end-of-sequence token,
        max_length=50,
    )
    return output_ids

  def predict_probs_from_prompt(self, prompt):
    input_ids = self.tokenizer.encode(prompt, return_tensors="pt")
    # inp = make_inputs(tokenizer, [prompt])
    out = self.model(input_ids.to(self.device))
    out = out["logits"]
    probs = torch.softmax(out[:, -1], dim=1)
    # print(probs.shape)

    # get top 10 probabilities and predictions
    topk_probs, topk_indices = torch.topk(probs, k=10, dim=1)
    # print(topk_probs, topk_probs.shape)
    # print(topk_indices, topk_indices.shape)

    # create a list with tuples of token to probability
    result = [
        (self.tokenizer.decode(int(c)), float(p))
        for p, c in zip(topk_probs[0], topk_indices[0])
    ]
    return result

  def decoding_output_ids(self, output_ids: torch.Tensor) -> str:
    """Decode output token IDs to text.

    Args:
        output_ids (torch.Tensor): Output token IDs.

    Returns:
        list: Decoded text.
    """
    texts = []
    for output_id in output_ids:
      text = self.tokenizer.decode(output_id, skip_special_tokens=False)
      texts.append(text)
    return texts[0]

  def calc_rogue_score(self, reference, generated_text):
    scores = []
    score_func = rouge_scorer.RougeScorer(
        ["rouge1", "rouge2", "rougeL"], use_stemmer=True
    ).score
    scores.append({
        "reference": reference,
        "text": generated_text,
        "score": score_func(reference, generated_text),
    })
    return scores

  def rouge_score(self, known_facts, debug=False):
    scores = []
    if debug:
      use_facts = known_facts[:5]
    else:
      use_facts = known_facts
    for i, k in enumerate(use_facts):
      prompt = k["prompt"]
      expected_prediction = k["prediction"]
      results = self.decoding_output_ids(
          output_ids=self.generate_output_ids(prompt)
      )
      generated_answer = results[len(prompt) :]
      if debug:
        print(
            "index = " + i + " Prompt: ",
            prompt,
            "expected prediction: ",
            expected_prediction,
        )
      scores += self.calc_rogue_score(expected_prediction, generated_answer)
    return scores

  def first_token_accuracy(self, known_facts, debug=False):
    scores = []
    if debug:
      use_facts = known_facts[:5]
    else:
      use_facts = known_facts
    for i, k in enumerate(use_facts):
      prompt = k["prompt"]
      expected_prediction = k["prediction"].split()[0]
      results = self.predict_probs_from_prompt(prompt)
      max_prob = results[0][1]
      matched = False
      for r in results:
        if self.clean_text(r[0]) == self.clean_text(
            expected_prediction.strip()
        ):
          matched = True
          debug and r[1] < max_prob and print(
              f"Lower prob matching prediction: {r[0]} with relative"
              f" probability {r[1] / max_prob}"
          )
          weighted_score = r[1] / max_prob
          scores.append({
              "reference": expected_prediction,
              "weighted_score": weighted_score,
              "generated": results,
          })
          break
      if not matched:
        scores.append({
            "reference": expected_prediction,
            "weighted_score": 0,
            "generated": results,
        })
    return scores

  def clean_text(self, text):
    # Remove non-alphanumeric characters
    return re.sub(r"[^\w\s]", "", text).strip()

  def bleu_score(
      self,
      expected_prediction,
      generated_answer,
      ngram_weights=(0.25, 0.25, 0.25, 0.25),
  ):
    smoothing = SmoothingFunction()
    reference_tokens_list = expected_prediction.split()
    generated_tokens = (
        generated_answer.split()
    )  # Model-generated text as list of tokens

    bleu = sentence_bleu(
        reference_tokens_list,
        generated_tokens,
        weights=ngram_weights,
        smoothing_function=smoothing.method1,
    )

    return bleu

  def eval_generated_text(self, dataset, dataset_type, debug=False):
    extracted_texts = []
    if debug:
      use_facts = random.choices(dataset, k=10)
    else:
      use_facts = dataset

    for i, k in enumerate(use_facts):
      question, answers = extract_question_answers_from_example(
          k, dataset_type, debug=debug
      )
      if not answers or len(answers) == 0 or len(answers[0]) == 0:
        continue
      prompt = question
      expected = answers[0]
      results = self.decoding_output_ids(
          output_ids=self.generate_output_ids(prompt)
      )
      generated_answer = results[len(prompt) :]
      extacted_facts_ref = self.extract_facts(prompt, expected)
      extacted_facts_generated_answer = self.extract_facts(
          prompt, generated_answer
      )
      extracted_texts.append({
          "prompt": prompt,
          "reference": expected,
          "extracted_facts_reference": extacted_facts_ref,
          "generated_answer": generated_answer,
          "extracted_facts_generated": extacted_facts_generated_answer,
      })
    return extracted_texts

  def score_extracted_facts(self, extracted_texts):
    scores = []

    for i, k in enumerate(extracted_texts):
      prompt = k["prompt"]
      expected = k["reference"]
      extacted_facts_ref = k["extracted_facts_reference"]
      generated_answer = k["generated_answer"]
      extacted_facts_generated_answer = k["extracted_facts_generated"]

      extracted_facts_generated_answer_token_set = set(
          extacted_facts_generated_answer.split()
      )
      extracted_facts_ref_token_set = set(extacted_facts_ref.split())

      one_gram_precision = (
          len(
              extracted_facts_generated_answer_token_set
              & extracted_facts_ref_token_set
          )
          / len(extracted_facts_generated_answer_token_set)
          if len(extracted_facts_generated_answer_token_set) > 0
          else 0
      )
      one_gram_recall = (
          len(
              extracted_facts_generated_answer_token_set
              & extracted_facts_ref_token_set
          )
          / len(extracted_facts_ref_token_set)
          if len(extracted_facts_ref_token_set) > 0
          else 0
      )
      one_gram_f1 = 0
      if one_gram_precision + one_gram_recall > 0:
        one_gram_f1 = (
            2
            * one_gram_precision
            * one_gram_recall
            / (one_gram_precision + one_gram_recall)
        )

      rogue_score = self.calc_rogue_score(expected, generated_answer)
      bleu = self.bleu_score(expected, generated_answer)

      scores.append({
          "prompt": prompt,
          "reference": expected,
          "extracted_facts_reference": extacted_facts_ref,
          "generated_answer": generated_answer,
          "extracted_facts_generated": extacted_facts_generated_answer,
          "one_gram_precision": one_gram_precision,
          "one_gram_recall": one_gram_recall,
          "one_gram_f1": one_gram_f1,
          "rogue_score": rogue_score[0]["score"],
          "bleu": bleu,
      })
    return scores

  def extract_facts(self, prompt, answers):
    full_text = prompt + " " + answers
    doc = nlp(full_text)
    facts = []

    for ent in doc.ents:
      if ent.end_char >= len(prompt) and (
          ent.label_ in ENT_SET
      ):
        cleaned_words = self.clean_text(ent.text).split()
        for cleaned_word in cleaned_words:
          if cleaned_word not in facts:
            facts.append(cleaned_word)
    for token in doc:
      if token.ent_type_ in ["PERSON", "ORG"] or token.dep_ in [
          "nsubj",
          "dobj",
          "subj",
          "pobj",
      ]:
        if token.text in answers and token.text not in facts:
          for word in token.text.split():
            cleaned_words = self.clean_text(word).split()
            for cleaned_word in cleaned_words:
              if cleaned_word not in facts:
                facts.append(cleaned_word)
    return " ".join(facts)

In [5]:
## Measure the factual accuracy on a dataset
# Using ROME's facts dataset
import os, re, json
!ls /content/home
from util.globals import DATA_DIR
from dsets import KnownsDataset
from datasets import load_dataset
knowns = KnownsDataset(DATA_DIR)
squad = load_dataset("squad_v2", split='train[:100]')


baselines     dsets	   globals.yml	LICENSE    README.md  scripts
CITATION.cff  experiments  hparams	notebooks  rome       util
data/known_1000.json does not exist. Downloading from https://rome.baulab.info/data/dsets/known_1000.json


100%|██████████| 335k/335k [00:00<00:00, 1.85MB/s]


Loaded dataset with 1209 elements


The secret `HF_TOKEN` does not exist in your Colab secrets.
To authenticate with the Hugging Face Hub, create a token in your settings tab (https://huggingface.co/settings/tokens), set it as secret in your Google Colab and restart your session.
You will be able to reuse this secret in all of your notebooks.
Please note that authentication is recommended but still optional to access public models or datasets.


README.md:   0%|          | 0.00/8.92k [00:00<?, ?B/s]

train-00000-of-00001.parquet:   0%|          | 0.00/16.4M [00:00<?, ?B/s]

validation-00000-of-00001.parquet:   0%|          | 0.00/1.35M [00:00<?, ?B/s]

Generating train split:   0%|          | 0/130319 [00:00<?, ? examples/s]

Generating validation split:   0%|          | 0/11873 [00:00<?, ? examples/s]

In [None]:
squad_first_100_train = list(filter(lambda x:  len(x['answers']["text"])>0, squad))

In [17]:
MODEL = "gpt2"
DEVICE = "cuda"
ENT_SET = set([
    "PERSON",  #      People, including fictional.
    "NORP",  #        Nationalities or religious or political groups.
    "FAC",  #         Buildings, airports, highways, bridges, etc.
    "ORG",  #         Companies, agencies, institutions, etc.
    "GPE",  #         Countries, cities, states.
    "LOC",  #         Non-GPE locations, mountain ranges, bodies of water.
    "PRODUCT",  #     Objects, vehicles, foods, etc. (Not services.)
    "EVENT",  #       Named hurricanes, battles, wars, sports events, etc.
    "WORK_OF_ART",  # Titles of books, songs, etc.
    "LAW",  #         Named documents made into laws.
    "LANGUAGE",  #    Any named language.
    "DATE",  #        Absolute or relative dates or periods.
    "TIME",  #        Times smaller than a day.
    "PERCENT",  #     Percentage, including ”%“.
    "MONEY",  #       Monetary values, including unit.
    "QUANTITY",  #    Measurements, as of weight or distance.
    "ORDINAL",  #     “first”, “second”, etc.
    "CARDINAL",  #    Numerals that do not fall under another type.
])

# check_point = AutoModelForCausalLM.from_pretrained("gpt2").to('cuda')
# check_point.load_state_dict(torch.load('/content/home/squad_epoch_14', weights_only=False))

# check_point.load_state_dict(torch.load('/content/home/squad_epoch_10_last_6_layers_overfit', weights_only=False))
model= GPT2(MODEL,DEVICE,check_point = None)

tokenizer_config.json:   0%|          | 0.00/26.0 [00:00<?, ?B/s]

config.json:   0%|          | 0.00/665 [00:00<?, ?B/s]

vocab.json:   0%|          | 0.00/1.04M [00:00<?, ?B/s]

merges.txt:   0%|          | 0.00/456k [00:00<?, ?B/s]

tokenizer.json:   0%|          | 0.00/1.36M [00:00<?, ?B/s]

model.safetensors:   0%|          | 0.00/548M [00:00<?, ?B/s]

generation_config.json:   0%|          | 0.00/124 [00:00<?, ?B/s]

In [None]:
def extract_question_answers_from_example(example, dataset_type, debug=False):
  if dataset_type == "knowns":
    question = example["prompt"]
    answers = [example["prediction"]]
  elif dataset_type == "squad":
    question = example["question"]
    answers = example["answers"]["text"]
  elif dataset_type == "wikiqa":
    question = example["question"]
    answers = [example["answer"]]
  else:
    raise ValueError(f"Unknown dataset type: {dataset_type}")

  return question, answers

In [None]:
def categorize_accuracy(scores):
  non_relevants = []
  accurates = []
  inaccurates = []
  for score in scores:
    prompt = score["prompt"]
    expected = score["reference"]
    extacted_facts_ref = score["extracted_facts_reference"]
    generated_answer = score["generated_answer"]
    extacted_facts_generated_answer = score["extracted_facts_generated"]
    one_gram_precision = score["one_gram_precision"]
    one_gram_recall = score["one_gram_recall"]
    one_gram_f1 = score["one_gram_f1"]
    rogue_score = score["rogue_score"]
    bleu = score["bleu"]
    summary_score = {
        "prompt": prompt,
        "reference": expected,
        "extracted_facts_reference": extacted_facts_ref,
        "generated_answer": generated_answer,
        "extracted_facts_generated": extacted_facts_generated_answer,
        "one_gram_precision": one_gram_precision,
        "one_gram_recall": one_gram_recall,
        "rogueL_recall": rogue_score["rougeL"].recall,
    }
    if one_gram_recall <= 0.05 and one_gram_precision <= 0.05:
      non_relevants.append(summary_score)
    elif rogue_score["rougeL"].recall > 0.3:
      accurates.append(summary_score)
    else:
      inaccurates.append(summary_score)
  return non_relevants, accurates, inaccurates

scores = model.score_extracted_facts (model.eval_generated_text( knowns, "knowns", debug=True))
non_relevants, accurates, inaccurates = categorize_accuracy(scores)

In [None]:
print("%d:%d:%d" %(len(non_relevants), len(accurates), len(inaccurates)))

3:5:2


In [None]:
import pandas as pd
df = pd.DataFrame(accurates)
df

Unnamed: 0,prompt,reference,extracted_facts_reference,generated_answer,extracted_facts_generated,one_gram_precision,one_gram_recall,rogueL_recall
0,Show Me a Hero premieres on,HBO on April 10.<|endoftext|>,HBO April 10endoftext,"HBO on Sunday, March 9 at 9 p.m. ET/PT.<|endo...",HBO Sunday March 9 pm PTendoftext,0.166667,0.333333,0.6
1,Khalid ibn al-Walid is affiliated with the rel...,"of Islam. He is a Muslim, and he",Islam Muslim He,Islam. He is a member of the Islamic Council ...,Islam the Islamic Council of America The Socie...,0.142857,0.666667,0.5
2,"In United Kingdom, the language spoken is",English.\n\nThe language spoken in the United,English United language,"English, but the English language is spoken i...",English the United States US Spanish language ...,0.375,1.0,1.0
3,Holycross-Ballycahill GAA is located in the co...,"Ireland, in the county of Ballycah",Ireland county Ballycah,"Ireland.\n\nThe GAB is a small, small town in...",Ireland GAB the United Kingdom border It earth,0.125,0.333333,0.5
4,"Myongji University, in","Seoul, South Korea, and the University of Cal...",Seoul South Korea the University of California,"the United States, has been a leading researc...",the United States development technologies res...,0.090909,0.142857,0.375


In [None]:
beyonce_queries = [
    "When did Beyonce start becoming popular?",
    "What was the first album Beyoncé released as a solo artist?",
    "When did Destiny's Child end their group act?",
    "Beyonce's childhood home believed in what religion?",
    "When did Beyonce start becoming popular?",]

for query in beyonce_queries:
  print(model.decoding_output_ids(
            output_ids=model.generate_output_ids(query)))

Setting `pad_token_id` to `eos_token_id`:None for open-end generation.
Setting `pad_token_id` to `eos_token_id`:None for open-end generation.


When did Beyonce start becoming popular? 1990s 1990's 1990′s and 1990 1990cés. 1990 and's and's, 1990,'ss' ands 2005's 2005′'s. 2005''s's' 2005 and 2005s


Setting `pad_token_id` to `eos_token_id`:None for open-end generation.


What was the first album Beyoncé released as a solo artist? 1990's No. 1 Love Love No No Love 1990s No Child No 1990 1990cécésssss 1990 No Fr. No's 1990rouss Norousrou 1990ss


Setting `pad_token_id` to `eos_token_id`:None for open-end generation.


When did Destiny's Child end their group act?s in 2014's.s.sss No. No,s 2014. Fr. 2014 Fr Fr 2014s Frsssss Frss.. F. of 2014 F Fr's Fr F


Setting `pad_token_id` to `eos_token_id`:None for open-end generation.


Beyonce's childhood home believed in what religion? Church's and United Methodist Church Church of Texas Church. Methodist Methodist Episcopal Church Methodist Presbyterian Church United Church Presbyterian Methodist United Presbyterian United United Baptist Church Texas United Women's United Sisters United Sister United's Church Women
When did Beyonce start becoming popular? 1990s 1990's 1990′s and 1990 1990cés. 1990 and's and's, 1990,'ss' ands 2005's 2005′'s. 2005''s's' 2005 and 2005s


In [None]:
prompt = "Vinson Massif is located in the continent of "
answer = "Antarctica. It is the largest of the three."
full_text = prompt+answer
doc = nlp(full_text)
for ent in doc.ents:
  if (ent.end_char>=len(prompt)):
    print(ent.text)

Antarctica
three


In [None]:
prompt = " Claridge is employed by the	the BBC as a freelance writer. He is the"
doc = nlp(prompt)

for ent in doc.ents:
  if ent.label_ in ENT_SET:
    print(ent.label_, ent.text)

for token in doc:
    if token.ent_type_ in ["PERSON", "ORG"] or token.dep_ in [
        "nsubj",
        "dobj",
        "subj",
        "pobj",
    ]:
      print(ent.label_, ent.text)


ORG Claridge
ORG BBC
ORG BBC
ORG BBC
ORG BBC
ORG BBC


In [22]:
target = "basketball"
generated_texts = [
    "play basketball.",
    "volleyball. Michael Jordan is a basketball player.",
    "basketball. Michael Jordan is a basketball player.",
    "volleyball.",
]

def clean_text(text):
    # Remove non-alphanumeric characters
    return re.sub(r"[^\w\s]", "", text).strip()

def f1_score(target, generated_text):
  target_set = set(clean_text(target).split())
  generated_set = set(clean_text(generated_text).split())
  recall = len(target_set & generated_set) / len(target_set)
  precision = len(target_set & generated_set) / len(generated_set)
  if recall+precision==0: return 0
  f1 = 2 * precision * recall / (precision + recall)
  return f1

def bleu_score_temp(expected_prediction, generated_answer):
  smoothing = SmoothingFunction()
  reference_tokens_list = clean_text(expected_prediction).split()
  generated_tokens = (
      clean_text(generated_answer).split()
  )  # Model-generated text as list of tokens
  ngram_weights=(0.5, 0.5, 0, 0)
  bleu = sentence_bleu(
      reference_tokens_list,
      generated_tokens,
      weights=ngram_weights,
      smoothing_function=smoothing.method1,
  )
  return bleu

def extract_facts(prompt, answers):
    full_text = prompt + " " + answers
    doc = nlp(full_text)
    facts = []

    for ent in doc.ents:
      if ent.end_char >= len(prompt) and (ent.label_ in ENT_SET):
        cleaned_words = clean_text(ent.text).split()
        for cleaned_word in cleaned_words:
          if cleaned_word not in facts:
            facts.append(cleaned_word)
    for token in doc:
      if token.ent_type_ in ["PERSON", "ORG"] or token.dep_ in [
          "nsubj",
          "dobj",
          "subj",
          "pobj",
      ]:
        if token.text in answers and token.text not in facts:
          for word in token.text.split():
            cleaned_words = clean_text(word).split()
            for cleaned_word in cleaned_words:
              if cleaned_word not in facts:
                facts.append(cleaned_word)
    return " ".join(facts)

def calc_rogue_score(reference, generated_text):
  scores = []
  score_func = rouge_scorer.RougeScorer(
      ["rougeL"], use_stemmer=True
  ).score
  scores.append({
      "reference": reference,
      "text": generated_text,
      "score": score_func(reference, generated_text),
  })
  return scores

for text in generated_texts:
  print("F1", f1_score(target, text))
  print("Bleu", bleu_score_temp(target, text))
  extracts = extract_facts("Michael Jordan plays", text)
  print("rougeL", calc_rogue_score(target,extracts))



F1 0.6666666666666666
Bleu 0
rougeL [{'reference': 'basketball', 'text': 'basketball', 'score': {'rougeL': Score(precision=1.0, recall=1.0, fmeasure=1.0)}}]
F1 0.25
Bleu 0.03178697118830404
rougeL [{'reference': 'basketball', 'text': 'Michael Jordan volleyball', 'score': {'rougeL': Score(precision=0.0, recall=0.0, fmeasure=0.0)}}]
F1 0.2857142857142857
Bleu 0.03178697118830404
rougeL [{'reference': 'basketball', 'text': 'Michael Jordan basketball', 'score': {'rougeL': Score(precision=0.3333333333333333, recall=1.0, fmeasure=0.5)}}]
F1 0
Bleu 0
rougeL [{'reference': 'basketball', 'text': 'volleyball', 'score': {'rougeL': Score(precision=0.0, recall=0.0, fmeasure=0.0)}}]
