In [1]:
from langgraph.types import Command
from colorama import Fore, Style
from copy import deepcopy
import json
import requests

def serialize_for_js(data):
    """
    Converts a Python dictionary to a clean JSON string that JavaScript can safely parse.
    """
    def custom_serializer(obj):
        """ Handle non-serializable objects like sets, tuples, and custom classes. """
        if isinstance(obj, set):
            return list(obj)  # Convert sets to lists
        if isinstance(obj, tuple):
            return [custom_serializer(item) for item in obj]  # Convert tuples recursively
        if hasattr(obj, '__dict__'):  # Handle custom objects
            return obj.__dict__
        return str(obj)  # Fallback: Convert unknown objects to string

    return json.dumps(data, default=custom_serializer, ensure_ascii=False, indent=2)

# Helper function to send update to the web service
def send_update(mode, data):
    try:
        requests.post("http://lg_viz:3002/api/update", json={"mode": mode, "data": serialize_for_js(data)})
    except Exception as e:
        print(f"Error sending update to web interface: {e}")

def send_clear():
    try:
        requests.post("http://lg_viz:3002/api/clear", json={})
    except Exception as e:
        print(f"Error sending clear to web interface: {e}")

def stream_from_app(app_stream, input_buffer=[{"messages": []}], verbose=False, debug=False):
    seen_metas = dict()
    input_buffer = deepcopy(input_buffer)
    send_clear()
    while input_buffer:
        for mode, chunk in app_stream(input_buffer.pop(), stream_mode = ["values", "messages", "updates", "debug"]): 
            # https://langchain-ai.github.io/langgraph/concepts/streaming/
            if mode == "messages":
                chunk, meta = chunk
                if meta.get("checkpoint_ns") not in seen_metas:
                    caller_node = meta.get('langgraph_node')
                    user_prompt = f"node:{caller_node} -> message" if verbose else caller_node.title()
                    send_update("messages", {"meta": meta})
                    if verbose: 
                        print(f"\n[node:{meta.get('langgraph_node')}:meta -> message] {meta}")
                        # print(f"\n[messages:meta] {meta}")
                    print(f"\n[{user_prompt}] ", end="")
                seen_metas[meta.get("checkpoint_ns")] = meta
                if chunk.content:
                    # print(chunk.content, end="", flush=True)
                    yield chunk.content
                elif chunk.response_metadata:
                    if verbose: 
                        print(f"\n\n[message] {chunk.response_metadata=}, {chunk.usage_metadata=}")
            elif mode == "values":
                if verbose: 
                    print("\n[value]", chunk)
                send_update("values", chunk)
            elif mode == "updates":
                if verbose:
                    print("\n[update]", chunk)
                send_update("updates", chunk)
                ## Handle the interrupt. If an interrupt happens, then handle the interrupt and re-queue app stream
                if "__interrupt__" in chunk and chunk.get("__interrupt__")[0].resumable:
                    user_prompt = "\n[update -> interrupt] " * bool(verbose) + chunk.get("__interrupt__")[0].value
                    input_buffer += [
                        Command(resume=input(user_prompt))
                    ]
            elif mode == "debug":
                send_update("debug", chunk)
                if debug:
                    print(Fore.RED + f"[debug] {chunk}" + Style.RESET_ALL)


In [2]:
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.output_parsers import StrOutputParser
from langchain_nvidia import ChatNVIDIA

# !pip install --upgrade langgraph colorama
llm = ChatNVIDIA(temperature=0)

import uuid
from typing import Annotated, Optional
from typing_extensions import TypedDict

from langgraph.checkpoint.memory import MemorySaver
from langgraph.constants import START, END
from langgraph.graph import StateGraph
from langgraph.types import interrupt, Command
from langgraph.graph.message import add_messages
from functools import partial
from colorama import Fore, Style
from copy import deepcopy
import operator

##################################################################
## Define the authoritative state system (environment) for your use-case

class State(TypedDict):
    """The Graph State for your Agent System"""
    messages: Annotated[list, add_messages]

def get_nth_message(state: State, n=-1, attr="messages"):
    try: return state.get("messages")[n].content
    except: return ""
    
##################################################################
## Define the operations (Nodes) that can happen on your environment

def user(state: State):
    """Edge" option where transition is generated at runtime"""
    # answer = interrupt("[User]:")
    return {"messages": [("user", "No clue")]} 

def agent(state: State, config=None):
    ## Passing in config will cause connector to stream values to state modification buffer. See graph stream later
    response = llm.invoke(state.get("messages"), config=config)
    update = {"messages": [response]}
    if "stop" in get_nth_message(state, n=-1): 
        return update  ## Implied goto=END
    return Command(update=update, goto="user")

##################################################################
## Define the system that organizes your nodes (and maybe edges)

builder = StateGraph(State)
builder.add_edge(START, "user")  ## A start node is always necessary
builder.add_node("agent", agent)
builder.add_node("user", user)
builder.add_edge("user", "agent")

##################################################################
## A memory management system to keep track of agent state
checkpointer = MemorySaver()
app = builder.compile(checkpointer=checkpointer)
config = {"configurable": {"thread_id": uuid.uuid4()}}
app_stream = partial(app.stream, config=config)

##################################################################
## Simple Invocation Example

## We can stream over it until an interrupt is received
for token in stream_from_app(app_stream, verbose=True, debug=False):
    # print(token, end="", flush=True)
    continue

Set model using model parameter. 
To get available models use available_models property.



[value] {'messages': []}

[update] {'user': {'messages': [('user', 'No clue')]}}

[value] {'messages': [HumanMessage(content='No clue', additional_kwargs={}, response_metadata={}, id='58da786f-5709-4e5b-afae-fa106e2c7bc8')]}

[node:agent:meta -> message] {'langgraph_step': 2, 'langgraph_node': 'agent', 'langgraph_triggers': ['user'], 'langgraph_path': ('__pregel_pull', 'agent'), 'langgraph_checkpoint_ns': 'agent:2922bfd9-0a4a-5d19-ad1e-60fda2fec2da', 'checkpoint_ns': 'agent:2922bfd9-0a4a-5d19-ad1e-60fda2fec2da', 'ls_provider': 'NVIDIA', 'ls_model_name': 'meta/llama-3.1-8b-instruct', 'ls_model_type': 'chat', 'ls_temperature': 0.0, 'ls_max_tokens': 1024, 'ls_stop': None}

[node:agent -> message] 

[message] chunk.response_metadata={'finish_reason': 'stop', 'model_name': 'meta/llama-3.1-8b-instruct'}, chunk.usage_metadata={'input_tokens': 12, 'output_tokens': 130, 'total_tokens': 142}

[update] {'agent': {'messages': [AIMessage(content="It can be frustrating when you're stuck and don't k

KeyboardInterrupt: 