# How to connect a chat bot to your memory service

In [1]:
from langgraph_sdk import get_client

# Update to your URL. Copy this from page of ryour LangGraph Deployment
deployment_url = "https://langmem-dos-ebc056487539596f92a0abf2670dd66f.default.us.langgraph.app" #"http://localhost:56446"

client = get_client(url=deployment_url)

# Create an assistant

In [2]:
from pydantic import BaseModel, Field
from datetime import datetime
from typing import Optional, Literal

class User(BaseModel):
    """Store all important information about a user here."""
    preferred_name: Optional[str] = None
    current_age: Optional[str] = None
    skills: list[str] = Field(description="Various skills the user has.")
    favorite_foods: Optional[list[str]]
    last_updated: datetime
    core_memories: list[str] = Field(descriptions="Important events and memories that shape the user's identity.")
    topics_discussed: list[str] = Field(description="topics the user has discussed previously")
    other_preferences: list[str] = Field(description="Other preferences the user has expressed that informs how you should interact with them.")
    relationships: list[str] = Field(description="Store information about friends, family members, coworkers, and other important relationships the user has here. Include relevant information about htem.")
    
    
                                        

def create_function(model, description: str = ""):
    return {"name": model.__name__, "description": description or model.__doc__ or "", "parameters": model.model_json_schema()}

def create_memory_function(model, description: str = "", custom_instructions: str = "" ,kind: Literal["patch", "insert"] = "patch"):
    return {
        "function": create_function(model, description=description),
        "system_prompt": custom_instructions,
        "update_mode": kind,
    }
mem_func = create_memory_function(User)

In [3]:
assistant = await client.assistants.create("memory", {"configurable": {"schemas": {"User": mem_func}}})
    

In [30]:
# await client.assistants.search()

In [28]:
chat_ass = await client.assistants.create("chat", {"configurable": 
                                                   {"user_id": "a_user", 
                                                    "mem_assistant_id": "d2a72962-2208-4882-a136-d9d495d35038"
                                                   }})
    

In [26]:
assistant['assistant_id']

'd2a72962-2208-4882-a136-d9d495d35038'

## Example Chat Bot

The bot fetches user memories my semantic similarity, templates them, then responds!

In [26]:
import os
import uuid
from datetime import datetime, timezone
from typing import List, Optional

import langsmith
from langchain.chat_models import init_chat_model
from langchain_core.messages import AnyMessage
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.runnables import RunnableConfig
from langgraph.checkpoint import MemorySaver
from langgraph.graph import START, StateGraph, add_messages
from langgraph_sdk import get_client
from pydantic.v1 import BaseModel, Field
from typing_extensions import Annotated, TypedDict

from memory_service import (
    _constants as constants,
)
from memory_service import (
    _settings as settings,
)
from memory_service import (
    _utils as utils,
)


class ChatState(TypedDict):
    """The state of the chatbot."""

    messages: Annotated[List[AnyMessage], add_messages]
    user_memories: List[dict]


class ChatConfigurable(TypedDict):
    """The configurable fields for the chatbot."""

    user_id: str
    thread_id: str
    memory_service_url: str = ""
    model: str
    delay: Optional[float]


def _ensure_configurable(config: RunnableConfig) -> ChatConfigurable:
    """Ensure the configuration is valid."""
    return ChatConfigurable(
        user_id=config["configurable"]["user_id"],
        thread_id=config["configurable"]["thread_id"],
        mem_assistant_id=config["configurable"]["mem_assistant_id"],
        memory_service_url=config["configurable"].get(
            "memory_service_url", os.environ.get("MEMORY_SERVICE_URL", "")
        ),
        model=config["configurable"].get(
            "model", "accounts/fireworks/models/firefunction-v2"
        ),
        delay=config["configurable"].get("delay", 60),
    )


PROMPT = ChatPromptTemplate.from_messages(
    [
        (
            "system",
            "You are a helpful and friendly chatbot. Get to know the user!"
            " Ask questions! Be spontaneous!"
            "{user_info}\n\nSystem Time: {time}",
        ),
        ("placeholder", "{messages}"),
    ]
).partial(
    time=lambda: datetime.now(timezone.utc).strftime("%Y-%m-%d %H:%M:%S"),
)


@langsmith.traceable
def format_query(messages: List[AnyMessage]) -> str:
    """Format the query for the user's memories."""
    # This is quite naive :)
    return " ".join([str(m.content) for m in messages if m.type == "human"][-5:])


async def query_memories(state: ChatState, config: RunnableConfig) -> ChatState:
    """Query the user's memories."""
    configurable: ChatConfigurable = config["configurable"]
    user_id = configurable["user_id"]
    index = utils.get_index()
    embeddings = utils.get_embeddings()

    query = format_query(state["messages"])
    vec = await embeddings.aembed_query(query)
    # You can also filter by memory type, etc. here.
    with langsmith.trace(
        "pinecone_query", inputs={"query": query, "user_id": user_id}
    ) as rt:
        response = index.query(
            vector=vec,
            filter={"user_id": {"$eq": str(user_id)}},
            include_metadata=True,
            top_k=10,
            namespace=settings.SETTINGS.pinecone_namespace,
        )
        rt.outputs["response"] = response
    memories = []
    if matches := response.get("matches"):
        memories = [m["metadata"][constants.PAYLOAD_KEY] for m in matches]
    return {
        "user_memories": memories,
    }


@langsmith.traceable
def format_memories(memories: List[dict]) -> str:
    """Format the user's memories."""
    if not memories:
        return ""
    # Note Bene: You can format better than this....
    memories = "\n".join(str(m) for m in memories)
    return f"""

## Memories

You have noted the following memorable events from previous interactions with the user.
<memories>
{memories}
</memories>
"""


async def bot(state: ChatState, config: RunnableConfig) -> ChatState:
    """Prompt the bot to resopnd to the user, incorporating memories (if provided)."""
    configurable = _ensure_configurable(config)
    model = init_chat_model(configurable["model"])
    chain = PROMPT | model
    memories = format_memories(state["user_memories"])
    m = await chain.ainvoke(
        {
            "messages": state["messages"],
            "user_info": memories,
        },
        config,
    )

    return {
        "messages": [m],
    }


class MemorableEvent(BaseModel):
    """A memorable event."""

    description: str
    participants: List[str] = Field(
        description="Names of participants in the event and their relationship to the user."
    )


async def post_messages(state: ChatState, config: RunnableConfig) -> ChatState:
    """Query the user's memories."""
    configurable = _ensure_configurable(config)
    langgraph_client = get_client(url=configurable["memory_service_url"])
    thread_id = config["configurable"]["thread_id"]
    # Hash "memory_{thread_id}" to get a new uuid5 for the memory id
    memory_thread_id = uuid.uuid5(uuid.NAMESPACE_URL, f"memory_{thread_id}")
    try:
        await langgraph_client.threads.get(thread_id=memory_thread_id)
    except Exception:
        await langgraph_client.threads.create(thread_id=memory_thread_id)

    await langgraph_client.runs.create(
        memory_thread_id,
        assistant_id=configurable["mem_assistant_id"],
        input={
            "messages": state["messages"],  # the service dedupes messages
        },
        config={
            "configurable": {
                "user_id": configurable["user_id"],
            },
        },
        multitask_strategy="rollback",
    )

    return {
        "messages": [],
    }


builder = StateGraph(ChatState, ChatConfigurable)
builder.add_node(query_memories)
builder.add_node(bot)
builder.add_node(post_messages)
builder.add_edge(START, "query_memories")
builder.add_edge("query_memories", "bot")
builder.add_edge("bot", "post_messages")

chat_graph = builder.compile(checkpointer=MemorySaver())

In [27]:
mem_assistant = await client.assistants.create(
    graph_id="memory",
    config={
        "configurable": {
            "delay": 4,  # seconds wait before considering a thread as "completed"
            "schemas": {
                "MemorableEvent": {
                    "system_prompt": "Extract any memorable events from the user's"
                    " messages that you would like to remember.",
                    "update_mode": "insert",
                    "function": MemorableEvent.schema(),
                },
            },
        }
    },
)

In [28]:
# mem_assistant = (await client.assistants.search(graph_id="memory"))[0]

In [29]:
user_id = str(uuid.uuid4())  # more permanent

In [30]:
thread_id = str(uuid.uuid4())  # can adjust
await client.threads.create(thread_id=thread_id)

{'thread_id': '3ff82998-b622-421f-8c8c-4b14d10c17b1',
 'created_at': '2024-06-28T00:42:23.884229+00:00',
 'updated_at': '2024-06-28T00:42:23.884229+00:00',
 'metadata': {},
 'status': 'idle'}

In [31]:
class Chat:
    def __init__(self, user_id: str, thread_id: str):
        self.thread_id = thread_id
        self.user_id = user_id

    async def __call__(self, query: str) -> str:
        chunks = chat_graph.astream_events(
            input={
                "messages": [("user", query)],
            },
            config={
                "configurable": {
                    "user_id": self.user_id,
                    "thread_id": self.thread_id,
                    "memory_service_url": deployment_url,
                    "mem_assistant_id": mem_assistant["assistant_id"],
                    "delay": 4,
                }
            },
            version="v2",
        )
        res = ""
        async for event in chunks:
            if event.get("event") == "on_chat_model_stream":
                tok = event["data"]["chunk"].content
                print(tok, end="")
                res += tok
        return res

In [32]:
chat = Chat(user_id, thread_id)

In [33]:
_ = await chat("Hi there")

Hi! It's nice to meet you. What brings you here today?

In [34]:
_ = await chat(
    "I've been planning a surprise party for my friend steve. "
    "He has been having a rough month and I want it to be special."
)

That's so sweet of you! I'm sure Steve will appreciate the effort you're putting into making him feel special. What's the theme of the party going to be? Has Steve mentioned anything he's been into lately that you could incorporate into the celebration?

In [35]:
_ = await chat(
    "Steve really likes crocheting. Maybe I can do something with that? Or is that dumb... "
)

That's a great idea! Crocheting is a unique interest, and incorporating it into the party could make it really special and personalized to Steve. You could decorate with crocheted items, have a "crochet station" where guests can make their own simple projects, or even have a crochet-themed cake. What do you think Steve's favorite colors or yarn types are? That could help you get started with planning.

In [36]:
_ = await chat("He's also into capoeira...")

Whoa, that's cool! Capoeira is such a dynamic and energetic activity. You could definitely incorporate elements of it into the party to make it more exciting. Maybe you could hire a capoeira instructor to lead a short workshop or demo, or even have a mini "roda" (that's the circle where capoeiristas play, right?) set up for guests to try out some moves. What do you think Steve would think of that?

In [37]:
_ = await chat(
    "Oh that's a cool idea. One time i took classes from this studio nearby. Wonder if they have any recs. "
)

That's a great connection to have! It's always helpful to get recommendations from people who have experience with a particular activity or business. You could reach out to the studio and ask if they know of any instructors who might be available to lead a workshop or demo at the party. They might even have some suggestions for how to incorporate capoeira into the celebration in a way that would be fun and engaging for Steve and the other guests. Do you think you'll reach out to them today, or wait until later in the planning process?

In [38]:
_ = await chat("Idk. Anyways - how are you doing?")

I'm doing great, thanks for asking! I'm just happy to be chatting with you and helping with your party planning. It's always exciting to see people come together to celebrate special occasions. But enough about me - let's get back to Steve's party! What do you think about serving some Brazilian-inspired food and drinks to tie in with the capoeira theme?

In [39]:
_ = await chat("My name is Ken btw")

Nice to meet you, Ken! I'm glad we could chat about Steve's party and get some ideas going. If you need any more help or just want to bounce some ideas off me, feel free to reach out anytime. Good luck with the planning, and I hope Steve has an amazing time!

## Convo 2

Our memory is configured only to consider a thread "ready to process" if has been inactive for a minute.
We'll wait for things to populate

In [40]:
import asyncio

await asyncio.sleep(60)

In [41]:
thread_id_2 = uuid.uuid4()

In [42]:
chat2 = Chat(user_id, thread_id_2)

In [43]:
_ = await chat2("Remember me?")

I remember you! We were planning a surprise party for Steve, and Ken was also involved. How's everything going? Did the party turn out well?

In [44]:
_ = await chat2("wdy remember??")

I remember because I have a special memory book where I keep track of all the fun conversations and events we've shared together! It's like a digital scrapbook, and it helps me remember important details about our chats.

In [45]:
_ = await chat2("Oh planning is going alright!")

That's great to hear! I'm glad to know that the planning is going smoothly. Are there any new developments or updates that you'd like to share about the party? Maybe I can even offer some suggestions or ideas to make it an even more special celebration for Steve!