Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions config.toml.example
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,10 @@ working_dir = "/home/agent"
[pool]
max_sessions = 10
session_ttl_hours = 24
# Hard ceiling (sec) per prompt; see #732. Default: 1800 (30 min).
# prompt_hard_timeout_secs = 1800
# Liveness-check cadence (sec) for the recv loop; see #732. Default: 30.
# liveness_check_secs = 30

[markdown]
tables = "code" # "code" (default) | "bullets" | "off"
Expand Down
25 changes: 24 additions & 1 deletion src/acp/connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader};
use tokio::process::{Child, ChildStdin};
use tokio::sync::{mpsc, oneshot, Mutex};
use tokio::task::JoinHandle;
use tracing::{debug, error, info};
use tracing::{debug, error, info, trace};

/// Pick the most permissive selectable permission option from ACP options.
fn pick_best_option(options: &[Value]) -> Option<String> {
Expand Down Expand Up @@ -319,6 +319,10 @@ impl AcpConnection {
let _ = tx.send(msg);
continue;
}
// Stale id (#732): pending was already abandoned. Falls through
// to subscriber forwarding; the adapter recv loop filters by
// request_id so it can't leak into the next prompt.
trace!(request_id = id, "stale id-bearing message after abandon");
}

// Notification → forward to subscriber
Expand Down Expand Up @@ -557,6 +561,25 @@ impl AcpConnection {
self.last_active = Instant::now();
}

/// Drop the pending entry for `request_id` and best-effort send
/// `session/cancel`. Errors are swallowed: the agent process may already
/// be dead, in which case the stdin write fails harmlessly. See #732.
pub async fn abandon_request(&self, request_id: u64) {
self.pending.lock().await.remove(&request_id);
let Some(session_id) = self.acp_session_id.as_deref() else {
return;
};
let req = json!({
"jsonrpc": "2.0",
"id": self.next_id(),
"method": "session/cancel",
"params": {"sessionId": session_id},
});
if let Ok(data) = serde_json::to_string(&req) {
let _ = self.send_raw(&data).await;
}
}

/// Return a clone of the stdin handle for lock-free cancel.
pub fn cancel_handle(&self) -> Arc<Mutex<ChildStdin>> {
Arc::clone(&self.stdin)
Expand Down
51 changes: 40 additions & 11 deletions src/adapter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -159,18 +159,25 @@ pub struct AdapterRouter {
pool: Arc<SessionPool>,
reactions_config: ReactionsConfig,
table_mode: TableMode,
prompt_hard_timeout: std::time::Duration,
/// Polling cadence for the recv-loop liveness check (#732).
liveness_check_interval: std::time::Duration,
}

impl AdapterRouter {
pub fn new(
pool: Arc<SessionPool>,
reactions_config: ReactionsConfig,
table_mode: TableMode,
prompt_hard_timeout_secs: u64,
liveness_check_secs: u64,
) -> Self {
Self {
pool,
reactions_config,
table_mode,
prompt_hard_timeout: std::time::Duration::from_secs(prompt_hard_timeout_secs),
liveness_check_interval: std::time::Duration::from_secs(liveness_check_secs),
}
}

Expand Down Expand Up @@ -335,6 +342,8 @@ impl AdapterRouter {
let streaming = adapter.use_streaming(other_bot_present);
let table_mode = self.table_mode;
let tool_display = self.reactions_config.tool_display;
let prompt_hard_timeout = self.prompt_hard_timeout;
let liveness_check_interval = self.liveness_check_interval;

self.pool
.with_connection(thread_key, |conn| {
Expand All @@ -343,7 +352,7 @@ impl AdapterRouter {
let reset = conn.session_reset;
conn.session_reset = false;

let (mut rx, _) = conn.session_prompt(content_blocks).await?;
let (mut rx, request_id) = conn.session_prompt(content_blocks).await?;
reactions.set_thinking().await;

let mut text_buf = String::new();
Expand Down Expand Up @@ -396,20 +405,40 @@ impl AdapterRouter {
(None, None)
};

// Process ACP notifications
// (#732) Liveness-aware recv loop. Filters stale id-bearing
// messages and abandons cleanly on dead agent / hard ceiling
// so late responses cannot leak into the next prompt.
let mut response_error: Option<String> = None;
let recv_timeout = std::time::Duration::from_secs(600);
let prompt_start = std::time::Instant::now();
loop {
let notification = match tokio::time::timeout(recv_timeout, rx.recv()).await
{
Ok(Some(n)) => n,
Ok(None) => break, // channel closed
Err(_) => {
response_error = Some("Agent stopped responding".into());
break;
let notification = tokio::select! {
msg = rx.recv() => match msg {
Some(n) => n,
// Reader saw EOF and already drained pending; nothing to abandon.
None => break,
},
_ = tokio::time::sleep(liveness_check_interval) => {
if !conn.alive() {
response_error = Some("Agent process died".into());
conn.abandon_request(request_id).await;
break;
}
if prompt_start.elapsed() > prompt_hard_timeout {
response_error = Some(format!(
"Agent exceeded hard timeout ({}s)",
prompt_hard_timeout.as_secs(),
));
conn.abandon_request(request_id).await;
break;
}
continue;
}
};
if notification.id.is_some() {
if let Some(notification_id) = notification.id {
if notification_id != request_id {
// Stale response from a previously-abandoned prompt.
continue;
}
if let Some(ref err) = notification.error {
response_error = Some(format_coded_error(err.code, &err.message));
}
Expand Down
22 changes: 22 additions & 0 deletions src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -313,6 +313,20 @@ pub struct PoolConfig {
pub max_sessions: usize,
#[serde(default = "default_ttl_hours")]
pub session_ttl_hours: u64,
/// Hard ceiling for a single prompt (#732). Once exceeded, the broker
/// abandons the in-flight request, sends `session/cancel` to the agent,
/// and clears the pending entry so late responses cannot leak into the
/// next prompt's subscriber.
///
/// Precision: checked every `liveness_check_secs`, so actual cutoff is
/// ±`liveness_check_secs` from this value.
#[serde(default = "default_prompt_hard_timeout_secs")]
pub prompt_hard_timeout_secs: u64,
/// Polling cadence (seconds) for the recv-loop liveness check (#732).
/// Lower = faster reaction to a dead agent / hard ceiling at the cost of
/// more wakeups while the agent is streaming normally.
#[serde(default = "default_liveness_check_secs")]
pub liveness_check_secs: u64,
}

#[derive(Debug, Clone, Deserialize)]
Expand Down Expand Up @@ -434,6 +448,12 @@ fn default_max_sessions() -> usize {
fn default_ttl_hours() -> u64 {
4
}
pub(crate) fn default_prompt_hard_timeout_secs() -> u64 {
30 * 60
}
pub(crate) fn default_liveness_check_secs() -> u64 {
30
}
fn default_true() -> bool {
true
}
Expand Down Expand Up @@ -481,6 +501,8 @@ impl Default for PoolConfig {
Self {
max_sessions: default_max_sessions(),
session_ttl_hours: default_ttl_hours(),
prompt_hard_timeout_secs: default_prompt_hard_timeout_secs(),
liveness_check_secs: default_liveness_check_secs(),
}
}
}
Expand Down
2 changes: 2 additions & 0 deletions src/dispatch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1072,6 +1072,8 @@ mod tests {
pool,
crate::config::ReactionsConfig::default(),
crate::markdown::TableMode::Off,
crate::config::default_prompt_hard_timeout_secs(),
crate::config::default_liveness_check_secs(),
));
Dispatcher::with_idle_timeout(router, 10, 24_000, grouping, DEFAULT_CONSUMER_IDLE_TIMEOUT)
}
Expand Down
2 changes: 2 additions & 0 deletions src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,8 @@ async fn main() -> anyhow::Result<()> {
pool.clone(),
cfg.reactions,
cfg.markdown.tables,
cfg.pool.prompt_hard_timeout_secs,
cfg.pool.liveness_check_secs,
));

// Shutdown signal for Slack adapter
Expand Down
Loading