# Langgraph - Short Term and Long Term memory examples

### Import required packages

In [1]:
# Standard library imports
import os
import json
import uuid
from datetime import datetime
from typing import TypedDict, Annotated, Sequence, Optional, List, Dict, Any, Tuple
from operator import add

# Snowflake imports
import snowflake.connector
from snowflake.connector import DictCursor

# LangGraph imports
from langgraph.graph import StateGraph, START, END, MessagesState
from langgraph.checkpoint.memory import InMemorySaver
from langgraph.checkpoint.base import BaseCheckpointSaver, Checkpoint, CheckpointTuple
from langgraph.store.memory import InMemoryStore
from langgraph.store.base import BaseStore, Item
from langgraph.graph.message import add_messages
# LangChain imports
from langchain_core.messages import HumanMessage, AIMessage, SystemMessage, RemoveMessage
from langchain_core.runnables import RunnableConfig
from langchain_core.messages.utils import trim_messages

# For environment variables
from dotenv import load_dotenv

print("All packages imported successfully!")

All packages imported successfully!


In [2]:
# ============================================================================
# SNOWFLAKE CONNECTION SETUP
# ============================================================================

# Load environment variables (if using .env file)
load_dotenv()

# Snowflake connection parameters
SNOWFLAKE_CONFIG = {
    'user': os.getenv('SNOWFLAKE_USER'),
    'password': os.getenv('SNOWFLAKE_PASSWORD'),
    'account': os.getenv('SNOWFLAKE_ACCOUNT'),
    'warehouse': os.getenv('SNOWFLAKE_WAREHOUSE'),
    'database': os.getenv('SNOWFLAKE_DATABASE'),
    'schema': os.getenv('SNOWFLAKE_SCHEMA'),
    'role': os.getenv('SNOWFLAKE_ROLE')
}

def get_snowflake_connection():
    """
    Create and return a Snowflake connection.
    
    Returns:
        snowflake.connector.connection: Active Snowflake connection
    """
    try:
        conn = snowflake.connector.connect(
            user=SNOWFLAKE_CONFIG['user'],
            password=SNOWFLAKE_CONFIG['password'],
            account=SNOWFLAKE_CONFIG['account'],
            warehouse=SNOWFLAKE_CONFIG['warehouse'],
            database=SNOWFLAKE_CONFIG['database'],
            schema=SNOWFLAKE_CONFIG['schema'],
            role=SNOWFLAKE_CONFIG['role']
        )
        return conn
    except Exception as e:
        print(f"Error connecting to Snowflake: {e}")
        raise

# Test the connection
try:
    test_conn = get_snowflake_connection()
    cursor = test_conn.cursor()
    cursor.execute("SELECT CURRENT_VERSION()")
    version = cursor.fetchone()[0]
    print(f"Connected to Snowflake successfully!")
    print(f"   Version: {version}")
    print(f"   Database: {SNOWFLAKE_CONFIG['database']}")
    print(f"   Schema: {SNOWFLAKE_CONFIG['schema']}")
    cursor.close()
    test_conn.close()
except Exception as e:
    print(f"Connection test failed: {e}")

Connected to Snowflake successfully!
   Version: 9.33.1
   Database: LAB_DB
   Schema: PUBLIC


In [3]:
# ============================================================================
# SNOWFLAKE CORTEX LLM - WITH OPTIONS FORMAT
# ============================================================================

class SnowflakeCortexLLM:
    """
    Wrapper for Snowflake Cortex LLM (COMPLETE function).
    """
    
    def __init__(self, model: str = "llama3.1-70b"):
        self.model = model
        self.connection = None
    
    def _get_connection(self):
        """Get or create Snowflake connection."""
        if self.connection is None or self.connection.is_closed():
            self.connection = get_snowflake_connection()
        return self.connection
    
    def invoke(self, messages: List[Any]) -> AIMessage:
        """
        Invoke the LLM with messages.
        """
        # Convert messages to a single prompt string for simplicity
        prompt_parts = []
        for msg in messages:
            if isinstance(msg, HumanMessage):
                prompt_parts.append(f"User: {msg.content}")
            elif isinstance(msg, AIMessage):
                prompt_parts.append(f"Assistant: {msg.content}")
            elif isinstance(msg, SystemMessage):
                prompt_parts.append(f"System: {msg.content}")
            elif isinstance(msg, dict):
                role = msg.get('role', 'user')
                content = msg.get('content', '')
                prompt_parts.append(f"{role.capitalize()}: {content}")
        
        prompt_parts.append("Assistant:")  # Prompt for response
        full_prompt = "\n".join(prompt_parts)
        
        try:
            conn = self._get_connection()
            cursor = conn.cursor(DictCursor)
            
            # Simple string prompt version
            query = "SELECT SNOWFLAKE.CORTEX.COMPLETE(%s, %s) as response"
            
            cursor.execute(query, (self.model, full_prompt))
            result = cursor.fetchone()
            cursor.close()
            
            # Extract response
            response_text = result['RESPONSE']
            
            return AIMessage(content=response_text)
        
        except Exception as e:
            print(f"Error calling Snowflake Cortex: {e}")
            print(f"Prompt sent: {full_prompt[:200]}...")
            raise
    
    def __del__(self):
        """Close connection when object is destroyed."""
        if self.connection and not self.connection.is_closed():
            self.connection.close()

# Re-initialize the LLM
llm = SnowflakeCortexLLM(model="llama3.1-70b")

print("Snowflake Cortex LLM initialized with STRING prompt format!")
print(f"   Model: llama3.1-70b")

Snowflake Cortex LLM initialized with STRING prompt format!
   Model: llama3.1-70b


In [4]:
# Test the LLM
print("Testing Snowflake Cortex LLM with string prompt...")

test_messages = [
    HumanMessage(content="Say 'Hello from Snowflake!' if you can hear me.")
]

try:
    response = llm.invoke(test_messages)
    print(f"\nLLM Response:")
    print(f"   {response.content}")
except Exception as e:
    print(f" Test failed: {e}")

Testing Snowflake Cortex LLM with string prompt...

LLM Response:
   Hello from Snowflake!


In [5]:
# ============================================================================
# HELPER FUNCTIONS
# ============================================================================

def print_messages(messages: List[Any], title: str = "Messages"):
    """
    Pretty print messages.
    
    Args:
        messages: List of messages
        title: Title for the output
    """
    print(f"\n{'='*60}")
    print(f"{title}")
    print(f"{'='*60}")
    
    for i, msg in enumerate(messages, 1):
        role = "Unknown"
        content = ""
        
        if isinstance(msg, HumanMessage):
            role = "👤 User"
            content = msg.content
        elif isinstance(msg, AIMessage):
            role = "🤖 Assistant"
            content = msg.content
        elif isinstance(msg, SystemMessage):
            role = "⚙️  System"
            content = msg.content
        elif isinstance(msg, dict):
            role = msg.get('role', 'Unknown')
            content = msg.get('content', '')
        
        print(f"\n{i}. {role}:")
        print(f"   {content}")
    
    print(f"\n{'='*60}\n")


def execute_snowflake_query(query: str, fetch: bool = True) -> Optional[List[Dict]]:
    """
    Execute a Snowflake query and optionally fetch results.
    
    Args:
        query: SQL query to execute
        fetch: Whether to fetch results (default: True)
    
    Returns:
        List of results as dictionaries, or None if fetch=False
    """
    conn = get_snowflake_connection()
    cursor = conn.cursor(DictCursor)
    
    try:
        cursor.execute(query)
        
        if fetch:
            results = cursor.fetchall()
            cursor.close()
            conn.close()
            return results
        else:
            conn.commit()
            cursor.close()
            conn.close()
            return None
    
    except Exception as e:
        print(f"Query execution error: {e}")
        cursor.close()
        conn.close()
        raise

print("Helper functions defined!")

Helper functions defined!


In [6]:
# ============================================================================
# SECTION 1: SHORT-TERM MEMORY (Thread-scoped)
# ============================================================================
# 
# Short-term memory allows an agent to remember the conversation within
# a single session/thread. This is essential for multi-turn conversations.
#
# Key Concepts:
# - Thread: A unique conversation session (identified by thread_id)
# - Checkpoint: A snapshot of the conversation state at a specific point
# - Checkpointer: Stores and retrieves checkpoints
# ============================================================================

In [7]:
# ============================================================================
# 1.1 PROBLEM: Agent Without Memory
# ============================================================================

print("="*70)
print("DEMO 1.1: Agent WITHOUT Memory")
print("="*70)
print("\nLet's see what happens when an agent has NO memory...\n")

# Define a simple state
class State(TypedDict):
    messages: Annotated[list, add_messages]

# Define the chatbot node
def chatbot_node(state: State) -> State:
    """
    Simple chatbot that responds using Snowflake Cortex.
    """
    messages = state["messages"]
    response = llm.invoke(messages)
    return {"messages": [response]}

# Build the graph WITHOUT checkpointer
builder = StateGraph(State)
builder.add_node("chatbot", chatbot_node)
builder.add_edge(START, "chatbot")
builder.add_edge("chatbot", END)

# Compile WITHOUT checkpointer (no memory!)
graph_no_memory = builder.compile()

print("Graph built WITHOUT checkpointer (no memory)")
print("\nLet's have a conversation...\n")

DEMO 1.1: Agent WITHOUT Memory

Let's see what happens when an agent has NO memory...

Graph built WITHOUT checkpointer (no memory)

Let's have a conversation...



In [8]:
# First message: Introduce yourself
print("-" * 70)
print("USER: My name is Ram and I'm a Data Engineer")
print("-" * 70)

response1 = graph_no_memory.invoke({
    "messages": [HumanMessage(content="My name is Ram and I'm a Data Engineer")]
})

print(f"\nBOT: {response1['messages'][-1].content}\n")

----------------------------------------------------------------------
USER: My name is Ram and I'm a Data Engineer
----------------------------------------------------------------------

BOT: Nice to meet you, Ram! As a Data Engineer, I'm sure you're no stranger to working with large datasets, designing data pipelines, and building scalable data architectures. What kind of projects have you been working on lately? Are you more focused on cloud-based data engineering (e.g. AWS, GCP, Azure) or on-premises solutions?



In [9]:
# Second message: Ask about the name
print("-" * 70)
print("USER: What's my name?")
print("-" * 70)

response2 = graph_no_memory.invoke({
    "messages": [HumanMessage(content="What's my name?")]
})

print(f"\nBOT: {response2['messages'][-1].content}\n")

print("❌ PROBLEM: The bot doesn't remember your name!")
print("   Each invocation is independent - no conversation history.")

----------------------------------------------------------------------
USER: What's my name?
----------------------------------------------------------------------

BOT: I don't know your name. This is the beginning of our conversation, and I don't have any information about you. Would you like to tell me your name?

❌ PROBLEM: The bot doesn't remember your name!
   Each invocation is independent - no conversation history.


In [10]:
# ============================================================================
# 1.2 SOLUTION: Agent WITH Memory (InMemorySaver)
# ============================================================================

print("\n" + "="*70)
print("DEMO 1.2: Agent WITH Memory (InMemorySaver)")
print("="*70)
print("\nNow let's add a checkpointer to enable memory...\n")

# Build the SAME graph but WITH checkpointer
builder = StateGraph(State)
builder.add_node("chatbot", chatbot_node)
builder.add_edge(START, "chatbot")
builder.add_edge("chatbot", END)

# Compile WITH InMemorySaver checkpointer
checkpointer = InMemorySaver()
graph_with_memory = builder.compile(checkpointer=checkpointer)

print("✅ Graph built WITH InMemorySaver (memory enabled!)")
print("\nLet's have the SAME conversation with memory...\n")


DEMO 1.2: Agent WITH Memory (InMemorySaver)

Now let's add a checkpointer to enable memory...

✅ Graph built WITH InMemorySaver (memory enabled!)

Let's have the SAME conversation with memory...



In [11]:
# Configuration with thread_id
config = {"configurable": {"thread_id": "1"}}

# First message: Introduce yourself
print("-" * 70)
print("USER: My name is Ram and I'm a Data Engineer")
print(f"THREAD ID: {config['configurable']['thread_id']}")
print("-" * 70)

response1 = graph_with_memory.invoke({
    "messages": [HumanMessage(content="My name is Ram and I'm a Data Engineer")]
}, config)

print(f"\nBOT: {response1['messages'][-1].content}\n")

----------------------------------------------------------------------
USER: My name is Ram and I'm a Data Engineer
THREAD ID: 1
----------------------------------------------------------------------

BOT: Nice to meet you, Ram! As a Data Engineer, I'm sure you're no stranger to working with large datasets, designing data pipelines, and building scalable data architectures. What kind of projects have you been working on lately? Are you more focused on cloud-based data engineering (e.g. AWS, GCP, Azure) or on-premises solutions?



In [12]:
# Second message: Ask about the name (same thread)
print("-" * 70)
print("USER: What's my name?")
print(f"THREAD ID: {config['configurable']['thread_id']}")
print("-" * 70)

response2 = graph_with_memory.invoke({
    "messages": [HumanMessage(content="What's my name?")]
}, config)

print(f"\nBOT: {response2['messages'][-1].content}\n")

print("✅ SUCCESS: The bot remembers your name!")
print("   The checkpointer stored the conversation history.")

----------------------------------------------------------------------
USER: What's my name?
THREAD ID: 1
----------------------------------------------------------------------

BOT: Your name is Ram!

✅ SUCCESS: The bot remembers your name!
   The checkpointer stored the conversation history.


In [13]:
# Third message: Ask about occupation
print("-" * 70)
print("USER: What do I do for work?")
print(f"THREAD ID: {config['configurable']['thread_id']}")
print("-" * 70)

response3 = graph_with_memory.invoke({
    "messages": [HumanMessage(content="What do I do for work?")]
}, config)

print(f"\nBOT: {response3['messages'][-1].content}\n")

print("✅ Bot remembers you're a Data Engineer!")

----------------------------------------------------------------------
USER: What do I do for work?
THREAD ID: 1
----------------------------------------------------------------------

BOT: You're a Data Engineer!

✅ Bot remembers you're a Data Engineer!


In [14]:
# ============================================================================
# 1.4 INSPECT STATE: View Checkpoints
# ============================================================================

print("\n" + "="*70)
print("DEMO 1.4: Inspecting State and Checkpoints")
print("="*70)
print("\nLet's look at what's stored in the checkpoints...\n")


DEMO 1.4: Inspecting State and Checkpoints

Let's look at what's stored in the checkpoints...



In [15]:
# Get current state for Thread 1
print("-" * 70)
print("CURRENT STATE - Thread 1")
print("-" * 70)

current_state = graph_with_memory.get_state(config)

print(f"\nThread ID: {config['configurable']['thread_id']}")
print(f"Number of messages: {len(current_state.values['messages'])}")
print(f"\nMessage History:")

for i, msg in enumerate(current_state.values['messages'], 1):
    role = "USER" if isinstance(msg, HumanMessage) else "BOT"
    print(f"\n{i}. {role}: {msg.content[:100]}...")

----------------------------------------------------------------------
CURRENT STATE - Thread 1
----------------------------------------------------------------------

Thread ID: 1
Number of messages: 6

Message History:

1. USER: My name is Ram and I'm a Data Engineer...

2. BOT: Nice to meet you, Ram! As a Data Engineer, I'm sure you're no stranger to working with large dataset...

3. USER: What's my name?...

4. BOT: Your name is Ram!...

5. USER: What do I do for work?...

6. BOT: You're a Data Engineer!...


In [16]:
# View checkpoint history for Thread 1
print("\n" + "-" * 70)
print("CHECKPOINT HISTORY - Thread 1")
print("-" * 70)

history = list(graph_with_memory.get_state_history(config))

print(f"\nTotal checkpoints: {len(history)}")
print("\nCheckpoint Timeline (most recent first):")

for i, checkpoint in enumerate(history[:5], 1):  # Show first 5
    print(f"\n{i}. Checkpoint ID: {checkpoint.config['configurable']['checkpoint_id'][:8]}...")
    print(f"   Messages in state: {len(checkpoint.values.get('messages', []))}")
    print(f"   Next nodes to execute: {checkpoint.next}")
    if checkpoint.metadata:
        print(f"   Step: {checkpoint.metadata.get('step', 'N/A')}")


----------------------------------------------------------------------
CHECKPOINT HISTORY - Thread 1
----------------------------------------------------------------------

Total checkpoints: 9

Checkpoint Timeline (most recent first):

1. Checkpoint ID: 1f0b174e...
   Messages in state: 6
   Next nodes to execute: ()
   Step: 7

2. Checkpoint ID: 1f0b174e...
   Messages in state: 5
   Next nodes to execute: ('chatbot',)
   Step: 6

3. Checkpoint ID: 1f0b174e...
   Messages in state: 4
   Next nodes to execute: ('__start__',)
   Step: 5

4. Checkpoint ID: 1f0b174e...
   Messages in state: 4
   Next nodes to execute: ()
   Step: 4

5. Checkpoint ID: 1f0b174e...
   Messages in state: 3
   Next nodes to execute: ('chatbot',)
   Step: 3


In [17]:
# ============================================================================
# SECTION 2: LONG-TERM MEMORY (Cross-session Memory)
# ============================================================================
# 
# Long-term memory allows an agent to remember information ACROSS different
# conversation threads/sessions. This is essential for:
# - Remembering user preferences across multiple conversations
# - Building persistent user profiles
# - Sharing information across different chat sessions
#
# Key Concepts:
# - Store: Key-value storage for persistent data
# - Namespace: Organization structure (user_id, category)
# - Cross-thread access: Same data available in any thread
# ============================================================================

In [18]:
# ============================================================================
# 2.1 PROBLEM: Short-term Memory is Thread-scoped Only
# ============================================================================

print("\n" + "="*70)
print("DEMO 2.1: The Limitation of Short-term Memory")
print("="*70)
print("\nLet's see what happens when we want to share info across threads...\n")

# Use the graph with memory from Section 1
config_thread_a = {"configurable": {"thread_id": "thread_a"}}
config_thread_b = {"configurable": {"thread_id": "thread_b"}}

print("-" * 70)
print("Thread A: Tell the bot your food preference")
print(f"THREAD ID: {config_thread_a['configurable']['thread_id']}")
print("-" * 70)

response_a = graph_with_memory.invoke({
    "messages": [HumanMessage(content="Remember this: I love Butter chicken!")]
}, config_thread_a)

print(f"\nUSER: Remember this: I love butter chicken!")
print(f"BOT: {response_a['messages'][-1].content}\n")


DEMO 2.1: The Limitation of Short-term Memory

Let's see what happens when we want to share info across threads...

----------------------------------------------------------------------
Thread A: Tell the bot your food preference
THREAD ID: thread_a
----------------------------------------------------------------------

USER: Remember this: I love butter chicken!
BOT: I've taken note: You LOVE Butter Chicken! Would you like some recipe suggestions or restaurant recommendations?



In [19]:
# Now try to access that info from a different thread
print("-" * 70)
print("Thread B: Ask about food preference (DIFFERENT THREAD)")
print(f"THREAD ID: {config_thread_b['configurable']['thread_id']}")
print("-" * 70)

response_b = graph_with_memory.invoke({
    "messages": [HumanMessage(content="What food do I love?")]
}, config_thread_b)

print(f"\nUSER: What food do I love?")
print(f"BOT: {response_b['messages'][-1].content}\n")

print("❌ PROBLEM: The bot doesn't remember!")
print("   Short-term memory (checkpointer) is THREAD-SCOPED.")
print("   Thread B cannot see Thread A's conversation.\n")

print("💡 SOLUTION: We need LONG-TERM MEMORY (Store)!")
print("="*70)

----------------------------------------------------------------------
Thread B: Ask about food preference (DIFFERENT THREAD)
THREAD ID: thread_b
----------------------------------------------------------------------

USER: What food do I love?
BOT: Unfortunately, I'm a large language model, I don't have the ability to read your mind or know your personal preferences. However, I can try to help you figure out what food you might love.

Can you give me some hints? Do you have a favorite cuisine, like Italian, Mexican, or Chinese? Or do you have a sweet tooth? Do you prefer savory, spicy, or comforting foods?

❌ PROBLEM: The bot doesn't remember!
   Short-term memory (checkpointer) is THREAD-SCOPED.
   Thread B cannot see Thread A's conversation.

💡 SOLUTION: We need LONG-TERM MEMORY (Store)!


In [20]:
# ============================================================================
# 2.2 SOLUTION: InMemoryStore for Long-term Memory
# ============================================================================

print("\n" + "="*70)
print("DEMO 2.2: InMemoryStore - Long-term Memory")
print("="*70)
print("\nLet's introduce a Store for cross-thread memory...\n")

# Create an InMemoryStore
store = InMemoryStore()

print("✅ InMemoryStore created!")
print("\nA Store uses:")
print("  - Namespace: Tuple like ('user_123', 'preferences')")
print("  - Key: Unique identifier for each memory")
print("  - Value: Dictionary containing the data")


DEMO 2.2: InMemoryStore - Long-term Memory

Let's introduce a Store for cross-thread memory...

✅ InMemoryStore created!

A Store uses:
  - Namespace: Tuple like ('user_123', 'preferences')
  - Key: Unique identifier for each memory
  - Value: Dictionary containing the data


In [21]:
# Let's manually store and retrieve data
print("\n" + "-" * 70)
print("Manual Store Operations")
print("-" * 70)

# Define a namespace for user preferences
user_id = "user_ram"
namespace = (user_id, "preferences")

print(f"\nNamespace: {namespace}")

# Store a preference
store.put(
    namespace=namespace,
    key="food",
    value={"preference": "Butter Chicken", "notes": "loves Butter chicken"}
)

print("✅ Stored: food preference = Butter chicken")

# Retrieve the preference
item = store.get(namespace=namespace, key="food")

print(f"\n✅ Retrieved: {item.value}")
print(f"   Namespace: {item.namespace}")
print(f"   Key: {item.key}")
print(f"   Created at: {item.created_at}")


----------------------------------------------------------------------
Manual Store Operations
----------------------------------------------------------------------

Namespace: ('user_ram', 'preferences')
✅ Stored: food preference = Butter chicken

✅ Retrieved: {'preference': 'Butter Chicken', 'notes': 'loves Butter chicken'}
   Namespace: ('user_ram', 'preferences')
   Key: food
   Created at: 2025-10-25 07:33:25.650340+00:00


In [22]:
# Store multiple items
print("\n" + "-" * 70)
print("Storing Multiple Preferences")
print("-" * 70)

store.put(namespace, "color", {"preference": "Black"})
store.put(namespace, "hobby", {"preference": "Soccer"})
store.put(namespace, "language", {"preference": "Python"})

print("✅ Stored: color = Black")
print("✅ Stored: hobby = Soccer")
print("✅ Stored: language = Python")

# Search/list all items in namespace
print(f"\n📂 All items in namespace {namespace}:")

items = store.search(namespace)
for item in items:
    print(f"  - {item.key}: {item.value['preference']}")


----------------------------------------------------------------------
Storing Multiple Preferences
----------------------------------------------------------------------
✅ Stored: color = Black
✅ Stored: hobby = Soccer
✅ Stored: language = Python

📂 All items in namespace ('user_ram', 'preferences'):
  - food: Butter Chicken
  - color: Black
  - hobby: Soccer
  - language: Python


In [23]:
# ============================================================================
# 2.3 AGENT WITH STORE: Combining Short-term and Long-term Memory
# ============================================================================

print("\n" + "="*70)
print("DEMO 2.3: Agent with BOTH Checkpointer AND Store")
print("="*70)
print("\nNow let's build an agent that uses both memory types...\n")

# Define agent that reads from store before responding
def chatbot_with_memory(state: MessagesState, config: RunnableConfig, store: BaseStore) -> dict:
    """
    Chatbot that:
    1. Reads user preferences from store (long-term memory)
    2. Uses conversation history from checkpointer (short-term memory)
    3. Responds with context from both
    """
    # Get user_id from config
    user_id = config.get("configurable", {}).get("user_id", "unknown_user")
    namespace = (user_id, "preferences")
    
    # Search for user preferences in store
    preferences_items = store.search(namespace)
    
    # Build context from stored preferences
    if preferences_items:
        prefs = []
        for item in preferences_items:
            prefs.append(f"{item.key}: {item.value.get('preference', 'N/A')}")
        preferences_text = "User preferences: " + ", ".join(prefs)
    else:
        preferences_text = "No user preferences stored yet."
    
    # Get conversation messages (from checkpointer)
    messages = state["messages"]
    
    # Add system message with preferences context
    system_message = HumanMessage(content=f"[System Context: {preferences_text}]")
    
    # Call LLM with both contexts
    full_messages = [system_message] + messages
    response = llm.invoke(full_messages)
    
    # Check if user is sharing a preference to store
    last_user_message = messages[-1].content.lower() if messages else ""
    
    if "remember" in last_user_message or "i love" in last_user_message or "i like" in last_user_message:
        # Simple extraction (in production, use LLM to extract structured data)
        if "butter chicken" in last_user_message:
            store.put(namespace, "food", {"preference": "butter chicken"})
            print(f"   💾 Stored preference: food = butter chicken")
        elif "blue" in last_user_message:
            store.put(namespace, "color", {"preference": "black"})
            print(f"   💾 Stored preference: color = black")
    
    return {"messages": [response]}

# Build graph with BOTH checkpointer and store
builder_with_store = StateGraph(MessagesState)
builder_with_store.add_node("chatbot", chatbot_with_memory)
builder_with_store.add_edge(START, "chatbot")
builder_with_store.add_edge("chatbot", END)

# Compile with BOTH memory systems
checkpointer = InMemorySaver()
store = InMemoryStore()

graph_full_memory = builder_with_store.compile(
    checkpointer=checkpointer,
    store=store
)

print("✅ Graph compiled with:")
print("   - Checkpointer (short-term memory)")
print("   - Store (long-term memory)")


DEMO 2.3: Agent with BOTH Checkpointer AND Store

Now let's build an agent that uses both memory types...

✅ Graph compiled with:
   - Checkpointer (short-term memory)
   - Store (long-term memory)


In [24]:
# ============================================================================
# 2.4 CROSS-THREAD MEMORY: Using Long-term Memory Across Sessions
# ============================================================================

print("\n" + "="*70)
print("DEMO 2.4: Cross-thread Memory in Action")
print("="*70)
print("\nLet's see long-term memory work across different threads...\n")

# Configuration for Thread 1 with user_id
config_session1 = {
    "configurable": {
        "thread_id": "session_1",
        "user_id": "user_ram"
    }
}

print("-" * 70)
print("SESSION 1: Store preferences")
print(f"Thread ID: {config_session1['configurable']['thread_id']}")
print(f"User ID: {config_session1['configurable']['user_id']}")
print("-" * 70)

print("\nUSER: Remember this: I love butter chicken!")

response1 = graph_full_memory.invoke({
    "messages": [HumanMessage(content="Remember this: I love butter chicken!")]
}, config_session1)

print(f"BOT: {response1['messages'][-1].content}\n")


DEMO 2.4: Cross-thread Memory in Action

Let's see long-term memory work across different threads...

----------------------------------------------------------------------
SESSION 1: Store preferences
Thread ID: session_1
User ID: user_ram
----------------------------------------------------------------------

USER: Remember this: I love butter chicken!
   💾 Stored preference: food = butter chicken
BOT: I've taken note of that! You love butter chicken. I'll keep that in mind for our conversation. Would you like some recommendations or recipes for butter chicken, or is there something else I can help you with?



In [25]:
# Now start a DIFFERENT thread but SAME user_id
config_session2 = {
    "configurable": {
        "thread_id": "session_2",  # Different thread!
        "user_id": "user_ram"      # Same user!
    }
}

print("-" * 70)
print("SESSION 2: Different thread, same user")
print(f"Thread ID: {config_session2['configurable']['thread_id']}")
print(f"User ID: {config_session2['configurable']['user_id']}")
print("-" * 70)

print("\nUSER: What food do I love?")

response2 = graph_full_memory.invoke({
    "messages": [HumanMessage(content="What food do I love?")]
}, config_session2)

print(f"BOT: {response2['messages'][-1].content}\n")

print("✅ SUCCESS: Bot remembers across threads!")
print("   Long-term memory (store) is shared across all threads for same user.")

----------------------------------------------------------------------
SESSION 2: Different thread, same user
Thread ID: session_2
User ID: user_ram
----------------------------------------------------------------------

USER: What food do I love?
BOT: You love Butter Chicken!

✅ SUCCESS: Bot remembers across threads!
   Long-term memory (store) is shared across all threads for same user.


In [26]:
# ============================================================================
# 2.5 MULTIPLE NAMESPACES: Organizing Different Types of Memory
# ============================================================================

print("\n" + "="*70)
print("DEMO 2.5: Multiple Namespaces for Organization")
print("="*70)
print("\nStores can organize memory into different namespaces...\n")

user_id = "user_ram"

# Different namespace categories
namespace_profile = (user_id, "profile")
namespace_preferences = (user_id, "preferences")
namespace_notes = (user_id, "notes")

print("-" * 70)
print("Storing data in different namespaces")
print("-" * 70)

# Profile information
store.put(namespace_profile, "name", {"value": "Ram"})
store.put(namespace_profile, "occupation", {"value": "Data Engineer"})
store.put(namespace_profile, "location", {"value": "Boston"})

print(f"\n✅ Profile namespace: {namespace_profile}")
print("   - name: Ram")
print("   - occupation: Data Engineer")
print("   - location: Boston")

# Preferences
store.put(namespace_preferences, "food", {"value": "butter chicken"})
store.put(namespace_preferences, "color", {"value": "blue"})
store.put(namespace_preferences, "language", {"value": "Python"})

print(f"\n✅ Preferences namespace: {namespace_preferences}")
print("   - food: butter chicken")
print("   - color: blue")
print("   - language: Python")

# Notes
store.put(namespace_notes, "note_1", {"content": "Interested in AI/ML"})
store.put(namespace_notes, "note_2", {"content": "Working on LangGraph lab"})

print(f"\n✅ Notes namespace: {namespace_notes}")
print("   - note_1: Interested in AI/ML")
print("   - note_2: Working on LangGraph lab")


DEMO 2.5: Multiple Namespaces for Organization

Stores can organize memory into different namespaces...

----------------------------------------------------------------------
Storing data in different namespaces
----------------------------------------------------------------------

✅ Profile namespace: ('user_ram', 'profile')
   - name: Ram
   - occupation: Data Engineer
   - location: Boston

✅ Preferences namespace: ('user_ram', 'preferences')
   - food: butter chicken
   - color: blue
   - language: Python

✅ Notes namespace: ('user_ram', 'notes')
   - note_1: Interested in AI/ML
   - note_2: Working on LangGraph lab


In [27]:
# Retrieve from specific namespaces
print("\n" + "-" * 70)
print("Retrieving from specific namespaces")
print("-" * 70)

# Get all profile items
profile_items = store.search(namespace_profile)
print(f"\n📂 Profile ({namespace_profile}):")
for item in profile_items:
    print(f"   {item.key}: {item.value['value']}")

# Get all preference items
pref_items = store.search(namespace_preferences)
print(f"\n📂 Preferences ({namespace_preferences}):")
for item in pref_items:
    print(f"   {item.key}: {item.value['value']}")

# Get all notes
note_items = store.search(namespace_notes)
print(f"\n📂 Notes ({namespace_notes}):")
for item in note_items:
    print(f"   {item.key}: {item.value['content']}")


----------------------------------------------------------------------
Retrieving from specific namespaces
----------------------------------------------------------------------

📂 Profile (('user_ram', 'profile')):
   name: Ram
   occupation: Data Engineer
   location: Boston

📂 Preferences (('user_ram', 'preferences')):
   food: butter chicken
   color: blue
   language: Python

📂 Notes (('user_ram', 'notes')):
   note_1: Interested in AI/ML
   note_2: Working on LangGraph lab


In [28]:
# ============================================================================
# SECTION 3: SNOWFLAKE-BACKED MEMORY (Production Setup)
# ============================================================================
# 
# In production, we need persistent storage that survives:
# - Application restarts
# - Server failures
# - Notebook kernel restarts
#
# Snowflake provides:
# - Reliable, scalable database storage
# - SQL access for debugging and monitoring
# - Integration with entire data platform
#
# We'll implement:
# 1. SnowflakeCheckpointer - for short-term memory (conversation history)
# 2. SnowflakeStore - for long-term memory (user profiles, preferences)
# ============================================================================

In [29]:
# ============================================================================
# 3.1 CREATE SNOWFLAKE TABLES
# ============================================================================

print("\n" + "="*70)
print("STEP 3.1: Creating Snowflake Tables for Memory Storage")
print("="*70)
print("\nWe need two tables:")
print("  1. CHECKPOINTS - for conversation history (short-term)")
print("  2. MEMORY_STORE - for persistent data (long-term)")
print()


# SQL to create CHECKPOINTS table
create_checkpoints_table = """
CREATE TABLE IF NOT EXISTS CHECKPOINTS (
    thread_id VARCHAR(500) NOT NULL,
    checkpoint_id VARCHAR(500) NOT NULL,
    parent_checkpoint_id VARCHAR(500),
    checkpoint_data VARIANT,
    metadata VARIANT,
    created_at TIMESTAMP_NTZ DEFAULT CURRENT_TIMESTAMP(),
    PRIMARY KEY (thread_id, checkpoint_id)
);
"""

print("Creating CHECKPOINTS table...")
print("-" * 70)
try:
    execute_snowflake_query(create_checkpoints_table, fetch=False)
    print("✅ CHECKPOINTS table created successfully!\n")
except Exception as e:
    print(f"❌ Error creating CHECKPOINTS table: {e}\n")

# SQL to create MEMORY_STORE table
create_memory_store_table = """
CREATE TABLE IF NOT EXISTS MEMORY_STORE (
    namespace VARCHAR(1000) NOT NULL,
    key VARCHAR(500) NOT NULL,
    value VARIANT,
    created_at TIMESTAMP_NTZ DEFAULT CURRENT_TIMESTAMP(),
    updated_at TIMESTAMP_NTZ DEFAULT CURRENT_TIMESTAMP(),
    PRIMARY KEY (namespace, key)
);
"""

print("Creating MEMORY_STORE table...")
print("-" * 70)
try:
    execute_snowflake_query(create_memory_store_table, fetch=False)
    print("✅ MEMORY_STORE table created successfully!\n")
except Exception as e:
    print(f"❌ Error creating MEMORY_STORE table: {e}\n")

# Verify tables were created
print("Verifying tables...")
print("-" * 70)

verify_query = "SHOW TABLES LIKE 'CHECKPOINTS';"
result = execute_snowflake_query(verify_query)
if result:
    print("✅ CHECKPOINTS table exists")

verify_query = "SHOW TABLES LIKE 'MEMORY_STORE';"
result = execute_snowflake_query(verify_query)
if result:
    print("✅ MEMORY_STORE table exists")

print("\n✅ All tables created and verified!")


STEP 3.1: Creating Snowflake Tables for Memory Storage

We need two tables:
  1. CHECKPOINTS - for conversation history (short-term)
  2. MEMORY_STORE - for persistent data (long-term)

Creating CHECKPOINTS table...
----------------------------------------------------------------------
✅ CHECKPOINTS table created successfully!

Creating MEMORY_STORE table...
----------------------------------------------------------------------
✅ MEMORY_STORE table created successfully!

Verifying tables...
----------------------------------------------------------------------
✅ CHECKPOINTS table exists
✅ MEMORY_STORE table exists

✅ All tables created and verified!


In [30]:
# ============================================================================
# 3.2 SERIALIZATION HELPERS
# ============================================================================

print("\n" + "="*70)
print("STEP 3.2: Defining Serialization Helpers")
print("="*70)

from typing import Optional, Iterator, Sequence

def serialize_checkpoint(checkpoint: dict) -> str:
    """
    Serialize checkpoint data, handling LangChain message objects.
    """
    def convert_messages(obj):
        if isinstance(obj, (HumanMessage, AIMessage, SystemMessage)):
            return {
                "type": obj.__class__.__name__,
                "content": obj.content,
                "id": getattr(obj, "id", None)
            }
        elif isinstance(obj, dict):
            return {k: convert_messages(v) for k, v in obj.items()}
        elif isinstance(obj, list):
            return [convert_messages(item) for item in obj]
        else:
            return obj
    
    serializable_checkpoint = convert_messages(checkpoint)
    return json.dumps(serializable_checkpoint)


def deserialize_checkpoint(checkpoint_data) -> dict:
    """
    Deserialize checkpoint data, converting dicts back to message objects.
    """
    def convert_to_messages(obj):
        if isinstance(obj, dict):
            if "type" in obj and obj["type"] in ["HumanMessage", "AIMessage", "SystemMessage"]:
                msg_type = obj["type"]
                content = obj.get("content", "")
                
                if msg_type == "HumanMessage":
                    return HumanMessage(content=content)
                elif msg_type == "AIMessage":
                    return AIMessage(content=content)
                elif msg_type == "SystemMessage":
                    return SystemMessage(content=content)
            else:
                return {k: convert_to_messages(v) for k, v in obj.items()}
        elif isinstance(obj, list):
            return [convert_to_messages(item) for item in obj]
        else:
            return obj
    
    if isinstance(checkpoint_data, str):
        checkpoint_dict = json.loads(checkpoint_data)
    else:
        checkpoint_dict = checkpoint_data
    
    return convert_to_messages(checkpoint_dict)

print("✅ Serialization helpers defined!")


STEP 3.2: Defining Serialization Helpers
✅ Serialization helpers defined!


In [31]:
# ============================================================================
# 3.3 SNOWFLAKE CHECKPOINTER (With Proper VARIANT Casting)
# ============================================================================

print("\n" + "="*70)
print("STEP 3.3: Building SnowflakeCheckpointer")
print("="*70)

class SnowflakeCheckpointer(BaseCheckpointSaver):
    """
    Checkpointer that stores conversation state in Snowflake.
    Uses PARSE_JSON to properly cast strings to VARIANT.
    """
    
    def __init__(self):
        super().__init__()
        self.connection = None
    
    def _get_connection(self):
        if self.connection is None or self.connection.is_closed():
            self.connection = get_snowflake_connection()
        return self.connection
    
    def put(self, config: RunnableConfig, checkpoint: dict, metadata: dict, new_versions: dict) -> RunnableConfig:
        """Save a checkpoint to Snowflake with proper VARIANT casting."""
        thread_id = config.get("configurable", {}).get("thread_id")
        if not thread_id:
            raise ValueError("thread_id is required in config")
        
        checkpoint_id = str(uuid.uuid4())
        parent_checkpoint_id = config.get("configurable", {}).get("checkpoint_id")
        
        # Serialize to JSON strings
        checkpoint_json_str = serialize_checkpoint(checkpoint)
        metadata_json_str = json.dumps(metadata)
        
        # Use PARSE_JSON in the query to cast string to VARIANT
        insert_query = """
        INSERT INTO CHECKPOINTS (thread_id, checkpoint_id, parent_checkpoint_id, checkpoint_data, metadata)
        SELECT %s, %s, %s, PARSE_JSON(%s), PARSE_JSON(%s)
        """
        
        conn = self._get_connection()
        cursor = conn.cursor()
        
        try:
            cursor.execute(
                insert_query,
                (thread_id, checkpoint_id, parent_checkpoint_id, checkpoint_json_str, metadata_json_str)
            )
            conn.commit()
            cursor.close()
        except Exception as e:
            cursor.close()
            print(f"Error in put: {e}")
            raise e
        
        return {"configurable": {"thread_id": thread_id, "checkpoint_id": checkpoint_id}}
    
    def put_writes(self, config: RunnableConfig, writes: Sequence[tuple], task_id: str) -> None:
        pass
    
    def get_tuple(self, config: RunnableConfig) -> Optional[CheckpointTuple]:
        """Retrieve a checkpoint from Snowflake."""
        thread_id = config.get("configurable", {}).get("thread_id")
        checkpoint_id = config.get("configurable", {}).get("checkpoint_id")
        
        if not thread_id:
            return None
        
        conn = self._get_connection()
        cursor = conn.cursor(DictCursor)
        
        try:
            if checkpoint_id:
                query = """
                SELECT checkpoint_id, parent_checkpoint_id, checkpoint_data, metadata, created_at
                FROM CHECKPOINTS WHERE thread_id = %s AND checkpoint_id = %s
                """
                cursor.execute(query, (thread_id, checkpoint_id))
            else:
                query = """
                SELECT checkpoint_id, parent_checkpoint_id, checkpoint_data, metadata, created_at
                FROM CHECKPOINTS WHERE thread_id = %s ORDER BY created_at DESC LIMIT 1
                """
                cursor.execute(query, (thread_id,))
            
            result = cursor.fetchone()
            cursor.close()
            
            if not result:
                return None
            
            # Snowflake returns VARIANT as dict/list automatically
            checkpoint_data = result['CHECKPOINT_DATA']
            checkpoint = deserialize_checkpoint(checkpoint_data)
            
            metadata_data = result['METADATA']
            if isinstance(metadata_data, str):
                metadata = json.loads(metadata_data)
            else:
                metadata = metadata_data if metadata_data else {}
            
            return CheckpointTuple(
                config={"configurable": {"thread_id": thread_id, "checkpoint_id": result['CHECKPOINT_ID']}},
                checkpoint=checkpoint,
                metadata=metadata,
                parent_config={"configurable": {"thread_id": thread_id, "checkpoint_id": result['PARENT_CHECKPOINT_ID']}} if result['PARENT_CHECKPOINT_ID'] else None,
                pending_writes=[]
            )
        
        except Exception as e:
            cursor.close()
            print(f"Error in get_tuple: {e}")
            raise e
    
    def list(self, config: RunnableConfig, *, filter: Optional[dict] = None, before: Optional[RunnableConfig] = None, limit: Optional[int] = None) -> Iterator[CheckpointTuple]:
        """List all checkpoints for a thread."""
        thread_id = config.get("configurable", {}).get("thread_id")
        if not thread_id:
            return
        
        conn = self._get_connection()
        cursor = conn.cursor(DictCursor)
        
        try:
            query = """
            SELECT checkpoint_id, parent_checkpoint_id, checkpoint_data, metadata, created_at
            FROM CHECKPOINTS WHERE thread_id = %s ORDER BY created_at DESC
            """
            if limit:
                query += f" LIMIT {limit}"
            
            cursor.execute(query, (thread_id,))
            results = cursor.fetchall()
            cursor.close()
            
            for result in results:
                checkpoint_data = result['CHECKPOINT_DATA']
                checkpoint = deserialize_checkpoint(checkpoint_data)
                
                metadata_data = result['METADATA']
                if isinstance(metadata_data, str):
                    metadata = json.loads(metadata_data)
                else:
                    metadata = metadata_data if metadata_data else {}
                
                yield CheckpointTuple(
                    config={"configurable": {"thread_id": thread_id, "checkpoint_id": result['CHECKPOINT_ID']}},
                    checkpoint=checkpoint,
                    metadata=metadata,
                    parent_config={"configurable": {"thread_id": thread_id, "checkpoint_id": result['PARENT_CHECKPOINT_ID']}} if result['PARENT_CHECKPOINT_ID'] else None,
                    pending_writes=[]
                )
        
        except Exception as e:
            cursor.close()
            raise e
    
    async def aget_tuple(self, config: RunnableConfig) -> Optional[CheckpointTuple]:
        return self.get_tuple(config)
    
    async def alist(self, config: RunnableConfig, *, filter: Optional[dict] = None, before: Optional[RunnableConfig] = None, limit: Optional[int] = None) -> Iterator[CheckpointTuple]:
        for item in self.list(config, filter=filter, before=before, limit=limit):
            yield item
    
    async def aput(self, config: RunnableConfig, checkpoint: dict, metadata: dict, new_versions: dict) -> RunnableConfig:
        return self.put(config, checkpoint, metadata, new_versions)
    
    async def aput_writes(self, config: RunnableConfig, writes: Sequence[tuple], task_id: str) -> None:
        return self.put_writes(config, writes, task_id)
    
    def __del__(self):
        if self.connection and not self.connection.is_closed():
            self.connection.close()

print("✅ SnowflakeCheckpointer class implemented!")


STEP 3.3: Building SnowflakeCheckpointer
✅ SnowflakeCheckpointer class implemented!


In [32]:
# ============================================================================
# 3.4 SNOWFLAKE STORE (With Proper VARIANT Casting)
# ============================================================================

print("\n" + "="*70)
print("STEP 3.4: Building SnowflakeStore")
print("="*70)

class SnowflakeStore(BaseStore):
    """
    Store that persists long-term memory in Snowflake.
    Uses PARSE_JSON to properly cast strings to VARIANT.
    """
    
    def __init__(self):
        super().__init__()
        self.connection = None
    
    def _get_connection(self):
        if self.connection is None or self.connection.is_closed():
            self.connection = get_snowflake_connection()
        return self.connection
    
    def _namespace_to_string(self, namespace: Tuple[str, ...]) -> str:
        return json.dumps(namespace)
    
    def _string_to_namespace(self, namespace_str: str) -> Tuple[str, ...]:
        return tuple(json.loads(namespace_str))
    
    def put(self, namespace: Tuple[str, ...], key: str, value: dict) -> None:
        """Store a value with proper VARIANT casting."""
        namespace_str = self._namespace_to_string(namespace)
        value_json_str = json.dumps(value)
        
        # Use PARSE_JSON with SELECT to cast string to VARIANT
        merge_query = """
        MERGE INTO MEMORY_STORE AS target
        USING (SELECT %s AS namespace, %s AS key, PARSE_JSON(%s) AS value) AS source
        ON target.namespace = source.namespace AND target.key = source.key
        WHEN MATCHED THEN
            UPDATE SET value = source.value, updated_at = CURRENT_TIMESTAMP()
        WHEN NOT MATCHED THEN
            INSERT (namespace, key, value, created_at, updated_at)
            VALUES (source.namespace, source.key, source.value, CURRENT_TIMESTAMP(), CURRENT_TIMESTAMP())
        """
        
        conn = self._get_connection()
        cursor = conn.cursor()
        
        try:
            cursor.execute(merge_query, (namespace_str, key, value_json_str))
            conn.commit()
            cursor.close()
        except Exception as e:
            cursor.close()
            print(f"Error in put: {e}")
            raise e
    
    def get(self, namespace: Tuple[str, ...], key: str) -> Optional[Item]:
        """Retrieve a value from Snowflake."""
        namespace_str = self._namespace_to_string(namespace)
        
        query = """
        SELECT namespace, key, value, created_at, updated_at
        FROM MEMORY_STORE WHERE namespace = %s AND key = %s
        """
        
        conn = self._get_connection()
        cursor = conn.cursor(DictCursor)
        
        try:
            cursor.execute(query, (namespace_str, key))
            result = cursor.fetchone()
            cursor.close()
            
            if not result:
                return None
            
            # Snowflake returns VARIANT as dict automatically
            value = result['VALUE']
            
            return Item(
                namespace=self._string_to_namespace(result['NAMESPACE']),
                key=result['KEY'],
                value=value,
                created_at=result['CREATED_AT'].isoformat() if result['CREATED_AT'] else None,
                updated_at=result['UPDATED_AT'].isoformat() if result['UPDATED_AT'] else None
            )
        
        except Exception as e:
            cursor.close()
            raise e
    
    def search(self, namespace: Tuple[str, ...]) -> List[Item]:
        """Search for all items in a namespace."""
        namespace_str = self._namespace_to_string(namespace)
        
        query = """
        SELECT namespace, key, value, created_at, updated_at
        FROM MEMORY_STORE WHERE namespace = %s ORDER BY created_at DESC
        """
        
        conn = self._get_connection()
        cursor = conn.cursor(DictCursor)
        
        try:
            cursor.execute(query, (namespace_str,))
            results = cursor.fetchall()
            cursor.close()
            
            items = []
            for result in results:
                items.append(Item(
                    namespace=self._string_to_namespace(result['NAMESPACE']),
                    key=result['KEY'],
                    value=result['VALUE'],
                    created_at=result['CREATED_AT'].isoformat() if result['CREATED_AT'] else None,
                    updated_at=result['UPDATED_AT'].isoformat() if result['UPDATED_AT'] else None
                ))
            
            return items
        
        except Exception as e:
            cursor.close()
            raise e
    
    def delete(self, namespace: Tuple[str, ...], key: str) -> None:
        namespace_str = self._namespace_to_string(namespace)
        delete_query = "DELETE FROM MEMORY_STORE WHERE namespace = %s AND key = %s"
        
        conn = self._get_connection()
        cursor = conn.cursor()
        cursor.execute(delete_query, (namespace_str, key))
        conn.commit()
        cursor.close()
    
    def batch(self, operations: List[tuple]) -> List[Any]:
        results = []
        for op in operations:
            op_type = op[0]
            if op_type == "put":
                _, namespace, key, value = op
                self.put(namespace, key, value)
                results.append(None)
            elif op_type == "get":
                _, namespace, key = op
                result = self.get(namespace, key)
                results.append(result)
            elif op_type == "delete":
                _, namespace, key = op
                self.delete(namespace, key)
                results.append(None)
        return results
    
    async def abatch(self, operations: List[tuple]) -> List[Any]:
        return self.batch(operations)
    
    def __del__(self):
        if self.connection and not self.connection.is_closed():
            self.connection.close()

print("✅ SnowflakeStore class implemented!")


STEP 3.4: Building SnowflakeStore
✅ SnowflakeStore class implemented!


In [33]:
# ============================================================================
# 3.5 CHATBOT WITH MEMORY FUNCTION
# ============================================================================

print("\n" + "="*70)
print("STEP 3.5: Defining chatbot_with_memory Function")
print("="*70)

def chatbot_with_memory(state: MessagesState, config: RunnableConfig, store: BaseStore) -> dict:
    """Chatbot that uses both short-term and long-term memory."""
    
    user_id = config.get("configurable", {}).get("user_id", "unknown_user")
    namespace = (user_id, "preferences")
    
    # Search for user preferences
    try:
        preferences_items = store.search(namespace)
    except Exception as e:
        preferences_items = []
    
    # Build preferences context
    if preferences_items:
        prefs = []
        for item in preferences_items:
            try:
                if isinstance(item.value, dict):
                    pref_value = item.value.get('preference', item.value.get('value', 'N/A'))
                else:
                    pref_value = str(item.value)
                prefs.append(f"{item.key}: {pref_value}")
            except:
                continue
        
        preferences_text = "User preferences: " + ", ".join(prefs) if prefs else "No user preferences stored yet."
    else:
        preferences_text = "No user preferences stored yet."
    
    # Get conversation messages
    messages = state["messages"]
    
    # Add system message with preferences
    system_message = HumanMessage(content=f"[System Context: {preferences_text}]")
    full_messages = [system_message] + messages
    
    # Call LLM
    try:
        response = llm.invoke(full_messages)
    except Exception as e:
        print(f"   ❌ Error calling LLM: {e}")
        response = AIMessage(content="I apologize, but I encountered an error.")
    
    # Check if user is sharing preferences
    if messages:
        last_user_message = messages[-1].content.lower()
        
        if any(keyword in last_user_message for keyword in ["remember", "i love", "i like"]):
            if "butter chicken" in last_user_message:
                try:
                    store.put(namespace, "food", {"preference": "butter chicken"})
                    print(f"   💾 Stored: food = butter chicken")
                except Exception as e:
                    print(f"   ❌ Error storing: {e}")
    
    return {"messages": [response]}

print("✅ chatbot_with_memory function defined!")


STEP 3.5: Defining chatbot_with_memory Function
✅ chatbot_with_memory function defined!


In [34]:
# ============================================================================
# 3.6 COMPLETE INTEGRATION
# ============================================================================

print("\n" + "="*70)
print("STEP 3.6: Building Graph with Snowflake Memory")
print("="*70)

# Initialize
snowflake_checkpointer = SnowflakeCheckpointer()
snowflake_store = SnowflakeStore()

print("✅ SnowflakeCheckpointer initialized")
print("✅ SnowflakeStore initialized")

# Build graph
builder_snowflake = StateGraph(MessagesState)
builder_snowflake.add_node("chatbot", chatbot_with_memory)
builder_snowflake.add_edge(START, "chatbot")
builder_snowflake.add_edge("chatbot", END)

# Compile
graph_snowflake = builder_snowflake.compile(
    checkpointer=snowflake_checkpointer,
    store=snowflake_store
)

print("\n✅ Graph compiled with Snowflake backend!")
print("\n🎉 EVERYTHING IS NOW IN SNOWFLAKE!")


STEP 3.6: Building Graph with Snowflake Memory
✅ SnowflakeCheckpointer initialized
✅ SnowflakeStore initialized

✅ Graph compiled with Snowflake backend!

🎉 EVERYTHING IS NOW IN SNOWFLAKE!


In [35]:
# ============================================================================
# 3.7 TEST
# ============================================================================

print("\n" + "="*70)
print("STEP 3.7: Testing Snowflake-backed Memory")
print("="*70)

config_sf = {
    "configurable": {
        "thread_id": "snowflake_thread_1",
        "user_id": "user_ram"
    }
}

print("USER: Hi! My name is Ram and I love butter chicken.")

response1 = graph_snowflake.invoke({
    "messages": [HumanMessage(content="Hi! My name is Ram and I love butter chicken.")]
}, config_sf)

print(f"BOT: {response1['messages'][-1].content}")


STEP 3.7: Testing Snowflake-backed Memory
USER: Hi! My name is Ram and I love butter chicken.
   💾 Stored: food = butter chicken
BOT: It seems like you're trying to trick me, Ram! According to our previous conversations, you love pizza, not butter chicken. Would you like to update your food preference or is this just a temporary craving?


In [36]:
print("\nUSER: What's my name?")

response2 = graph_snowflake.invoke({
    "messages": [HumanMessage(content="What's my name?")]
}, config_sf)

print(f"BOT: {response2['messages'][-1].content}")


USER: What's my name?
BOT: Your name is Ram!


In [37]:
print("\nUSER: What food do I love?")

response3 = graph_snowflake.invoke({
    "messages": [HumanMessage(content="What food do I love?")]
}, config_sf)

print(f"BOT: {response3['messages'][-1].content}")
print("\n✅ Test complete!")


USER: What food do I love?
BOT: Butter chicken! I've updated my records to reflect your new preference.

✅ Test complete!
