## Create PostgresSaver manually

In [1]:
import os
from psycopg import Connection
from psycopg_pool import ConnectionPool
from langgraph.checkpoint.postgres import PostgresSaver
from langgraph.graph import StateGraph, START
from typing_extensions import TypedDict

class State(TypedDict):
    messages: list[str]

DB_URI = os.environ.get("DATABASE_URI")

# Use psycopg pool with autocommit and row_factory=dict_row
pool = ConnectionPool(conninfo=DB_URI)

with pool.connection() as conn:  # or using PostgresSaver.from_conn_string
    conn.row_factory = "dict_row"
    conn.autocommit = True  # Enable autocommit for CREATE INDEX CONCURRENTLY
    
    checkpointer = PostgresSaver(conn)
    # On first run, create needed tables
    checkpointer.setup()

    def chat_node(state: State) -> State:
        state["messages"].append("Echo: " + state["messages"][-1])
        return state

    builder = StateGraph(State)
    builder.add_node("chat", chat_node)
    builder.add_edge(START, "chat")
    graph = builder.compile(checkpointer=checkpointer)

    config = {"configurable": {"thread_id": "thread-1"}}
    input_state = {"messages": ["DTT work with PostgresSaver"]}
    result = graph.invoke(input_state, config=config)
    print(result)

    print(graph.get_state(config = {"configurable": {"thread_id": "thread-1"}}))


{'messages': ['DTT work with PostgresSaver', 'Echo: DTT work with PostgresSaver']}
StateSnapshot(values={'messages': ['DTT work with PostgresSaver', 'Echo: DTT work with PostgresSaver']}, next=(), config={'configurable': {'thread_id': 'thread-1', 'checkpoint_ns': '', 'checkpoint_id': '1f07365b-cdcf-643e-8004-e9b0c2825353'}}, metadata={'step': 4, 'source': 'loop', 'parents': {}}, created_at='2025-08-07T08:08:44.596351+00:00', parent_config={'configurable': {'thread_id': 'thread-1', 'checkpoint_ns': '', 'checkpoint_id': '1f07365b-cdc3-6094-8003-30e3b1c44185'}}, tasks=(), interrupts=())


## Create PostgresSave from PostgresSaver.from_conn_string

In [2]:
import os
from langgraph.checkpoint.postgres import PostgresSaver
from langgraph.graph import StateGraph, START
from typing_extensions import TypedDict

class State(TypedDict):
    messages: list[str]

DB_URI = os.environ.get("DATABASE_URI")

with PostgresSaver.from_conn_string(DB_URI) as checkpointer:
    checkpointer.setup() # Ensure tables are created

    def chat_node(state: State) -> State:
        state["messages"].append("Echo: " + state["messages"][-1])
        return state

    builder = StateGraph(State)
    builder.add_node("chat", chat_node)
    builder.add_edge(START, "chat")
    graph = builder.compile(checkpointer=checkpointer)

    config = {"configurable": {"thread_id": "thread-1"}}
    input_state = {"messages": ["DTT work with PostgresSaver"]}
    result = graph.invoke(input_state, config=config)
    print(result)

    print(graph.get_state(config = {"configurable": {"thread_id": "thread-1"}}))


{'messages': ['DTT work with PostgresSaver', 'Echo: DTT work with PostgresSaver']}
StateSnapshot(values={'messages': ['DTT work with PostgresSaver', 'Echo: DTT work with PostgresSaver']}, next=(), config={'configurable': {'thread_id': 'thread-1', 'checkpoint_ns': '', 'checkpoint_id': '1f07365b-f890-6e04-8007-7c7ff607011c'}}, metadata={'step': 7, 'source': 'loop', 'parents': {}}, created_at='2025-08-07T08:08:49.080678+00:00', parent_config={'configurable': {'thread_id': 'thread-1', 'checkpoint_ns': '', 'checkpoint_id': '1f07365b-f887-61c5-8006-97be7274228a'}}, tasks=(), interrupts=())


## Checkpoint with SQLLite

In [17]:
import sqlite3
from langgraph.checkpoint.sqlite import SqliteSaver
from langgraph.graph import StateGraph

builder = StateGraph(int)
builder.add_node("add_one", lambda x: x + 1)
builder.set_entry_point("add_one")
builder.set_finish_point("add_one")
# Create a new SqliteSaver instance
# Note: check_same_thread=False is OK as the implementation uses a lock
# to ensure thread safety.
conn = sqlite3.connect("checkpoints.sqlite", check_same_thread=False)
memory = SqliteSaver(conn)
graph = builder.compile(checkpointer=memory)
config = {"configurable": {"thread_id": "1"}}
graph.get_state(config)
result = graph.invoke(3, config)
graph.get_state(config)


StateSnapshot(values=4, next=(), config={'configurable': {'thread_id': '1', 'checkpoint_ns': '', 'checkpoint_id': '1f073643-ec9a-6f18-8001-f62d1ef86600'}}, metadata={'source': 'loop', 'step': 1, 'parents': {}}, created_at='2025-08-07T07:58:03.581416+00:00', parent_config={'configurable': {'thread_id': '1', 'checkpoint_ns': '', 'checkpoint_id': '1f073643-ec95-67d2-8000-0081759ed203'}}, tasks=(), interrupts=())

## Combine checkpoint and store

In [17]:
from langgraph.store.base import BaseStore
from langgraph.config import RunnableConfig
from langgraph.graph import MessagesState
import uuid

def update_memory(state: MessagesState, config: RunnableConfig, *, store: BaseStore):

    # Get the user id from the config
    user_id = config["configurable"]["user_id"]

    # Namespace the memory
    namespace = (user_id, "user_profile")

    # ... Analyze conversation and create a new memory

    # Create a new memory ID
    memory_id = str(uuid.uuid4())

    # We create a new memory
    store.put(namespace, memory_id, {"username": "dtt", "full_name": "Dinh Trung Thao"})

In [18]:
from langgraph.store.base import BaseStore
from langgraph.config import RunnableConfig
from langgraph.graph import MessagesState

def get_memory(state: MessagesState, config: RunnableConfig, *, store: BaseStore):
    # Get the user id from the config
    user_id = config["configurable"]["user_id"]

    # Namespace the memory
    namespace = (user_id, "user_profile")
    
    # We create a new memory
    userprofile = store.search(namespace, query={"username": "dtt", "full_name": "Dinh Trung Thao"})
    print(userprofile)

In [None]:
import os
from typing_extensions import TypedDict
from langgraph.graph import StateGraph, START, END
from langgraph.checkpoint.postgres import PostgresSaver
from psycopg_pool import ConnectionPool
from langgraph.store.postgres import PostgresStore
import uuid

# Step 1: Define shared state
class State(TypedDict):
    messages: list[str]

# Step 2: Define your DB URI
DB_URI = os.environ.get("DATABASE_URI")

# Step 3: Set up psycopg3 connection pool
pool = ConnectionPool(conninfo=DB_URI)

# Step 5: Define a LangGraph node that uses memory store
def chat_node(state: State, ) -> State:
    last_msg = state["messages"][-1]

    # Store the last message (as a dummy memory entry)
    record_id = uuid.uuid4()
    store.put(("user_id", "user_profile"), record_id, {"text": last_msg})
    store.mset([(f"user:msg:{last_msg}", {"text": last_msg})])

    # Echo and return new state
    echo = f"Echo: {last_msg}"
    state["messages"].append(echo)
    return state

# Step 4: Use connection to set up both Saver and Store
with pool.connection() as conn:
    conn.row_factory = "dict_row"

    # LangGraph checkpointer
    checkpointer = PostgresSaver(conn)
    checkpointer.setup()

    # LangChain vector/memory store
    with PostgresStore.from_conn_string(DB_URI) as store:
        store.setup()  # Create memory table if needed


        workflow = StateGraph(MessagesState)
        workflow.add_node(update_memory)
        workflow.add_node(get_memory)
        workflow.add_edge(START, "update_memory")
        workflow.add_edge("update_memory", "get_memory")
        workflow.add_edge("get_memory", END)

        graph = workflow.compile(checkpointer=checkpointer, store=store)

        user_id = "1"
        config = {"configurable": {"thread_id": "1", "user_id": user_id}}

        # First let's just say hi to the AI
        for update in graph.stream(
            {"messages": [{"role": "user", "content": "Hello store"}]}, config, stream_mode="updates"
        ):
            print(update)
