# Connecting to the task_maistro Deployment

## Deployment

See `creating.ipynb` for the full build and launch walkthrough. In short:

* The [LangGraph CLI](https://docs.langchain.com/langsmith/cli) packages `task_maistro` into a Docker image.
* `docker-compose.yml` launches three containers:
    * `langgraph-redis`: Redis for streaming pub/sub.
    * `langgraph-postgres`: Postgres for persistent state and long-term store.
    * `langgraph-api`: The `task-maistro` server image.

```
$ docker compose --env-file .env up
```

Once running, access the deployment through:

* API: http://localhost:8123
* Docs: http://localhost:8123/docs
* LangGraph Studio: https://smith.langchain.com/studio/?baseUrl=http://127.0.0.1:8123

## Using the API

LangGraph Server exposes [REST API endpoints](https://github.com/langchain-ai/agent-protocol) grouped into three areas:

* **Runs**: Atomic agent executions
* **Threads**: Multi-turn interactions and human-in-the-loop
* **Store**: Long-term memory

You can test any endpoint directly in the [interactive API docs](http://localhost:8123/docs#tag/thread-runs).

## SDK

The [LangGraph SDKs](https://docs.langchain.com/langsmith/sdk) (Python and JS) provide a developer-friendly interface to interact with the LangGraph Server API presented above.

In [1]:
%%capture --no-stderr
%pip install -U langgraph_sdk

In [2]:
from langgraph_sdk import get_client

# Connect via SDK
url_for_cli_deployment = "http://localhost:8123"
client = get_client(url=url_for_cli_deployment)

## Remote Graph

If you are working in the LangGraph library, [Remote Graph](https://docs.langchain.com/langsmith/use-remote-graph) is also a useful way to connect directly to the graph.

In [3]:
%%capture --no-stderr
%pip install -U langchain_openai langgraph langchain_core

In [4]:
from langgraph.pregel.remote import RemoteGraph
from langchain_core.messages import convert_to_messages
from langchain_core.messages import HumanMessage, SystemMessage

# Connect via remote graph
url = "http://localhost:8123"
graph_name = "task_maistro" 
remote_graph = RemoteGraph(graph_name, url=url)

## Runs

A "run" represents a [single execution](https://github.com/langchain-ai/agent-protocol?tab=readme-ov-file#runs-atomic-agent-executions) of your graph. Each time a client makes a request:

1. The HTTP worker generates a unique run ID
2. This run and its results are stored in PostgreSQL
3. You can query these runs to:
   - Check their status
   - Get their results
   - Track execution history

You can see a full set of How To guides for various types of runs [here](https://langchain-ai.github.io/langgraph/how-tos/#runs).

Let's looks at a few of the interesting things we can do with runs.

### Background Runs

The LangGraph server supports two types of runs: 

* `Fire and forget` - Launch a run in the background, but don’t wait for it to finish
* `Waiting on a reply (blocking or polling)` - Launch a run and wait/stream its output

Background runs and polling are quite useful when working with long-running agents. 

Let's [see](https://docs.langchain.com/langsmith/background-run) how this works:

In [5]:
# Create a thread
thread = await client.threads.create()
thread

{'thread_id': '3619d2d0-f72d-403a-9bb2-2b5e0e2ca342',
 'created_at': '2026-02-17T03:52:00.093043+00:00',
 'updated_at': '2026-02-17T03:52:00.093043+00:00',
 'metadata': {},
 'config': {},
 'error': None,
 'status': 'idle',
 'values': None,
 'interrupts': {}}

In [6]:
# Check any existing runs on a thread
thread = await client.threads.create()
runs = await client.runs.list(thread["thread_id"])
print(runs)

[]


In [7]:
# Ensure we've created some ToDos and saved them to my user_id
user_input = "Add a ToDo to finish booking travel to Hong Kong by end of next week. Also, add a ToDo to call parents back about Thanksgiving plans."
config = {"configurable": {"user_id": "Test"}}
graph_name = "task_maistro" 
run = await client.runs.create(thread["thread_id"], graph_name, input={"messages": [HumanMessage(content=user_input)]}, config=config)

In [8]:
# Kick off a new thread and a new run
thread = await client.threads.create()
user_input = "Give me a summary of all ToDos."
config = {"configurable": {"user_id": "Test"}}
graph_name = "task_maistro" 
run = await client.runs.create(thread["thread_id"], graph_name, input={"messages": [HumanMessage(content=user_input)]}, config=config)

In [9]:
# Check the run status
print(await client.runs.get(thread["thread_id"], run["run_id"]))

{'run_id': '019c69bb-2241-7522-a8a6-85a779715c18', 'thread_id': '6e808947-648c-4cf7-9c78-0e3e393104fe', 'assistant_id': 'ea4ebafa-a81d-5063-a5fa-67c755d98a21', 'created_at': '2026-02-17T03:53:17.634747+00:00', 'updated_at': '2026-02-17T03:53:20.789154+00:00', 'status': 'success', 'metadata': {'assistant_id': 'ea4ebafa-a81d-5063-a5fa-67c755d98a21'}, 'kwargs': {'input': {'messages': [{'id': None, 'name': None, 'type': 'human', 'content': 'Give me a summary of all ToDos.', 'additional_kwargs': {}, 'response_metadata': {}}]}, 'config': {'configurable': {'thread_id': '6e808947-648c-4cf7-9c78-0e3e393104fe', 'graph_id': 'task_maistro', '__request_start_time_ms__': 1771300397633, 'langgraph_request_id': '4fd0df50-4039-44dd-bf4f-51c0fb6002d8', 'langgraph_auth_user': None, 'user_id': 'Test', 'langgraph_auth_user_id': '', 'assistant_id': 'ea4ebafa-a81d-5063-a5fa-67c755d98a21', 'langgraph_auth_permissions': [], '__after_seconds__': 0, 'run_id': '019c69bb-2241-7522-a8a6-85a779715c18'}, 'metadata': 

We can see that it has `'status': 'pending'` because it is still running.

What if we want to wait until the run completes, making it a blocking run?

We can use `client.runs.join` to wait until the run completes.

This ensures that no new runs are started until the current run completes on the thread.

In [10]:
# Wait until the run completes
await client.runs.join(thread["thread_id"], run["run_id"])
print(await client.runs.get(thread["thread_id"], run["run_id"]))

{'run_id': '019c69bb-2241-7522-a8a6-85a779715c18', 'thread_id': '6e808947-648c-4cf7-9c78-0e3e393104fe', 'assistant_id': 'ea4ebafa-a81d-5063-a5fa-67c755d98a21', 'created_at': '2026-02-17T03:53:17.634747+00:00', 'updated_at': '2026-02-17T03:53:20.789154+00:00', 'status': 'success', 'metadata': {'assistant_id': 'ea4ebafa-a81d-5063-a5fa-67c755d98a21'}, 'kwargs': {'input': {'messages': [{'id': None, 'name': None, 'type': 'human', 'content': 'Give me a summary of all ToDos.', 'additional_kwargs': {}, 'response_metadata': {}}]}, 'config': {'configurable': {'thread_id': '6e808947-648c-4cf7-9c78-0e3e393104fe', 'graph_id': 'task_maistro', '__request_start_time_ms__': 1771300397633, 'langgraph_request_id': '4fd0df50-4039-44dd-bf4f-51c0fb6002d8', 'langgraph_auth_user': None, 'user_id': 'Test', 'langgraph_auth_user_id': '', '__after_seconds__': 0, 'langgraph_auth_permissions': [], 'assistant_id': 'ea4ebafa-a81d-5063-a5fa-67c755d98a21', 'run_id': '019c69bb-2241-7522-a8a6-85a779715c18'}, 'metadata': 

Now the run has `'status': 'success'` because it has completed.

We can get run in the api docs as well.

![get_run](../images/get_run.png)

### Streaming Runs

Each time a client makes a streaming request:

1. The HTTP worker generates a unique run ID
2. The Queue worker begins work on the run
3. During execution, the Queue worker publishes updates to Redis
4. The HTTP worker subscribes to those updates and streams them back to the client

This is what enables token-by-token streaming over HTTP.

We use `stream_mode="messages-tuple"` to [stream tokens](https://docs.langchain.com/langsmith/streaming) — useful for production agents that may take a while to respond.

In [11]:
user_input = "What ToDo should I focus on first."
async for chunk in client.runs.stream(
    thread["thread_id"],
    graph_name,
    input={"messages": [HumanMessage(content=user_input)]},
    config=config,
    stream_mode="messages-tuple",
):
    if chunk.event == "messages":
        print(
            "".join(data_item["content"] for data_item in chunk.data if "content" in data_item),
            end="",
            flush=True,
        )

Since you have a task that is time-sensitive, I recommend focusing on the following:

1. **Call parents back about Thanksgiving plans**
   - This task has no deadline but is likely important to complete soon, especially if Thanksgiving is approaching.

After that, you can prioritize the travel booking for Hong Kong, as it has a deadline of February 24, 2026. 

Would you like to mark any of these tasks as in progress or completed?

## Threads

Whereas a run is only a single execution of the graph, a thread supports *multi-turn* interactions.

When the client makes a graph execution execution with a `thread_id`, the server will save all [checkpoints](https://docs.langchain.com/oss/python/langgraph/persistence#checkpoints) (steps) in the run to the thread in the Postgres database.

The server allows us <!-- to  [~check the status of created threads~](https://langchain-ai.github.io/langgraph/cloud/how-tos/check_thread_status/) -->
a variety of ways to [work with threads](https://reference.langchain.com/python/langsmith/deployment/sdk/#langgraph_sdk.client.ThreadsClient).

### Check thread state

For example, we can easily access the state [checkpoints](https://docs.langchain.com/oss/python/langgraph/persistence#checkpoints) saved to any specific thread.

In [12]:
thread_state = await client.threads.get_state(thread['thread_id'])
for m in convert_to_messages(thread_state['values']['messages']):
    m.pretty_print()


Give me a summary of all ToDos.

Here’s a summary of your current ToDo list:

1. **Task:** Call parents back about Thanksgiving plans
   - **Status:** Not started
   - **Deadline:** None
   - **Time to Complete:** 30 minutes

2. **Task:** Finish booking travel to Hong Kong
   - **Status:** Not started
   - **Deadline:** February 24, 2026
   - **Solutions:** 
     - Check flight options
     - Book accommodation
     - Plan itinerary
   - **Time to Complete:** 120 minutes

3. **Task:** Finish booking travel to Hong Kong (duplicate entry)
   - **Status:** Not started
   - **Deadline:** February 24, 2026
   - **Solutions:** 
     - Check flight options
     - Book accommodation
     - Plan itinerary
   - **Time to Complete:** 120 minutes

Let me know if you need any changes or updates!

What ToDo should I focus on first.

Since you have a task that is time-sensitive, I recommend focusing on the following:

1. **Call parents back about Thanksgiving plans**
   - This task has no deadline b

### Copy threads

We can also [copy](https://docs.langchain.com/langsmith/use-threads#copy-thread) (i.e. "fork") an existing thread. 

This will keep the existing thread's history, but allow us to create independent runs that do not affect the original thread.

In [13]:
# Copy the thread
copied_thread = await client.threads.copy(thread['thread_id'])

In [14]:
# Check the state of the copied thread
copied_thread_state = await client.threads.get_state(copied_thread['thread_id'])
for m in convert_to_messages(copied_thread_state['values']['messages']):
    m.pretty_print()


Give me a summary of all ToDos.

Here’s a summary of your current ToDo list:

1. **Task:** Call parents back about Thanksgiving plans
   - **Status:** Not started
   - **Deadline:** None
   - **Time to Complete:** 30 minutes

2. **Task:** Finish booking travel to Hong Kong
   - **Status:** Not started
   - **Deadline:** February 24, 2026
   - **Solutions:** 
     - Check flight options
     - Book accommodation
     - Plan itinerary
   - **Time to Complete:** 120 minutes

3. **Task:** Finish booking travel to Hong Kong (duplicate entry)
   - **Status:** Not started
   - **Deadline:** February 24, 2026
   - **Solutions:** 
     - Check flight options
     - Book accommodation
     - Plan itinerary
   - **Time to Complete:** 120 minutes

Let me know if you need any changes or updates!

What ToDo should I focus on first.

Since you have a task that is time-sensitive, I recommend focusing on the following:

1. **Call parents back about Thanksgiving plans**
   - This task has no deadline b

### Human in the loop

The server supports all [human-in-the-loop](https://docs.langchain.com/langsmith/add-human-in-the-loop) features — you can search, edit state, and resume graph execution from any prior checkpoint.

As an example, [we can search, edit, and continue graph execution](https://docs.langchain.com/oss/python/langgraph/persistence#capabilities) from any prior checkpoint.

In [15]:
# Get the history of the thread
states = await client.threads.get_history(thread['thread_id'])

# Pick a state update to fork
to_fork = states[-2]
to_fork['values']

{'messages': [{'content': 'Give me a summary of all ToDos.',
   'additional_kwargs': {},
   'response_metadata': {},
   'type': 'human',
   'name': None,
   'id': 'd08ed8d9-166e-42da-95ce-58604a79c07c'}]}

In [16]:
to_fork['values']['messages'][0]['id']

'd08ed8d9-166e-42da-95ce-58604a79c07c'

In [17]:
to_fork['next']

['task_mAIstro']

In [18]:
to_fork['checkpoint_id']

'1f10bb43-16d7-612e-8000-58aeaab7eb3b'

Let's edit the state. Remember how our reducer on `messages` works: 

* It will append, unless we supply a message ID.
* We supply the message ID to overwrite the message, rather than appending to state!

In [19]:
forked_input = {
    "messages": HumanMessage(
        content="Give me a summary of all ToDos that need to be done in the next week.",
        id=to_fork["values"]["messages"][0]["id"],
    )
}

# Update the state, creating a new checkpoint in the thread
forked_config = await client.threads.update_state(
    thread["thread_id"], forked_input, checkpoint_id=to_fork["checkpoint_id"]
)

In [20]:
# Run the graph from the new checkpoint in the thread
async for chunk in client.runs.stream(
    thread["thread_id"],
    graph_name,
    input=None,
    config=config,
    checkpoint_id=forked_config["checkpoint_id"],
    stream_mode="messages-tuple",
):
    if chunk.event == "messages":
        print(
            "".join(data_item["content"] for data_item in chunk.data if "content" in data_item),
            end="",
            flush=True,
        )

Currently, your ToDo list includes the following tasks:

1. **Call parents back about Thanksgiving plans**
   - Status: Not started
   - Deadline: None
   - Time to complete: 30 minutes

2. **Finish booking travel to Hong Kong**
   - Status: Not started
   - Deadline: February 24, 2026
   - Solutions: Check flight options, Book accommodation, Plan itinerary
   - Time to complete: 120 minutes

None of these tasks have a deadline within the next week. The only task without a deadline is the call to your parents, which you can complete at your convenience. If you have any other tasks or deadlines in mind, feel free to let me know!

## Across-thread memory

The `task_maistro` graph uses the [LangGraph memory `store`](https://docs.langchain.com/oss/python/langgraph/persistence#memory-store) to persist information across threads — such as the user's profile, ToDo list, and custom instructions.

The deployment's Postgres database backs this store, making all memories durable across restarts.

There are several methods [for interacting with the store](https://reference.langchain.com/python/langsmith/deployment/sdk/#langgraph_sdk.client.StoreClient) via the LangGraph SDK.

### Search items

`task_maistro` saves ToDos under the namespace tuple `("todo", todo_category, user_id)`.

The `todo_category` defaults to `"general"` (configurable in `configuration.py`).

Supply this tuple to search for all ToDos for a given user.

In [21]:
items = await client.store.search_items(
    ("todo", "general", "Test"),
    limit=5,
    offset=0
)
items['items']

[{'namespace': ['todo', 'general', 'Test'],
  'key': '149199ac-87af-4ab4-8c51-df5c24331aa8',
  'value': {'task': 'Call parents back about Thanksgiving plans',
   'status': 'not started',
   'deadline': None,
   'solutions': [],
   'time_to_complete': 30},
  'created_at': '2026-02-17T03:53:16.567869+00:00',
  'updated_at': '2026-02-17T03:53:16.567869+00:00',
  'score': None},
 {'namespace': ['todo', 'general', 'Test'],
  'key': 'e4761eec-7bb0-4fe8-974b-a314aa34a0f6',
  'value': {'task': 'Finish booking travel to Hong Kong',
   'status': 'not started',
   'deadline': '2026-02-24T00:00:00',
   'solutions': ['Check flight options',
    'Book accommodation',
    'Plan itinerary'],
   'time_to_complete': 120},
  'created_at': '2026-02-17T03:53:16.567008+00:00',
  'updated_at': '2026-02-17T03:53:16.567008+00:00',
  'score': None},
 {'namespace': ['todo', 'general', 'Test'],
  'key': '566063c5-4ac7-40b3-b582-7b21d6dfa5b8',
  'value': {'task': 'Finish booking travel to Hong Kong',
   'status': 

We can also check in the interactive API docs.

![search_store_items](../images/search_store_items.png)

### Add items

In our graph, we call `put` to add items to the store.

We can use [put](https://reference.langchain.com/python/langsmith/deployment/sdk/#langgraph_sdk.client.StoreClient.put_item) with the SDK if we want to directly add items to the store outside our graph.

In [None]:
from uuid import uuid4

# # Put in same namespace
# await client.store.put_item(
#     ("todo", "high", "Test"),
#     key=str(uuid4()),
#     value={"todo": "Test SDK put_item"},
# )

# Put in different namespace
await client.store.put_item(
    ("testing", "Test"),
    key=str(uuid4()),
    value={"todo": "Test SDK put_item"},
)

In [26]:
items = await client.store.search_items(
    ("testing", "Test"),
    limit=5,
    offset=0
)
items['items']

[{'namespace': ['testing', 'Test'],
  'key': '6284215f-abf3-435c-a124-de17ea811c19',
  'value': {'todo': 'Test SDK put_item'},
  'created_at': '2026-02-17T04:26:46.746637+00:00',
  'updated_at': '2026-02-17T04:26:46.746637+00:00',
  'score': None}]

### Delete items

We can use the SDK to [delete items](https://reference.langchain.com/python/langsmith/deployment/sdk/#langgraph_sdk.client.StoreClient.delete_item) from the store by key.

In [30]:
item_keys = [item['key'] for item in items['items']]
item_keys

['6284215f-abf3-435c-a124-de17ea811c19']

In [31]:
del_keys = item_keys[0]

await client.store.delete_item(
    ("testing", "Test"),
    key=del_keys,
)

In [32]:
items = await client.store.search_items(
    ("testing", "Test"),
    limit=5,
    offset=0
)
items['items']

[]