In [35]:
# @title Setup packages
import pandas as pd
from collections import Counter
import random
#from google.colab import output
import json
import time
from bs4 import BeautifulSoup
import random
import requests
import seaborn as sns
import matplotlib.pyplot as plt
import spacy


In [2]:
# # magic needed to just let importing openAI work
# %%writefile /usr/local/lib/python3.10/dist-packages/openai/_utils/_streams.py
# from typing import Any
# from typing_extensions import AsyncIterator
# from typing import Iterator # import Iterator from the correct library

# def consume_sync_iterator(iterator: Iterator[Any]) -> None:
#     for _ in iterator:
#         ...

# async def consume_async_iterator(iterator: AsyncIterator[Any]) -> None:
#     async for _ in iterator:
#         ...



In [36]:
from openai import OpenAI

In [37]:
nlp = spacy.load("en_core_web_trf")

sns.set(context="poster", style='white')
#output.clear()

In [38]:
# @title Get a list of election-related misinformation from PolitiFact.
# @markdown ToDos: Filter on date, Extract source of claim

def flatten_list_of_lists(list_of_lists):
    return [item for sublist in list_of_lists for item in sublist]

def scrape_election_claims(pg_no):
    """
    Scrapes election related claims and their truth values from Politifact

    Params
    - pg_no (int): Page number

    Returns
    - A list of tuples with each tuple containing a claim and its truth value
    """
    url = f"https://www.politifact.com/factchecks/list/?page={pg_no}&category=elections"
    print("Scraping page:", pg_no)
    print("URL:", url)

    time.sleep(random.random())
    claims_data = []
    response = requests.get(url)
    if response.status_code == 200:
        try:
            soup = BeautifulSoup(response.content, 'html.parser')
            claims = soup.find_all('div', class_='m-statement__quote')
            truth_values = soup.find_all('div', class_='m-statement__meter')

            for claim, truth_value in zip(claims, truth_values):
                claim_text = claim.get_text(strip=True)
                truth_value_text = truth_value.find('img').get('alt')  # Assuming the truth value is in an image alt text
                claims_data.append((claim_text, truth_value_text))
        except Exception as e:
            print(f"Retrieved page but couldn't parse claims for page {pg_no} due to {e}")
    else:
        print(f"Failed to retrieve the page. Status code: {response.status_code}")
    return claims_data



claims = [scrape_election_claims(pg_no=i) for i in range(1, 10)]
claims = flatten_list_of_lists(claims)
claim_df = pd.DataFrame(claims, columns=['claim', 'truth'])
false_flags = ['false', 'pants-fire']
claim_df = claim_df.query("truth in @ false_flags")
print(f"Number of claims scraped where truth value is in {' OR '.join([x for x in false_flags])}: ", len(claim_df))

Scraping page: 1
URL: https://www.politifact.com/factchecks/list/?page=1&category=elections
Scraping page: 2
URL: https://www.politifact.com/factchecks/list/?page=2&category=elections
Scraping page: 3
URL: https://www.politifact.com/factchecks/list/?page=3&category=elections
Scraping page: 4
URL: https://www.politifact.com/factchecks/list/?page=4&category=elections
Scraping page: 5
URL: https://www.politifact.com/factchecks/list/?page=5&category=elections
Scraping page: 6
URL: https://www.politifact.com/factchecks/list/?page=6&category=elections
Scraping page: 7
URL: https://www.politifact.com/factchecks/list/?page=7&category=elections
Scraping page: 8
URL: https://www.politifact.com/factchecks/list/?page=8&category=elections
Scraping page: 9
URL: https://www.politifact.com/factchecks/list/?page=9&category=elections
Number of claims scraped where truth value is in false OR pants-fire:  194


In [39]:
# @title Get named entities.
# @markdown It takes awhile using Spacy Transformers. ToDos:  (1) Better filter out non-useful named entities such as numbers and (2) filter out duplicates like "Biden" and "Joe Biden"
def extract_named_entities(nlp, text):
  """Extract named entities from text using Spacy tagger"""
  doc = nlp(text)
  entities = [ent.text for ent in doc.ents]
  return entities
def refine_entities(entities):
    """
    Refine a list of entities by removing any entities containing numbers and merging duplicate names.
    
    Args:
    entities (list): A list of entity strings.
    
    Returns:
    list: A refined list of entities without duplicates and entities containing numbers.
    """
    refined_entities = []
    seen = set()  # To track seen entities and avoid re-adding them

    # Remove entities containing numeric characters
    non_numeric_entities = [entity for entity in entities if not any(char.isdigit() for char in entity)]
    
    for entity in non_numeric_entities:
        # Simplify comparison by using lowercase
        lower_entity = entity.lower()

        # Check for partial matches (e.g., "biden" with "joe biden")
        partial_match_found = False
        for seen_entity in list(seen):
            if lower_entity in seen_entity or seen_entity in lower_entity:
                # Merge names by keeping the longer one
                if len(lower_entity) > len(seen_entity):
                    seen.remove(seen_entity)
                    seen.add(lower_entity)
                    refined_entities.remove(seen_entity.capitalize())
                    refined_entities.append(entity)
                partial_match_found = True
                break

        if not partial_match_found and lower_entity not in seen:
            seen.add(lower_entity)
            refined_entities.append(entity)

    return refined_entities

def gen_entities(entity_probabilities, n=2):
  """
  Generate entities according to weights of occurence in dataset

  Params:
    entity_probabilities (dict): A dict like {entity: probability}

  Returns a list of `n` entities
  """
  return random.choices(list(entity_probabilities.keys()), weights=entity_probabilities.values(), k=n)

def gen_claims(claims, n=2):
  """Generate a random sample of claims"""
  return random.sample(list(claims), k=n)

# Get named entities
entities_per_claim = [extract_named_entities(nlp, claim) for claim in claim_df['claim'].unique()]
refined_entities_list = []
for entities in entities_per_claim:
    refined_entities = refine_entities(entities)
    refined_entities_list.append(refined_entities)

# # Make distribution
entity_frequency = Counter([entity for sublist in refined_entities_list for entity in sublist])
total_entities = sum(entity_frequency.values())
entity_probabilities = {entity: freq / total_entities for entity, freq in entity_frequency.items()}

# # Plot some entity distributions
# prob_df = pd.DataFrame(entity_probabilities, index=[1]).T.reset_index()
# prob_df.columns = ['entity', 'prob']
# prob_df = prob_df.sort_values(by=['prob'], ascending=False)

# plt.figure(figsize=(12,8))
# sns.histplot(prob_df['prob'])
# plt.title(f"Probability Distribution of {prob_df['entity'].nunique()} Named Entities")
# plt.ylabel("Number of Entities")
# plt.xlabel("Probability of Occurence")
# plt.show()

# plt.figure(figsize=(12,8))
# sns.barplot(data=prob_df.head(20), x='prob', y='entity', palette='Blues_r')
# plt.title("Top 20 Named Entities")



# Generate similar misinformation using a few-shot prompt.

- CONTEXT: We provide `N_EXAMPLES` of claims and `N_EXAMPLES` of named entities, where the probability of a named entity being picked is proportional to its probability in the dataset

- PROMPT: Ask ChatGPT to generate `N_STATEMENTS` similar to the example claims, where each claim is about at least one of the named entities.

- CONSTRAINTS: By default we say `Each statement should sound very realistic.` but can experiment by tweaking `MORE_INSTRUCTIONS`

Possible ToDos: Prompt experiments, change hyperparameters, add source, source-entity co-occurence, fine-tuning


In [40]:
# @title Prompt Generator { vertical-output: true }
N_STATEMENTS = 10# @param {type:"slider", min:1, max:10, step:1}
N_EXAMPLES =  10# @param {type:"slider", min:1, max:10, step:1}
MODEL = "gpt-3.5-turbo" # @param ["gpt-3.5-turbo", "gpt-4"]
MORE_INSTRUCTIONS = "" # @param {type:"string"}
OPENAI_KEY = "" # @param {type:"string"}

def query_openai(prompt, model):
  """Query openai"""
  client = OpenAI(api_key=OPENAI_KEY)
  response = client.chat.completions.create(
    model=model,
    messages=[
      {
        "role": "user",
        "content": prompt
      },
    ],
    temperature=1,
    max_tokens=256,
    top_p=1,
    frequency_penalty=0,
    presence_penalty=0
  )
  return json.loads(response.json())['choices'][0]['message']['content']


def get_prompt(entity_probabilities, claims, n_statements=1, n_examples=2, more_instructions=""):
  """
  Build a prompt for few shot misinfo generation

  Params
  - entity_probabilities (dict): {entity, prob} dictionary
  - claims (list): A list of claims
  - n_statements(int, default=1): Number of statements to generate
  - n_examples (int, default=2): Number of examples
  - more_instructions (str, default=''): A string of addtnl instructions

  Returns:
    A string of a ChatGPT prompt

  """
  claims = '-' + " \n-".join([x for x in set(gen_claims(claims, n=n_examples))])
  entities = '-' + " \n-".join([x for x in  set(gen_entities(entity_probabilities, n=n_examples))])

  prompt = f"""Generate {n_statements} statement(s) similar to these statements: \n{claims}\n\nEach statement should be about at least one of these entities:\n{entities}\n\nReturn a newline-seperated list of statements and nothing else.\n\nSTATEMENTS:\n-Statement1 \n-Statement2\n...\nCONSTRAINTS:\n-Each statement should sound very realistic.\n{'-' + more_instructions if more_instructions else ''}\n\n"""
  return entities, prompt

extracted_entities, prompt = get_prompt(entity_probabilities, claim_df['claim'].values, n_statements=N_STATEMENTS, n_examples=N_EXAMPLES, more_instructions=MORE_INSTRUCTIONS)
print("PROMPT:\n\n", prompt)

response = query_openai(prompt, MODEL)
print("RESPONSE:\n\n", response)


PROMPT:

 Generate 10 statement(s) similar to these statements: 
-“There is no chain of custody” for ballots placed in Box No. 3 at Maricopa, Arizona, polling sites. 
-“Maricopa County announced that on Election Day over 540,000 voters visited one of the 223” vote centers, but final official results data claimed that “only 248,070 people voted.” 
-“Illegal immigrants now have the right to vote in New York." 
-Vice President Mike Pence on Jan. 6, 2021, could have approved the 2020 electoral votes on the condition that Congress pass an overhaul of election law. 
-“In Arizona, we have flood the zone with fake ballots.” 
-“They are trying to make it illegal to question the results of a bad election.” 
-If Texas says, “‘We don’t want to be part of America anymore’ … that’s their decision to make.” 
-“Pennsylvania is under a court order to count their ballots on election day and not after!!” 
-Brian Kemp "dismissed concerns about voter fraud in the 2020 election" and "widespread illegal ball

In [49]:
# List of all GPT-generated statments
statements_list = response.strip().split('\n')
statements_list = [statement.lstrip('-') for statement in statements_list]
candidates=statements_list

In [50]:
# List of all entities used to generated all the statements
extracted_entities_list = extracted_entities.strip().split('\n')
extracted_entities_list = [extracted_entities_list.lstrip('-') for extracted_entities_list in extracted_entities_list]
extracted_entities_list = [extracted_entities_list.rstrip() for extracted_entities_list in extracted_entities_list]

In [51]:
# Link all the entities with the statements and form a list
entities_in_statements = []
for statement in statements_list:
    entities_in_current_statement = [entity for entity in extracted_entities_list if entity.lower() in statement.lower()]
    entities_in_statements.append(entities_in_current_statement)

[['Los Angeles County'], ['Clark County'], ['Katie Hobbs', 'Arizona'], ['third', 'DMV'], ['Los Angeles County'], ['Biden'], ['Arizona', 'Zero'], ['Clark County'], ['DMV'], ['Los Angeles County']]


In [44]:
# Search out all the rows that had any of the entities that the each of the generated statements used
filtered_rows=[]
for entities_per_statement in entities_in_statements:
    filtered_rows_per_statement = [index for index, row in enumerate(entities_per_claim) if any(element in entities_per_statement for element in row)]
    filtered_rows.append(filtered_rows_per_statement)


In [45]:
# Gernerating the list of existing statements
references=[]
for row_list in filtered_rows:
    references_per_statement=[]
    for row_num in row_list:
        references_per_statement.append(claims[row_num][0])
    references.append(references_per_statement)
references

[['“Wisconsin has historically … and I think largely continues to be, a blue state.”'],
 ['Photo shows a copycat ‘QAnon shaman’ at attack on Brazil’s capital.',
  '"Over 240,000 \'unverified\' ballots have already been sent out in Pennsylvania, a total mess. The Democrats are playing games again.”'],
 ['Clips of Nikki Haley speaking about Hillary Clinton show Haley supports Clinton and is “not who she says she is.”',
  '"Trump has selected his pick for running mate (and the) news stuns Republican Party.”',
  'Ron DeSantis “wants to cut Social Security and Medicare.”',
  'Video showing someone in military gear outside a Georgia Waffle House proves that Donald Trump is president.',
  'A lawyer for former President Donald Trump released new proof of 2020 election fraud.',
  'An Arizona judge was “forced to overturn” an election and ruled “274,000 ballots must be thrown out.”',
  'A Florida elections bill “guts everything” “instead of getting tough, and doing what the people want (same day

In [47]:
from bert_score import score
import numpy as np

# Initialize an array to store the best F1 score for each candidate
best_f1_scores = np.zeros(len(candidates))
import warnings
warnings.filterwarnings('ignore')
# Compare each candidate against every reference
for i, candidate in enumerate(candidates):
    reference_list=references[i]
    # Store F1 scores for the current candidate against all references
    f1_scores_for_candidate = []
    
    for reference in reference_list:
        P, R, F1 = score([candidate], [reference], lang='en', verbose=False)
        f1_scores_for_candidate.append(F1.item())
    
    # Find the best F1 score for the current candidate
    best_f1_scores[i] = max(f1_scores_for_candidate)

best_f1_scores


Some weights of RobertaModel were not initialized from the model checkpoint at roberta-large and are newly initialized: ['roberta.pooler.dense.weight', 'roberta.pooler.dense.bias']
You should probably TRAIN this model on a down-stream task to be able to use it for predictions and inference.
Some weights of RobertaModel were not initialized from the model checkpoint at roberta-large and are newly initialized: ['roberta.pooler.dense.weight', 'roberta.pooler.dense.bias']
You should probably TRAIN this model on a down-stream task to be able to use it for predictions and inference.
Some weights of RobertaModel were not initialized from the model checkpoint at roberta-large and are newly initialized: ['roberta.pooler.dense.weight', 'roberta.pooler.dense.bias']
You should probably TRAIN this model on a down-stream task to be able to use it for predictions and inference.
Some weights of RobertaModel were not initialized from the model checkpoint at roberta-large and are newly initialized: ['ro

array([0.83681005, 0.85647386, 0.86977142, 0.87489086, 0.84713727,
       0.8683508 , 0.88615865, 0.86073941, 0.84644192, 0.83464378])

In [16]:
#junk
import torch
if torch.cuda.is_available():
    print("CUDA is available. GPU will be used for computation.")
    device = torch.device("cuda")
else:
    print("CUDA is not available. Computation will fall back on the CPU.")
    device = torch.device("cpu")


CUDA is not available. Computation will fall back on the CPU.


In [None]:
#junk for now
def Calc_Bert_Score(repeats=5):
    for i in range(repeats): 
        extracted_entities, prompt = get_prompt(entity_probabilities, claim_df['claim'].values, n_statements=N_STATEMENTS, n_examples=N_EXAMPLES, more_instructions=MORE_INSTRUCTIONS)
        response = query_openai(prompt, MODEL)
        statements_list = response.strip().split('\n')
        statements_list = [statement.lstrip('-') for statement in statements_list]
        extracted_entities_list = extracted_entities.strip().split('\n')
        extracted_entities_list = [extracted_entities_list.lstrip('-') for extracted_entities_list in extracted_entities_list]
        extracted_entities_list = [extracted_entities_list.rstrip() for extracted_entities_list in extracted_entities_list]
        filtered_rows = [index for index, row in enumerate(entities_per_claim) if any(element in extracted_entities_list for element in row)]
        references=[]
        for row_num in filtered_rows:
            references.append(claims[row_num][0])
        candidates=statements_list

    