# How to handle "double-texting" or concurrent runs in your graph

You might want to start a new run on a thread while the previous run still haven't finished. We call this "double-texting" or multi-tasking.

There are several strategies for handling this:
 
- `reject`: Reject the new run.
- `interrupt`: Interrupt the current run, keeping steps completed until now, and start a new one.
- `rollback`: Cancel and delete the existing run, rolling back the thread to the state before it had started, then start the new run.
- `enqueue`: Queue up the new run to start after the current run finishes.

### Reject

In [1]:
from langgraph_sdk import get_client
from langchain_core.messages import convert_to_messages
import httpx

In [2]:
client = get_client()

In [3]:
assistant = await client.assistants.create("agent")

In [4]:
thread = await client.threads.create()

In [5]:
run = await client.runs.create(
    thread["thread_id"],
    assistant["assistant_id"],
    input={"messages": [{"role": "human", "content": "whats the weather in sf?"}]}
)

In [6]:
try:
    await client.runs.create(
        thread["thread_id"],
        assistant["assistant_id"],
        input={"messages": [{"role": "human", "content": "whats the weather in nyc?"}]},
        multitask_strategy="reject",
    )
except httpx.HTTPStatusError as e:
    print("Failed to start concurrent run", e)

Failed to start concurrent run Client error '409 Conflict' for url 'http://localhost:8123/threads/6645ef70-3f29-424b-bc47-26839d60e49d/runs'
For more information check: https://developer.mozilla.org/en-US/docs/Web/HTTP/Status/409


We can verify that the original thread finished executing:

In [7]:
# wait until the original run completes
await client.runs.join(thread["thread_id"], run["run_id"])

In [8]:
state = await client.threads.get_state(thread["thread_id"])

In [9]:
for m in convert_to_messages(state["values"]["messages"]):
    m.pretty_print()


whats the weather in sf?

[{'id': 'toolu_01WkksaEhUW41Qtj5PrcsyGe', 'input': {'query': 'weather in san francisco'}, 'name': 'tavily_search_results_json', 'type': 'tool_use'}]
Tool Calls:
  tavily_search_results_json (toolu_01WkksaEhUW41Qtj5PrcsyGe)
 Call ID: toolu_01WkksaEhUW41Qtj5PrcsyGe
  Args:
    query: weather in san francisco
Name: tavily_search_results_json

[{"url": "https://www.weatherapi.com/", "content": "{'location': {'name': 'San Francisco', 'region': 'California', 'country': 'United States of America', 'lat': 37.78, 'lon': -122.42, 'tz_id': 'America/Los_Angeles', 'localtime_epoch': 1718324566, 'localtime': '2024-06-13 17:22'}, 'current': {'last_updated_epoch': 1718324100, 'last_updated': '2024-06-13 17:15', 'temp_c': 16.7, 'temp_f': 62.1, 'is_day': 1, 'condition': {'text': 'Partly cloudy', 'icon': '//cdn.weatherapi.com/weather/64x64/day/116.png', 'code': 1003}, 'wind_mph': 12.5, 'wind_kph': 20.2, 'wind_degree': 270, 'wind_dir': 'W', 'pressure_mb': 1016.0, 'pressure_in': 

### Cancel

In [10]:
import asyncio

In [11]:
thread = await client.threads.create()

In [12]:
# the first run will be interrupted
interrupted_run = await client.runs.create(
    thread["thread_id"], assistant["assistant_id"],
    # NOTE: we add a sleep here on purpose, so that we interrupt the run before the LLM is called
    input={"messages": [{"role": "human", "content": "whats the weather in sf?"}], "sleep": 5},
)
await asyncio.sleep(1)
run = await client.runs.create(
    thread["thread_id"],
    assistant["assistant_id"],
    input={"messages": [{"role": "human", "content": "whats the weather in nyc?"}]},
    multitask_strategy="interrupt",
)

In [13]:
# wait until the second run completes
await client.runs.join(thread["thread_id"], run["run_id"])

We can see that the thread has partial data from the first run + data from the second run

In [14]:
state = await client.threads.get_state(thread["thread_id"])

In [15]:
for m in convert_to_messages(state["values"]["messages"]):
    m.pretty_print()


whats the weather in sf?

whats the weather in nyc?

[{'id': 'toolu_01HbWybk3Hiv4K55QAgrgSVW', 'input': {'query': 'weather in san francisco'}, 'name': 'tavily_search_results_json', 'type': 'tool_use'}]
Tool Calls:
  tavily_search_results_json (toolu_01HbWybk3Hiv4K55QAgrgSVW)
 Call ID: toolu_01HbWybk3Hiv4K55QAgrgSVW
  Args:
    query: weather in san francisco
Name: tavily_search_results_json

[{"url": "https://www.weatherapi.com/", "content": "{'location': {'name': 'San Francisco', 'region': 'California', 'country': 'United States of America', 'lat': 37.78, 'lon': -122.42, 'tz_id': 'America/Los_Angeles', 'localtime_epoch': 1718324566, 'localtime': '2024-06-13 17:22'}, 'current': {'last_updated_epoch': 1718324100, 'last_updated': '2024-06-13 17:15', 'temp_c': 16.7, 'temp_f': 62.1, 'is_day': 1, 'condition': {'text': 'Partly cloudy', 'icon': '//cdn.weatherapi.com/weather/64x64/day/116.png', 'code': 1003}, 'wind_mph': 12.5, 'wind_kph': 20.2, 'wind_degree': 270, 'wind_dir': 'W', 'pressure_m

Verify that the original, interrupted run was interrupted

In [16]:
(await client.runs.get(thread["thread_id"], interrupted_run["run_id"]))["status"]

'interrupted'

### Rollback

In [17]:
thread = await client.threads.create()

In [18]:
# the first run will be interrupted
rolled_back_run = await client.runs.create(
    thread["thread_id"], assistant["assistant_id"],
    # NOTE: we add a sleep here on purpose, so that we interrupt the run before the LLM is called
    input={"messages": [{"role": "human", "content": "whats the weather in sf?"}], "sleep": 5},
)
await asyncio.sleep(1)
run = await client.runs.create(
    thread["thread_id"],
    assistant["assistant_id"],
    input={"messages": [{"role": "human", "content": "whats the weather in nyc?"}]},
    multitask_strategy="rollback",
)

In [19]:
# wait until the second run completes
await client.runs.join(thread["thread_id"], run["run_id"])

We can see that the thread has data only from the second run

In [20]:
state = await client.threads.get_state(thread["thread_id"])

In [21]:
for m in convert_to_messages(state["values"]["messages"]):
    m.pretty_print()


whats the weather in nyc?

[{'id': 'toolu_01EmFtimGE2uLKbrVo65zsN1', 'input': {'query': 'weather in nyc'}, 'name': 'tavily_search_results_json', 'type': 'tool_use'}]
Tool Calls:
  tavily_search_results_json (toolu_01EmFtimGE2uLKbrVo65zsN1)
 Call ID: toolu_01EmFtimGE2uLKbrVo65zsN1
  Args:
    query: weather in nyc
Name: tavily_search_results_json

[{"url": "https://www.weatherapi.com/", "content": "{'location': {'name': 'New York', 'region': 'New York', 'country': 'United States of America', 'lat': 40.71, 'lon': -74.01, 'tz_id': 'America/New_York', 'localtime_epoch': 1718324657, 'localtime': '2024-06-13 20:24'}, 'current': {'last_updated_epoch': 1718324100, 'last_updated': '2024-06-13 20:15', 'temp_c': 24.4, 'temp_f': 75.9, 'is_day': 1, 'condition': {'text': 'Sunny', 'icon': '//cdn.weatherapi.com/weather/64x64/day/113.png', 'code': 1000}, 'wind_mph': 2.2, 'wind_kph': 3.6, 'wind_degree': 177, 'wind_dir': 'S', 'pressure_mb': 1015.0, 'pressure_in': 29.98, 'precip_mm': 0.0, 'precip_in': 0.

Verify that the original, rolled back run was deleted

In [22]:
try:
    await client.runs.get(thread["thread_id"], rolled_back_run["run_id"])
except httpx.HTTPStatusError as e:
    print("Original run was correctly deleted")

Original run was correctly deleted


### Enqueue

In [23]:
thread = await client.threads.create()

In [24]:
# this run will be interrupted
first_run = await client.runs.create(
    thread["thread_id"],
    assistant["assistant_id"],
    input={"messages": [{"role": "human", "content": "whats the weather in sf?"}]}
)

In [25]:
second_run = await client.runs.create(
    thread["thread_id"],
    assistant["assistant_id"],
    input={"messages": [{"role": "human", "content": "whats the weather in nyc?"}]},
    multitask_strategy="enqueue",
)

Verify that the thread has data from both runs

In [26]:
# wait until the second run completes
await client.runs.join(thread["thread_id"], second_run["run_id"])

In [27]:
state = await client.threads.get_state(thread["thread_id"])

In [28]:
for m in convert_to_messages(state["values"]["messages"]):
    m.pretty_print()


whats the weather in sf?

[{'id': 'toolu_01VFfFcVhgHaYCj8hc4pvx2e', 'input': {'query': 'weather in san francisco'}, 'name': 'tavily_search_results_json', 'type': 'tool_use'}]
Tool Calls:
  tavily_search_results_json (toolu_01VFfFcVhgHaYCj8hc4pvx2e)
 Call ID: toolu_01VFfFcVhgHaYCj8hc4pvx2e
  Args:
    query: weather in san francisco
Name: tavily_search_results_json

[{"url": "https://www.weatherapi.com/", "content": "{'location': {'name': 'San Francisco', 'region': 'California', 'country': 'United States of America', 'lat': 37.78, 'lon': -122.42, 'tz_id': 'America/Los_Angeles', 'localtime_epoch': 1718324566, 'localtime': '2024-06-13 17:22'}, 'current': {'last_updated_epoch': 1718324100, 'last_updated': '2024-06-13 17:15', 'temp_c': 16.7, 'temp_f': 62.1, 'is_day': 1, 'condition': {'text': 'Partly cloudy', 'icon': '//cdn.weatherapi.com/weather/64x64/day/116.png', 'code': 1003}, 'wind_mph': 12.5, 'wind_kph': 20.2, 'wind_degree': 270, 'wind_dir': 'W', 'pressure_mb': 1016.0, 'pressure_in': 