In [1]:
# short-term memory: thread-based using InMemorySaver
# long-term memory: application level using InMemoryStore


In [2]:
# memory types
# semantic - facts
# episodic - experiences
# procedural - instructions

In [1]:
from dotenv import load_dotenv
import os

load_dotenv()

GEMINI_API_KEY = os.getenv("GEMINI_API_KEY")

#### Add short term memory

In [22]:
from langgraph.graph import StateGraph, START, END, MessagesState
from langgraph.checkpoint.memory import InMemorySaver
import uuid

checkpoint = InMemorySaver()

def call_model(state: MessagesState):
    state["messages"] = "hello, you call call_model functions"
    return {"messages": state["messages"]}

workflow = (
    StateGraph(MessagesState)
    .add_node("call_model", call_model)
    .add_edge(START, "call_model")
    .add_edge("call_model", END)
    .compile(checkpointer=checkpoint)
)

In [23]:
config = {
    "configurable":{
        "thread_id" : str(uuid.uuid4())
    }
}

In [39]:
for chunk in workflow.stream({"messages":[{"role": "user", "content": "what's my name?"}]}, config=config, stream_mode="values"):
    chunk["messages"][-1].pretty_print()


what's my name?

hello, you call call_model functions


#### subgraph memory management - short memory management 

In [40]:
from langgraph.graph import START, StateGraph
from langgraph.checkpoint.memory import InMemorySaver
from typing import TypedDict

class State(TypedDict):
    foo: str

# Subgraph

def subgraph_node_1(state: State):
    return {"foo": state["foo"] + "bar"}

subgraph_builder = StateGraph(State)
subgraph_builder.add_node(subgraph_node_1)
subgraph_builder.add_edge(START, "subgraph_node_1")
subgraph = subgraph_builder.compile(checkpointer=True) # define internal memory and checkpoint management 

# Parent graph

builder = StateGraph(State)
builder.add_node("node_1", subgraph)
builder.add_edge(START, "node_1")

checkpointer = InMemorySaver()
graph = builder.compile(checkpointer=checkpointer)

#### Read short term memory 

In [44]:
from langgraph.graph import StateGraph, START, END, MessagesState
from langgraph.checkpoint.memory import InMemorySaver

checkpoint = InMemorySaver()

class CustomState(MessagesState):
    user_id: str

def call_model(state: CustomState):
    print(state["user_id"])
    return {"messages": f"hello {state["user_id"]}"}

workflow = (
    StateGraph(CustomState)
    .add_node("call_model", call_model)
    .add_edge(START, "call_model")
    .compile(checkpointer=checkpoint)
)


workflow.invoke({"messages":["hello"], "user_id": "user12"}, config=config)


user12


{'messages': [HumanMessage(content='hello', additional_kwargs={}, response_metadata={}, id='f7274c10-d8ae-44f0-9f4d-f0808b035fe2'),
  HumanMessage(content='hello user12', additional_kwargs={}, response_metadata={}, id='7cdf1dcd-c90e-44a2-942a-f801437fe01b')],
 'user_id': 'user12'}

#### Write short term memory

In [45]:
# def update_user_info(
#     tool_call_id: Annotated[str, InjectedToolCallId],
#     config: RunnableConfig
# ) -> Command:
#     """Look up and update user info."""
#     user_id = config["configurable"].get("user_id")
#     name = "John Smith" if user_id == "user_123" else "Unknown user"
#     return Command(update={
#         "user_name": name,
#         # update the message history
#         "messages": [
#             ToolMessage(
#                 "Successfully looked up user information",
#                 tool_call_id=tool_call_id
#             )
#         ]
#     })

#### Add long term memory 

In [48]:
# use inmemorystore 
from langgraph.store.memory import InMemoryStore

store = InMemoryStore()

# use in compile graph
# graph.compile(store=store)

#### Read long term memory

In [77]:
from langgraph.graph import StateGraph, START, END, MessagesState
from langgraph.store.memory import InMemoryStore
from langgraph.checkpoint.memory import InMemorySaver
from langgraph.config import get_store

checkpoint = InMemorySaver()
store = InMemoryStore()


store.put(  
    ("users",),  
    "1",  
    {
        "name": "John Smith",
        "language": "English",
    } 
)

config = {
    "configurable":{
        "thread_id": str(uuid.uuid4()),
        "user_id": "1"
    }
}


def call_model(state: MessagesState):
    store = get_store() 
    user_id = config["configurable"].get("user_id")
    user_info = store.get(("users",), user_id) 
    print(user_info)
    return {"messages": f"hello, call_model {state["messages"][-1].content}"}

graph = (
    StateGraph(MessagesState)
    .add_node("call_model", call_model)
    .add_edge(START, "call_model")
    .compile(store=store, checkpointer=checkpoint)
)

In [78]:
for chunk in graph.stream({"messages": "call fucntion"}, config=config, stream_mode='updates'):
    print(chunk)

Item(namespace=['users'], key='1', value={'name': 'John Smith', 'language': 'English'}, created_at='2025-08-05T08:40:37.062542+00:00', updated_at='2025-08-05T08:40:37.062543+00:00')
{'call_model': {'messages': 'hello, call_model call fucntion'}}


#### Write long term memory 

In [80]:
from langgraph.graph import StateGraph, START, END, MessagesState
from langgraph.checkpoint.memory import InMemorySaver
from langgraph.store.memory import InMemoryStore
from typing import Annotated, Literal
from langgraph.config import RunnableConfig, get_store

checkpoint = InMemorySaver()
store = InMemoryStore()

class CustomState(MessagesState):
    node_choise: str
    save_obj: str

def selector(state: CustomState):
    print(f"choise: {state["node_choise"]}")
    return {"messages": "selector is execute", "node_choise": state["node_choise"]}

def write_in_memory(state: CustomState, config: RunnableConfig):
    store = get_store() 
    user_id = config["configurable"].get("user_id")
    store.put(("users",), user_id, state["save_obj"]) 
    return {"messages": "write in memory"}

def read_from_memory(state: CustomState, config: RunnableConfig):
    store = get_store() 
    user_id = config["configurable"].get("user_id")
    user_info = store.get(("users",), user_id) 
    print(user_info)
    return {"messages": "read from memory"}

def redirect_node(state: CustomState) -> Literal["write_in_memory", "read_from_memory"]:
    return state["node_choise"]

graph = (
    StateGraph(CustomState)
    .add_node("selector", selector)
    .add_node("write_in_memory", write_in_memory)
    .add_node("read_from_memory", read_from_memory)
    .add_edge(START, "selector")
    .add_conditional_edges("selector", redirect_node)
    .compile(checkpointer=checkpoint, store=store)
)

In [81]:
config = {
    "configurable":{
        "thread_id": str(uuid.uuid4),
        "user_id": "1"
    }
}

In [82]:
for chunk in graph.stream({"messages":["hello"], "node_choise":"write_in_memory", "save_obj":"sample_save"}, config=config, stream_mode="updates"):
    print(chunk)

choise: write_in_memory
{'selector': {'messages': 'selector is execute', 'node_choise': 'write_in_memory'}}
{'write_in_memory': {'messages': 'write in memory'}}


In [83]:
for chunk in graph.stream({"messages":["hello"], "node_choise":"read_from_memory"}, config=config, stream_mode="updates"):
    print(chunk)

choise: read_from_memory
{'selector': {'messages': 'selector is execute', 'node_choise': 'read_from_memory'}}
Item(namespace=['users'], key='1', value='sample_save', created_at='2025-08-05T09:09:49.718009+00:00', updated_at='2025-08-05T09:09:49.718010+00:00')
{'read_from_memory': {'messages': 'read from memory'}}


#### Use Semantic Search 

In [None]:
from langchain_google_genai import GoogleGenerativeAIEmbeddings

embeddings = GoogleGenerativeAIEmbeddings(model="models/gemini-embedding-001", google_api_key=GEMINI_API_KEY)

store = InMemoryStore(
    index={
        "embed": embeddings,
        "dims": 3072,
    }
)


In [None]:
store.put(("user_123", "memories"), "1", {"text": "I hate pizza"})
store.put(("user_123", "memories"), "2", {"text": "I am a plumber, and i like apple"})

In [99]:
items = store.search(
    ("user_123", "memories"), query="I'm hungry", limit=2
)
items

[Item(namespace=['user_123', 'memories'], key='2', value={'text': 'I like apple'}, created_at='2025-08-05T10:01:32.237361+00:00', updated_at='2025-08-05T10:01:32.237365+00:00', score=0.7750916233947487),
 Item(namespace=['user_123', 'memories'], key='1', value={'text': 'I hate pizza'}, created_at='2025-08-05T10:01:30.989564+00:00', updated_at='2025-08-05T10:01:30.989566+00:00', score=0.7625800174036598)]

#### Manage short-term memory

In [None]:
# trim messages
from langchain_core.messages.utils import (
    trim_messages, count_tokens_approximately
)

def pre_model_processing(state):
    trimmed_messages = trim_messages(
        state["messages"],
        strategy="last",
        token_counter=count_tokens_approximately,
        max_tokens=384,
        start_on="human",
        end_on=("human", "tool")
    )

    return {"llm_input_messages": trimmed_messages}

# custom strategies - message filtering

In [100]:
# delete messages
from langchain_core.messages import RemoveMessage

# to remove specific message
def delete_message(state):
    messages = state["messages"]
    if len(messages) > 2:
        return {"messages": [RemoveMessage(id=m.id) for m in messages[:2]]}
    
from langgraph.graph.message import REMOVE_ALL_MESSAGES

# remove all messages
def delete_all_messages(state):
    return {"messages": [RemoveMessage(id=REMOVE_ALL_MESSAGES)]}

In [None]:
# summarize messages
from langgraph.graph import MessagesState
from langchain_core.messages import HumanMessage

class State(MessagesState):
    summary: str

def summarize_conversation(state: State):
    summary = state.get("summary", "")

    if summary:
        summary_message = (
            f"This is a summary of the conversation to date: {summary}\n\n"
            "Extend the summary by taking into account the new messages above:"
        )
    else:
        summary_message = "Create a summary of the conversation above:"

    messages = state["messages"] + [HumanMessage(content=summary_message)]
    # response = llm.invoke(messages)
    response = "demo"

    delete_messages = [RemoveMessage(id=m.id) for m in state["messages"][:-2]]
    return {"summary": response.content, "messages": delete_messages}


# for agent 
# from langmem.short_term import SummarizationNode, RunningSummary

# class State(AgentState):
    # context: dict[str, RunningSummary]

# summarization_node = SummarizationNode( 
#     token_counter=count_tokens_approximately,
#     model=model,
#     max_tokens=384,
#     max_summary_tokens=128,
#     output_messages_key="llm_input_messages",
# )

In [102]:
# manage checkpoints

# func api
# graph.get_state(config)
# list(graph.get_state_history(config))

# checkpoint api
# checkpointer.get_tuple(config)
# list(checkpointer.list(config))

# delete all checkpoint for thread
# checkpointer.delete_thread(thread_id)

