## Setup

In [1]:
# # For transformer models
# !pip install -q accelerate
# # !pip install -q bitsandbytes
# !pip install -i https://pypi.org/simple/ bitsandbytes
# # !pip install -q flash-attn --no-build-isolation

# # For sentence similarity
# !pip install -q sentence_transformers

# # For web queries
# !pip install -q googlesearch-python

# # For Retrieval Augmentated Generation (RAG) since HF doesn't have great support for it
# !pip install -q langchain chromadb

# # For using the Unofficial HuggingChat Python API: https://github.com/Soulter/hugging-chat-api
# !pip install -q hugchat

In [2]:
import os
import sys
import json
import dotenv

# Add the parent directory to sys.path so we can import other py files
sys.path.append('../')

# Set environment variables
dotenv.load_dotenv()

# Hide warnings cuz they're annoying
import warnings
warnings.filterwarnings('ignore')

In [3]:
# Load examples from JSON file
with open('../data/examples.json', 'r') as f:
    examples = json.load(f)

In [4]:
# HuggingChat API Setup
from hugchat import hugchat
from hugchat.login import Login

# Log in to huggingface and grant authorization to huggingchat
cookie_path_dir = "./cookies/" # NOTE: trailing slash (/) is required to avoid errors
sign = Login(os.environ.get('HF_EMAIL'), os.environ.get('HF_PASSWORD'))
cookies = sign.login(cookie_dir_path=cookie_path_dir, save_cookies=True)

# Create your ChatBot
chatbot = hugchat.ChatBot(cookies=cookies.get_dict())  # or cookie_path="usercookies/<email>.json"
# Get the available models (not hardcore)
models = chatbot.get_available_llm_models()
model_idx = 3
chatbot.switch_llm(model_idx)
print(f'Using model: {models[chatbot.get_active_llm_index()]}')

Using model: mistralai/Mixtral-8x7B-Instruct-v0.1


## Subtasks

In [5]:
from prompts import claim_atomization_template
from utils.nlp_utils import select_best_examples
from utils.code_utils import multiline_string_to_list

def generate_atomic_claims(statements, num_examples=3):
    """
    Generates atomic claims for the input statements.

    Args:
        claim (str): The input statements.
        num_examples (int, optional): The number of few-shot examples to include in the prompt. Defaults to 3.

    Returns:
        str: The generated atomic claims.
    """
    if num_examples > 0: # Populate the prompt with few-shot examples (w/ proper formatting)
        examples_text = ""
        best_examples = select_best_examples(statements, examples["claim_atomization_examples"], "statement", num_examples)

        # Add each example to the prompt
        for example in best_examples:
            examples_text += f"Statements: {example['statement']}\n"
            examples_text += f"Atomic Claims: {example['atomic_claims']}\n"

        # Finally, fill in the prompt template with the examples and the input statements
        prompt = claim_atomization_template.format(examples=examples_text.strip(), statements=statements).strip()
    else: # Otherwise leave the examples section of the prompt template blank and only include the input statements
        prompt = claim_atomization_template.format(examples="", statements=statements.strip()).strip()

    # # Print the entire prompt for debugging purposes
    # print(prompt)

    # Generate atomic claims using the HuggingChat API
    output_text = str(chatbot.chat(prompt))

    # Extract only the list of claims from the model's output
    # Assuming output format directly returns Python list
    try:
        output_text = output_text.split('Atomic Claims:')[-1].strip()
        atomic_claims = multiline_string_to_list(output_text)
        # POST-PROCESSING ERROR HANDLING: If list contains lists, return a flattened list
        if isinstance(atomic_claims[0], list):
            atomic_claims = [item for sublist in atomic_claims for item in sublist]
        return atomic_claims
    except:
        print(f"Error parsing model output: {output_text}\nRetrying...")
        return generate_atomic_claims(statements, num_examples)

# # Example usage
# statements = '''
# "In New York, there are no barriers to law enforcement to work with the federal government on immigration laws, and there are 100 crimes where migrants can be handed over."
# '''
# atomic_claims = generate_atomic_claims(statements)
# print(f'Statement: {statements}')
# print(f'Atomic Claims: {atomic_claims}')

'NoneType' object has no attribute 'cadam32bit_grad_fp32'


In [6]:
## Question Generation
from prompts import question_generation_template

def generate_questions(claim, num_examples=3):
    """
    Generates questions to verify the factuality of the input claim.

    Args:
        claim (str): The input claim.
        num_examples (int, optional): The number of few-shot examples to include in the prompt. Defaults to 3.

    Returns:
        str: The generated questions.
    """
    if num_examples > 0: # Populate the prompt with few-shot examples (w/ proper formatting)
        examples_text = ""
        best_examples = select_best_examples(claim, examples["question_generation_examples"], "claim", num_examples)

        # Add each example to the prompt
        for example in best_examples:
            examples_text += f"Claim: {example['claim']}\n"
            examples_text += f"Questions: {example['questions']}\n"

        # Finally, fill in the prompt template with the examples and the input claim
        prompt = question_generation_template.format(examples=examples_text.strip(), claim=claim).strip()
    else: # Otherwise leave the examples section of the prompt template blank and only include the input claim
        prompt = question_generation_template.format(examples="", claim=claim).strip()

    # Print the entire prompt for debugging purposes
    # print(prompt)

    # Generate questions using the HuggingChat API
    output_text = str(chatbot.chat(prompt))

    # Extract only the list of questions from the model's output
    try:
        # Assuming output format directly returns Python list
        questions = multiline_string_to_list(output_text.split('Questions:')[-1].strip())
        return questions
    except:
        print(f"Error parsing model output: {output_text}\nRetrying...")
        return generate_questions(claim, num_examples)
    
# # Example usage for question generation
# claim_question = dict()
# for i, claim in enumerate(atomic_claims):
#   questions = generate_questions(claim)
#   claim_question[claim] = questions
#   print(f"Claim: {claim}")
#   print(f"Questions: {claim_question[claim]}")

In [7]:
## Web Querying & Scraping
import json
import requests
import re
from bs4 import BeautifulSoup

SOURCE_BLACKLIST = ['politifact.org', 'factcheck.org', 'snopes.com']

def extract_website_name(url):
    """Extracts the website name from a given URL using regex"""
    match = re.search(r'(?P<url>https?://[^\s]+)', url)
    if match:
        url = match.group('url')
        return url.split('//')[1].split('/')[0].lower().replace('www.', '')
    return None

def scrape_text_from_website(url):
    """Scrapes text and metadata from a given website URL."""
    try:
        response = requests.get(url, timeout=5)
        if response.status_code == 200:
            soup = BeautifulSoup(response.content, 'html.parser')

            # Remove script and style tags
            for script in soup(["script", "style"]):
                script.decompose()

            # Extract all text from the website
            text = soup.get_text()

            # Clean up whitespace
            text = re.sub(r'\s+', ' ', text).strip()

            return text
        else:
            # print(f"Failed to retrieve content from the URL: {url}")
            return None
    except Exception as e:
        # print(f"Error during website scraping: {e}")
        return None

def fetch_search_results(question, scrape_website=False):
    """
    Fetches search results for a given question using an API.

    Args:
        question (str): The question to search for.
        scrape_website (bool, optional): Whether to scrape the website content. Defaults to False.

    Returns:
        list: A list of organic search results.
    """
    api_key = os.environ.get("SERPER_API_KEY")

    headers = {
        "X-API-KEY": api_key,
        "Content-Type": "application/json",
    }

    payload = json.dumps({"q": question})
    try:
        response = requests.post("https://google.serper.dev/search", headers=headers, data=payload)
        result = json.loads(response.text)

        # Extract the organic search results and transform them into our desired format
        results = []
        for item in result['organic']:
            # ALSO while iterating through the results, remove any websites on our source blacklist
            source = extract_website_name(item.get('link', ''))
            if source in SOURCE_BLACKLIST: continue
            website_text = scrape_text_from_website(item.get('link', '')) if scrape_website else item.get('snippet', '')
            if website_text is None or website_text == '': # if we failed to scrape the website, use the snippet
                website_text = item.get('snippet', '')
            results.append({
                "title": item.get('title', ''),
                "source": source,
                "date_published": item.get('date', ''),
                "relevant_excerpt": item.get('snippet', ''),
                "text": website_text,
                "search_position": item.get('position', -1),
                "url": item.get('link', ''),
            })
        return results

    except Exception as e:
        print(f"Failed to fetch information: {e}")
        return []

# # Example usage:
# question = question = '''
# In New York, are there barriers to law enforcement to work with the federal government on immigration laws?
# '''
# search_results = fetch_search_results(question)
# search_results

In [8]:
## Retrieval Augmented Generation (RAG) Retriever
from langchain.docstore.document import Document
from langchain.vectorstores import Chroma
from langchain.embeddings.huggingface import HuggingFaceEmbeddings
from langchain.text_splitter import RecursiveCharacterTextSplitter
import torch

import copy

# Initialize embedding model for retrieval (sentence similarity)
BATCH_SIZE = 32
device = 'cuda' if torch.cuda.is_available() else 'mps' if torch.backends.mps.is_available() else 'cpu'
retriever_model_id='sentence-transformers/all-MiniLM-L6-v2'
retriever_model = HuggingFaceEmbeddings(
    model_name=retriever_model_id,
    model_kwargs={'device': device},
    encode_kwargs={'device': device, 'batch_size': BATCH_SIZE},
)

def retrieve_relevant_documents_using_rag(search_results, content_key, question, chunk_size=512, chunk_overlap=128, top_k=10):
    """
    Takes in search results and a query question, processes and splits the documents,
    and retrieves relevant documents using a RAG approach.

    Args:
        search_results (list of dict): A list of dictionaries containing web-scraped data.
        question (str): The query question for retrieving relevant documents.
        content_key (str): The key in the dictionary containing the text content.
        chunk_size (int): The maximum size of the text chunks.
        chunk_overlap (int): The overlap between consecutive text chunks.
        top_k (int): The number of relevant documents to retrieve.

    Returns:
        list: A list of relevant document chunks.
    """
    # Create LangChain documents from search results
    documents = []
    for result in search_results:
        page_content = result.pop(content_key, None)  # Extract the text content, remaining keys are metadata
        if page_content is not None:
            documents.append(Document(page_content=page_content, metadata=result))

    # Split documents into smaller chunks (if needed, based on document size)
    text_splitter = RecursiveCharacterTextSplitter(
        chunk_size=chunk_size,
        chunk_overlap=chunk_overlap,
    )
    split_documents = text_splitter.split_documents(documents)

    # Initialize ChromaDB vector store to index the document chunks
    db = Chroma.from_documents(
        documents=split_documents,
        embedding=retriever_model,
    )

    # Retrieve the most relevant chunks for the given question
    top_k = min(top_k, len(split_documents))  # Ensure we don't request more documents than available
    relevant_docs = db.similarity_search(question, k=top_k)

    return relevant_docs

# question = '''
# In New York, are there barriers to law enforcement to work with the federal government on immigration laws?
# '''
# relevant_docs = retrieve_relevant_documents_using_rag(search_results, 'text', question)
# relevant_docs

In [19]:
## RAG-based Question Answering
from prompts import answer_synthesis_template

def synthesize_answer(relevant_docs, question, return_sources=True):
    """
    Synthesizes an answer to a given question using the relevant documents.

    Args:
        relevant_docs (list of dict): A list of relevant document chunks.
        question (str): The question to answer.

    Returns:
        str: The synthesized answer.
    """
    # Format the relevant documents for the prompt
    documents_text = ""
    for doc in relevant_docs:
        documents_text += f"Title: {doc.metadata.get('title', '')}\n"
        documents_text += f"URL: {doc.metadata.get('url', '')}\n"
        documents_text += f"Text: {doc.page_content.strip()}\n"
        documents_text += f"Date Published: {doc.metadata.get('date_published', '')}\n\n"

    # Fill in the prompt template with the relevant documents and the question
    prompt = answer_synthesis_template.format(documents=documents_text.strip(), question=question).strip()
    prompt = prompt.replace('\n\n\n', '\n')

    # Print the entire prompt for debugging purposes
    # print(prompt)

    # Generate the answer using the HuggingChat API
    output_text = str(chatbot.chat(prompt))

    # Extract the answer and sources separately from the model's output
    try:
        answer = output_text.split('Answer:')[-1].split('Sources:')[0].strip()
        sources = output_text.split('Sources:')[-1].strip()
        if return_sources: return answer, sources
        return answer
    except:
        print(f"Error parsing model output: {output_text}\nRetrying...")
        return synthesize_answer(relevant_docs, question, return_sources)
    
# # Example usage for RAG-based question answering
# # answer, sources = synthesize_answer(relevant_docs, question)

In [10]:
## Claim Classification
from prompts import claim_classification_template

def classify_claim(claim, questions, answers, return_reasoning=True):
    """
    Uses a chain-of-thought approach to classify the original claim as true or false based on the answers to generated questions.

    Args:
        claim (str): The original claim.
        questions (list): List of questions related to the claim.
        answers (list): List of answers corresponding to the questions.

    Returns:
        str: The conclusion whether the claim is true or false with reasoning.
    """
    # Format the questions and answers into a single string
    questions_and_answers = ""
    for question, answer in zip(questions, answers):
        questions_and_answers += f"Question: {question}\nAnswer: {answer}\n\n"

    # Fill in the prompt template with the claim and formatted questions and answers
    prompt = claim_classification_template.format(claim=claim, questions_and_answers=questions_and_answers)

    # Print the entire prompt for debugging purposes
    # print(prompt)

    # Generate the classification using the HuggingChat API
    output_text = str(chatbot.chat(prompt))

    # Extract the verdict and reasoning separately from the model's output
    try:
        verdict = output_text.split('Verdict:')[-1].split('Reasoning:')[0].strip()
        reasoning = output_text.split('Reasoning:')[-1].strip()
        if return_reasoning: return verdict, reasoning
        return verdict
    except:
        raise ValueError(f"Error parsing model output: {output_text}")

In [11]:
from collections import Counter
import numpy as np

def generate_fact_score(verdicts):
    label = None
    perc_unverified = 0
    v_cleaned = verdicts
    if 'Unverifiable' in verdicts:
        v_cleaned = verdicts[:]
        v_cleaned.remove('Unverifiable')
        perc_unverified = Counter(verdicts)['Unverifiable'] / len(verdicts)
    perc_true = Counter(verdicts)['True'] / len(verdicts)
    perc_false = Counter(verdicts)['False'] / len(verdicts)
    perc = [perc_true, perc_false, perc_unverified]
    winner = np.argwhere(perc == np.amax(perc))

    if len(winner) == 3: # three-way tie
        label = "Unverifiable"

    elif len(winner) == 2: # two-way tie
        if 0 in winner and 1 in winner: # half true
            label = 'Half True'
        elif 0 in winner and 2 in winner: # true & unverifable
            label = "Unverifiable"
        elif 1 in winner and 2 in winner: # false & unverifable
            label = "Unverifiable"

    elif len(winner) == 1:
        if 0 in winner:
            if perc_true == 1: # all true
                label = "True"
            elif Counter(v_cleaned)['True'] / len(v_cleaned) > 0.5: # mostly true
                label = "Mostly True"
            else:
                label = 'Unverifiable'
        elif 1 in winner:
            if perc_false == 1: # all false
                label = "Pants on Fire"
            elif Counter(v_cleaned)['False'] / len(v_cleaned) > 0.5: # mostly false
                label = "Mostly False"
            else:
                label = 'Unverifiable'
        elif 2 in winner:
            label = 'Unverifiable'
    return label

## Putting It All Together

In [12]:
def verify_statement_hugchat(statement, num_examples=0):
    """
    Runs the entire fact-checking pipeline for the input claim.

    Args:
        statement (str): The input statement(s).
        num_examples (int, optional): The number of few-shot examples to include in the prompts. Defaults to 0.

    Returns:
        tuple: A tuple containing the verdicts, output dictionary, fact score, and reasoning for the claim.
    """
    # Write out the whole pipeline and be verbose about what's happening (print out the steps)
    print("Statement:", statement)

    atomic_claims = generate_atomic_claims(statement, num_examples)
    print("Atomic Claims generated:", len(atomic_claims))

    output_dict = []  # List to store all the info for each atomic claim (claim, questions, answers, verdict, reasoning)
    verdicts = []
    reasonings = []

    for i, claim in enumerate(atomic_claims, start=1):
        print(f"Processing Atomic Claim {i}/{len(atomic_claims)}:")
        print("\tClaim:", claim)

        claim_output = {}
        claim_output['claim'] = claim

        questions = generate_questions(claim, num_examples)
        print("\tQuestions generated:", len(questions))

        claim_output['qa-pairs'] = {}
        claim_output['qa-pairs']['questions'] = questions
        answers = []
        sources = []
        for j, question in enumerate(questions, start=1):
            print(f"\t\tQuestion {j}/{len(questions)}:", question)

            search_results = fetch_search_results(question, scrape_website=True)
            relevant_docs = retrieve_relevant_documents_using_rag(search_results, 'relevant_excerpt', question)

            answer, source = synthesize_answer(relevant_docs, question)
            answers.append(answer)
            sources.append(source)

            print(f"\t\tAnswer {j}/{len(questions)}:", answer)
            # print(f"\t\tSources {j}:", source)

        claim_output['qa-pairs']['answers'] = answers
        claim_output['qa-pairs']['sources'] = sources

        verdict, reasoning = classify_claim(claim, questions, answers)
        verdicts.append(verdict)
        reasonings.append((claim, reasoning))
        claim_output['verdict'] = verdict
        claim_output['reasoning'] = reasoning

        print("\tVerdict:", verdict)
        print("\tReasoning:", reasoning)

        output_dict.append(claim_output)

    print("\nVerdicts:", verdicts)

    fact_score = generate_fact_score(verdicts)
    print("\nFact Score:", fact_score)

    reasoning = '\n'.join(["Claim: " + item[0] + "\nReasoning: " + item[1] + "\n" for item in reasonings])

    return verdicts, output_dict, fact_score, reasoning

In [13]:
import pandas as pd

base_dir = '../data/' # TODO: modify this to the correct path for you!
filename = 'pilot.csv'
full_path = os.path.join(base_dir, filename)
df = pd.read_csv(full_path)

df['statement'] = df['statement'].astype(str)

df['statement'][:50]

0     “The National Guard in the HISTORY of its life...
1     "On Jan. 6, 2021, U.S. Capitol 'protestors car...
2         "Not even one rocket (from Iran) hit Israel."
3     "326,000 migrants were flown to Florida with t...
4     "Crime is down in Venezuela by 67% because the...
5     "In New York, there are no barriers to law enf...
6     "Speaking of semiconductor industry jobs, "Kno...
7     "Starting in 2025 "no matter what your total b...
8     “Tens of thousands of auto jobs were lost nati...
9     "The current Congress is “the least productive...
10    "Video shows “New York Governor Kathy Hochul b...
11    "We’ve had 12 elections in 24 years in Wiscons...
12    After a 2022 law, “The vast majority of colleg...
13    "Insulin for Medicare beneficiaries "was costi...
14    “Support for Roe is higher today in America th...
15    "The 2022 CHIPS and Science Act “attracted $64...
16              “It is a fact that Obama created ISIS.”
17    "Millions of Arizonans will soon live unde

In [14]:
import os
import pickle

save_filename = '../pkl_for_final_dataset/fact_check_samples_cohere.pkl'
if os.path.exists(save_filename):
    fact_check_samples = pickle.load(open(save_filename, 'rb'))
else: 
    fact_check_samples = []

In [17]:
from tqdm.auto import tqdm
df_subset = df[26:50]

# print("Using HuggingChat model:", models[model_idx])
for index, row in tqdm(df_subset.iterrows(), total=len(df_subset)):
    statement = row['statement']
    result = verify_statement_hugchat(statement)
    fact_check_samples.append(result)

  0%|          | 0/24 [00:00<?, ?it/s]

Statement: "Only two presidents in American history left office with fewer jobs than when they entered office. Herbert Hoover and yes, Donald Herbert Hoover Trump."
Atomic Claims generated: 3
Processing Atomic Claim 1/3:
	Claim: Two US presidents left office with fewer jobs than when they entered.
	Questions generated: 2
		Question 1/2: Did Herbert Hoover leave office with fewer jobs than when he started? 
Failed to fetch information: 'organic'


ValueError: Expected IDs to be a non-empty list, got 0 IDs

In [18]:
len(fact_check_samples)

26

In [None]:
fact_check_samples[-1]

(['True'],
 [{'claim': 'Joe Biden graduated 76th out of 85 students at Syracuse University College of Law in 1968.',
   'qa-pairs': {'questions': ['Did Joe Biden graduate from Syracuse University College of Law in 1968?',
     "What was Joe Biden's class rank at Syracuse University College of Law?",
     'Was Joe Biden a student at Syracuse University in the 1960s?'],
    'answers': ['Yes, Joe Biden graduated from Syracuse University College of Law in 1968.',
     "Joe Biden's class rank at Syracuse University College of Law was 76th out of 85 students.",
     'Yes, Joe Biden was a student at Syracuse University College of Law in the 1960s, specifically in 1968.'],
    'sources': ['[\n    {\n        "text": "A rumor said Joe Biden finished 76th in a class of 85 at Syracuse University College of Law in 1968. \\"Did Biden graduate 76th out of a class ...",\n        "url": "https://www.yahoo.com/news/fact-check-rumor-says-biden-130000833.html",\n        "date_published": "Apr 27, 2024"\n 

In [None]:
# fact_check_samples.pop()

([''],
 [{'claim': 'Only two presidents in American history left office with fewer jobs created during their tenure. They were Herbert Hoover and Donald Trump.',
   'qa-pairs': {'questions': ['Which presidents left office with the fewest jobs created?',
     'Did Herbert Hoover leave office with fewer jobs than when he entered?',
     'Did Donald Trump leave office with fewer jobs than when he took office?'],
    'answers': ['The presidents who left office with the fewest jobs created were Herbert Hoover and Donald Trump.',
     'Yes, Herbert Hoover left office with fewer jobs than when he entered, during the Great Depression.',
     'Yes, Donald Trump left office with fewer jobs than when he took office.'],
    'sources': ['[\n    {\n        "text": "As of 2022, former President Bill Clinton was the president who created the most jobs in the United States, at 18.6 million jobs created ...",\n        "url": "https://www.statista.com/statistics/985577/number-jobs-created-sitting-preside

In [None]:
import pickle
with open(save_filename, 'wb') as f:
    pickle.dump(fact_check_samples, f)

In [None]:
hugchat_sample = pd.DataFrame({
    "verdicts": [x[0] for x in fact_check_samples],
    "fact_score": [x[2] for x in fact_check_samples],
    "output": [x[1] for x in fact_check_samples],
    "reasonings": [x[3] for x in fact_check_samples]
})
hugchat_sample

Unnamed: 0,verdicts,fact_score,output,reasonings
0,"[True, True]",True,[{'claim': 'The National Guard typically gets ...,Claim: The National Guard typically gets calle...
1,[False],Pants on Fire,"[{'claim': 'On Jan. 6, 2021, U.S. Capitol prot...","Claim: On Jan. 6, 2021, U.S. Capitol protestor..."
2,[False],Pants on Fire,[{'claim': 'Not even one rocket from Iran hit ...,Claim: Not even one rocket from Iran hit Israe...
3,"[False, False]",Pants on Fire,"[{'claim': '326,000 migrants were flown to Flo...","Claim: 326,000 migrants were flown to Florida ..."
4,"[False, Unverifiable]",Unverifiable,[{'claim': 'Crime in Venezuela is down by 67%....,Claim: Crime in Venezuela is down by 67%.\nRea...
5,"[False, Unverifiable]",Unverifiable,"[{'claim': 'In New York, there are no barriers...","Claim: In New York, there are no barriers for ..."
6,"[Unverifiable, True]",Unverifiable,[{'claim': 'The average salary in the semicond...,Claim: The average salary in the semiconductor...
7,"[True, True]",True,"[{'claim': 'Starting in 2025, Medicare Part D ...","Claim: Starting in 2025, Medicare Part D users..."
8,"[Unverifiable, True]",Unverifiable,[{'claim': 'Tens of thousands of auto jobs wer...,Claim: Tens of thousands of auto jobs were los...
9,[True],True,[{'claim': 'The current Congress is 'the least...,Claim: The current Congress is 'the least prod...


In [None]:
# Save the results to an Excel file
filename = 'final-gpt-4-turbo-samples.xlsx'
full_path = os.path.join(base_dir, filename)
hugchat_sample.to_excel(full_path)