In [1]:
import os
from getpass import getpass
from dotenv import load_dotenv

from langchain_groq import ChatGroq

llm = ChatGroq(model="openai/gpt-oss-120b",
            temperature = 0.2)

load_dotenv()

True

In [2]:
from langchain_core.prompts import ( 
                                    SystemMessagePromptTemplate,
                                    HumanMessagePromptTemplate,
                                    MessagesPlaceholder,
                                    ChatPromptTemplate )

system_prompt = "You are a helpful assistant called Zeta."

prompt_template = ChatPromptTemplate.from_messages([
    SystemMessagePromptTemplate.from_template(system_prompt),
    MessagesPlaceholder(variable_name="history"),
    HumanMessagePromptTemplate.from_template("{query}"),
])

In [3]:
pipeline = prompt_template | llm

In [4]:
from langchain_core.chat_history import InMemoryChatMessageHistory

chat_map = {}

def get_chat_history(session_id: str) -> InMemoryChatMessageHistory:
    if session_id not in chat_map:
        # if session ID doesn't exist, create a new chat history
        chat_map[session_id] = InMemoryChatMessageHistory()
        
    return chat_map[session_id]

In [8]:
from langchain_core.runnables.history import RunnableWithMessageHistory
from langchain_core.runnables import ConfigurableFieldSpec

pipeline_with_history = RunnableWithMessageHistory(
    pipeline,
    get_session_history=get_chat_history,
    input_messages_key="query",
    history_messages_key="history",
    history_factory_config=[
        ConfigurableFieldSpec(
            id="session_id",
            annotation=str,
            name="Session ID",
            description="The session ID to use for the chat history",
            default="id_default",
        ),
        # ConfigurableFieldSpec(
        #     id="k",
        #     annotation=int,
        #     name="k",
        #     description="The number of messages to keep in the history",
        #     default=4,
        # )
    ]
)

In [9]:
pipeline_with_history.invoke(
    {"query": "Hi, my name is James"},
    config={"session_id": "id_123"}
)

AIMessage(content='Nice to meet you again, James! How can I help you today?', additional_kwargs={'reasoning_content': 'The user says "Hi, my name is James". The assistant should respond appropriately, maybe greet again. There\'s no conflict. Just respond.'}, response_metadata={'token_usage': {'completion_tokens': 53, 'prompt_tokens': 141, 'total_tokens': 194, 'completion_time': 0.109363555, 'completion_tokens_details': {'reasoning_tokens': 29}, 'prompt_time': 0.005644656, 'prompt_tokens_details': None, 'queue_time': 0.019078482, 'total_time': 0.115008211}, 'model_name': 'openai/gpt-oss-120b', 'system_fingerprint': 'fp_96d96a151c', 'service_tier': 'on_demand', 'finish_reason': 'stop', 'logprobs': None, 'model_provider': 'groq'}, id='lc_run--019c82c6-34c6-7f82-95be-3874fb9a93dd-0', tool_calls=[], invalid_tool_calls=[], usage_metadata={'input_tokens': 141, 'output_tokens': 53, 'total_tokens': 194, 'output_token_details': {'reasoning': 29}})

In [10]:
pipeline_with_history.invoke(
    {"query": "What is my name again?"},
    config={"session_id": "id_123"}
)

AIMessage(content='Your name is James.', additional_kwargs={'reasoning_content': 'The user is asking "What is my name again?" The assistant should answer "Your name is James." There\'s no policy violation. Just answer.'}, response_metadata={'token_usage': {'completion_tokens': 44, 'prompt_tokens': 172, 'total_tokens': 216, 'completion_time': 0.090729483, 'completion_tokens_details': {'reasoning_tokens': 30}, 'prompt_time': 0.006564051, 'prompt_tokens_details': None, 'queue_time': 0.017671839, 'total_time': 0.097293534}, 'model_name': 'openai/gpt-oss-120b', 'system_fingerprint': 'fp_87b3c396db', 'service_tier': 'on_demand', 'finish_reason': 'stop', 'logprobs': None, 'model_provider': 'groq'}, id='lc_run--019c82c6-379e-7112-9941-c666ba6784bb-0', tool_calls=[], invalid_tool_calls=[], usage_metadata={'input_tokens': 172, 'output_tokens': 44, 'total_tokens': 216, 'output_token_details': {'reasoning': 30}})

In [11]:
from pydantic import BaseModel, Field
from langchain_core.chat_history import BaseChatMessageHistory
from langchain_core.messages import BaseMessage

class BufferWindowMessageHistory(BaseChatMessageHistory, BaseModel):
    messages: list[BaseMessage] = Field(default_factory=list)
    k: int = Field(default_factory=int)

    def __init__(self, k: int):
        super().__init__(k=k)
        print(f"Initializing BufferWindowMessageHistory with k={k}")

    def add_messages(self, messages: list[BaseMessage]) -> None:
        """Add messages to the history, removing any messages beyond
        the last `k` messages.
        """
        self.messages.extend(messages)
        self.messages = self.messages[-self.k:]

    def clear(self) -> None:
        """Clear the history."""
        self.messages = []
        
        
chat_map = {}
def get_chat_history(session_id: str, k: int = 4) -> BufferWindowMessageHistory:
    print(f"get_chat_history called with session_id={session_id} and k={k}")
    if session_id not in chat_map:
        # if session ID doesn't exist, create a new chat history
        chat_map[session_id] = BufferWindowMessageHistory(k=k)
    # remove anything beyond the last
    return chat_map[session_id]

In [12]:
from langchain_core.messages import SystemMessage


class ConversationSummaryMessageHistory(BaseChatMessageHistory, BaseModel):
    messages: list[BaseMessage] = Field(default_factory=list)
    llm: ChatGroq = Field(default_factory=ChatGroq)

    def __init__(self, llm: ChatGroq):
        super().__init__(llm=llm)

    def add_messages(self, messages: list[BaseMessage]) -> None:
        """Add messages to the history.
        """
        self.messages.extend(messages)
        
        # construct the summary chat messages
        # =====================================
        summary_prompt = ChatPromptTemplate.from_messages([
            SystemMessagePromptTemplate.from_template(
                "Given the existing conversation summary and the new messages, "
                "generate a new summary of the conversation. Ensuring to maintain "
                "as much relevant information as possible."
            ),
            HumanMessagePromptTemplate.from_template(
                "Existing conversation summary:\n{existing_summary}\n\n"
                "New messages:\n{messages}"
            )
        ])
        # format the messages and invoke the LLM
        new_summary = self.llm.invoke(
            summary_prompt.format_messages(
                existing_summary=self.messages.content,
                messages=[x.content for x in messages]
            )
        )
        # replace the existing history with a single system summary message
        self.messages = [SystemMessage(content=new_summary.content)]

    def clear(self) -> None:
        """Clear the history."""
        self.messages = []
        
        
chat_map = {}
def get_chat_history(session_id: str, llm: ChatGroq) -> ConversationSummaryMessageHistory:
    if session_id not in chat_map:
        # if session ID doesn't exist, create a new chat history
        chat_map[session_id] = ConversationSummaryMessageHistory(llm=llm)
    # return the chat history
    return chat_map[session_id]