In [None]:
import os
os.environ["WANDB_DISABLED"] = "true"
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
from sklearn.metrics import confusion_matrix, accuracy_score, classification_report, ConfusionMatrixDisplay
from sklearn.feature_extraction.text import CountVectorizer
from transformers import AutoTokenizer, AutoModelForSequenceClassification, Trainer, DataCollatorWithPadding
from datasets import Dataset
import torch

MODEL_PATH = "./my_finetuned_models/roberta_author_attribution_all_domain_optimized/checkpoint-9375"

LABEL_MAP = {
    "Llama-3.1": 0,
    "Qwen-2.5": 1,
    "Mistral-v0.3": 2,
    "Granite-3.3": 3,
    "GLM-4": 4
}
ID2LABEL = {v: k for k, v in LABEL_MAP.items()}

df_test = pd.read_csv("../all_domain_data/test_combined.csv")

# Standardize
if 'text' in df_test.columns: df_test.rename(columns={'text': 'source_text'}, inplace=True)
if 'summary' in df_test.columns: df_test.rename(columns={'summary': 'text'}, inplace=True)
if 'dataset' in df_test.columns: df_test.rename(columns={'dataset': 'domain'}, inplace=True)

df_test = df_test.dropna(subset=['text']).copy()
df_test['text'] = df_test['text'].astype(str)

# Map Labels
def encode_label(model_name):
    name = str(model_name).lower()
    if "llama" in name: return 0
    if "qwen" in name: return 1
    if "mistral" in name: return 2
    if "granite" in name: return 3
    if "glm" in name: return 4
    return -1

df_test['label'] = df_test['model'].apply(encode_label)
df_test = df_test[df_test['label'] != -1].copy()

tokenizer = AutoTokenizer.from_pretrained("roberta-large")
model = AutoModelForSequenceClassification.from_pretrained(MODEL_PATH, local_files_only=True)

def preprocess(examples):
    return tokenizer(examples["text"], truncation=True, max_length=512)

hf_test = Dataset.from_pandas(df_test)
tokenized_test = hf_test.map(preprocess, batched=True)
tokenized_test = tokenized_test.remove_columns([c for c in tokenized_test.column_names if c not in ['input_ids', 'attention_mask', 'label']])
tokenized_test.set_format("torch")

data_collator = DataCollatorWithPadding(tokenizer=tokenizer)
trainer = Trainer(model=model, data_collator=data_collator)
preds = trainer.predict(tokenized_test)

y_pred = np.argmax(preds.predictions, axis=1)
y_true = preds.label_ids

In [None]:
#Confusion Matrix
acc = accuracy_score(y_true, y_pred)
tick_labels = [ID2LABEL[i] for i in range(len(ID2LABEL))]
cm_norm = confusion_matrix(y_true, y_pred, normalize='true')
fig, ax = plt.subplots(figsize=(10, 8))

disp = ConfusionMatrixDisplay(
    confusion_matrix=cm_norm,
    display_labels=tick_labels
)
disp.plot(
    cmap='Blues', 
    ax=ax, 
    values_format='.2f', 
    colorbar=True
)

for labels in disp.text_.ravel():
    labels.set_color("black")
    labels.set_fontsize(12)
    labels.set_weight("bold")

plt.title(f'(Acc: {acc:.2%})')
plt.tight_layout()
plt.savefig('./all_domain_analysis_optimized/Figure1_Final_Robust.png')

In [None]:
# Length analysis

if 'pred_label' not in df_test.columns:
    df_test['pred_label'] = y_pred 

if 'is_correct' not in df_test.columns:
    df_test['is_correct'] = df_test['label'] == df_test['pred_label']

# Calculate Lengths, token counts
df_test['token_count'] = df_test['text'].apply(lambda x: len(tokenizer.encode(x)))

# Create Bins (Groups)
bins = [0, 100, 200, 300, 400, 500, 1000]
labels = ['0-100', '100-200', '200-300', '300-400', '400-500', '500+']
df_test['len_bin'] = pd.cut(df_test['token_count'], bins=bins, labels=labels)

# Calculate Accuracy per Bin
bin_acc = df_test.groupby('len_bin', observed=True)['is_correct'].mean()

plt.figure(figsize=(10, 6))
bars = bin_acc.plot(kind='bar', color='skyblue', edgecolor='black')
plt.title('All Domain Attribution Accuracy vs. Summary Length')
plt.xlabel('Summary Length (Tokens)')
plt.ylabel('Accuracy')
plt.ylim(0, 1.0)
plt.grid(axis='y', linestyle='--', alpha=0.7)

# Add numbers on top of bars
for p in bars.patches:
    bars.annotate(f'{p.get_height():.2f}', 
                   (p.get_x() + p.get_width() / 2., p.get_height()), 
                   ha = 'center', va = 'center', 
                   xytext = (0, 9), 
                   textcoords = 'offset points')

plt.tight_layout()
plt.savefig('./all_domain_analysis_optimized/Figure2_LengthAnalysis.png')

In [None]:

# Add Labels
df_test['true_label_id'] = y_true
df_test['pred_label_id'] = y_pred
df_test['prediction_name'] = df_test['pred_label_id'].map(ID2LABEL)

df_test['is_correct'] = df_test['true_label_id'] == df_test['pred_label_id']
probs = torch.nn.functional.softmax(torch.tensor(preds.predictions), dim=-1).numpy()
df_test['confidence_score'] = np.max(probs, axis=1)

output_filename = "./all_domain_analysis_optimized/test_results_with_predictions.csv"
df_test.to_csv(output_filename, index=False)

# Isolate & Save Errors
df_errors = df_test[~df_test['is_correct']].copy()
df_errors = df_errors.sort_values('confidence_score', ascending=False)

error_filename = "./all_domain_analysis_optimized/wrong_predictions_only.csv"
df_errors.to_csv(error_filename, index=False)

# Quick Sample
if len(df_errors) > 0:
    top_error = df_errors.iloc[0]
    print("\n--- Most Common Mistake ---")
    print(f"True: {top_error['model']} | Pred: {top_error['prediction_name']}")
    print(f"Confidence: {top_error['confidence_score']:.4f}")
    print(f"Text: {str(top_error['text'])[:150]}...")

In [None]:

df_full = pd.read_csv("./all_domain_analysis_optimized/test_results_with_predictions.csv").dropna(subset=['text'])
df_errors = pd.read_csv("./all_domain_analysis_optimized/wrong_predictions_only.csv").dropna(subset=['text'])

df_full['text'] = df_full['text'].astype(str).str.lower()
df_errors['text'] = df_errors['text'].astype(str).str.lower()

LABEL_ORDER = ["Llama-3.1", "Qwen-2.5", "Mistral-v0.3", "Granite-3.3", "GLM-4"]
TICK_LABELS = LABEL_ORDER

# Error Heat map
cm_errors = confusion_matrix(
    df_errors['label'], 
    df_errors['pred_label_id'], 
    labels=range(len(TICK_LABELS))
)

fig, ax = plt.subplots(figsize=(10, 8))
disp = ConfusionMatrixDisplay(confusion_matrix=cm_errors, display_labels=TICK_LABELS)
disp.plot(cmap='Reds', ax=ax, values_format='d', colorbar=True)

# Styling
for labels in disp.text_.ravel():
    labels.set_color("black")
    labels.set_fontsize(12)
    if labels.get_text() != '0': labels.set_weight("bold")

plt.title('All Domain Heatmap of Misclassifications (Counts)')
plt.ylabel('True Model (Actual)')
plt.xlabel('Predicted Model (The Mistake)')
plt.tight_layout()
plt.savefig('./all_domain_analysis_optimized/Error_Heatmap.png')


In [None]:
#Uncomment if data is not in memory
# df_full = pd.read_csv("test_results_with_predictions.csv").dropna(subset=['text'])
# df_errors = pd.read_csv("wrong_predictions_only.csv").dropna(subset=['text'])
# df_full['text'] = df_full['text'].astype(str).str.lower()
# df_errors['text'] = df_errors['text'].astype(str).str.lower()

# Deceptive Words
print("\n" + "="*50)
print("Top 10 'Deceptive Words' per model")
print("(When this model is misclassified, these words appear most)")
print("="*50)

for model_name in df_errors['model'].unique():
    subset = df_errors[df_errors['model'] == model_name]
    
    if len(subset) < 1: continue
        
    print(f"\nModel: {model_name}")
    print(f"Total Errors: {len(subset)}")
    
    if 'prediction_name' in subset.columns:
        top_impostor = subset['prediction_name'].mode()[0]
        print(f"Most often confused with: {top_impostor}")

    try:
        vectorizer = CountVectorizer(stop_words='english', max_features=10, ngram_range=(1, 2))
        X = vectorizer.fit_transform(subset['text'])
        word_counts = np.asarray(X.sum(axis=0)).flatten()
        feature_names = vectorizer.get_feature_names_out()
        
        sorted_idx = word_counts.argsort()[::-1]
        
        print("Top 10 Trigger Words:")
        for i in range(len(feature_names)):
            idx = sorted_idx[i]
            print(f"  {i+1}. {feature_names[idx]} ({word_counts[idx]})")
            
    except ValueError:
        print("Not enough text data to analyze keywords")
        
    print("-" * 40)


# Word Frequency count
print("\n" + "="*50)
print("Top 10 Most Frequent Words Per Model (all samples)")
print("="*50)

for model_name in df_full['model'].unique():
    subset = df_full[df_full['model'] == model_name]
    
    vectorizer = CountVectorizer(stop_words='english', max_features=10, ngram_range=(1, 2), max_df=0.9)
    
    try:
        X = vectorizer.fit_transform(subset['text'])
        word_counts = np.asarray(X.sum(axis=0)).flatten()
        feature_names = vectorizer.get_feature_names_out()
        sorted_idx = word_counts.argsort()[::-1]
        
        print(f"\nModel: {model_name}")
        for i in range(len(feature_names)):
            idx = sorted_idx[i]
            print(f"  {i+1}. {feature_names[idx]} ({word_counts[idx]})")
            
    except ValueError:
        pass

### Analysis By Domain

In [None]:

output_folder = "domain_analysis_plots"
os.makedirs(output_folder, exist_ok=True)

try:
    df_full = pd.read_csv("test_results_with_predictions.csv").dropna(subset=['text'])
    df_errors = pd.read_csv("wrong_predictions_only.csv").dropna(subset=['text'])
    
    # Normalize text
    df_full['text'] = df_full['text'].astype(str).str.lower()
    df_errors['text'] = df_errors['text'].astype(str).str.lower()
    
except FileNotFoundError:
    print("Error: Files not found.")
    exit()

LABEL_ORDER = ["Llama-3.1", "Qwen-2.5", "Mistral-v0.3", "Granite-3.3", "GLM-4"]
TICK_LABELS = LABEL_ORDER

# Loop data analysis for each specific domain
unique_domains = df_full['domain'].unique()

for domain in unique_domains:
    print(f"\n{'='*40}")
    print(f"Processing Domain: {domain.upper()}")
    print(f"{'='*40}")
    
    # Filter Data
    domain_full = df_full[df_full['domain'] == domain]
    domain_errors = df_errors[df_errors['domain'] == domain]
    
    if len(domain_full) == 0: continue

    #Accuracy Matrix
    print(f"  Generating Accuracy Matrix for {domain}")
    cm_acc = confusion_matrix(
        domain_full['label'], 
        domain_full['pred_label_id'], 
        labels=range(len(TICK_LABELS))
    )
    
    fig, ax = plt.subplots(figsize=(10, 8))
    disp = ConfusionMatrixDisplay(confusion_matrix=cm_acc, display_labels=TICK_LABELS)
    disp.plot(cmap='Blues', ax=ax, values_format='d', colorbar=True)
    
    for labels in disp.text_.ravel():
        labels.set_color("black")
        if labels.get_text() != '0': labels.set_weight("bold")
        
    plt.title(f'Accuracy Matrix: {domain.capitalize()} Domain')
    plt.tight_layout()
    filename_acc = f"{output_folder}/Matrix_Accuracy_{domain}.png"
    plt.savefig(filename_acc)

    # Error Heat map
    print(f"  Generating Error Heatmap for {domain}")
    
    if len(domain_errors) > 0:
        cm_err = confusion_matrix(
            domain_errors['label'], 
            domain_errors['pred_label_id'], 
            labels=range(len(TICK_LABELS))
        )
        np.fill_diagonal(cm_err, 0)
        
        fig, ax = plt.subplots(figsize=(10, 8))
        disp = ConfusionMatrixDisplay(confusion_matrix=cm_err, display_labels=TICK_LABELS)
        disp.plot(cmap='Reds', ax=ax, values_format='d', colorbar=True)
        
        for labels in disp.text_.ravel():
            labels.set_color("black")
            if labels.get_text() != '0': labels.set_weight("bold")
            
        plt.title(f'Error Heatmap: {domain.capitalize()} Domain (Mistakes Only)')
        plt.tight_layout()
        filename_err = f"{output_folder}/Matrix_Error_{domain}.png"
        plt.savefig(filename_err)
        plt.close()
        print(f"  -> Saved {filename_err}")
    else:
        print(" No errors")

    #word frequency table
    print(f"\n Word Analysis for {domain} ")
    
    actual_model_names = df_full['model'].unique()

    # Deceptive Words (From Errors)
    print(" Top Trigger Words:")
    for model_name in actual_model_names:
        subset = domain_errors[domain_errors['model'] == model_name]
        
        if len(subset) > 0:
            try:
                vectorizer = CountVectorizer(stop_words='english', max_features=10, ngram_range=(1, 2))
                X = vectorizer.fit_transform(subset['text'])
                
                # Calculate counts
                word_counts = np.asarray(X.sum(axis=0)).flatten()
                feature_names = vectorizer.get_feature_names_out()
                word_freq = list(zip(feature_names, word_counts))
                word_freq.sort(key=lambda x: x[1], reverse=True)
                
                formatted_list = [f"{word} ({count})" for word, count in word_freq]
                print(f"    {model_name}: {', '.join(formatted_list)}")
            except ValueError:
                pass 

    # Full Data
    print("Baseline Words (Natural vocabulary):")
    for model_name in actual_model_names:
        subset = domain_full[domain_full['model'] == model_name]
        
        if len(subset) > 0:
            try:
                vectorizer = CountVectorizer(stop_words='english', max_features=10, ngram_range=(1, 2), max_df=0.9)
                X = vectorizer.fit_transform(subset['text'])
                
                word_counts = np.asarray(X.sum(axis=0)).flatten()
                feature_names = vectorizer.get_feature_names_out()
                
                word_freq = list(zip(feature_names, word_counts))
                word_freq.sort(key=lambda x: x[1], reverse=True)
                
                formatted_list = [f"{word} ({count})" for word, count in word_freq]
                print(f"    {model_name}: {', '.join(formatted_list)}")
            except ValueError:
                pass