### LangChain RAG with chat history and streaming

#### Dependencies

We'll use the following packages:

In [1]:
%pip install --upgrade --quiet langchain langchain-community langchain-openai python-dotenv

Note: you may need to restart the kernel to use updated packages.


#### Load API keys into environment variables

In [2]:
from dotenv import load_dotenv
load_dotenv()

True

#### Define LLM

In [3]:
from langchain_openai import ChatOpenAI

llm = ChatOpenAI(model='gpt-4o')

#### Construct retriever and populate vector store

In [4]:
'''
import bs4
from langchain_chroma import Chroma
from langchain_community.document_loaders import WebBaseLoader
from langchain_openai import OpenAIEmbeddings
from langchain_text_splitters import RecursiveCharacterTextSplitter

loader = WebBaseLoader(
  web_paths=('https://lilianweng.github.io/posts/2023-06-23-agent/',),
  bs_kwargs=dict(
    parse_only=bs4.SoupStrainer(class_=('post-content', 'post-title', 'post-header'))
  )
)
docs = loader.load()

text_splitter = RecursiveCharacterTextSplitter(chunk_size=1000, chunk_overlap=200)
splits = text_splitter.split_documents(docs)
vectorstore = Chroma.from_documents(documents=splits, embedding=OpenAIEmbeddings())
retriever = vectorstore.as_retriever()
'''


USER_AGENT environment variable not set, consider setting it to identify your requests.


#### Get Pinecone vector store 

In [None]:
import os
from langchain_pinecone import PineconeVectorStore
from langchain_openai import OpenAIEmbeddings
from pinecone import Pinecone

model_name = 'text-embedding-ada-002'  
index_name = 'schh'
text_field = 'text'

pc = Pinecone(api_key=os.getenv('PINECONE_API_KEY'))
index = pc.Index(index_name)
embeddings = OpenAIEmbeddings( model=model_name, openai_api_key=os.getenv('OPENAPI_API_KEY') )
vectorstore = PineconeVectorStore( index, embeddings, text_field )  
retriever = vectorstore.as_retriever()


#### Contextualize question

In [10]:
from langchain_core.prompts import ChatPromptTemplate, MessagesPlaceholder
from langchain.chains import create_history_aware_retriever

contextualize_q_system_prompt = '''Given a chat history and the latest user question \
which might reference context in the chat history, formulate a standalone question \
which can be understood without the chat history. Do NOT answer the question, \
just reformulate it if needed and otherwise return it as is.'''

contextualize_q_prompt = ChatPromptTemplate.from_messages(
    [
        ('system', contextualize_q_system_prompt),
        MessagesPlaceholder('chat_history'),
        ('human', '{input}'),
    ]
)
history_aware_retriever = create_history_aware_retriever(
    llm, retriever, contextualize_q_prompt
)


#### Answer question

In [11]:
from langchain.chains.combine_documents import create_stuff_documents_chain
from langchain.chains import create_retrieval_chain

system_prompt = '''You are an assistant for question-answering tasks. \
Use the following pieces of retrieved context to answer the question. \
If you don't know the answer, just say that you don't know. \
Use three sentences maximum and keep the answer concise.\

{context}'''

qa_prompt = ChatPromptTemplate.from_messages(
  [
    ('system', system_prompt),
    MessagesPlaceholder(variable_name='chat_history'),
    ('human', '{input}'),
  ]
)

question_answer_chain = create_stuff_documents_chain(llm, qa_prompt)

rag_chain = create_retrieval_chain(history_aware_retriever, question_answer_chain)


#### Statefully manage chat history

In [12]:

import json
from langchain_core.runnables.history import RunnableWithMessageHistory
from langchain_core.chat_history import BaseChatMessageHistory
from langchain_community.chat_message_histories import ChatMessageHistory

store = {}

def get_session_history(session_id: str) -> BaseChatMessageHistory:
    if session_id not in store:
        store[session_id] = ChatMessageHistory()
    return store[session_id]

conversational_rag_chain = RunnableWithMessageHistory(
    rag_chain,
    get_session_history,
    input_messages_key='input',
    history_messages_key='chat_history',
    output_messages_key='answer',
).with_config(tags=['main_chain'])

def print_response(resp):
  as_dict = {
    'input': resp['input'],
    'chat_history': [doc.model_dump() for doc in resp['chat_history']],
    'context': [doc.model_dump() for doc in resp['context']],
    'answer': resp['answer']
  }
  print(json.dumps(as_dict, indent=2) + '\n')

#### Output streamer

In [8]:
from langchain_core.messages import AIMessageChunk

async def generate_chat_events(message, session_id):
  
  def serialize_aimessagechunk(chunk):
    if isinstance(chunk, AIMessageChunk):
      return chunk.content
    else:
      raise TypeError(f'Object of type {type(chunk).__name__} is not correctly formatted for serialization')
  
  try:
    async for event in conversational_rag_chain.astream_events(message, version='v1', config={'configurable': {'session_id': session_id}} ):
      # print(event['tags'], event['event'], event.get('data',{}).get('chunk'))
      # Only get the answer
      sources_tags = ['seq:step:3', 'main_chain']
      if all(value in event['tags'] for value in sources_tags) and event['event'] == 'on_chat_model_stream':
        chunk_content = serialize_aimessagechunk(event['data']['chunk'])
        if len(chunk_content) != 0:
          yield chunk_content
          
  except Exception as e:
    print('error'+ str(e))

#### Example usage

In [13]:
prompt_1 = 'tell me about the golf plans'
sessionid = 'abc123'

resp = conversational_rag_chain.invoke(
    {'input': prompt_1},
    config={
        'configurable': {'session_id': sessionid}
    },
)
print_response(resp)
resp['answer']

{
  "input": "tell me about the golf plans",
  "chat_history": [],
  "context": [
    {
      "id": "f463964dc775b88e7db54b29d7dbe86d286f1c35613b2cea2492105d8d690a32",
      "metadata": {},
      "page_content": "of class changes may result in a fee.\nFitness instruction on all Association property shall be taught by Sun City Hilton Head Community Association staff \nand/or approved contractors only. \nPersons with known medical problems or who are unsure of their physical condition are strongly advised to consult \nwith their physician(s) before engaging in exercise activity.\n5.9. Gazebos/Shade Structures \nThe gazebos are available for use on a first come, first served basis, located near the tennis courts at the Barataria \noutdoor pool deck and at Lake Somerset. All shade structures are available on a first come, first served basis.  \n5.10. Golf Courses \nLocations\n\u2022 Hidden Cypress\n\u2022 Okatie Creek\n\u2022 Argent Lakes \nOnly residents and their invited guests may play 

"The golf courses at Sun City Hilton Head, including Hidden Cypress, Okatie Creek, and Argent Lakes, are accessible only to residents and their invited guests, with no public access. Reservations for tee times can be made through the Chelsea Reservation System on the community's website."

In [12]:
prompt_2 = 'Is there an appeal process for this?'

conversational_rag_chain.invoke(
    {'input': prompt_2},
    config={'configurable': {'session_id': sessionid}},
)["answer"]

'Yes, if an application is denied, homeowners can revise the application or appeal the decision by contacting the Modifications Coordinator listed in their denial letter for next steps.'

#### Example usage with streaming output

In [11]:
async for event in generate_chat_events({'input': prompt_1, 'chat_history': []}, sessionid):
    print(event)


Task
 decomposition
 is
 the
 process
 of
 breaking
 down
 a
 complex
 task
 into
 smaller
,
 simpler
 steps
 to
 make
 it
 more
 manageable
.
 This
 approach
 enhances
 model
 performance
 by
 allowing
 for
 structured
 reasoning
 and
 easier
 execution
 of
 tasks
.
 Techniques
 such
 as
 Chain
 of
 Thought
 (
Co
T
)
 and
 Tree
 of
 Thoughts
 are
 commonly
 used
 to
 facilitate
 this
 process
.


#### Using with FastAPI

In [None]:
from fastapi import FastAPI
from fastapi.responses import StreamingResponse, Response, FileResponse
from fastapi.middleware.cors import CORSMiddleware
from typing import Optional
import secrets

app = FastAPI()
app.add_middleware(CORSMiddleware, allow_origins=['*'], allow_credentials=True, allow_methods=['*'], allow_headers=['*'])

@app.get('/')
async def root():
  return FileResponse('index.html')

@app.get('/chat/{prompt}')
async def chat(prompt: str, sessionid: Optional[str] = None, stream: Optional[bool] = False):
    sessionid = sessionid or secrets.token_hex(4) # Generates 4 bytes, resulting in an 8-character hex string

    if stream:
        return StreamingResponse(generate_chat_events({'input': prompt, 'chat_history': []}, sessionid), media_type='text/event-stream')
    else:
        resp = conversational_rag_chain.invoke({'input': prompt}, config={'configurable': {'session_id': sessionid}}, )
        # print(json.dumps(store[sessionid].model_dump(), indent=2))
        # print_response(resp)
        return Response(status_code=200, content=resp['answer'], media_type='text/plain')
