From a590ec7ce4b732612ca7d940ef94cacc5dcf988a Mon Sep 17 00:00:00 2001 From: Shaun Tsai Date: Tue, 26 May 2026 23:06:16 +0800 Subject: [PATCH] fix: harden agy-acp session binding --- agy-acp/src/main.rs | 211 +++++++++++++++++++++++++++++++++++--------- 1 file changed, 171 insertions(+), 40 deletions(-) diff --git a/agy-acp/src/main.rs b/agy-acp/src/main.rs index 51ede1e4..0c5333d1 100644 --- a/agy-acp/src/main.rs +++ b/agy-acp/src/main.rs @@ -1,6 +1,6 @@ use serde::{Deserialize, Serialize}; use serde_json::{json, Value}; -use std::collections::HashMap; +use std::collections::{HashMap, HashSet}; use std::io::{self, BufRead, Write}; use std::path::PathBuf; use tokio::process::Command; @@ -34,8 +34,8 @@ struct JsonRpcNotification { struct Session { /// agy conversation ID (from conversations directory) conversation_id: Option, - /// cumulative stdout length from previous turns - prev_output_len: usize, + /// full stdout from the previous turn for prefix-checked delta extraction + prev_output: String, } struct Adapter { @@ -52,19 +52,56 @@ impl Adapter { working_dir: std::env::current_dir() .map(|p| p.to_string_lossy().to_string()) .unwrap_or_else(|_| "/tmp".to_string()), - conversations_dir: PathBuf::from(&home) - .join(".gemini/antigravity-cli/conversations"), + conversations_dir: PathBuf::from(&home).join(".gemini/antigravity-cli/conversations"), } } - /// Find the most recently modified conversation ID from agy's data dir. - fn latest_conversation_id(&self) -> Option { - let entries = std::fs::read_dir(&self.conversations_dir).ok()?; + fn conversation_snapshot(&self) -> HashSet { + let Ok(entries) = std::fs::read_dir(&self.conversations_dir) else { + return HashSet::new(); + }; + entries .filter_map(|e| e.ok()) - .filter(|e| e.path().extension().map(|x| x == "pb").unwrap_or(false)) - .max_by_key(|e| e.metadata().ok().and_then(|m| m.modified().ok())) - .and_then(|e| e.path().file_stem().map(|s| s.to_string_lossy().to_string())) + .filter_map(|e| { + let path = e.path(); + if path.extension().map(|x| x == "pb").unwrap_or(false) { + path.file_stem().map(|s| s.to_string_lossy().to_string()) + } else { + None + } + }) + .collect() + } + + fn new_conversation_id(&self, before: &HashSet) -> Option { + let after = self.conversation_snapshot(); + let mut created = after.difference(before); + let first = created.next()?.to_string(); + if created.next().is_some() { + eprintln!( + "[agy-acp] WARN: multiple new agy conversation files appeared; \ + refusing to bind this ACP session by global heuristic" + ); + return None; + } + Some(first) + } + + fn extract_delta(prev_output: &str, full_text: &str, conversation_bound: bool) -> String { + if !conversation_bound || prev_output.is_empty() { + return full_text.to_string(); + } + + if let Some(delta) = full_text.strip_prefix(prev_output) { + return delta.trim_start().to_string(); + } + + eprintln!( + "[agy-acp] WARN: agy stdout was not append-only for the bound conversation; \ + sending full output for this turn and resetting delta baseline" + ); + full_text.to_string() } fn handle_initialize(&self, id: u64) -> JsonRpcResponse { @@ -82,10 +119,13 @@ impl Adapter { fn handle_session_new(&mut self, id: u64) -> JsonRpcResponse { let session_id = Uuid::new_v4().to_string(); - self.sessions.insert(session_id.clone(), Session { - conversation_id: None, - prev_output_len: 0, - }); + self.sessions.insert( + session_id.clone(), + Session { + conversation_id: None, + prev_output: String::new(), + }, + ); JsonRpcResponse { jsonrpc: "2.0", id, @@ -95,7 +135,10 @@ impl Adapter { } async fn handle_session_prompt(&mut self, id: u64, params: &Value) -> Vec { - let session_id = params.get("sessionId").and_then(|v| v.as_str()).unwrap_or(""); + let session_id = params + .get("sessionId") + .and_then(|v| v.as_str()) + .unwrap_or(""); let prompt_text = params .get("prompt") .and_then(|v| v.as_array()) @@ -108,6 +151,17 @@ impl Adapter { .unwrap_or_default(); let clean_prompt = prompt_text.trim(); + let conversation_snapshot = if self + .sessions + .get(session_id) + .map(|s| s.conversation_id.is_none()) + .unwrap_or(false) + { + Some(self.conversation_snapshot()) + } else { + None + }; + // Build args: use --conversation for subsequent turns let mut args: Vec = Vec::new(); // Always add working dir as workspace so agy reads AGENTS.md/GEMINI.md @@ -141,31 +195,36 @@ impl Adapter { Ok(output) => { let full_text = String::from_utf8_lossy(&output.stdout).to_string(); - // Extract only the new content (delta) - let prev_len = self.sessions.get(session_id) - .map(|s| s.prev_output_len) - .unwrap_or(0); - let new_text = if prev_len < full_text.len() { - full_text[prev_len..].trim_start().to_string() - } else { - full_text.clone() - }; + let prev_output = self + .sessions + .get(session_id) + .map(|s| s.prev_output.clone()) + .unwrap_or_default(); + let conversation_bound = self + .sessions + .get(session_id) + .map(|s| s.conversation_id.is_some()) + .unwrap_or(false); + let new_text = Self::extract_delta(&prev_output, &full_text, conversation_bound); - // Update session state - let conv_id = if self.sessions.get(session_id) - .map(|s| s.conversation_id.is_none()) - .unwrap_or(false) - { - self.latest_conversation_id() - } else { - None - }; + let conv_id = conversation_snapshot + .as_ref() + .and_then(|before| self.new_conversation_id(before)); if let Some(session) = self.sessions.get_mut(session_id) { - session.prev_output_len = full_text.len(); if session.conversation_id.is_none() { session.conversation_id = conv_id; } + if session.conversation_id.is_some() { + session.prev_output = full_text; + } else { + session.prev_output.clear(); + eprintln!( + "[agy-acp] WARN: could not bind an agy conversation ID; \ + this ACP session will run in single-turn mode until a \ + conversation can be bound" + ); + } } let notification = serde_json::to_string(&JsonRpcNotification { @@ -178,13 +237,24 @@ impl Adapter { "content": { "type": "text", "text": new_text }, }, }), - }).unwrap(); + }) + .unwrap(); output_lines.push(notification); - let resp = JsonRpcResponse { jsonrpc: "2.0", id, result: Some(json!({ "stopReason": "end_turn" })), error: None }; + let resp = JsonRpcResponse { + jsonrpc: "2.0", + id, + result: Some(json!({ "stopReason": "end_turn" })), + error: None, + }; output_lines.push(serde_json::to_string(&resp).unwrap()); } Err(e) => { - let resp = JsonRpcResponse { jsonrpc: "2.0", id, result: None, error: Some(json!({"code":-32000,"message":format!("failed to run agy: {e}")})) }; + let resp = JsonRpcResponse { + jsonrpc: "2.0", + id, + result: None, + error: Some(json!({"code":-32000,"message":format!("failed to run agy: {e}")})), + }; output_lines.push(serde_json::to_string(&resp).unwrap()); } } @@ -236,11 +306,23 @@ async fn main() { adapter.handle_session_prompt(id, ¶ms).await } Some("session/cancel") => { - let r = JsonRpcResponse { jsonrpc: "2.0", id, result: Some(json!({})), error: None }; + let r = JsonRpcResponse { + jsonrpc: "2.0", + id, + result: Some(json!({})), + error: None, + }; vec![serde_json::to_string(&r).unwrap()] } Some(method) => { - let r = JsonRpcResponse { jsonrpc: "2.0", id, result: None, error: Some(json!({"code":-32601,"message":format!("method not found: {method}")})) }; + let r = JsonRpcResponse { + jsonrpc: "2.0", + id, + result: None, + error: Some( + json!({"code":-32601,"message":format!("method not found: {method}")}), + ), + }; vec![serde_json::to_string(&r).unwrap()] } None => continue, @@ -252,3 +334,52 @@ async fn main() { let _ = stdout.flush(); } } + +#[cfg(test)] +mod tests { + use super::*; + use std::fs; + + #[test] + fn extract_delta_returns_full_text_without_bound_conversation() { + let output = Adapter::extract_delta("old", "oldnew", false); + assert_eq!(output, "oldnew"); + } + + #[test] + fn extract_delta_returns_only_appended_output_for_bound_conversation() { + let output = + Adapter::extract_delta("first response\n", "first response\nsecond response", true); + assert_eq!(output, "second response"); + } + + #[test] + fn extract_delta_falls_back_when_output_is_not_append_only() { + let output = Adapter::extract_delta("old response", "fresh response", true); + assert_eq!(output, "fresh response"); + } + + #[test] + fn new_conversation_id_uses_snapshot_diff_instead_of_latest_file() { + let root = std::env::temp_dir().join(format!("agy-acp-test-{}", Uuid::new_v4())); + let conversations_dir = root.join("conversations"); + fs::create_dir_all(&conversations_dir).unwrap(); + fs::write(conversations_dir.join("old.pb"), b"old").unwrap(); + + let adapter = Adapter { + sessions: HashMap::new(), + working_dir: root.to_string_lossy().to_string(), + conversations_dir: conversations_dir.clone(), + }; + + let before = adapter.conversation_snapshot(); + fs::write(conversations_dir.join("new.pb"), b"new").unwrap(); + + assert_eq!( + adapter.new_conversation_id(&before), + Some("new".to_string()) + ); + + let _ = fs::remove_dir_all(root); + } +}