Install Required Packages

In [1]:
# %%capture --no-stderr
# %pip install -U langchain_community tiktoken langchainhub chromadb langchain langgraph nlangchain_ollama langchain_huggingface

In [2]:
local_llm = "llama3.2"

Create Index

In [3]:
from langchain.text_splitter import RecursiveCharacterTextSplitter
from langchain_community.document_loaders import WebBaseLoader

urls = [
    "https://lilianweng.github.io/posts/2023-06-23-agent/",
    "https://lilianweng.github.io/posts/2023-03-15-prompt-engineering/",
    "https://lilianweng.github.io/posts/2023-10-25-adv-attack-llm/",
]

docs = [WebBaseLoader(url).load() for url in urls]
docs_list = [item for sublist in docs for item in sublist]

text_spliter = RecursiveCharacterTextSplitter.from_tiktoken_encoder(
    chunk_size=250, chunk_overlap=50
)

doc_splits = text_spliter.split_documents(docs_list)

USER_AGENT environment variable not set, consider setting it to identify your requests.


In [4]:
from transformers import DPRQuestionEncoder, DPRQuestionEncoderTokenizer, DPRContextEncoder, DPRContextEncoderTokenizer
question_encoder = DPRQuestionEncoder.from_pretrained("facebook/dpr-question_encoder-single-nq-base")
question_tokenizer = DPRQuestionEncoderTokenizer.from_pretrained("facebook/dpr-question_encoder-single-nq-base")
context_encoder = DPRContextEncoder.from_pretrained("facebook/dpr-ctx_encoder-single-nq-base")
context_tokenizer = DPRContextEncoderTokenizer.from_pretrained("facebook/dpr-ctx_encoder-single-nq-base")

Some weights of the model checkpoint at facebook/dpr-question_encoder-single-nq-base were not used when initializing DPRQuestionEncoder: ['question_encoder.bert_model.pooler.dense.bias', 'question_encoder.bert_model.pooler.dense.weight']
- This IS expected if you are initializing DPRQuestionEncoder from the checkpoint of a model trained on another task or with another architecture (e.g. initializing a BertForSequenceClassification model from a BertForPreTraining model).
- This IS NOT expected if you are initializing DPRQuestionEncoder from the checkpoint of a model that you expect to be exactly identical (initializing a BertForSequenceClassification model from a BertForSequenceClassification model).
Some weights of the model checkpoint at facebook/dpr-ctx_encoder-single-nq-base were not used when initializing DPRContextEncoder: ['ctx_encoder.bert_model.pooler.dense.bias', 'ctx_encoder.bert_model.pooler.dense.weight']
- This IS expected if you are initializing DPRContextEncoder from the

In [5]:
encoded_docs = []
for doc in doc_splits:
    inputs = context_tokenizer(doc.page_content, return_tensors="pt", truncation=True, max_length=512)
    embeddings = context_encoder(**inputs).pooler_output.detach().numpy()
    encoded_docs.append(embeddings[0])

In [6]:
import faiss
import numpy as np

chunk_embeddings = np.array(encoded_docs)
chunk_dimension = chunk_embeddings.shape[1]

faiss_index = faiss.IndexFlatL2(chunk_dimension)
faiss_index.add(chunk_embeddings)

index_to_metadata = {i: doc_splits[i].metadata for i in range(len(doc_splits))}

In [7]:
def question_dpr_embedding(question:str):
    question_inputs = question_tokenizer(question, return_tensors='pt')
    return question_encoder(**question_inputs).pooler_output.detach().numpy()
    

In [8]:
def search(question:str):
    D, I = faiss_index.search(question_dpr_embedding(question=question), k=5)
    for i, idx in enumerate(I[0]):
        print(f'{i+1}: {doc_splits[idx]}')
        print(f'distance {D[0][i]}\n')

In [9]:
search("agent memory")

1: page_content='Short-Term Memory (STM) or Working Memory: It stores information that we are currently aware of and needed to carry out complex cognitive tasks such as learning and reasoning. Short-term memory is believed to have the capacity of about 7 items (Miller 1956) and lasts for 20-30 seconds.


Long-Term Memory (LTM): Long-term memory can store information for a remarkably long time, ranging from a few days to decades, with an essentially unlimited storage capacity. There are two subtypes of LTM:

Explicit / declarative memory: This is memory of facts and events, and refers to those memories that can be consciously recalled, including episodic memory (events and experiences) and semantic memory (facts and concepts).
Implicit / procedural memory: This type of memory is unconscious and involves skills and routines that are performed automatically, like riding a bike or typing on a keyboard.




Fig. 8. Categorization of human memory.
We can roughly consider the following mappin

In [10]:
from langchain.vectorstores import FAISS
from langchain.embeddings.sentence_transformer import SentenceTransformerEmbeddings
from langchain.docstore.in_memory import InMemoryDocstore
from langchain.docstore.document import Document

def embed_query(query):
    return question_dpr_embedding(question=query).flatten()
    

docstore = InMemoryDocstore({str(i): doc for i, doc in enumerate(doc_splits)})
index_to_docstore_id = {i: str(i) for i in range(len(doc_splits))}

vectorstore = FAISS(
    docstore=docstore,
    index_to_docstore_id=index_to_docstore_id,
    embedding_function=embed_query, 
    index=faiss_index
    )

retriever = vectorstore.as_retriever(search_type='similarity', seacrh_kwargs={'k':5})

question = "who is Ezio Auditore da Firenze?"
retrieved_docs = retriever.get_relevant_documents(query=question)

for doc in retrieved_docs:
    print(doc, "\n\n")

`embedding_function` is expected to be an Embeddings object, support for passing in a function will soon be removed.


page_content='Fig. 9. Average attack success rate on "HB (harmful behavior)" instructions, averaging 5 prompts. Two baselines are "HB" prompt only or HB prompt followed by `"Sure here's"` as a suffix. "Concatenation" combines several adversarial suffixes to construct a more powerful attack with a significantly higher success rate in some cases. "Ensemble" tracks if any of 5 prompts and the concatenated one succeeded. (Image source: Zou et al. 2023)
ARCA (“Autoregressive Randomized Coordinate Ascent”; Jones et al. 2023) considers a broader set of optimization problems to find input-output pairs $(\mathbf{x}, \mathbf{y})$ that match certain behavior pattern; such as non-toxic input starting with "Barack Obama" but leading to toxic output. Given an auditing objective $\phi: \mathcal{X} \times \mathcal{Y} \to \mathbb{R}$ that maps a pair of (input prompt, output completion) into scores. Examples of behavior patterns captured by $\phi$ are as follows:' metadata={'source': 'https://lilianwen

  retrieved_docs = retriever.get_relevant_documents(query=question)


In [23]:
def retriever_search(question:str):
    retrieved_docs = retriever.get_relevant_documents(query=question)
    for doc in retrieved_docs:
        print(doc, "\n\n")

In [None]:
retriever_search("assassin's creed brotherhood?")

LLM

Retriever Grader

In [11]:
from langchain.prompts import PromptTemplate
from langchain_ollama import ChatOllama
from langchain_core.output_parsers import JsonOutputParser

llm = ChatOllama(model=local_llm, format="json", temperature=0)

prompt = PromptTemplate(
    template="""You are a grader assessing relevance of a retrieved document to a user question. \n 
    Here is the retrieved document: \n\n {document} \n\n
    Here is the user question: {question} \n
    If the document contains keywords related to the user question, grade it as relevant. \n
    It does not need to be a stringent test. The goal is to filter out erroneous retrievals. \n
    Give a binary score 'yes' or 'no' score to indicate whether the document is relevant to the question. \n
    Provide the binary score as a JSON with a single key 'score' and no premable or explanation.""",
    input_variables=["question", "document"]
)

retriever_grader = prompt | llm | JsonOutputParser()
question = "who is Ezio Auditore da Firenze?"
docs = retriever.invoke(question)
doc_text = docs[1].page_content
print(retriever_grader.invoke(
    {
        "question" : question,
        "document" : doc_text
    }
))

{'score': 'no'}


In [None]:
question = "Assassin's Creed: Brotherhood"
docs = retriever.invoke(question)
doc_text = docs[1].page_content
print(retriever_grader.invoke(
    {
        "question" : question,
        "document" : doc_text
    }
))
print(docs[1])

Generate

In [None]:
from langchain import hub
from langchain_core.output_parsers import StrOutputParser

generate_prompt = PromptTemplate(
   template= """
You are an assistant for question-answering tasks.
Use the following pieces of retrieved context to answer the question.
If you don't know the answer, just say that you don't know.
Use three sentences maximum and keep the answer concise.
Question: {question} 
Context: {context} 
Answer:
""",
input_variables=["context", "question"]
)
generate_llm = ChatOllama(model=local_llm, temperature=0)

def format_docs(docs):
    return "\n\n".join(doc.page_content for doc in docs)

rag_chain = generate_prompt | generate_llm | StrOutputParser()

generation = rag_chain.invoke(
    {
        "context": docs,
        "question" : question
    }
)
print(generation)

Hallucination Grader

In [None]:
hallucination_model = ChatOllama(model=local_llm, format="json", temperature=0)

hallucination_prompt = PromptTemplate(
    template="""You are a grader assessing whether an answer is grounded in / supported by a set of facts. \n 
    Here are the facts:
    \n ------- \n
    {documents} 
    \n ------- \n
    Here is the answer: {generation}
    Give a binary score 'yes' or 'no' score to indicate whether the answer is grounded in / supported by a set of facts. \n
    Provide the binary score as a JSON with a single key 'score' and no preamble or explanation.""",
    input_variables=["generation", "documents"]
)

hallucination_grader = hallucination_prompt | hallucination_model | JsonOutputParser()
hallucination_grader.invoke(
    {
        "documents" : docs,
        "generation" : generation
    }
)

Answer Grader

In [None]:

grader_model = ChatOllama(model=local_llm, format="json", temperature=0)

grader_prompt = PromptTemplate(
    template="""You are a grader assessing whether an answer is useful to resolve a question. \n 
    Here is the answer:
    \n ------- \n
    {generation} 
    \n ------- \n
    Here is the question: {question}
    Give a binary score 'yes' or 'no' to indicate whether the answer is useful to resolve a question. \n
    Provide the binary score as a JSON with a single key 'score' and no preamble or explanation.""",
    input_variables=["generation", "question"],
    )

answer_grader = grader_prompt | grader_model | JsonOutputParser()
answer_grader.invoke(
    {
        "question" : question,
        "generation" : generation
    }
)

Question Re-Writer

In [None]:
re_writer_model = ChatOllama(model = local_llm, temperature=0)

re_write_prompt = PromptTemplate(
        template="""You a question re-writer that converts an input question to a better version that is optimized \n 
     for vectorstore retrieval. Look at the initial and formulate an improved question. \n
     Here is the initial question: \n\n {question}. Improved question with no preamble: \n """,
    input_variables=["generation", "question"],
)

question_re_writer = re_write_prompt | re_writer_model | StrOutputParser()
question_re_writer.invoke(
    {
        "question" : question
    }
)

Graph

Graph State

In [14]:
from typing import List
from typing_extensions import TypedDict

class GraphState(TypedDict):
    """
    Represents the state of our graph.

    Attributes:
        question: question
        generation: LLM generation
        documents: list of documents
    """

    question: str
    generation:str
    documents:List[str]

Nodes

In [15]:
##Retrieve
def retrieve(state):
    """
    Retrieve documents

    Args:
        state (dict): The current graph state

    Returns:
        state (dict): New key added to state, documents, that contains retrieved documents
    """
    print("---RETRIEVE---")
    question = state["question"]
    
    documents = retriever.invoke(question)
    return{"documents":documents, "question":question}

##Generate
def generate(state):
    """
    Generate answer

    Args:
        state (dict): The current graph state

    Returns:
        state (dict): New key added to state, generation, that contains LLM generation
    """
    print("---GENERATE---")
    question = state["question"]
    documents = state["documents"]

    generation = rag_chain.invoke({
        "context" : documents,
        "question" : question
    })
    return {"documents" : documents,"question" : question,"generation" : generation}

def grade_doduments(state):
    """
    Determines whether the retrieved documents are relevant to the question.

    Args:
        state (dict): The current graph state

    Returns:
        state (dict): Updates documents key with only filtered relevant documents
    """

    print("---CHECK DOCUMENT RELEVANCE TO QUESTION---")
    question = state['question']
    documents = state['documents']

    filtered_docs = []
    for d in documents:
        score = retriever_grader.invoke(
            {
                "question" : question,
                "document" : documents
            }
        )
        grade = score["score"]
        if grade == 'yes':
            print("---GRADE: DOCUMENT RELEVANT---")
            filtered_docs.append(d)
        else:
            print("---GRADE: DOCUMENT NOT RELEVANT---")
            continue
    return {"documents" : filtered_docs, "question" : question}

def transform_query(state):
    """
    Transform the query to produce a better question.

    Args:
        state (dict): The current graph state

    Returns:
        state (dict): Updates question key with a re-phrased question
    """

    print("---TRANSFORM QUERY---")
    question = state["question"]
    documents = state["documents"]

    #re-write question
    better_question = question_re_writer.invoke({"question":question})
    return {"documents":documents, "question":better_question}

Edges

In [16]:
def decide_to_generate(state):
    """
    Determines whether to generate an answer, or re-generate a question.

    Args:
        state (dict): The current graph state

    Returns:
        str: Binary decision for next node to call
    """

    print("---ASSESS GRADED DOCUMENTS---")
    state["question"]
    filtered_documents = state["documents"]

    if not filtered_documents:
        print(
            "---DECISION: ALL DOCUMENTS ARE NOT RELEVANT TO QUESTION, TRANSFORM QUERY---"
        )
        return "transform_query"
    else:
        print("---DECISION: GENERATE---")
        return "generate"
    
def grade_generation_v_documents_and_question(state):
    """
    Determines whether the generation is grounded in the document and answers question.

    Args:
        state (dict): The current graph state

    Returns:
        str: Decision for next node to call
    """

    print("---CHECK HALLICINATION---")
    question = state["question"]
    documents = state["documents"]
    generation = state["generation"]

    score = hallucination_grader.invoke(
        {
            "documents" : documents,
            "generation" : generation
        }
    )
    grade = score['score']

    if grade == "yes":
        print("---DECISION: GENERATION IS GROUNDED IN DOCUMENTS---")
        print("---GRADE GENERATION VS QUESTION---")
        score = answer_grader.invoke({
            "question":question,
            "generation":generation
        })
        grade = score["score"]

        if grade == "yes":
            print("---DECISION: GENERATION ADDRESSES QUESTION---")
            return "useful"
        else:
            print("___DECISION: GENERATION DOEST NOT ADDRESS QUESTION---")
            return "not useful"
    else:
        print("---DECISION: GENERATION IS NOT GROUNDED IN DOCUMENTS, RE-TRY---")
        return "not supported"


Build Graph

In [17]:
from langgraph.graph import END, StateGraph, START

workflow = StateGraph(GraphState)

#nodes
workflow.add_node("retrieve", retrieve)
workflow.add_node("grade_documents", grade_doduments)
workflow.add_node("generate", generate)
workflow.add_node("transform_query", transform_query)

#edges
workflow.add_edge(START, "retrieve")
workflow.add_edge("retrieve", "grade_documents")
workflow.add_conditional_edges(
    "grade_documents",
    decide_to_generate,
    {
        "transform_query":"transform_query",
        "generate": "generate"
    }
)
workflow.add_edge("transform_query", "retrieve")
workflow.add_conditional_edges(
    "generate",
    grade_generation_v_documents_and_question,
    {
        "not supported":"generate",
        "useful":END,
        "not useful": "transform_query"
    }
)

app = workflow.compile()

In [None]:
from IPython.display import Image, display
display(Image(app.get_graph().draw_mermaid_png()))

Run

In [None]:
from pprint import pprint

inputs = {
    "question" : "assassin's creed"
}

for output in app.stream(inputs):
    for key, value in output.items():
        pprint(f"Node '{key}:")
    pprint("\n---\n")
pprint(value["generation"])

In [None]:
from pprint import pprint

inputs = {
    "question" : "what is the gameplay of the game?"
}

for output in app.stream(inputs):
    for key, value in output.items():
        pprint(f"Node '{key}:")
    pprint("\n---\n")
pprint(value["generation"])