## Test OracleCheckpointSaver

In [1]:
import uuid
import time
from typing import TypedDict
from IPython.display import Image, display
from langgraph.graph import StateGraph, START, END
from oracle_checkpoint_saver import OracleCheckpointSaver

In [2]:
# the shared state
class State(TypedDict):
    input: str
    processed: str

In [3]:
# first node
def process_input(state: State) -> State:

    print("Sleep for 10 sec..")
    time.sleep(10)
    
    text = state["input"]
    processed = text.upper()
    state["processed"] = processed

    print("Execution, first step:", state)
    
    return state

# second node
def middle_node(state: State) -> State:
    print("Sleep for 10 sec..")
    time.sleep(10)

    print("Execution, middle step...")
    
    return state
    
# end node
def end_node(state: State) -> State:

    print("Sleep for 10 sec..")
    time.sleep(10)
    
    print("Execution completed. Final step:", state)
    
    return state

In [4]:
# build the graph
builder = StateGraph(State)

builder.add_node("process", process_input)
builder.add_node("middle", middle_node)
builder.add_node("end", end_node)

builder.add_edge(START, "process")
builder.add_edge("process", "middle")
builder.add_edge("middle", "end")
builder.add_edge("end", END)
# builder.set_finish_point("end")

# 6. Aggiungiamo il checkpointer
saver = OracleCheckpointSaver()
graph = builder.compile(checkpointer=saver)

In [5]:
# set the input
initial_state = {"input": "Hello LangGraph!", "processed": ""}

thread_id = "00001" # str(uuid.uuid4())

agent_config = {
    "configurable": {
        "thread_id": thread_id
    }
}

In [13]:
# 7. Execute the graph
# to start from zero: pass initial_state, to restart pass None
# final_state = graph.invoke(initial_state, config=agent_config)

# to restart need simply to pass the same thread_id with input = None
final_state = graph.invoke(None, config=agent_config)

2025-04-21 17:21:28,900 - Checkpoint row found!
