In [4]:
pip install faiss-cpu

Collecting faiss-cpu
  Downloading faiss_cpu-1.11.0.post1-cp311-cp311-manylinux_2_27_x86_64.manylinux_2_28_x86_64.whl.metadata (5.0 kB)
Downloading faiss_cpu-1.11.0.post1-cp311-cp311-manylinux_2_27_x86_64.manylinux_2_28_x86_64.whl (31.3 MB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m31.3/31.3 MB[0m [31m70.6 MB/s[0m eta [36m0:00:00[0m
[?25hInstalling collected packages: faiss-cpu
Successfully installed faiss-cpu-1.11.0.post1


In [14]:
# Requirements:
# pip install transformers accelerate peft sentence-transformers faiss-cpu

import os
import random
import numpy as np
import torch

from transformers import (
    AutoTokenizer,
    AutoModelForCausalLM,
    pipeline
)
from peft import PeftModel
from sentence_transformers import SentenceTransformer
import faiss

# ----------------------------------------
# 1. Model & LoRA adapter loading
# ----------------------------------------
MODEL_NAME = "tiiuae/falcon-7b-instruct"
LORA_WEIGHTS_PATH = "./lora-falcon-instruct"

tokenizer = AutoTokenizer.from_pretrained(
    MODEL_NAME, trust_remote_code=True
)

base_model = AutoModelForCausalLM.from_pretrained(
    MODEL_NAME,
    torch_dtype=torch.bfloat16,
    device_map="auto",
    trust_remote_code=True
)

if os.path.isdir(LORA_WEIGHTS_PATH):
    model = PeftModel.from_pretrained(
        base_model,
        LORA_WEIGHTS_PATH,
        torch_dtype=torch.bfloat16,
        device_map="auto"
    )
else:
    print(f"Warning: LoRA directory '{LORA_WEIGHTS_PATH}' not found. Using base model.")
    model = base_model

model.eval()

# ----------------------------------------
# 2. Text generation pipeline
# ----------------------------------------
gen_pipeline = pipeline(
    "text-generation",
    model=model,
    tokenizer=tokenizer,
    torch_dtype=torch.bfloat16,
    device_map="auto",
    trust_remote_code=True
)

# ----------------------------------------
# 3. Document Retrieval (RAG) setup
# ----------------------------------------
docs = [
    "Cristiano Ronaldo is one of the greatest footballers of all time.",
    "Real Madrid was founded in the early 20th century.",
    "The UEFA Champions League is Europe's premier club competition."
]

embed_model = SentenceTransformer("all-MiniLM-L6-v2")
doc_embeddings = embed_model.encode(docs, convert_to_numpy=True)

faiss_index = faiss.IndexFlatL2(doc_embeddings.shape[1])
faiss_index.add(doc_embeddings)

def retrieve_docs(query: str, top_k: int = 2):
    q_emb = embed_model.encode([query], convert_to_numpy=True)
    distances, indices = faiss_index.search(q_emb, top_k)
    return [docs[i] for i in indices[0]]

# ----------------------------------------
# 4. Dynamic memory summarization
# ----------------------------------------
summarizer = pipeline(
    "summarization",
    model="sshleifer/distilbart-cnn-12-6",
    device_map="auto"
)

MAX_MEMORY_TOKENS = 512

def dynamic_summarize(history: list) -> list:
    text = "\n".join(history)
    summary = summarizer(
        text, max_length=150, min_length=50, do_sample=False
    )[0]['summary_text']
    return [summary]

# ----------------------------------------
# 5. Long-term Memory Bank
# ----------------------------------------
memory_bank = []
memory_embeddings = []
memory_encoder = SentenceTransformer("all-MiniLM-L6-v2")

def extract_memory_detail(conversation: str, new_message: str) -> str | None:
    prompt = (
        f"Conversation so far:\n{conversation}\n\n"
        f"New user message:\n\"{new_message}\"\n\n"
        "Extract any new personal detail or preference. "
        "If none, reply exactly 'None'."
    )
    out = gen_pipeline(
        prompt,
        max_length=64,
        do_sample=False,
        return_full_text=False,
        eos_token_id=tokenizer.eos_token_id,
        pad_token_id=tokenizer.eos_token_id
    )
    detail = out[0]["generated_text"].strip()
    return detail if detail.lower() != "none" else None

def add_to_memory_bank(detail: str):
    emb = memory_encoder.encode([detail], convert_to_numpy=True)[0]
    memory_bank.append(detail)
    memory_embeddings.append(emb)

def retrieve_memory(query: str, top_k: int = 2) -> list[str]:
    if not memory_bank:
        return []
    q_emb = memory_encoder.encode([query], convert_to_numpy=True)[0]
    sims = np.dot(memory_embeddings, q_emb) / (
        np.linalg.norm(memory_embeddings, axis=1) * np.linalg.norm(q_emb) + 1e-8
    )
    top_idxs = sims.argsort()[-top_k:][::-1]
    return [memory_bank[i] for i in top_idxs]

# ----------------------------------------
# 6. Few-Shot Prompt Engineering
# ----------------------------------------
few_shot_examples = [
    {
        "user": "Who is Ronaldo?",
        "bot": "Cristiano Ronaldo is a Portuguese footballer widely regarded as one of the best."
    },
    {
        "user": "How many Champions League titles does Real Madrid have?",
        "bot": "Real Madrid has won the UEFA Champions League 14 times."
    }
]

def build_few_shot_block(n: int = 2) -> str:
    picks = random.sample(few_shot_examples, n)
    return "\n".join(f"Alice: {ex['user']}\nBob: {ex['bot']}" for ex in picks)

# ----------------------------------------
# 7. Hallucination Monitoring & Content Filtering
# ----------------------------------------
classifier = pipeline(
    "zero-shot-classification",
    model="facebook/bart-large-mnli",
    device_map="auto"
)

HALLUCINATION_LABEL = "contradiction"
BANNED_TERMS = ["insult", "forbidden"]

def is_hallucination(context: str, response: str) -> bool:
    seq = classifier(
        response,
        candidate_labels=["entailment", "neutral", "contradiction"],
        hypothesis_template=context
    )
    score = seq["scores"][seq["labels"].index(HALLUCINATION_LABEL)]
    return score > 0.5

def content_filter(response: str) -> bool:
    return any(term in response for term in BANNED_TERMS)

# ----------------------------------------
# 8. Main Chat Loop
# ----------------------------------------
conversation_history = []

print("Chatbot ready. Press Ctrl+C to exit.\n")

while True:
    user_input = input("Alice: ").strip()
    conversation_history.append(f"Alice: {user_input}")

    # long-term memory extraction
    full_history = "\n".join(conversation_history)
    memory_detail = extract_memory_detail(full_history, user_input)
    if memory_detail:
        add_to_memory_bank(memory_detail)

    # dynamic summarization if too long
    token_count = len(tokenizer("\n".join(conversation_history))["input_ids"])
    if token_count > MAX_MEMORY_TOKENS:
        conversation_history = dynamic_summarize(conversation_history)

    # retrieve memory and docs
    mem_items = retrieve_memory(user_input, top_k=2)
    mem_block = "\n".join(f"[Memory {i+1}]: {m}" for i, m in enumerate(mem_items))

    doc_items = retrieve_docs(user_input, top_k=2)
    doc_block = "\n".join(f"[Context {i+1}]: {d}" for i, d in enumerate(doc_items))

    # build few-shot examples
    few_shot_block = build_few_shot_block(n=2)

    # assemble prompt
    history_block = "\n".join(conversation_history)
    prompt_parts = [mem_block, doc_block, few_shot_block, history_block]
    prompt = "\n".join(part for part in prompt_parts if part) + "\nBob:"

    # generate response
    out = gen_pipeline(
        prompt,
        max_length=512,
        do_sample=True,
        top_k=10,
        return_full_text=False,
        eos_token_id=tokenizer.eos_token_id,
        pad_token_id=tokenizer.eos_token_id
    )
    bot_response = out[0]["generated_text"].strip()

    # hallucination & filter check
    if is_hallucination(history_block, bot_response) or content_filter(bot_response):
        print("Bob: Sorry, I couldn't generate a reliable answer. Please rephrase your question.")
    else:
        print("Bob:", bot_response)
        conversation_history.append(f"Bob: {bot_response}")

Loading checkpoint shards:   0%|          | 0/2 [00:00<?, ?it/s]

ValueError: You are trying to offload the whole model to the disk. Please use the `disk_offload` function instead.