In [None]:
# create a pip3 install command that will install all packages required to run this notebook
!pip3 install -U --force-reinstall --no-cache-dir \
    torch == 2.6.0 \
    transformers \
    datasets \
    vllm \
    pandas \
    tqdm \
    scikit-learn

In [None]:
import datetime
import json
import os
import re
import time
from typing import List

from datasets import load_dataset
from sklearn.metrics import classification_report
from tqdm import tqdm
from tqdm.notebook import tqdm
from transformers import AutoTokenizer
from vllm import LLM, SamplingParams

# Test performance

In [None]:
TMP_DIR = "/tmp/"

data_names = ['10k_seed', '8k_seed', '497k_seed', 'def14a_seed',
              '10k_20000tokens_beginning', '10k_20000tokens_end', '10k_20000tokens_middle',
              '10k_2000tokens_beginning', '10k_2000tokens_end', '10k_2000tokens_middle',
              '50000tokens_beginning', '50000tokens_end', '50000tokens_middle',
              '10k_5000tokens_beginning', '10k_5000tokens_end', '10k_5000tokens_middle',
              '10k_30000tokens_end','10k_30000tokens_beginning','10k_30000tokens_middle',
              '10k_10000tokens_beginning', '10k_10000tokens_end', '10k_10000tokens_middle'
              'def14A_10000tokens_beginning', 'def14A_10000tokens_end', 'def14A_10000tokens_middle'
              'def14A_2000tokens_middle','def14A_2000tokens_beginning','def14A_2000tokens_end',
              'def14A_5000tokens_middle','def14A_5000tokens_beginning','def14A_5000tokens_end',
              'def14A_20000tokens_middle','def14A_20000tokens_end','def14A_20000tokens_beginning',
              'def14A_30000tokens_end','def14A_30000tokens_beginning','def14A_30000tokens_middle',
              'def14A_10000tokens_beginning', 'def14A_10000tokens_end', 'def14A_10000tokens_middle'

]


detector_model_list = ['deepseek-ai/DeepSeek-R1-Distill-Qwen-7B', 'deepseek-ai/DeepSeek-R1-Distill-Qwen-14B', 'deepseek-ai/DeepSeek-R1-Distill-Qwen-32B',
                       "Qwen/Qwen2.5-14B-Instruct", "Qwen/Qwen2.5-32B-Instruct", "Qwen/Qwen2.5-7B-Instruct", "meta-llama/Llama-3.3-70B-Instruct",
                       "deepseek-ai/DeepSeek-R1-Distill-Llama-70B"]

In [None]:
today = datetime.date.today()
time_now = datetime.datetime.now()

folder_name = str(today)
path = os.path.join(TMP_DIR, "results", folder_name)

try:
    os.makedirs(path, exist_ok=True)
    print(f"Directory '{folder_name}' created or already exists.")
except OSError as e:
    print(f"Error creating directory: {e}")

model_names_str = '_'.join(detector_model_list)


In [None]:
"""
Put your prompt for hallucination detection in the function below (extend HALLUCINATION_PROMPT string).

We follow these guidelines when creating the hallucination detection prompt:
	1. Detect any deviation in the answer from the context.
	2. Detect implied or implicit information in answer that is not present in the context.
	3. Detect misalignment of timing in the answer that might be different from the context.
	4. Detect any important details in the context that is missed in the answer.
	5. Ask the model to provide reasoning (i.e., chain of thought).
	6. Ask for pass/fail decision, depending on whether the models finds the answer faithful to the context or not.
"""
def create_detection_prompt(query, chunk, answer):
        HALLUCINATION_PROMPT = f"""
        **Input Format:**
        --
        QUESTION:
        {query}
        CONTEXT:
        {chunk}
        ANSWER:
        {answer}
        --

        **Example output:**
        {{"REASONING": [...], "SCORE": "PASS" or "FAIL"}}
        """
        return HALLUCINATION_PROMPT

In [None]:
class VllmModel:
    """
    A class for interacting with the language model using vLLM for fast inference.
    """

    def __init__(self, model_path: str = "Qwen/Qwen2.5-7B-Instruct", tensor_parallel_size: int = 2,
                max_token_length=32000):
        """Initializes the model with the specified model path and GPU configuration.

        Args:
            model_path (str, optional): The path or identifier for the Qwen model.
                Defaults to "Qwen/Qwen2.5-7B-Instruct".
            tensor_parallel_size (int, optional): Number of GPUs to use for multi-GPU model parallelism.
                Defaults to 2.
        """
        self.model_path = model_path
        self.tokenizer = AutoTokenizer.from_pretrained(model_path)
        self.sampling_params = SamplingParams(
            temperature=0.7,
            top_p=0.8,
            repetition_penalty=1.05,
            max_tokens=8000
        )
        self.llm = LLM(model=model_path, tensor_parallel_size=tensor_parallel_size)
        self.max_token_length = max_token_length

    def generate_prompts(self, prompts: List[str]):
        """Prepares prompts for generation and yields the model outputs.

        Args:
            prompts (List[str]): A list of prompt strings.

        Returns:
            vllm.outputs.RequestOutput: The generated output for each prompt.
        """
        model_input = []
        for prompt in tqdm(prompts, desc="Generating outputs"):
            messages = [{"role": "user", "content": prompt}]
            chat_text = self.tokenizer.apply_chat_template(
                messages, tokenize=False, add_generation_prompt=True
            )
            encoded_input = self.tokenizer.encode(chat_text, truncation=False)

            if len(encoded_input) > self.max_token_length:
                truncated_input = encoded_input[: self.max_token_length]
                chat_text = self.tokenizer.decode(truncated_input, skip_special_tokens=True)
            model_input.append(chat_text)

        outputs = self.llm.generate(model_input, self.sampling_params)
        return outputs

    def generate_responses(self, prompts: List[str], return_answer_only: bool = True):
        """Generates responses for the provided prompts.
        Optionally returns only the answer text from a full output object.

        Args:
            prompts (List[str]): A list of prompt strings.
            return_answer_only (bool, optional): Whether to return only the answer text.
                Defaults to True.

        Returns:
            List[str] or List[vllm.outputs.RequestOutput]: A list of generated answer texts or full output objects.
        """
        outputs = self.generate_prompts(prompts)
        if return_answer_only:
            outputs = [output.outputs[0].text for output in outputs]
        return outputs


In [None]:
re_map = {
    "pass": "not hallucination",
    "fail": "hallucination"
}

def get_label(response):
    """
    Parses a string containing JSON-like model response to extract the 'SCORE' and determine if it indicates a hallucination.

    Args:
        response: A string containing JSON-like data.

    Returns:
        "not hallucination" if the SCORE is "PASS", "hallucination" if the SCORE is "FAIL".
        Returns None if the SCORE cannot be determined.
    """
    try:
        # Attempt to load the response as JSON
        j_res = json.loads(response).lower()
        score = j_res.get("score").strip()
        if score in re_map:
            return re_map[score]
        else:
            # throw exception
            raise ValueError(f"Unexpected score value: {score}")

    except:
        # If JSON parsing fails, use regular expressions to find the SCORE
        try:
            match = re.search(r'"SCORE"\s*:\s*"(\w+)"', response, re.IGNORECASE)
            if match:
                score = match.group(1).lower().strip()
                if score in re_map:
                    return re_map[score]
                else:
                    raise ValueError(f"Unexpected score value: {score}")
            else:
                if '"FAIL"' in response:
                    return 'hallucination'
                else:
                    return 'not hallucination'
        except:
            return 'not hallucination'

In [None]:
## RUN EXPERIMENT
for detector_model_name in detector_model_list:
    model = VllmModel(detector_model_name, tensor_parallel_size=4, max_token_length=32000)
    for data_name in data_names:
        print(data_name)

        print(f'DATA: Phantom {data_name}')
        print(f'MODELS: {detector_model_list}')

        ## LOAD DATA
        df = load_dataset("seyled/Phantom_Hallucination_Detection", data_files=f"PhantomDataset/Phantom_{data_name}.csv")
        df_final = df["train"].to_pandas()

        output_file_name = f'{path}/outputs_{detector_model_name.replace("/","__")}_{data_name}_{time_now.strftime("%Y-%m-%d_%H:%M")}.csv'
        result_file_name = f'{path}/results_{detector_model_name.replace("/","__")}_{data_name}_{time_now.strftime("%Y-%m-%d_%H:%M")}.txt'
        print(f'''Results stored in '{output_file_name}', '{result_file_name}' ''')

        print('\nStart testing ', detector_model_name)
        start_time = time.time()

        df_final[f'{detector_model_name}_output'] = None

        prompt_list = []
        for i in range(df_final.shape[0]):
            query = df_final.at[i, 'query']
            chunk = df_final.at[i, 'context']
            answer = df_final.at[i, 'answer']

            HALLUCINATION_PROMPT = create_detection_prompt(query, chunk, answer)
            prompt_list.append(HALLUCINATION_PROMPT)

        model_output_list = model.generate_responses(prompt_list)

        df_final[f'{detector_model_name}_output'] = model_output_list
        print('\nFinished testing ', detector_model_name)
        print("Process time: ", round(time.time() - start_time, 2), " seconds.")

        ## SAVE FILE
        df_final.to_csv(output_file_name)

    ### PARSE OUTPUT
        print('\nStart parsing ', detector_model_name, ' results.')

        df_final[f'{detector_model_name}_label'] = None

        for i in range(df_final.shape[0]):
            df_final.at[i, f'{detector_model_name}_label'] = get_label(df_final.at[i, f'{detector_model_name}_output'])
        print('\nFinished parsing ', detector_model_name)

    ### OUTPUT
        with open(result_file_name, 'w') as f:
            f.write(f'Phantom {data_name} results:')
            print(f'Phantom {data_name} results:')
            results = classification_report(df_final['ground_truth_label'], df_final[f'{detector_model_name}_label'],
                                            digits=3)
            f.write(f'\n\n{detector_model_name} results.')
            f.write(results)
            print(f'\n\n{detector_model_name} results.')
            print(results)

    del model
