In [None]:
from transformers import AutoTokenizer, AutoModelForCausalLM, TextStreamer, BitsAndBytesConfig
import torch
from datasets import load_dataset
import random
import json
import os
import nltk
import time
import datetime
from sentence_transformers import SentenceTransformer, util

# --------------------------
# Global vars
# --------------------------
history = []
ds = load_dataset("sentence-transformers/natural-questions")
dataset_split = ds["train"]
filename = "TestResults.json"

# --------------------------
# Setup model + tokenizer
# --------------------------
model_id = "microsoft/phi-2"

stopwords = {"the", "a", "and", "is", "to", "of", "in", "on"}

bnb_config = BitsAndBytesConfig(
    load_in_8bit=True,
    llm_int8_threshold=6.0,
    llm_int8_skip_modules=None,
    llm_int8_enable_fp32_cpu_offload=False
)

tokenizer = AutoTokenizer.from_pretrained(model_id)
model = AutoModelForCausalLM.from_pretrained(
    model_id,
    device_map="auto",
    dtype=torch.float16,
    quantization_config=bnb_config
)
streamer = TextStreamer(tokenizer, skip_prompt=True, skip_special_tokens=True)

# --------------------------
# WordNet setup
# --------------------------
try:
    from nltk.corpus import wordnet
    _ = wordnet.synsets("car")
except LookupError:
    nltk.download("wordnet")
    nltk.download("omw-1.4")
    from nltk.corpus import wordnet

# --------------------------
# SentenceTransformer setup
# --------------------------
embedder = SentenceTransformer("all-MiniLM-L6-v2", device="cpu")

# --------------------------
# LLM wrapper
# --------------------------
def ask_phi2(user_input, max_new_tokens=256):
    global history
    history.append(f"User: {user_input}")
    prompt = "\n".join(history) + "\nAssistant:"

    inputs = tokenizer(prompt, return_tensors="pt").to(model.device)
    output_ids = model.generate(
        **inputs,
        max_new_tokens=max_new_tokens,
        temperature=0.7,
        top_p=0.9,
        repetition_penalty=1.2,
        do_sample=True,
        pad_token_id=tokenizer.eos_token_id,
    )

    # Decode only new tokens (not the prompt)
    gen_ids = output_ids[0][inputs["input_ids"].shape[1]:]
    response = tokenizer.decode(gen_ids, skip_special_tokens=True).strip()

    history.append(f"Assistant: {response}")
    return response


def send_to_llm(question):
    global history
    history = []  # reset history per question
    return ask_phi2(question, max_new_tokens=256)

# --------------------------
# Text utilities
# --------------------------
def clean_words(text):
    return [w.lower() for w in text.split() if w.lower() not in stopwords]

def expand_with_synonyms(words):
    expanded = set(words)
    for w in words:
        for syn in wordnet.synsets(w):
            for lemma in syn.lemmas():
                expanded.add(lemma.name().lower().replace("_", " "))
    return expanded

def similarity_wordnet(answer, response, use_synonyms=True):
    a_words = clean_words(answer)
    r_words = clean_words(response)

    if use_synonyms:
        a_set = expand_with_synonyms(a_words)
        r_set = expand_with_synonyms(r_words)
    else:
        a_set, r_set = set(a_words), set(r_words)

    if not a_set:
        return 0
    return len(a_set & r_set) / len(a_set) * 100

def similarity_semantic(answer, response):
    # Compute embeddings and cosine similarity
    emb_a = embedder.encode(answer, convert_to_tensor=True)
    emb_r = embedder.encode(response, convert_to_tensor=True)
    sim = util.pytorch_cos_sim(emb_a, emb_r).item()
    return sim * 100  # percentage

# --------------------------
# Experiment helpers
# --------------------------
def run_experiment(dataset_split, n=20):
    questions = dataset_split["query"]
    answers = dataset_split["answer"]
    pairs = list(zip(questions, answers))

    subset = random.sample(pairs, min(n, len(pairs)))
    start_time = time.time()

    results = []

    for i, (q, a) in enumerate(subset):
        resp = send_to_llm(q)
        wordnet_score = similarity_wordnet(a, resp, use_synonyms=True)
        semantic_score = similarity_semantic(a, resp)

        results.append({
            "id": i,
            "question": q,
            "expected_answer": a,
            "llm_response": resp,
            "similarity_wordnet": round(wordnet_score, 2),
            "similarity_semantic": round(semantic_score, 2)
        })

    duration = time.time() - start_time
    return results, duration


def save_results(results, duration, filename="TestResults.json"):
    if os.path.exists(filename):
        with open(filename, "r") as f:
            all_results = json.load(f)
    else:
        all_results = []

    test_number = len(all_results) + 1
    timestamp = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")

    all_results.append({
        "test_number": test_number,
        "timestamp": timestamp,
        "duration_seconds": round(duration, 2),
        "results": results
    })

    with open(filename, "w") as f:
        json.dump(all_results, f, indent=2)

    print(f"Saved Test #{test_number} ({round(duration,2)}s) with {len(results)} results to {filename}")

# --------------------------
# Main
# --------------------------
def main(n=20):
    results, duration = run_experiment(dataset_split, n=n)
    save_results(results, duration)

if __name__ == "__main__":
    main()


In [None]:
main(n=20)