# Automated Claim Correction: Evidence from the Russia-Ukraine Conflict

Social media platforms enable the rapid and widespread dissemination of new claims, while corrective responses often lag in both timing and visibility. This paper presents a system for real-time identification and response to contentious content. The system maintains a database of pre-compiled claims and corresponding corrections. It analyzes incoming social media posts, news articles, and other online content to extract embedded claims, retrieve the appropriate corrections, and automatically generate appropriate responses. We evaluate our approach using a corpus of 313 social media posts from Russian state propaganda outlets. In 76\% of cases, our system’s responses were judged more effective at challenging the content than the crowdsourced human alternative (Twitter Community Notes). These results demonstrate the potential for automated approaches to provide timely and scalable interventions in contentious online discourse.

In [None]:
# Libraries
import os
import json
from datetime import datetime
from tqdm import tqdm
from glob import glob
import random

# Helper functions
from libs.utils import save_dict_as_json, load_dict_from_json
from libs.strings import remove_links
from libs.batch_processor import parallel_process_batches, parallel_embed_batches

In [2]:
# Data
filepath_entities_of_interest = './Data/entities-of-interest.json'
filepath_tweet_id_to_url = './Data/tweet-id-to-url.json'
filepath_tweets = './Data/tweets.json'

# Community notes
filepath_community_notes_raw = './Data/notes-00000.tsv'
filepath_community_notes_status = './Data/noteStatusHistory-00000.tsv'
filepath_community_notes_filtered = './Data/community-notes.json'

# Analyses
filepath_dyads = './Analyses/1-dyads.json'
filepath_claims = './Analyses/4-claims.json'
filepath_claims_final = './Analyses/4-claims-final.json'
filepath_claims_embeddings = './Analyses/5-claims-embeddings.json'
filepath_counterclaims = './Analyses/6-counterclaims.json'
filepath_database = './Analyses/7-database.json'
filepath_cached_requests = './Analyses/8-cached-requests.json'
filepath_responses = './Analyses/9-responses.json'
filepath_evaluations = './Analyses/10-evaluations.json'

## 1. Mise-en-place

In [3]:
# Load entities of interest
with open(filepath_entities_of_interest, 'r') as f:
    entities_of_interest = json.load(f)

# Lexicon
lexicon = [
    # General
    'military', 'missile', 'nuclear', 
    
    # Ukraine
    'ukraine', 'ukrainian', 'ukrainians', 'kyiv',

    # Russia
    'russia', 'russian', 'russians', 'moscow', 'kremlin',
    
    # People
    'putin', 'zelensky', 'zelenskyy', 'zakharova', 'lavrov', 'poroshenko', 'yanukovych', 'yanukovich', 'klitschko',

    # Ideologies
    'nazi', 'nazism', 'nazis', 'neo-nazi',

    # Organizations
    'nato', 'osce', 'red army', 'cia', 

    # Places
    'donbas', 'donbass', 'donetsk', 'luhansk', 'crimea', 'kharkiv', 
    'kherson', 'odessa', 'zaporizhzhia', 'zaporizhzhya', 'zaporizhzhya', 
    'sevastopol', 'simferopol', 'mariupol', 'mariupol', 
    
    # Other countries
    'poland',
]

# Date Window
MIN_DATE = '2022-02-23'

## 2. Community Notes

Download 'Notes data' and 'Note status history data' from [here](https://x.com/i/communitynotes/download-data) into Data/ folder.

In this section we filter the community notes data to only include notes that are related to the Russia-Ukraine conflict.

### 2.1 Filter by Review Status

Remove notes who aren't in the 'CURRENTLY_RATED_HELPFUL' status.

In [None]:
import pandas as pd

# Load community notes
df_raw = pd.read_csv(filepath_community_notes_raw, sep='\t')
df_status = pd.read_csv(filepath_community_notes_status, sep='\t')

# Log
print(f"Total notes: {len(df_raw):,}")

# Config
columns_raw = [
    'noteId',
    'createdAtMillis',
    'tweetId',
    'summary'
]
columns_status = [
    'noteId',
    'currentStatus'
]
status_to_remove = [
    'CURRENTLY_RATED_NOT_HELPFUL',
    'NEEDS_MORE_RATINGS'
]

# Remove all but the columns of interest
df_raw = df_raw[columns_raw]
df_status = df_status[columns_status]

# Convert tweet ids to strings
df_raw['tweetId'] = df_raw['tweetId'].astype(str)
df_raw['noteId'] = df_raw['noteId'].astype(str)
df_status['noteId'] = df_status['noteId'].astype(str)

# Join dataframes
df_raw = df_raw.merge(df_status, on='noteId', how='inner')

# Remove all rows with status to remove
df_raw = df_raw[~df_raw['currentStatus'].isin(status_to_remove)]

# Save as json
df_raw.to_json(filepath_community_notes_filtered, orient='records', indent=4, force_ascii=False)

# Log 
print(f"Total notes after filtering: {len(df_raw):,}")

### 2.2 Filter by Lexicon

In [6]:
import re

def tokenize(text):
    """
    Tokenizes the input text into words using only the re library.
    """
    # Convert to lowercase
    text = text.lower()
    
    # Remove URLs
    text = re.sub(r'http\S+|www\.\S+', '', text)
    
    # Remove punctuation and non-alphanumeric characters, replace with spaces
    text = re.sub(r'[\W_]+', ' ', text)
    
    # Split by whitespace to get tokens
    tokens = text.split()
    
    return tokens

In [None]:
# Load community notes
with open(filepath_community_notes_filtered, 'r') as file:
    community_notes = json.load(file)

# Filter using the lexicon
community_notes_filtered = []

# Iterate over
for note in tqdm(community_notes):

    # Unpack
    text = note['summary']

    # Tokenize
    tokens = set(tokenize(text))

    # Check if any token is in the lexicon
    if not any(token in lexicon for token in tokens):
        continue

    # Append
    community_notes_filtered.append(note)

# Save
with open(filepath_community_notes_filtered, 'w') as file:
    json.dump(community_notes_filtered, file, indent=4, ensure_ascii=False)

# Log
print(f"Total notes after lexicon filtering: {len(community_notes_filtered):,}")

### 2.3 Filter by language

In [15]:
# !pip3 install langdetect
from langdetect import detect

def is_english(text):
    try:
        return detect(text) == 'en'
    except:
        return False

In [None]:
# Load community notes
with open(filepath_community_notes_filtered, 'r') as file:
    community_notes = json.load(file)

# Initialize
community_notes_filtered = []

# Iterate over
for note in tqdm(community_notes):

    # Unpack
    text = note['summary']

    # If not English skip
    if not is_english(text): continue

    # Append
    community_notes_filtered.append(note)

# Save
with open(filepath_community_notes_filtered, 'w') as file:
    json.dump(community_notes_filtered, file, indent=4, ensure_ascii=False)

# Log
print(f"Total notes after language filtering: {len(community_notes_filtered):,}")

### 2.4 Filter by date

In [None]:
# Convert to millis
MIN_DATE = datetime.strptime(MIN_DATE, '%Y-%m-%d').timestamp() * 1000

# Load community notes
with open(filepath_community_notes_filtered, 'r') as file:
    community_notes = json.load(file)

# Initialize
community_notes_filtered = []

# Iterate over
for note in tqdm(community_notes):

    # Unpack
    date = note['createdAtMillis']

    # If date is too early skip
    if date < MIN_DATE: continue

    # Append
    community_notes_filtered.append(note)

# Save
with open(filepath_community_notes_filtered, 'w') as file:
    json.dump(community_notes_filtered, file, indent=4, ensure_ascii=False)

# Log
print(f"Total notes after date filtering: {len(community_notes_filtered):,}")

Get the date span of the notes.

In [None]:
# Load community notes
with open(filepath_community_notes_filtered, 'r') as file:
    community_notes = json.load(file)

# Initialize
dates_arr = []

# Iterate over
for note in community_notes:

    # Convert to yyyy-mm-dd
    date = note['createdAtMillis']
    date = datetime.fromtimestamp(date / 1000).strftime('%Y-%m-%d')
    dates_arr.append(date)

# Sort
dates_arr = sorted(dates_arr)

# Get the min/max
min_date = dates_arr[0]
max_date = dates_arr[-1]

# Log
print(f"Min date: {min_date}")
print(f"Max date: {max_date}")

### 2.5 Filter by account

The notes do not have the account information by default. Tweet ID to URL was obtained from the Twitter API.

In [None]:
# Load mapping of tweet id to url
with open(filepath_tweet_id_to_url, 'r') as file:
    tweet_id_to_url = json.load(file)

# Load community notes
with open(filepath_community_notes_filtered, 'r') as file:
    community_notes = json.load(file)

# Map handles to entities
entity_by_handle = {}
for entity in entities_of_interest:
    for channel in entity['Channels']:
        if channel['Platform'] != 'Twitter': continue
        entity_by_handle[channel['Handle'].lower()] = entity

# Initialize
community_notes_filtered = []

# Iterate over
for note in tqdm(community_notes):

    # Unpack
    tweet_id = note['tweetId']

    # Skip if no url
    if tweet_id not in tweet_id_to_url: continue

    # Get url
    url = tweet_id_to_url[tweet_id]

    # Check if any handle is in the url
    handle = url.split('/')[-3].lower()

    # Check if handle is in the entities of interest
    if handle not in entity_by_handle: continue

    # Add metadata
    note['entity_id'] = entity_by_handle[handle]['ID']
    note['url'] = url

    # Append
    community_notes_filtered.append(note)

# Save
with open(filepath_community_notes_filtered, 'w') as file:
    json.dump(community_notes_filtered, file, indent=4, ensure_ascii=False)

# Log
print(f"Total notes after filtering: {len(community_notes_filtered):,}")

Get the final breakdown of notes by account.

In [None]:
# Load community notes
with open(filepath_community_notes_filtered, 'r') as file:
    community_notes = json.load(file)

# Map entity_id to entity name
entity_name_by_id = {}
for entity in entities_of_interest:
    entity_name_by_id[entity['ID']] = entity['Name']

# Initialize
entity_count = {}
for entity in entities_of_interest:
    entity_count[entity['ID']] = 0

# Iterate over
for note in community_notes:
    entity_id = note['entity_id']
    entity_count[entity_id] += 1

# Remove entities with no notes
entity_count = { k: v for k, v in entity_count.items() if v > 0 }

# Sort
entity_count = { k: v for k, v in sorted(entity_count.items(), key=lambda item: item[1], reverse=True) }

# Log
for entity_id, count in entity_count.items():
    print(f"{entity_name_by_id[entity_id]}: {count:,}")
print(f"Total notes: {sum(entity_count.values()):,}")

## 3. Tweet-Note Dyads

The notes do not have the original tweeet by default. Tweets were obtained from the Twitter API. We combine them here.

In [None]:
# Load data
with open(filepath_tweets, 'r') as file:
    tweets = json.load(file)
with open(filepath_community_notes_filtered, 'r') as file:
    community_notes = json.load(file)

# Map tweet id to tweet
tweet_by_id = {}
for tweet in tweets:
    tweet_by_id[tweet['tweetId']] = tweet

# Initialize
dyads = []

# Iterate over notes
for note in community_notes:

    # Unpack
    tweet_id = note['tweetId']
    url = note['url']
    note_text = note['summary']
    entity_id = note['entity_id']

    # Skip if no tweet
    if tweet_id not in tweet_by_id: continue

    # Get tweet
    tweet = tweet_by_id[tweet_id]
    tweet_text = tweet['text']


    # Append
    dyads.append({
        'tweetId': tweet_id,
        'url': url,
        'entity_id': entity_id,
        'note': note_text,
        'tweet': tweet_text,
    })

# Log
print(f"Total dyads: {len(dyads):,}")

# Save
with open(filepath_dyads, 'w') as file:
    json.dump(dyads, file, indent=4, ensure_ascii=False)

## 4. Claim Extraction

Using an LLM, extract the claims from each filtered document.

In [21]:
def compose_prompt_claims(text):

    # Clean
    text = remove_links(text)

    return f"""
DOCUMENT:
"{text}"

TASK:
Identify all claims in the document. A claim is a statement that:
- Asserts or implies something as true, factual, or plausible (e.g., "X happened," "Y is true," or "Z will occur").
- Takes a position (e.g., supports or opposes something).
- Shows a connection, causation, or relationship (e.g., "Could X have caused Y?").
- Proposes an explanation, hypothesis, or prediction.

GUIDELINES:
- Claims must be concise and self-contained.
- Exclude unnecessary details unless essential for clarity.
- Focus each claim on a single idea.
- Keep claims objective and based only on the document content.

OUTPUT:
Return the main claims as a JSON array. For example:
claims: [
    "NATO provoked the war",
    "Neo-Nazis are in the Ukrainian government",
    ...
]

If the document does not contain any claims, return an empty list: [].
""".strip()

In [None]:
# Load dyads
with open(filepath_dyads, 'r') as file:
    dyads = json.load(file)

# Initialize/Load Results
claims_dict = {}
if os.path.exists(filepath_claims):
    claims_dict = load_dict_from_json(filepath_claims)

# Initialize Tasks
tasks = []

# Iterate over
for document in dyads:

    # Skip if already annotated
    key = (document['url'],)
    if key in claims_dict and claims_dict[key] is not None: continue
    
    # Compose prompt
    prompt = compose_prompt_claims(document['tweet'])

    # Append
    tasks.append((key, prompt))

# Log number of tasks
print(f'Found {len(tasks):,} tasks to process.')

In [None]:
# Configuration
RESULTS_MAP = None
NUM_WORKERS = 10
FILEPATH = filepath_claims
JSON_MODE = True
MODEL_LARGE = True

# Run the function
parallel_process_batches(
    tasks=tasks, 
    filepath=FILEPATH, 
    results_map=RESULTS_MAP, 
    json_mode=JSON_MODE,
    model_large=MODEL_LARGE, 
    num_workers=NUM_WORKERS
)

Collect and format all claims

In [23]:
# Load claims
claims_dict = {}
if os.path.exists(filepath_claims):
    claims_dict = load_dict_from_json(filepath_claims)

# Iterate over and format
for key in claims_dict.keys():
    if claims_dict[key] is None: continue

    # Initialize
    claims = set()

    # Initial format { 'claims': [ ... ] } (due to llm processing)
    if 'claims' in claims_dict[key]:
        claims = set(claims_dict[key]['claims'])

    else:
        claims = set(claims_dict[key])

    # Remove '.' at the end
    claims = set([ claim[:-1].strip() if claim[-1] == '.' else claim.strip() for claim in claims ])

    # Update
    claims_dict[key] = list(claims)

# Save
save_dict_as_json(claims_dict, filepath_claims)

## 5. Claim Embedding

Represent each claim as a vector using a text embedding model. This vector representation captures the semantic meaning of the claim, enabling similarity comparisons and clustering.

In [None]:
# Load claims
claims_dict = load_dict_from_json(filepath_claims)

# Collect all claims across documents
claims = set()
for values in claims_dict.values():
    claims.update(values)

# Initialize / Load
embeddings_dict = {}
if os.path.exists(filepath_claims_embeddings):
    embeddings_dict = load_dict_from_json(filepath_claims_embeddings)

# Create tasks
tasks = []
for claim in claims:
    
    # Skip if already processed
    key = (claim,)
    if key in embeddings_dict and embeddings_dict[key] is not None: continue
    
    # Add to tasks
    tasks.append((key, claim))

# Log
print(f'Found {len(tasks):,} tasks to process.')

In [None]:
# Run
parallel_embed_batches(tasks, filepath_claims_embeddings, num_workers=10)

## 6. Counter-Claim Generation (FOR PROOF OF CONCEPT)

STRICTLY FOR THE INITIAL PROOF OF CONCEPT. Usually this would be done by human expert. 

For each extracted claim, use an LLM to generate a counterclaim that employs a **fact-based strategy**.

In [25]:
def compose_prompt_generate_counterclaim(claim): 
    suffix = '\n{\n   "claim": "' + claim + '",\n   "label": "True/False",\n   "evidence": "Short, factual summary with hard evidence (cite only key sources)."\n}'
    return f"""
CLAIM:
"{claim}"

TASK:
Verify the factual accuracy of the claim. Provide a short explanation supported by concise and hard evidence.

INSTRUCTIONS:
1. Analyze the claim: Identify the key assertions and context.
2. Research evidence: Use only credible sources (e.g., academic studies, verified news, official records).
3. Evaluate reasoning: Identify any fallacies, biases, or misinformation.
4. Determine accuracy: Label the claim as "True" or "False." Justify your label with brief, precise, and verifiable evidence (no lengthy explanations).

OUTPUT FORMAT:
Return the analysis as a structured JSON object in this format:
""".strip() + suffix

In [None]:
# Initialize
tasks = []

# Load claims
with open(filepath_claims_final, 'r') as file:
    claims = json.load(file)

# Load claims results
counterclaims_dict = {}
if os.path.exists(filepath_counterclaims):
    counterclaims_dict = load_dict_from_json(filepath_counterclaims)

# Iterate over
for claim in claims:

    # Skip if already annotated
    key = (claim,)
    if key in counterclaims_dict and counterclaims_dict[key] is not None: continue

    # Compose prompt
    prompt = compose_prompt_generate_counterclaim(claim)

    # Append
    tasks.append((key, prompt))

# Log number of tasks
print(f'Found {len(tasks):,} tasks to process.')

In [None]:
# Configuration
RESULTS_MAP = None
NUM_WORKERS = 10
FILEPATH = filepath_counterclaims
JSON_MODE = True
MODEL_LARGE = True

# Run the function
parallel_process_batches(
    tasks=tasks, 
    filepath=FILEPATH, 
    results_map=RESULTS_MAP, 
    json_mode=JSON_MODE,
    model_large=MODEL_LARGE, 
    num_workers=NUM_WORKERS
)

Collect and format all counterclaims

In [21]:
# Load
counterclaims_dict = load_dict_from_json(filepath_counterclaims)

# Iterate over and format
for key in counterclaims_dict.keys():
    if 'claim' in counterclaims_dict[key]:
        del counterclaims_dict[key]['claim']
        
# Save
save_dict_as_json(counterclaims_dict, filepath_counterclaims)

## 7. Claim-Counterclaim Database

A database pairing each claim with its corresponding counterclaim. Additionally, each claim is stored with its vector representation and text to facilitate retrieval during discussions.

In [None]:
# Load final claims
with open(filepath_claims_final, 'r') as file:
    claims = json.load(file)

# Load embeddings
embeddings_dict = load_dict_from_json(filepath_claims_embeddings)
embeddings_dict = { key: value for key, value in embeddings_dict.items() if key[0] in claims }

# Load claims results
counterclaims_results = load_dict_from_json(filepath_counterclaims)
counterclaims_results = { key: value for key, value in counterclaims_results.items() if key[0] in claims }

# Initialize
database = []

# Iterate over
for claim in claims:

    # Get key
    key = (claim,)
    embedding = embeddings_dict[key]
    counterclaim = counterclaims_results[key]
    
    # Append
    database.append({
        'claim': claim,
        'counterclaim': counterclaim['evidence'],
        'embedding': embedding
    })

# Save
with open(filepath_database, 'w') as file:
    json.dump(database, file, indent=4, ensure_ascii=False)

# Log
print(f"Total claims in database: {len(database):,}")

## 8. Document Counterclaim Retrieval

Pair new documents with relevant counterclaims for response generation.

In [3]:
from libs.database import pipeline_lookup_counterclaims

In [None]:
# Load dyads
with open(filepath_dyads, 'r') as file:
    dyads = json.load(file)

# Initialize
counterclaims_dict = {}
if os.path.exists(filepath_cached_requests):
    with open(filepath_cached_requests, 'r') as file:
        counterclaims_dict = json.load(file)

# Iterate over
for document in dyads:

    # If already processed, skip 
    if document['url'] in counterclaims_dict:
        continue

    # Unpack
    id_str = document['url']
    text = document['tweet']

    # Retrieve counterclaims
    counterclaims = pipeline_lookup_counterclaims(text, filepath_database=filepath_database)    

    # Store
    counterclaims_dict[id_str] = counterclaims

    # Save
    with open(filepath_cached_requests, 'w') as file:
        json.dump(counterclaims_dict, file, indent=4, ensure_ascii=False)

## 9. Response Generation

For each document, generate responses using different methods.

In [16]:
# Number of different responses to generate per method per document
NUM_RESPONSES_PER_METHOD = 1

# Different methods to generate responses
RESPONSE_GENERATION_CATEGORIES = ['control', 'generic', 'RAG', 'community_note', 'community_note_RAG']

In [17]:
RULES = """
RULES:
- Brevity: Responses must be brief and focused.
- Neutrality: Avoid emotionally charged, inflammatory, or biased language. Maintain a factual and objective tone.
- Accuracy: Ensure correct spelling and consistent use of established names or terminology (e.g., countries, organizations, historical figures).
- Formatting: Do not use any formatting or special characters.
- Author's Stance: Consider the provided author's stance when crafting the response, but never mention it explicitly.
- Final Response Only: Provide only the final response. Do not include reasoning, intermediate steps, or explanations in the output.
""".strip()


def compose_prompt_control(text):
    return f"""
DOCUMENT:
"{text}"

TASK:
Write a short response that superficially challenges the main point of the document. Focus only on surface-level issues and avoid providing strong evidence or detailed reasoning.

{RULES}
""".strip()


def compose_prompt_generic(text):
    return f"""
DOCUMENT:
"{text}"

TASK:
Write a clear, concise, and constructive response that challenges the main point of the document.

{RULES}
""".strip()


def compose_prompt_RAG(text, counterclaims):
    random.shuffle(counterclaims)   # Randomize order of counter claims for different responses
    counterclaims_str = '\n'.join([ f'- "{counterclaim}"' for counterclaim in counterclaims ])
    return f"""
DOCUMENT:
"{text}"

RELEVANT INFORMATION:
{counterclaims_str}

TASK:
Using the relevant information provided, write a clear, concise, and constructive response that challenges the main point of the document.

{RULES}
""".strip()

In [None]:
# Load dyads
with open(filepath_dyads, 'r') as file:
    dyads = json.load(file)

# Initialize
tasks = []

# Load counterclaims
with open(filepath_counterclaims, 'r') as file:
    counterclaims_dict = json.load(file)

# Load responses
responses_dict = {}
if os.path.exists(filepath_responses):
    responses_dict = load_dict_from_json(filepath_responses)

# Iterate over
for document in dyads:

    # Unpack
    id_str = document['url']
    tweet_id = document['tweetId']
    text = document['tweet']
    note = document['note']

    # If no counterclaims, skip
    if id_str not in counterclaims_dict:
        print(f"Skipping {id_str} due to missing counterclaims.")
        continue

    # Load counterclaims
    counterclaims = counterclaims_dict[id_str]

    # Iterate over
    for idx in range(NUM_RESPONSES_PER_METHOD):

        # Response #1: Control
        key_1 = (id_str, 'control', idx)
        response_1 = compose_prompt_control(text)
        if key_1 not in responses_dict or responses_dict[key_1] is None:
            tasks.append((key_1, response_1))

        # Response #2: Generic LLM response
        key_2 = (id_str, 'generic', idx)
        response_2 = compose_prompt_generic(text)
        if key_2 not in responses_dict or responses_dict[key_2] is None:
            tasks.append((key_2, response_2))
        
        # Response #3: Retrieval-Augmented Generation
        key_3 = (id_str, 'RAG', idx)
        response_3 = compose_prompt_RAG(text, counterclaims)
        if key_3 not in responses_dict or responses_dict[key_3] is None:
            tasks.append((key_3, response_3))

        # Response #4: Community Note
        #   manually added...

        # Response #5: Community Note with RAG
        key_5 = (id_str, 'community_note_RAG', idx)
        response_5 = compose_prompt_RAG(text, [note])
        if key_5 not in responses_dict or responses_dict[key_5] is None:
            tasks.append((key_5, response_5))
    
# Log
print(f"Generated {len(tasks):,} tasks.")

In [None]:
# Configuration
RESULTS_MAP = None
NUM_WORKERS = 10
FILEPATH = filepath_responses
JSON_MODE = False
MODEL_LARGE = True

# Run the function
parallel_process_batches(
    tasks=tasks,
    filepath=FILEPATH, 
    results_map=RESULTS_MAP, 
    json_mode=JSON_MODE,
    model_large=MODEL_LARGE, 
    num_workers=NUM_WORKERS,
    temperature=0.0
)

## 10. Evaluation (FOR PROOF OF CONCEPT)

In [17]:
# Categories
RESPONSE_GENERATION_CATEGORIES = ['control', 'generic', 'RAG', 'community_note', 'community_note_RAG']

In [18]:
def compose_prompt_ranking_effectiveness(text, response_a, response_b):

    # Clean
    text = remove_links(text)
    response_a = remove_links(response_a)
    response_b = remove_links(response_b)

    return f"""
You are evaluating two responses to the following document. Your task is to determine which response more effectively challenges the main point of the document.

Criteria for an Effective Response:
- Maintains a neutral and factual tone.
- Directly addresses the main point of the document.
- Uses relevant facts and evidence to support its argumentation.

DOCUMENT:
"{text}"

RESPONSE A:
"{response_a}"

RESPONSE B:
"{response_b}"

QUESTION: 
Which response (A or B) is more effective at challenging the main point of the document?

Respond ONLY with a single letter: A or B. Do not add any explanations or additional text.
""".strip()

In [None]:
# Load dyads
with open(filepath_dyads, 'r') as file:
    dyads = json.load(file)

# Initialize
tasks = []

# Load responses
responses_dict = load_dict_from_json(filepath_responses)

# Load evaluation results
evaluation_dict = {}
if os.path.exists(filepath_evaluations):
    evaluation_dict = load_dict_from_json(filepath_evaluations)

# Iterate over
for document in dyads:

    # Unpack
    id_str = document['url']
    text = document['tweet']

    # Generate pairs excluding same-category comparisons
    pairs = []

    # Iterate only over unique category combinations
    for i, category_A in enumerate(RESPONSE_GENERATION_CATEGORIES):
        for j, category_B in enumerate(RESPONSE_GENERATION_CATEGORIES[i + 1:], start=i + 1):  # Avoid duplicate or reversed pairs
            
            # Get responses
            responses_A = []
            responses_B = []
            for i in range(10):
                key_a = (id_str, category_A, i)
                key_b = (id_str, category_B, i)
                if key_a in responses_dict:
                    responses_A.append(responses_dict[key_a])
                if key_b in responses_dict:
                    responses_B.append(responses_dict[key_b])
            
            for response_A in responses_A:
                for response_B in responses_B:
                    pairs.append((response_A, response_B))

    # Iterate over
    for response_A, response_B in pairs:

        # Effectiveness Task
        key_effectiveness = (id_str, 'effectiveness', response_A, response_B)
        prompt_effectiveness = compose_prompt_ranking_effectiveness(text, response_A, response_B)
        if key_effectiveness not in evaluation_dict or evaluation_dict[key_effectiveness] is None:
            tasks.append((key_effectiveness, prompt_effectiveness))
            
# Log
print(f"Generated {len(tasks):,} tasks.")

In [None]:
# Configuration
RESULTS_MAP = [
    ('A', 0),
    ('B', 1)
]
NUM_WORKERS = 10
FILEPATH = filepath_evaluations
JSON_MODE = False
MODEL_LARGE = True

# Run the function
parallel_process_batches(
    tasks=tasks,
    filepath=FILEPATH, 
    results_map=RESULTS_MAP, 
    json_mode=JSON_MODE,
    model_large=MODEL_LARGE, 
    num_workers=NUM_WORKERS
)

## 11. Collect all evaluations

In [None]:
import numpy as np

# Categories
RESPONSE_GENERATION_CATEGORIES = [ 'control', 'generic', 'RAG', 'community_note', 'community_note_RAG' ]

""" Step 1: Load Data """

# Load data
responses_dict = load_dict_from_json(filepath_responses)
evaluation_dict = load_dict_from_json(filepath_evaluations)

# Map llm responses to response category
map_response_to_category = {}
for (document_id, response_category, idx), response in responses_dict.items():
    map_response_to_category[remove_links(response)] = response_category

# Load dyads
with open(filepath_dyads, 'r') as file:
    dyads = json.load(file)

# Map community notes to response category
for document in dyads:
    summary = remove_links(document['note'])
    map_response_to_category[summary] = 'community_note'


""" Step 2: Format Comparisons """

# Initialize
evaluations = []

# Convert comparisons format
for (document_id, evaluation_category, response_A, response_B), result in evaluation_dict.items():

    # Map responses to their categories
    try:
        category_A = map_response_to_category[response_A]
        category_B = map_response_to_category[response_B]
    except:
        continue

    # Get winning category
    if result == 0:
        category_winning = category_A
    elif result == 1:
        category_winning = category_B
    else:
        continue

    # Append
    evaluations.append((document_id, category_A, category_B, category_winning))



""" Step 3: Aggregate by document """

# Initialize
evaluations_by_document = { document_id: [] for document_id, _, _, _ in evaluations }

# Populate
for document_id, category_A, category_B, category_winning in evaluations:
    evaluations_by_document[document_id].append((category_A, category_B, category_winning))


"""
Step 4: Quality Assurance

We only keep the evaluations where the last category is 'control' or the last two categories are 'community_note' and 'control'.
"""

# Initialize
evaluations_by_document_valid = {}

# Iterate over
for document_id, document_evaluations in evaluations_by_document.items():

    # Skip if empty
    if len(document_evaluations) == 0: continue

    # Initialize
    category_winning_counts = { category: { 'win': 0, 'total': 0 } for category in RESPONSE_GENERATION_CATEGORIES }

    # Iterate over
    for category_A, category_B, category_winning in document_evaluations:
        category_winning_counts[category_A]['total'] += 1
        category_winning_counts[category_B]['total'] += 1
        category_winning_counts[category_winning]['win'] += 1

    # Compute winning percentages
    for category in RESPONSE_GENERATION_CATEGORIES:
        total = category_winning_counts[category]['total']
        win = category_winning_counts[category]['win']
        category_winning_counts[category]['win_%'] = round(win / total, 3) if total > 0 else 0

    # Sort by winning percentage
    category_winning_counts = { k: v for k, v in sorted(category_winning_counts.items(), key=lambda item: item[1]['win_%'], reverse=True) }

    # Get ranking
    ranking = [ category for category in category_winning_counts.keys() ]

    # # Discard if invalid under QA
    # if ranking[-1] != 'control' and ranking[-1] != 'generic': continue

    # Store
    evaluations_by_document_valid[document_id] = ranking

# Log
print(f"Filtered {len(evaluations_by_document):,} -> {len(evaluations_by_document_valid):,} documents.")



""" Step 5: Compute Overall Ranking, and Statistics """

# Initialize overall statistics
category_statistics = { category: {'win': 0, 'total': 0, 'win_%': 0} for category in RESPONSE_GENERATION_CATEGORIES }

# Aggregate statistics across all documents
for document_id, rankings in evaluations_by_document_valid.items():
    for i, category in enumerate(rankings):
        category_statistics[category]['total'] += 1
        if i == 0:  # Top rank counts as a win
            category_statistics[category]['win'] += 1

# Compute win percentages
for category, stats in category_statistics.items():
    total = stats['total']
    win = stats['win']
    stats['win_%'] = round(win / total, 3) if total > 0 else 0

# Sort categories by win percentage
overall_ranking = [category for category, stats in sorted(category_statistics.items(), key=lambda x: x[1]['win_%'], reverse=True)]

# Log results
print("\nOverall Ranking by Win Percentage:")
print("-----------------------------------")
for category in overall_ranking:
    print(f"{category.ljust(24)} {category_statistics[category]['win_%']:.3f} (Wins: {category_statistics[category]['win']}, Total: {category_statistics[category]['total']})")


""" Step 6: Compute Friedman-like Test """

# Prepare data for Friedman-like test (ranking significance)
evaluation_matrix = []
for document_id, rankings in evaluations_by_document_valid.items():
    document_ranks = [RESPONSE_GENERATION_CATEGORIES.index(category) for category in rankings]
    evaluation_matrix.append(document_ranks)

evaluation_matrix = np.array(evaluation_matrix)

# Perform Friedman-like test manually
if evaluation_matrix.shape[0] > 1:
    k = evaluation_matrix.shape[1]  # Number of categories
    n = evaluation_matrix.shape[0]  # Number of documents

    # Compute rank sums for each category
    rank_sums = [0] * k
    for row in evaluation_matrix:
        for i, rank in enumerate(row):
            rank_sums[i] += rank

    # Compute test statistic
    chi_square = (12 / (n * k * (k + 1))) * sum((rank_sum - n * (k + 1) / 2) ** 2 for rank_sum in rank_sums)
    p_value = 1 - (1 if chi_square > 10 else 0)  # Simplified p-value approximation

    print(f"\nFriedman-like Test Results: Chi-square={chi_square:.3f}, p-value={p_value:.3f}")
    if p_value < 0.05:
        print("The differences among the categories are statistically significant.")
    else:
        print("No statistically significant differences among the categories.")


""" Step 6: Compute Confidence Intervals for Win Rates """

# Compute Confidence Intervals for Win Rates
confidence_intervals = {}
def bootstrap_mean(data, n_resamples=1000):
    means = []
    for _ in range(n_resamples):
        sample = [data[i] for i in np.random.randint(0, len(data), len(data))]
        means.append(sum(sample) / len(sample))
    means.sort()
    return round(means[int(0.025 * len(means))], 3), round(means[int(0.975 * len(means))], 3)

for category in RESPONSE_GENERATION_CATEGORIES:
    win_counts = category_statistics[category]['win']
    total_counts = category_statistics[category]['total']
    if total_counts > 0:
        data = [1] * win_counts + [0] * (total_counts - win_counts)
        confidence_intervals[category] = bootstrap_mean(data)
    else:
        confidence_intervals[category] = (0, 0)

# Sort
confidence_intervals = { k: v for k, v in sorted(confidence_intervals.items(), key=lambda item: item[1][0], reverse=True) }

print("\nConfidence Intervals for Win Rates:")
print("---------------------------------------")
for category, ci in confidence_intervals.items():
    print(f"{category.ljust(24)} {ci}")