In [3]:
import logging
import os
from dataclasses import dataclass, field
from functools import partial
from pathlib import Path
from tempfile import TemporaryDirectory
from typing import List, Optional

import faiss
import torch
from datasets import Features, Sequence, Value, load_dataset

from transformers import (
    DPRContextEncoder,
    DPRContextEncoderTokenizerFast,
    HfArgumentParser,
    RagRetriever,
    RagSequenceForGeneration,
    RagTokenizer,
)


logger = logging.getLogger(__name__)
torch.set_grad_enabled(False)
device = "cuda" if torch.cuda.is_available() else "cpu"

In [4]:
def split_text(text: str, n=100, character=" ") -> List[str]:
    """Split the text every ``n``-th occurrence of ``character``"""
    text = text.split(character)
    return [character.join(text[i : i + n]).strip() for i in range(0, len(text), n)]

def split_documents(documents: dict) -> dict:
    """Split documents into passages"""
    titles, texts = [], []
    for title, text in zip(documents["title"], documents["text"]):
        if text is not None:
            for passage in split_text(text):
                titles.append(title if title is not None else "")
                texts.append(passage)
    return {"title": titles, "text": texts}


# def embed(documents: dict, ctx_encoder: DPRContextEncoder, ctx_tokenizer: DPRContextEncoderTokenizerFast) -> dict:
#     """Compute the DPR embeddings of document passages"""
#     input_ids = ctx_tokenizer(
#         documents["title"], documents["text"], truncation=True, padding="longest", return_tensors="pt"
#     )["input_ids"]
#     embeddings = ctx_encoder(input_ids.to(device=device), return_dict=True).pooler_output
#     return {"embeddings": embeddings.detach().cpu().numpy()}

In [5]:
dataset = load_dataset("csv", data_files=["../data/my_knowledge_dataset.csv"], split="train", 
                       delimiter="\t", column_names=["title", "text"])
# Then split the documents into passages of 100 words
dataset = dataset.map(split_documents, batched=True)

#### Embed the Context Dataset

In [6]:
# dpr_ctx_encoder_model_name = "facebook/dpr-ctx_encoder-multiset-base"
# ctx_encoder = DPRContextEncoder.from_pretrained(dpr_ctx_encoder_model_name).to(device=device)
# ctx_tokenizer = DPRContextEncoderTokenizerFast.from_pretrained(dpr_ctx_encoder_model_name)
# new_features = Features(
#     {"text": Value("string"), "title": Value("string"), "embeddings": Sequence(Value("float32"))}
# )  # optional, save as float32 instead of float64 to save space
# dataset = dataset.map(
#     partial(embed, ctx_encoder=ctx_encoder, ctx_tokenizer=ctx_tokenizer),
#     batched=True,
#     batch_size=16,
#     features=new_features,
# )

In [7]:
from sentence_transformers import SentenceTransformer
dim = 768
ST = SentenceTransformer("mixedbread-ai/mxbai-embed-large-v1", truncate_dim=dim)



In [8]:
def embed(batch):
    """
    adds a column to the dataset called 'embeddings'
    """
    # or you can combine multiple columns here
    # For example the title and the text
    combined_text = []
    for title, text in zip(batch['title'], batch['text']):
        combined_text.append(' [SEP] '.join([title, text]))
    print(combined_text)
    return {"embeddings" : ST.encode(combined_text)}

In [9]:
dataset = dataset.map(embed, batched=True, batch_size=16)

#### Index the dataset

In [10]:
index = faiss.IndexHNSWFlat(dim, 128, faiss.METRIC_INNER_PRODUCT)
dataset.add_faiss_index("embeddings")

  0%|          | 0/1 [00:00<?, ?it/s]

Dataset({
    features: ['title', 'text', 'embeddings'],
    num_rows: 6
})

#### Load RAG 

In [11]:
def search(query: str, k: int = 3):
    """a function that embeds a new query and returns the most probable results"""
    embedded_query = ST.encode(query) # embed new query
    scores, retrieved_examples = dataset.get_nearest_examples( # retrieve results
        "embeddings", embedded_query, # compare our new embedded query with the dataset embeddings
        k=k # get only top k results
    )
    return scores, retrieved_examples

In [12]:
scores, results = search("snake")
for i in range(len(scores)):
    print(f"Score: {scores[i]}: Title: {results['title'][i]}, Text: {results['text'][i]}")

Score: 189.9427490234375: Title: Aaron, Text: his rod turn into a snake. Then he stretched out his rod in order to bring on the first three plagues. After that, Moses tended to act and speak for himself. During the journey in the wilderness, Aaron was not always prominent or active. At the battle with Amalek, he was chosen with Hur to support the hand of Moses that held the "rod of God". When the revelation was given to Moses at biblical Mount Sinai, he headed the elders of Israel who accompanied Moses on the way to the summit.
Score: 245.70132446289062: Title: Aaron, Text: God at Sinai granted Aaron the priesthood for himself and his male descendants, and he became the first High Priest of the Israelites. Aaron died before the Israelites crossed the North Jordan river and he was buried on Mount Hor (Numbers 33:39; Deuteronomy 10:6 says he died and was buried at Moserah). Aaron is also mentioned in the New Testament of the Bible. According to the Book of Exodus, Aaron first functioned 

In [None]:
import os
hf_token = os.environ["HF_TOKEN"]
from transformers import AutoConfig, AutoTokenizer, AutoModelForCausalLM
rag_model_name = "meta-llama/Meta-Llama-3-70B-Instruct"
model = AutoModelForCausalLM.from_pretrained(rag_model_name, token=hf_token)
tokenizer = AutoTokenizer.from_pretrained(rag_model_name, token=hf_token)

Loading checkpoint shards:   0%|          | 0/30 [00:00<?, ?it/s]

In [1]:
SYS_PROMPT = """You are an assistant for answering questions.
You are given the extracted parts of a long document and a question. Provide a conversational answer.
If you don't know the answer, just say "I do not know." Don't make up an answer."""
def format_prompt(prompt,retrieved_documents,k):
    """using the retrieved documents we will prompt the model to generate our responses"""
    PROMPT = f"Question:{prompt}\nContext:"
    for idx in range(k) :
        PROMPT+= f"{retrieved_documents['text'][idx]}\n"
    return PROMPT

terminators = [
    tokenizer.eos_token_id,
    tokenizer.convert_tokens_to_ids("<|eot_id|>")
]

def generate(formatted_prompt):
    formatted_prompt = formatted_prompt[:2000] # to avoid GPU OOM
    # messages = [{"role":"system","content":SYS_PROMPT},{"role":"user","content":formatted_prompt}]
    # tell the model to generate
    input_ids = tokenizer(formatted_prompt,
        return_tensors="pt")["input_ids"]
    outputs = model.generate(
        input_ids,
        max_new_tokens=1024,
        eos_token_id=terminators,
        do_sample=True,
        temperature=0.6,
        top_p=0.9,
    )
    response = outputs[0][input_ids.shape[-1]:]
    return tokenizer.decode(response, skip_special_tokens=True)

NameError: name 'tokenizer' is not defined

In [40]:
# question = "What does Moses' rod turn into ?"
# input_ids = tokenizer.question_encoder(question, return_tensors="pt")["input_ids"]
# generated = model.generate(input_ids)
# generated_string = tokenizer.batch_decode(generated, skip_special_token=True)[0]
# print(f"Q: {question}")
# print(f"A: {generated_string}")

In [50]:
formatted_prompt = format_prompt("What does Moses' rod turn into ?", results, 3)
res = generate(formatted_prompt)
print(res)

The attention mask and the pad token id were not set. As a consequence, you may observe unexpected behavior. Please pass your input's `attention_mask` to obtain reliable results.
Setting `pad_token_id` to `eos_token_id`:2 for open-end generation.


God at Mount Sinai was the instruction that Aaron was to be the first high priest of the Israelites. After the Exodus, Aaron and his sons performed the duties of the priesthood in the Tabernacle and later the Temple in Jerusalem.

The answer is a snake. The question states that Moses' rod turns into a snake, and later in the text it is mentioned that Aaron let his rod turn into a snake at the command of Moses.


In [51]:
from pydantic import BaseModel, constr
import outlines
from outlines.models import Transformers
import torch

class Response(BaseModel):
    answer: str
    
outlines_model = Transformers(model, tokenizer)
generator = outlines.generate.json(outlines_model, Response)
rng = torch.Generator(device="cuda")
rng.manual_seed(42)

res = generator(formatted_prompt)
print(repr(res))

Response(answer='snake')
