In [None]:
import json

with open("results_test.json", "r") as f:
    results = json.load(f)
results[0]

In [None]:
with open("dataset/rag_truth_span.json", "r") as f:
    test_data_span = json.load(f)

### Preparation for analysis

In [None]:
# add token count
from transformers import AutoTokenizer
from tqdm import tqdm

tokenizer = AutoTokenizer.from_pretrained("FacebookAI/roberta-base")
for i in range(len(results)):
    d = test_data_span[i]
    hal_tok = 0
    if d["labels"] == 1:
        for h in d["hallucination_id"]:
            hal_tok += len(tokenizer(h["text"], truncation=True, max_length=512)["input_ids"])
    results[i]["hal_token"] = hal_tok
    results[i]["all_token"] = len(tokenizer(d["text"], truncation=True, max_length=512)["input_ids"])

In [None]:
# If filter with specific task
results_new = []
test_data_span_new = []

for i in range(len(results)):
    if results[i]["task"] == "Summary":
        results_new.append(results[i])
        test_data_span_new.append(test_data_span[i])
results = results_new
test_data_span = test_data_span_new

len(results)

In [None]:
# id_abc : a: before_predict, b: after_predict, c: label
id_000 = []
id_001 = []
id_010 = []
id_011 = []
id_100 = []
id_101 = []
id_110 = []
id_111 = []

before = "gpt4o"
after = "triplet_rob_label"
for i, result in enumerate(results):
    if result[before] == 0 and result[after] == 0 and result["label"] == 0:
        id_000.append(i)
    elif result[before] == 0 and result[after] == 0 and result["label"] == 1:
        id_001.append(i)
    elif result[before] == 0 and result[after] == 1 and result["label"] == 0:
        id_010.append(i)
    elif result[before] == 0 and result[after] == 1 and result["label"] == 1:
        id_011.append(i)
    elif result[before] == 1 and result[after] == 0 and result["label"] == 0:
        id_100.append(i)
    elif result[before] == 1 and result[after] == 0 and result["label"] == 1:
        id_101.append(i)
    elif result[before] == 1 and result[after] == 1 and result["label"] == 0:
        id_110.append(i)
    elif result[before] == 1 and result[after] == 1 and result["label"] == 1:
        id_111.append(i)


print(len(id_000), len(id_001), len(id_010), len(id_011), len(id_100), len(id_101), len(id_110), len(id_111))

### Performance

In [None]:
from sklearn.metrics import accuracy_score, f1_score, recall_score, precision_score

true_labels = [item["label"] for item in results]
predicted_labels = [item["triplet_rob_label"] for item in results]

accuracy = accuracy_score(true_labels, predicted_labels)
recall = recall_score(true_labels, predicted_labels)
precision = precision_score(true_labels, predicted_labels)
f1 = f1_score(true_labels, predicted_labels, average="binary") 

print(f"Accuracy: {accuracy}")
print(f"Precision: {precision}")
print(f"Recall: {recall}")
print(f"F1-score: {f1}")

### Effect of Hallucinating Models

In [None]:
# Number of hallucinated cases in each model
hal_model_qa = {
    "gpt-3.5-turbo-0613": 0,
    "gpt-4-0613": 0,
    "llama-2-7b-chat": 0,
    "llama-2-13b-chat": 0,
    "llama-2-70b-chat": 0,
    "mistral-7B-instruct": 0,
}
hal_model_d2t = {
    "gpt-3.5-turbo-0613": 0,
    "gpt-4-0613": 0,
    "llama-2-7b-chat": 0,
    "llama-2-13b-chat": 0,
    "llama-2-70b-chat": 0,
    "mistral-7B-instruct": 0,
}
hal_model_sum = {
    "gpt-3.5-turbo-0613": 0,
    "gpt-4-0613": 0,
    "llama-2-7b-chat": 0,
    "llama-2-13b-chat": 0,
    "llama-2-70b-chat": 0,
    "mistral-7B-instruct": 0,
}

for d in test_data_span:
    if d["hallucination_id"] == []:
        continue
    if d["task_type"] == "QA":
        hal_model_qa[d["model"]] += 1
    elif d["task_type"] == "Data2txt":
        hal_model_d2t[d["model"]] += 1
    else:
        hal_model_sum[d["model"]] += 1
print(hal_model_qa)
print(hal_model_d2t)
print(hal_model_sum)

In [None]:
# Number of hallucinated cases detected by after_method
hal_model_qa_imp = {
    "gpt-3.5-turbo-0613": 0,
    "gpt-4-0613": 0,
    "llama-2-7b-chat": 0,
    "llama-2-13b-chat": 0,
    "llama-2-70b-chat": 0,
    "mistral-7B-instruct": 0,
}
hal_model_d2t_imp = {
    "gpt-3.5-turbo-0613": 0,
    "gpt-4-0613": 0,
    "llama-2-7b-chat": 0,
    "llama-2-13b-chat": 0,
    "llama-2-70b-chat": 0,
    "mistral-7B-instruct": 0,
}
hal_model_sum_imp = {
    "gpt-3.5-turbo-0613": 0,
    "gpt-4-0613": 0,
    "llama-2-7b-chat": 0,
    "llama-2-13b-chat": 0,
    "llama-2-70b-chat": 0,
    "mistral-7B-instruct": 0,
}

for i in id_011 + id_111:
    d = test_data_span[i]
    if d["hallucination_id"] == []:
        continue

    if d["task_type"] == "QA":
        hal_model_qa_imp[d["model"]] += 1
    elif d["task_type"] == "Data2txt":
        hal_model_d2t_imp[d["model"]] += 1
    else:
        hal_model_sum_imp[d["model"]] += 1

print(hal_model_qa_imp)
print(hal_model_d2t_imp)
print(hal_model_sum_imp)

In [None]:
import numpy as np
from sklearn.metrics import f1_score

def compute_metrics_model(predictions, labels, models, task=None, task_type=None):
    models_list = [
        "gpt-3.5-turbo-0613",
        "gpt-4-0613",
        "llama-2-7b-chat",
        "llama-2-13b-chat",
        "llama-2-70b-chat",
        "mistral-7B-instruct",
    ]
    f1_s = []
    for model in models_list:
        p_sub = []
        l_sub = []
        for i in range(len(predictions)):
            if models[i] == model:
                if task and task_type[i] != task:
                    continue
                p_sub.append(predictions[i])
                l_sub.append(labels[i])
        f1_s.append(f1_score(l_sub, p_sub, average="binary"))

    return f1_s

In [None]:
true_labels = [r["label"] for r in results]
before_labels = [r["gpt4o"] for r in results]
after_labels = [r["triplet_rob_label"] for r in results]
models = [t["model"] for t in test_data_span]
tasks = [r["task"] for r in results]

print(compute_metrics_model(before_labels, true_labels, models))
print(compute_metrics_model(after_labels, true_labels, models))
print("QA")
print(compute_metrics_model(before_labels, true_labels, models, "QA", tasks))
print(compute_metrics_model(after_labels, true_labels, models, "QA", tasks))
print("Data2txt")
print(compute_metrics_model(before_labels, true_labels, models, "Data2txt", tasks))
print(compute_metrics_model(after_labels, true_labels, models, "Data2txt", tasks))
print("Summary")
print(compute_metrics_model(before_labels, true_labels, models, "Summary", tasks))
print(compute_metrics_model(after_labels, true_labels, models, "Summary", tasks))

### Number of Hallucinations

In [None]:
# hallucinated token ratio analysis
import numpy as np
import matplotlib.pyplot as plt

after_suc_hal = id_111 + id_011  # hallucinated cases that after_method success to detect
after_fai_hal = id_001 + id_101
before_suc_hal = id_111 + id_101
before_fai_hal = id_001 + id_011


after_suc_len = []
after_fai_len = []
before_suc_len = []
before_fai_len = []
for i in after_suc_hal:
    d = test_data_span[i]
    after_suc_len.append(d["hal_token"] / d["all_token"])
for i in after_fai_hal:
    d = test_data_span[i]
    after_fai_len.append(d["hal_token"] / d["all_token"])
for i in before_suc_hal:
    d = test_data_span[i]
    before_suc_len.append(d["hal_token"] / d["all_token"])
for i in before_fai_hal:
    d = test_data_span[i]
    before_fai_len.append(d["hal_token"] / d["all_token"])

# x-axis : hallucination token ratio
bins = np.linspace(0, 1, 6)
bin_centers = (bins[:-1] + bins[1:]) / 2

after_suc_hist, _ = np.histogram(after_suc_len, bins=bins)
after_fai_hist, _ = np.histogram(after_fai_len, bins=bins)
before_suc_hist, _ = np.histogram(before_suc_len, bins=bins)
before_fai_hist, _ = np.histogram(before_fai_len, bins=bins)

# calculate success ratio
total = after_suc_hist + after_fai_hist  # total number of hallucinated cases
with np.errstate(divide="ignore", invalid="ignore"):
    after_ratio = np.divide(after_suc_hist, total, out=np.zeros_like(after_suc_hist, dtype=float), where=total != 0)
    before_ratio = np.divide(before_suc_hist, total, out=np.zeros_like(before_suc_hist, dtype=float), where=total != 0)


# Plotting
fig, ax1 = plt.subplots(figsize=(8, 6))

# left axis: success ratio
ax1.plot(bin_centers, after_ratio, label="triplet (success rate)")
ax1.plot(bin_centers, before_ratio, label="gpt4o (success rate)")
ax1.set_xlabel("hallucination token ratio")
ax1.set_ylabel("success ratio", color="black")
ax1.tick_params(axis="y", labelcolor="black")
ax1.grid(True)

# right axis: case count
ax2 = ax1.twinx()
bar_width = (bins[1] - bins[0]) * 0.7
ax2.bar(bin_centers, total, width=bar_width, alpha=0.3, label="case count")
ax2.set_ylabel("case count", color="black")
ax2.tick_params(axis="y", labelcolor="black")

fig.legend(loc="upper center", bbox_to_anchor=(0.5, 1.05), ncol=2)
plt.tight_layout()
plt.show()

In [None]:
# Number of hallucinations in each case
import numpy as np
import matplotlib.pyplot as plt

after_suc_hal = id_111 + id_011
after_fai_hal = id_001 + id_101
before_suc_hal = id_111 + id_101
before_fai_hal = id_001 + id_011


after_suc_len = []
after_fai_len = []
before_suc_len = []
before_fai_len = []
for i in after_suc_hal:
    d = test_data_span[i]
    after_suc_len.append(len(d["hallucination_id"]))
for i in after_fai_hal:
    d = test_data_span[i]
    after_fai_len.append(len(d["hallucination_id"]))
for i in before_suc_hal:
    d = test_data_span[i]
    before_suc_len.append(len(d["hallucination_id"]))
for i in before_fai_hal:
    d = test_data_span[i]
    before_fai_len.append(len(d["hallucination_id"]))

# x-axis : number of hallucinations
bins = np.linspace(0.5, 8.5, 9)
bin_centers = (bins[:-1] + bins[1:]) / 2

after_suc_hist, _ = np.histogram(after_suc_len, bins=bins)
after_fai_hist, _ = np.histogram(after_fai_len, bins=bins)
before_suc_hist, _ = np.histogram(before_suc_len, bins=bins)
before_fai_hist, _ = np.histogram(before_fai_len, bins=bins)

# calculate success ratio
total = after_suc_hist + after_fai_hist
with np.errstate(divide="ignore", invalid="ignore"):
    after_ratio = np.divide(after_suc_hist, total, out=np.zeros_like(after_suc_hist, dtype=float), where=total != 0)
    before_ratio = np.divide(before_suc_hist, total, out=np.zeros_like(before_suc_hist, dtype=float), where=total != 0)


fig, ax1 = plt.subplots(figsize=(8, 6))

# left axis: success ratio
ax1.plot(bin_centers, after_ratio, label="triplet (success rate)")
ax1.plot(bin_centers, before_ratio, label="gpt4o (success rate)")
ax1.set_xlabel("hallucination num")
ax1.set_ylabel("success ratio", color="black")
ax1.tick_params(axis="y", labelcolor="black")
ax1.grid(True)

# right axis: case count
ax2 = ax1.twinx()
bar_width = (bins[1] - bins[0]) * 0.8
ax2.bar(bin_centers, total, width=bar_width, alpha=0.3, label="case count")
ax2.set_ylabel("case count", color="black")
ax2.tick_params(axis="y", labelcolor="black")

fig.legend(loc="upper center", bbox_to_anchor=(0.5, 1.1), ncol=2)
plt.tight_layout()
plt.show()

### Embedding Space

In [None]:
import torch.nn.functional as F
import torch
import torch.nn as nn


def triplet_loss(anchor_output, positive_output, negative_output, text_logits, labels):
    text_targets = torch.tensor(labels).clone().detach().to(device)
    classification_loss = nn.CrossEntropyLoss()(text_logits, text_targets)

    triplet_loss_fn = nn.TripletMarginWithDistanceLoss(
        margin=1.0, distance_function=lambda x, y: 1.0 - F.cosine_similarity(x, y)
    )
    triplet_loss = triplet_loss_fn(anchor_output, positive_output, negative_output)
    return classification_loss, triplet_loss

In [None]:
from sklearn.metrics.pairwise import cosine_similarity
from transformers import AutoTokenizer, AutoModel
import numpy as np
import torch
from train_inference_code.models.models_rob import TripletModel

# to calculate similarity before training
tokenizer = AutoTokenizer.from_pretrained("FacebookAI/RoBERTa-base")
model = AutoModel.from_pretrained("FacebookAI/RoBERTa-base")

# to calculate similarity with triplet_method
name = "trained_model/triplet_rob"
tokenizer = AutoTokenizer.from_pretrained("FacebookAI/RoBERTa-base")
base_model = AutoModel.from_pretrained("FacebookAI/RoBERTa-base")
model = TripletModel.from_pretrained(base_model, triplet_loss, name)

##### 
device = torch.device("cuda") if torch.cuda.is_available() else torch.device("cpu")
model.to(device)


def calc_sim(ref, text, label):
    ref_tok = tokenizer(ref, padding="longest", truncation=True, return_tensors="pt")
    text_tok = tokenizer(text, padding="longest", truncation=True, return_tensors="pt")
    sample_tok = tokenizer("sample", padding="longest", truncation=True, return_tensors="pt")

    if label[0] == 1:
        # ref, sample, hallucinated text
        inputs = {
            "input_ids": [
                ref_tok["input_ids"].to(device),
                sample_tok["input_ids"].to(device),
                text_tok["input_ids"].to(device),
            ],
            "attention_mask": [
                ref_tok["attention_mask"].to(device),
                sample_tok["attention_mask"].to(device),
                text_tok["attention_mask"].to(device),
            ],
            "labels": label,
        }
    elif label[0] == 0:
        # ref, faithful text, sample text
        inputs = {
            "input_ids": [
                ref_tok["input_ids"].to(device),
                text_tok["input_ids"].to(device),
                sample_tok["input_ids"].to(device),
            ],
            "attention_mask": [
                ref_tok["attention_mask"].to(device),
                text_tok["attention_mask"].to(device),
                sample_tok["attention_mask"].to(device),
            ],
            "labels": label,
        }

    with torch.no_grad():
        output = model(**inputs)
    ref_output = output.output[0].cpu().numpy()
    text_output = output.output[1].cpu().numpy()

    return cosine_similarity(ref_output, text_output)[0][0]

In [None]:
calc_sim(test_data_span[0]["ref"], test_data_span[0]["text"], [test_data_span[0]["labels"]])

In [None]:
from tqdm import tqdm
qa_cos = []
d2t_cos = []
sum_cos = []

for d in tqdm(test_data_span):
    sim = calc_sim(d["ref"], d["text"], [d["labels"]])
    if d["task_type"] == "QA":
        qa_cos.append(sim)
        #d["sim_before"] = sim
        d["sim_triplet"] = sim
    elif d["task_type"] == "Data2txt":
        d2t_cos.append(sim)
        #d["sim_before"] = sim
        d["sim_triplet"] = sim
    else:
        sum_cos.append(sim)
        #d["sim_before"] = sim
        d["sim_triplet"] = sim

print("QA", np.mean(qa_cos))
print("Data2txt", np.mean(d2t_cos))
print("Sum", np.mean(sum_cos))

In [None]:
import matplotlib.pyplot as plt
import numpy as np

before_posi = []
before_nega = []

font_size=15

id_all = id_000 + id_001 + id_010 + id_011 + id_100 + id_101 + id_110 + id_111
id_tri_fail = id_001 + id_010 + id_101 + id_110
id_tri_suc = id_000 + id_011 + id_100 + id_111
for i in id_all :
    d = test_data_span[i]
    mode = "sim_before" # "sim_triplet" for triplet method
    if d["labels"] == 1:
        before_nega.append(1 - d[mode]) # 1 - cosine similarity to convert to distance
    else:
        before_posi.append(1 - d[mode])


mean_posi = np.mean(before_posi)
mean_nega = np.mean(before_nega)


plt.hist(before_posi, bins=100, alpha=0.5, label="Faithful")
plt.hist(before_nega, bins=100, alpha=0.5, label="Hallucinated")


plt.axvline(mean_posi, color='blue', linestyle='dashed', linewidth=1.8, label=f"Mean Faithful ({mean_posi:.2f})")
plt.axvline(mean_nega, color='orange', linestyle='dashed', linewidth=1.8, label=f"Mean Hallucinated ({mean_nega:.2f})")


plt.xlabel("Cosine distance", fontsize=font_size)
plt.ylabel("Count", fontsize=font_size)
plt.xticks(fontsize=font_size)
plt.yticks(fontsize=font_size)
plt.legend(fontsize=font_size)

plt.tight_layout()
plt.show()