In [7]:
import argparse
import json
import logging
from typing import List, Optional
import sys
from FActScore.factscore.factscorer import FactScorer
from utils.wandb_utils import wandb_init_run, wandb_push_json, wandb_push_table
from utils.fscore_utils import csv_to_jsonl_for_factscore, regenerate_text, flatten_hallucinations
from datetime import datetime
import nltk
import numpy as np
import os
nltk.download('punkt_tab')
from dotenv import load_dotenv
load_dotenv()
from argparse import Namespace
import os
import numpy as np
from transformers import AutoTokenizer, AutoModelForSequenceClassification
import torch
device = torch.device("cuda") if torch.cuda.is_available() else torch.device("cpu")

fs_logs_available = {}

class GenFact:
    def __init__(self, args: dict = None):
        print (args)

        #self.args = parse_options(sys.argv[1:] if args is None else args)
        
        args['gamma'] = 10
        args['data_dir']  ="./FActScore/.cache/factscore/"
        args['model_dir']  = "./FActScore/.cache/factscore/"
        args['cache_dir'] ="./FActScore/.cache/factscore/"
        args['knowledge_source'] = None
          

        args['cost_estimate'] = "consider_cache"
        args['abstain_detection_type'] = None,
        args['use_atomic_facts']= False

        args['verbose'] = False

        #parser['print_rate_limit_error'] ="store_true"
        args['n_samples'] = None
        
        
        args = Namespace(**args)

        self.args = args
        print (self.args)
        logging.basicConfig(format='%(asctime)s - %(name)s - %(message)s',
                            datefmt='%m/%d/%Y %H:%M:%S',
                            level=logging.ERROR )

        self.log_dir = self.create_log_folder()
        self.fs = FactScorer(model_name=self.args.model_name,
                        data_dir=self.args.data_dir,
                        model_dir=self.args.model_dir,
                        cache_dir=self.args.cache_dir,
                        openai_key=self.args.openai_key,
                        cost_estimate=self.args.cost_estimate,
                        abstain_detection_type=self.args.abstain_detection_type,
                        grounding_provided=self.args.grounding_provided)

    def run_factscrorer(self, grounding_provided:bool) -> dict:

        tot = 0
        topics, generations, atomic_facts, groundings = [], [], [], []
        with open(self.args.input_path) as f:
            for line in f:
                dp = json.loads(line)
                tot += 1

                if self.args.use_atomic_facts:
                    assert "annotations" in dp, "You can specify `--use_atomic_facts` only when atomic facts are available in the input data already."
                    if dp["annotations"] is None:
                        continue
                    topics.append(dp["topic"])
                    generations.append(dp["output"])
                    atomic_facts.append(
                        [atom["text"] for sent in dp["annotations"] for atom in sent["model-atomic-facts"]])
                else:
                    topics.append(dp["topic"])
                    generations.append(dp["output"])
                if self.args.grounding_provided:
                    groundings.append(dp["input"])

                if self.args.n_samples is not None and tot == args.n_samples:
                    break
        out = self.fs.get_score(topics=topics,
                           generations=generations,
                           groundings=groundings,
                           gamma=self.args.gamma,
                           atomic_facts=atomic_facts if self.args.use_atomic_facts else None,
                           knowledge_source=self.args.knowledge_source,
                           verbose=self.args.verbose,
                           grounding_provided=grounding_provided)
        print ("Using intrinsic Fact Checking")
        logging.critical("FActScore = %.1f%%" % (100 * out["score"]))
        if "init_score" in out:
            logging.critical("FActScore w/o length penalty = %.1f%%" % (100 * out["init_score"]))
        logging.critical("Respond ratio = %.1f%%" % (100 * out["respond_ratio"]))
        logging.critical("# Atomic facts per valid response = %.1f" % (out["num_facts_per_response"]))

        # Save out as a json file
        with open(args.input_path.replace(".jsonl", f"_factscore_output.json"), 'w') as f:
            f.write(json.dumps(out) + "\n")

        self.factscore_logs = {"score": out["score"],"topics": topics, "decisions": out["decisions"], "wrong_facts": out["wrong_facts"], "groundings": groundings,
                               "generations": generations, "grounding_provided": grounding_provided}

        return self.factscore_logs

    def write_logs(self, out:json, fname:str):
        fname = os.path.join(self.log_dir,fname)
        with open(fname, 'w') as fp:
            json.dump(out, fp)

    def fs_get_extrinsic_af(self, topics, wrong_facts, groundings,generations, grounding_provided):
        # Check if the wrongly classified facts are "wrong" or just not present in the article.
        extrinsic_af = self.fs.get_extrinsic_af(topics=topics, wrong_facts=wrong_facts, groundings=groundings,
                                                  generations=generations, grounding_provided=grounding_provided)


        return extrinsic_af
    def fs_extrinsic_score(self,fs_extrinsic_af:dict):
        extrinsic_out = self.fs.get_extrinsic_score(topics = self.factscore_logs["topics"], extrinsic_facts=fs_extrinsic_af["extrinsic_facts"],
                                                    generations = self.factscore_logs["generations"],  verbose=False,
                                                    grounding_provided=False)
        return extrinsic_out

    def create_log_folder(self, ):
        date_time =  '{date:%Y-%m-%d_%H-%M-%S}'.format( date=datetime.now() )
        run_name = os.path.basename(self.args.input_path).replace('.jsonl','')

        folder = os.path.join("results","genfact", run_name, date_time)
        os.makedirs(folder, exist_ok=True)
        print(f"Run outputs would be locally stored at {folder}")
        return folder

    def get_updated_score(self, factscore_out, fs_extrinsic_af) ->float:
        decision_before = factscore_out["decisions"]
        decision_after = fs_extrinsic_af["decisions"]
        count = 0


        for idx, afs in enumerate(decision_after):
            if len(afs) > 0:
                for af in afs:
                    if decision_before[idx][af["idx"]]['is_supported'] != af["is_supported"]:
                        print(f"Updating the decision for the Atomic Fact: {af} for sample {idx}")
                        decision_before[idx][af["idx"]]['is_supported'] = af["is_supported"]
                        count += 1
        scores = [np.mean([d["is_supported"]  for d in decisions]) for decisions in decision_before]
        hallucinations = [[d for d in decisions if not d["is_supported"]] for decisions in decision_before]

        updated_score = np.mean(scores)
        logging.critical("FActScore After extrinsic check = %.1f%%" % (100 * updated_score))
        logging.critical(f"Updated decision on {str(count)} Facts after running Extrinsic check")
        #if "init_score" in extrinsic_out:
        #    logging.critical("FActScore w/o length penalty = %.1f%%" % (100 * out["init_score"]))
        #logging.critical("Respond ratio = %.1f%%" % (100 * out["respond_ratio"]))
        #logging.critical("# Atomic facts per valid response = %.1f" % (out["num_facts_per_response"]))
        return updated_score, hallucinations


class DebertaNli:
    def __init__(self, score_out, decisions, groundings, fs):

        self.score_out = score_out
        self.decisions = decisions
        self.groundings = groundings
        self.fs = fs

        self.model_name = "tasksource/deberta-base-long-nli"
        self.tokenizer = AutoTokenizer.from_pretrained(self.model_name)
        self.model = AutoModelForSequenceClassification.from_pretrained(self.model_name)
        self.model.to(device)

    def get_nli_class(self, premise, hypothesis):
        model_input = self.tokenizer(premise, hypothesis, truncation=True, return_tensors="pt")
        model_output = self.model(model_input["input_ids"].to(device)) 
        prediction_probs = torch.softmax(model_output["logits"][0], -1).tolist()
        prediction_probs = np.array(prediction_probs)

        max_index = np.argmax(prediction_probs)
        if  max_index == 0:
            nli_class = "entailment"
        elif max_index == 1:
            nli_class = "neutral"
        elif max_index == 2:
            nli_class = "contradiction"

        return nli_class


    def check_intrinsic(self,) -> dict:
        nli_decisions = list()
        deberta_scores = list()
        
        for data_instance, article in zip(self.decisions, self.groundings):
            nli_results = list()

            for atom_instance in data_instance:
                atom_fact = atom_instance["atom"]
                premise = atom_fact
                hypothesis = article

                nli_class = self.get_nli_class(premise, hypothesis)
                nli_results.append(nli_class)

            nli_decisions.append(nli_results)
        
        new_decisions = list()
        wrong_facts = list()

        for decision, nli_prediction in zip(self.decisions, nli_decisions):
            new_list = list()
            for dec, pred in zip(decision, nli_prediction):
                dec["nli_intrinsic"] = pred
                dec["nli_supported_intrinsic"] = True if pred == "entailment" else False
                new_list.append(dec)


            debscore = np.mean([d["nli_supported_intrinsic"] for d in decision])
            deberta_scores.append(debscore)
            new_decisions.append(new_list)

            wfs = [{"atom":d["atom"], "idx":idx}  for idx, d in enumerate(new_list) if not d["nli_supported_intrinsic"]]
            wrong_facts.append(wfs)

            for d in new_list:
                if not d["nli_supported_intrinsic"]:
                    passages = self.fs.search_passage_till_success(topic = '', atom = d["atom"], generation=d["atom"],
                                                               knowledge_source="enwiki-20230401")
                    context = ""
                    for psg_idx, psg in enumerate(reversed(passages)):
                        context += "Title: {}\nText: {}\n\n".format(psg["title"], psg["text"].replace("<s>", "").replace("</s>", ""))
                    context = context.strip()

                    d["wiki_context"] = context


        self.decisions = new_decisions            
        self.score_out["decisions"] = new_decisions

        self.score_out["deberta_score_intrinsic"] = np.mean(deberta_scores)
        self.score_out["deberta_wrong_facts"] = wrong_facts

        logging.critical("Deberta Score (intrinsic) = %.1f%%" % (100 * np.mean(deberta_scores)))

        return self.score_out, np.mean(deberta_scores)
    

    def check_extrinsic(self, wrong_facts) -> dict:
        nli_decisions = list()
        deberta_scores = list()

        for data_instance, wf in zip(self.decisions, wrong_facts):
            nli_results = list()

            fact_idx = 0
            wf_indices = [w["idx"] for w in wf]

            for atom_instance in data_instance:
                if fact_idx not in wf_indices:
                    nli_results.append(("none", fact_idx))
                    fact_idx += 1
                    continue

                atom_fact = atom_instance["atom"]
                atom_wiki_context = atom_instance["wiki_context"]

                premise = atom_fact
                hypothesis = atom_wiki_context

                model_input = self.tokenizer(premise, hypothesis, truncation=True, return_tensors="pt")
                model_output = self.model(model_input["input_ids"].to(device)) 
                prediction_probs = torch.softmax(model_output["logits"][0], -1).tolist()
                prediction_probs = np.array(prediction_probs)

                max_index = np.argmax(prediction_probs)
                if  max_index == 0:
                    nli_class = "entailment"
                elif max_index == 1:
                    nli_class = "neutral"
                elif max_index == 2:
                    nli_class = "contradiction"
                nli_results.append((nli_class, fact_idx))

                fact_idx += 1

            nli_decisions.append(nli_results)

        new_decisions = list()
        for decision, nli_prediction in zip(self.decisions, nli_decisions):
            new_list = list()

            atom_dec_idx = 0
            wrong_fact_indices = [n[1] for n in nli_prediction if n[0] != "none"]
            for dec, pred in zip(decision, nli_prediction):

                if atom_dec_idx in wrong_fact_indices:
                    dec["nli_extrinsic"] = pred[0]
                    dec["nli_supported_extrinsic"] = True if pred[0] == "entailment" else False
                    dec["nli_final_supported"] = dec["nli_supported_extrinsic"]
                    new_list.append(dec)
                else:
                    dec["nli_final_supported"] = dec["nli_supported_intrinsic"]
                    new_list.append(dec)

                atom_dec_idx += 1

            #debscore = np.mean([d["nli_supported_extrinsic"] for d in decision])
            #deberta_scores.append(debscore)
            debscore = np.mean([d["nli_final_supported"] for d in new_list])
            deberta_scores.append(debscore)

            new_decisions.append(new_list)

        self.decisions = new_decisions            
        self.score_out["decisions"] = new_decisions
        self.score_out["deberta_score_final"] = np.mean(deberta_scores)
        logging.critical("Deberta score (final, after extrinsic) = %.1f%%" % (100 * np.mean(deberta_scores)))        

        return self.score_out, np.mean(deberta_scores)

def get_pooled_score(deberta_extrinsic_out):
    decisions = deberta_extrinsic_out["decisions"]
    new_decisions = list()
    pooled_scores = list()

    for instance_decisions in decisions:
        new_list = list()
    
        for dec in instance_decisions:
            deberta_final = dec["nli_final_supported"]
            factscore_final = dec["is_supported"]

            #Pooled decision is True if both FactScore and NLI predictions are True.
            pooled_final = deberta_final and factscore_final
            dec["pooled_supported"] = pooled_final
            new_list.append(dec)

        poolscore = np.mean([d["pooled_supported"] for d in new_list])
        pooled_scores.append(poolscore)

        new_decisions.append(new_list)

    logging.critical("Pooled score (final) = %.1f%%" % (100 * np.mean(pooled_scores)))        

    return new_decisions, np.mean(pooled_scores)


def parse_options(parser: dict)-> dict:

    parser['gamma'] = 10
    parser['data_dir']  ="./FActScore/.cache/factscore/"
    parser['model_dir']  = "./FActScore/.cache/factscore/"
    parser['cache_dir'] ="./FActScore/.cache/factscore/"
    parser['knowledge_source'] = None
          

    parser['cost_estimate'] = "consider_cache"
    parser['abstain_detection_type'] = None,
    parser['use_atomic_facts']= "store_true"
                        
    parser['verbose'] = False
                            
    #parser['print_rate_limit_error'] ="store_true"
    parser['n_samples'] = None
    
    return parser




[nltk_data] Downloading package punkt_tab to
[nltk_data]     /Users/anumafzal/nltk_data...
[nltk_data]   Package punkt_tab is already up-to-date!


In [8]:
import argparse

def main(input_path = "/Users/anumafzal/PycharmProjects/FactSumm/results/1od31evk_MODEL_meta-llama-Meta-Llama-3.1-70B-Instruct-Turbo_DS_pubmed_TASK_summarization.jsonl",
        model_key = "GPT4-mini",
        openai_key = 'api.key',
        grounding_provided = 'True'):
    
    args = dict()
    args['input_path'] = input_path
    args['model_name'] = model_key
    args['openai_key'] = openai_key
    args['grounding_provided'] = grounding_provided


    csv_results_dir = "results/"
    #jsonl_paths = csv_to_jsonl_for_factscore(csv_results_dir)


    genFact = GenFact(args)

    wandb_init_run(run_path=args['input_path'], config = genFact.args)


    print ("Running Vanilla FactScore")
    factscore_out_vanilla = genFact.run_factscrorer(grounding_provided = False )
    genFact.write_logs(factscore_out_vanilla, fname="factscore_vanilla.json")

    print ("Running Factscore with grounded document")
    factscore_out = genFact.run_factscrorer(grounding_provided=args.grounding_provided)
    genFact.write_logs(factscore_out, fname="factscore_grounded.json")


    fs_extrinsic_af = genFact.fs_get_extrinsic_af(topics = factscore_out["topics"], wrong_facts = factscore_out["wrong_facts"],
                    groundings=  factscore_out["groundings"], generations = factscore_out["generations"],
                                                grounding_provided= factscore_out["grounding_provided"])
    fs_extrinsic_out = genFact.fs_extrinsic_score(fs_extrinsic_af)
    genFact.write_logs(factscore_out, fname="factscore_grounded_extrinsic.json")


    fs_updated_score, fs_updated_wrong_facts = genFact.get_updated_score(factscore_out,fs_extrinsic_out)
    wandb_table = {"fs_wiki": factscore_out_vanilla["score"], "fs_grounded": factscore_out["score"],
                       "fs_grounded_wiki": fs_updated_score}
    wandb_push_json(wandb_table)

        # test regeneration
    fs_regenerations = regenerate_text(factscore_out["generations"], flatten_hallucinations(fs_updated_wrong_facts))

    wandb_table = {"generations": factscore_out["generations"], "hallucinations": fs_updated_wrong_facts,
                       "regenerations": fs_regenerations}
    wandb_push_table(wandb_table)

    #Creates new class for deberta predictions. Loads a model from HuggingFace.
    deberta_nli = DebertaNli(score_out = factscore_out,
                                 decisions =  factscore_out["decisions"],
                                  groundings = factscore_out["groundings"],
                                  fs = genFact.fs)

    #Output is the same as factscore_out, but with a new attribute in dictionaries with NLI predictions. 
    #Gives intrinsic NLI score.
    deberta_out, deberta_intrinsic_score = deberta_nli.check_intrinsic()
    genFact.write_logs(deberta_out, fname="deberta_grounded.json")


    #Checks the wrong facts with extrinsic checking over Wikipedia. Gives final NLI score.
    deberta_nli.score_out = fs_extrinsic_out    
    deberta_extrinsic_out, deberta_final_score = deberta_nli.check_extrinsic(factscore_out["wrong_facts"])
    genFact.write_logs(deberta_extrinsic_out, fname="deberta_grounded_extrinsic.json")

    #Calculates the final pooled prediction (inside of pooled_decisions) and final pooled score.
    pooled_decisions, pooled_score = get_pooled_score(deberta_extrinsic_out)

    deberta_score_dict = {
                       "deberta_grounded": factscore_out["deberta_score_intrinsic"],
                    "deberta_grounded_wiki": deberta_final_score,"pooled_score":pooled_score}
    wandb_push_json(deberta_score_dict)

    db_regeneration = factscore_out["generations"]
    db_regenerations = regenerate_text(factscore_out["generations"], flatten_hallucinations(fs_updated_wrong_facts))

    wandb_table = {"generations": factscore_out["generations"], "hallucinations": fs_updated_wrong_facts,
                       'regenerations':db_regenerations}
    wandb_push_table(wandb_table)

    print("done")


In [9]:
main()

{'input_path': '/Users/anumafzal/PycharmProjects/FactSumm/results/1od31evk_MODEL_meta-llama-Meta-Llama-3.1-70B-Instruct-Turbo_DS_pubmed_TASK_summarization.jsonl', 'model_name': 'GPT4-mini', 'openai_key': 'api.key', 'grounding_provided': 'True'}
Namespace(input_path='/Users/anumafzal/PycharmProjects/FactSumm/results/1od31evk_MODEL_meta-llama-Meta-Llama-3.1-70B-Instruct-Turbo_DS_pubmed_TASK_summarization.jsonl', model_name='GPT4-mini', openai_key='api.key', grounding_provided='True', gamma=10, data_dir='./FActScore/.cache/factscore/', model_dir='./FActScore/.cache/factscore/', cache_dir='./FActScore/.cache/factscore/', knowledge_source=None, cost_estimate='consider_cache', abstain_detection_type=(None,), use_atomic_facts=False, verbose=False, n_samples=None)
Run outputs would be locally stored at results/genfact/1od31evk_MODEL_meta-llama-Meta-Llama-3.1-70B-Instruct-Turbo_DS_pubmed_TASK_summarization/2024-09-10_14-46-32




Running Vanilla FactScore


OSError: [E050] Can't find model 'en_core_web_sm'. It doesn't seem to be a Python package or a valid path to a data directory.