<a href="https://colab.research.google.com/github/jamesm2002/tone_based_counterspeech_models/blob/main/modelTestingandComparison.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Model Testing and Comparison

This is used to read in both models and generate counterspeech outputs. It is also used to generate NLP metrics

In [None]:
!pip install transformers sentence-transformers better_profanity torch datasets accelerate
!pip install torch nltk rouge-score transformers

In [1]:
# all imports needed
import torch
import random
import re
import pandas as pd
from transformers import AutoModelForCausalLM, AutoTokenizer, T5Tokenizer, T5ForConditionalGeneration
from sentence_transformers import SentenceTransformer, util
from transformers import pipeline


In [None]:
device = "cuda" if torch.cuda.is_available() else "cpu"

# load fine-tuned counter-speech model
model_path = "model/t5-counterspeech-tone-model"
tokenizer = T5Tokenizer.from_pretrained(model_path)
model = T5ForConditionalGeneration.from_pretrained(model_path).to(device)

# load LLaMA 2 model & tokenizer hugging face token is required
HF_TOKEN = ""
llama_model_name = "meta-llama/Llama-2-7b-chat-hf"
llama_tokenizer = AutoTokenizer.from_pretrained(llama_model_name, use_auth_token=HF_TOKEN)
llama_model = AutoModelForCausalLM.from_pretrained(llama_model_name, torch_dtype=torch.float16, device_map="auto", use_auth_token=HF_TOKEN)

# load NLP utilities
similarity_model = SentenceTransformer("all-MiniLM-L6-v2", device=device)
sentiment_model = pipeline("sentiment-analysis", device=0 if torch.cuda.is_available() else -1)

print("Models Loaded Successfully!")


In [None]:
from collections import Counter
from better_profanity import profanity

# function to remove numbers from hate speech
def clean_hate_speech(text):
    return re.sub(r"^\d+\\.\\s*", "", text)

# function to check if response is clean
# excludes gay as this is not inherently offensive if used in the right context
def is_clean(response):
    words_to_exclude = {"gay", "gays"}
    words_in_text = set(response.lower().split())
    return not any(word for word in words_in_text if word not in words_to_exclude and profanity.contains_profanity(word))

# function to calculate response relevance
def contains_relevant_terms(hate_speech, response):
    hate_speech_embedding = similarity_model.encode(hate_speech, convert_to_tensor=True)
    response_embedding = similarity_model.encode(response, convert_to_tensor=True)
    cosine_similarity = util.pytorch_cos_sim(hate_speech_embedding, response_embedding)[0][0]
    return cosine_similarity.item()

# function to score sentiment
def score_sentiment(response):
    result = sentiment_model(response)[0]
    if result['label'] == "POSITIVE":
        return 1
    elif result['label'] == "NEGATIVE":
        return 0
    return 0.5

# function to calculate n-gram repetition penalty
def ngram_repetition_penalty(response, n=2):
    words = response.split()
    ngrams = [tuple(words[i:i+n]) for i in range(len(words)-n+1)]
    ngram_counts = Counter(ngrams)
    repetition_penalty = sum((count - 1) for count in ngram_counts.values() if count > 1)
    return repetition_penalty * 0.1

# function to select best semantic response
def best_semantic_response(hate_speech, responses):
    filtered_responses = [r for r in responses if is_clean(r)]
    if not filtered_responses:
        return {"response": "No suitable response found.", "total_score": 0}, []

    response_scores = []
    for response in filtered_responses:
        repetition_score = ngram_repetition_penalty(response)
        sentiment = score_sentiment(response) * 0.1
        relevance_score = contains_relevant_terms(hate_speech, response) * 0.5
        total_score = sentiment + relevance_score - repetition_score
        response_scores.append({
            'response': response,
            'relevance_score': relevance_score,
            'sentiment_score': sentiment,
            'repetition_penalty': repetition_score,
            'total_score': total_score
        })

    best_response = max(response_scores, key=lambda x: x['total_score'])
    return best_response, response_scores



In [None]:
# tone instructions
tone_instructions = {
    "Inquisitive": "Ask a thoughtful question about the statement:",
    "Confrontational": "Strongly refute the statement with counter-evidence:",
    "Empathetic": "Acknowledge the concern but provide a hopeful perspective:",
    "Conversational": "Respond in a casual and friendly way:"
}

# function to generate counter-speech using fine-tuned T5 model
def generate_counter_speech(hate_speech, tone):
    prompt = f"{tone_instructions[tone]} {hate_speech}"
    input_ids = tokenizer(prompt, return_tensors="pt").input_ids.to(device)

    responses = []
    for _ in range(3):
        output_ids = model.generate(
            input_ids,
            do_sample=True,
            max_length=150,
            num_beams=3,
            temperature=1.4,
            top_p=0.85,
            top_k=30,
            repetition_penalty=1.2
        )
        response = tokenizer.decode(output_ids[0], skip_special_tokens=True)
        responses.append(response)

    best_response, response_scores = best_semantic_response(hate_speech, responses)
    return best_response

# function to generate
def one_shot_generate_llama(hate_speech, tone):
    instruction = tone_instructions.get(tone, "Respond in a neutral way:")

    prompt = f"""
    Statement: "{hate_speech}"

    Your response (in a {tone.lower()} tone, max 35 words):
    """

    input_ids = llama_tokenizer.encode(prompt, return_tensors="pt").to(device)

    if torch.cuda.is_available():
        llama_model.to("cuda")

    with torch.no_grad():
        output_ids = llama_model.generate(
            input_ids,
            temperature=0.8,  # keeps responses varied but relevant
            top_p=0.85,  # keeps responses coherent
            top_k=30,  # reduces likelihood of repeating the prompt
            repetition_penalty=1.3,  # prevents unnecessary repetition
            do_sample=True
        )

    response = llama_tokenizer.decode(output_ids[0], skip_special_tokens=True).strip()

    if "max 35 words):" in response:
        response = response.split("max 35 words):")[-1].strip()

    # ensure 35-word limit
    response_words = response.split()
    return " ".join(response_words[:35]) + ("..." if len(response_words) > 35 else "")

In [None]:
# test output for both models
import random

# define output file path
output_file = "/counter_speech_output.txt"

# example hate speech input
hate_speech_example = "Muslims should not live here because it is impossible to assimilate them."

# list of tones to generate responses for
tones = ["Conversational", "Confrontational", "Inquisitive", "Empathetic"]

# write results to file
with open(output_file, "w", encoding="utf-8") as out_f:
    out_f.write("Hate Speech Example Responses\n\n")

    for tone in tones:
        best_response, response_scores = generate_counter_speech(hate_speech_example, tone)
        response_llama = one_shot_generate_llama(hate_speech_example, tone)

        out_f.write(f"Tone: {tone}\n")
        out_f.write(f"Fine-tuned Model: {best_response['response']} ({best_response['total_score']:.2f})\n")
        out_f.write(f"LLaMA 2 Response: {response_llama}\n\n")

print(f"Counter-speech results saved to {output_file}")


In [None]:
!pip install bert-score

In [None]:
# used to get rougue and bleu score

import random
import nltk
from tqdm import tqdm
from nltk.translate.bleu_score import sentence_bleu, SmoothingFunction
from rouge_score import rouge_scorer
import pandas as pd

nltk.download('punkt')

# load dataset
file_path = "data/test_counterspeech.csv"
df = pd.read_csv(file_path)

# define available tones
tones = ["Empathetic", "Inquisitive", "Confrontational", "Conversational"]

# function to compute BLEU score
def compute_bleu(reference, generated):
    reference_tokens = [nltk.word_tokenize(reference.lower())]
    generated_tokens = nltk.word_tokenize(generated.lower())

    smoothie = SmoothingFunction().method4
    return sentence_bleu(reference_tokens, generated_tokens, smoothing_function=smoothie)

# function to compute ROUGE-L score
def compute_rouge_l(reference, generated):
    scorer = rouge_scorer.RougeScorer(["rougeL"], use_stemmer=True)
    scores = scorer.score(reference, generated)
    return scores["rougeL"].fmeasure

# Dictionary to store scores for each tone
results = {tone: {"bleu_t5": [], "rouge_t5": [], "bleu_llama": [], "rouge_llama": []} for tone in tones}

print("\n--- Running Evaluation ---")

for tone in tones:
    print(f"\n🔹 Evaluating tone: {tone}")
    first_output_logged = False

    # randomly select 100 samples
    sampled_df = df.sample(n=100, random_state=42)

    for i, row in tqdm(sampled_df.iterrows(), total=len(sampled_df), desc=f"Processing ({tone})", unit="sample"):
        hate_speech = row["HATE_SPEECH"]
        reference = row["COUNTER_NARRATIVE"]

        generated_t5 = generate_counter_speech(hate_speech, tone=tone)
        generated_llama = one_shot_generate_llama(hate_speech, tone=tone)

        if isinstance(generated_t5, dict):
            generated_t5 = generated_t5.get("response", "")

        if not first_output_logged:
            print("\n📝 First Generated Output:")
            print(f"  T5 Output Type: {type(generated_t5)}, Value: {generated_t5}")
            print(f"  LLaMA Output Type: {type(generated_llama)}, Value: {generated_llama}")
            first_output_logged = True

        if i % 25 == 0:
            print(f"\n📝 Entry {i}:")
            print(f"  T5 Output Type: {type(generated_t5)}, Value: {generated_t5}")
            print(f"  LLaMA Output Type: {type(generated_llama)}, Value: {generated_llama}")

        # compute scores
        results[tone]["bleu_t5"].append(compute_bleu(reference, generated_t5))
        results[tone]["rouge_t5"].append(compute_rouge_l(reference, generated_t5))

        results[tone]["bleu_llama"].append(compute_bleu(reference, generated_llama))
        results[tone]["rouge_llama"].append(compute_rouge_l(reference, generated_llama))

# compute and print average scores per tone
overall_bleu_t5, overall_rouge_t5, overall_bleu_llama, overall_rouge_llama = [], [], [], []

print("\n--- Model Evaluation Results ---")
for tone in tones:
    avg_bleu_t5 = sum(results[tone]["bleu_t5"]) / len(results[tone]["bleu_t5"])
    avg_rouge_t5 = sum(results[tone]["rouge_t5"]) / len(results[tone]["rouge_t5"])

    avg_bleu_llama = sum(results[tone]["bleu_llama"]) / len(results[tone]["bleu_llama"])
    avg_rouge_llama = sum(results[tone]["rouge_llama"]) / len(results[tone]["rouge_llama"])

    overall_bleu_t5.extend(results[tone]["bleu_t5"])
    overall_rouge_t5.extend(results[tone]["rouge_t5"])
    overall_bleu_llama.extend(results[tone]["bleu_llama"])
    overall_rouge_llama.extend(results[tone]["rouge_llama"])

    print(f"\n Results for Tone: {tone}")
    print(f"  T5 Model - BLEU: {avg_bleu_t5:.4f}, ROUGE-L: {avg_rouge_t5:.4f}")
    print(f"  LLaMA Model - BLEU: {avg_bleu_llama:.4f}, ROUGE-L: {avg_rouge_llama:.4f}")

final_bleu_t5 = sum(overall_bleu_t5) / len(overall_bleu_t5)
final_rouge_t5 = sum(overall_rouge_t5) / len(overall_rouge_t5)
final_bleu_llama = sum(overall_bleu_llama) / len(overall_bleu_llama)
final_rouge_llama = sum(overall_rouge_llama) / len(overall_rouge_llama)

print("\nOverall Average Scores Across All Tones:")
print(f"  T5 Model - BLEU: {final_bleu_t5:.4f}, ROUGE-L: {final_rouge_t5:.4f}")
print(f"  LLaMA Model - BLEU: {final_bleu_llama:.4f}, ROUGE-L: {final_rouge_llama:.4f}")



In [None]:
import random
import nltk
from tqdm import tqdm
from bert_score import score
import pandas as pd

nltk.download('punkt')

# load dataset
file_path = "data/test_counterspeech.csv"
df = pd.read_csv(file_path)

tones = ["Empathetic", "Inquisitive", "Confrontational", "Conversational"]

# function to compute BERTScore
def compute_bertscore(reference, generated):
    P, R, F1 = score(
        [generated], [reference],
        model_type="microsoft/deberta-base-mnli",
        rescale_with_baseline=False
    )
    return F1.item()


# dictionary to store BERTScores for each tone
results = {tone: {"bertscore_t5": [], "bertscore_llama": []} for tone in tones}

print("\n--- Running BERTScore Evaluation ---")

for tone in tones:
    print(f"\n🔹 Evaluating tone: {tone}")
    first_output_logged = False

    sampled_df = df.sample(n=100, random_state=42)

    for i, row in tqdm(sampled_df.iterrows(), total=len(sampled_df), desc=f"Processing ({tone})", unit="sample"):
        hate_speech = row["HATE_SPEECH"]
        reference = row["COUNTER_NARRATIVE"]

        generated_t5 = generate_counter_speech(hate_speech, tone=tone)
        generated_llama = one_shot_generate_llama(hate_speech, tone=tone)

        if isinstance(generated_t5, dict):
            generated_t5 = generated_t5.get("response", "")

        if not first_output_logged:
            print("\nFirst Generated Output:")
            print(f"  T5 Output Type: {type(generated_t5)}, Value: {generated_t5}")
            print(f"  LLaMA Output Type: {type(generated_llama)}, Value: {generated_llama}")
            first_output_logged = True

        if i % 25 == 0:
            print(f"\n Entry {i}:")
            print(f"  T5 Output Type: {type(generated_t5)}, Value: {generated_t5}")
            print(f"  LLaMA Output Type: {type(generated_llama)}, Value: {generated_llama}")

        results[tone]["bertscore_t5"].append(compute_bertscore(reference, generated_t5))
        results[tone]["bertscore_llama"].append(compute_bertscore(reference, generated_llama))

overall_bertscore_t5, overall_bertscore_llama = [], []

print("\n--- BERTScore Evaluation Results ---")
for tone in tones:
    avg_bertscore_t5 = sum(results[tone]["bertscore_t5"]) / len(results[tone]["bertscore_t5"])
    avg_bertscore_llama = sum(results[tone]["bertscore_llama"]) / len(results[tone]["bertscore_llama"])

    overall_bertscore_t5.extend(results[tone]["bertscore_t5"])
    overall_bertscore_llama.extend(results[tone]["bertscore_llama"])

    print(f"\n Results for Tone: {tone}")
    print(f"  T5 Model - BERTScore: {avg_bertscore_t5:.4f}")
    print(f"  LLaMA Model - BERTScore: {avg_bertscore_llama:.4f}")

# compute overall average across all tones
final_bertscore_t5 = sum(overall_bertscore_t5) / len(overall_bertscore_t5)
final_bertscore_llama = sum(overall_bertscore_llama) / len(overall_bertscore_llama)

print("\n Overall Average BERTScore Across All Tones:")
print(f"  T5 Model - BERTScore: {final_bertscore_t5:.4f}")
print(f"  LLaMA Model - BERTScore: {final_bertscore_llama:.4f}")
