Skip to content
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
211 changes: 171 additions & 40 deletions agy-acp/src/main.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use serde::{Deserialize, Serialize};
use serde_json::{json, Value};
use std::collections::HashMap;
use std::collections::{HashMap, HashSet};
use std::io::{self, BufRead, Write};
use std::path::PathBuf;
use tokio::process::Command;
Expand Down Expand Up @@ -34,8 +34,8 @@ struct JsonRpcNotification {
struct Session {
/// agy conversation ID (from conversations directory)
conversation_id: Option<String>,
/// cumulative stdout length from previous turns
prev_output_len: usize,
/// full stdout from the previous turn for prefix-checked delta extraction
prev_output: String,
}

struct Adapter {
Expand All @@ -52,19 +52,56 @@ impl Adapter {
working_dir: std::env::current_dir()
.map(|p| p.to_string_lossy().to_string())
.unwrap_or_else(|_| "/tmp".to_string()),
conversations_dir: PathBuf::from(&home)
.join(".gemini/antigravity-cli/conversations"),
conversations_dir: PathBuf::from(&home).join(".gemini/antigravity-cli/conversations"),
}
}

/// Find the most recently modified conversation ID from agy's data dir.
fn latest_conversation_id(&self) -> Option<String> {
let entries = std::fs::read_dir(&self.conversations_dir).ok()?;
fn conversation_snapshot(&self) -> HashSet<String> {
let Ok(entries) = std::fs::read_dir(&self.conversations_dir) else {
return HashSet::new();
};

entries
.filter_map(|e| e.ok())
.filter(|e| e.path().extension().map(|x| x == "pb").unwrap_or(false))
.max_by_key(|e| e.metadata().ok().and_then(|m| m.modified().ok()))
.and_then(|e| e.path().file_stem().map(|s| s.to_string_lossy().to_string()))
.filter_map(|e| {
let path = e.path();
if path.extension().map(|x| x == "pb").unwrap_or(false) {
path.file_stem().map(|s| s.to_string_lossy().to_string())
} else {
None
}
})
.collect()
}

fn new_conversation_id(&self, before: &HashSet<String>) -> Option<String> {
let after = self.conversation_snapshot();
let mut created = after.difference(before);
let first = created.next()?.to_string();
if created.next().is_some() {
eprintln!(
"[agy-acp] WARN: multiple new agy conversation files appeared; \
refusing to bind this ACP session by global heuristic"
);
return None;
}
Some(first)
}

fn extract_delta(prev_output: &str, full_text: &str, conversation_bound: bool) -> String {
if !conversation_bound || prev_output.is_empty() {
return full_text.to_string();
}

if let Some(delta) = full_text.strip_prefix(prev_output) {
return delta.trim_start().to_string();
}

eprintln!(
"[agy-acp] WARN: agy stdout was not append-only for the bound conversation; \
sending full output for this turn and resetting delta baseline"
);
full_text.to_string()
}

fn handle_initialize(&self, id: u64) -> JsonRpcResponse {
Expand All @@ -82,10 +119,13 @@ impl Adapter {

fn handle_session_new(&mut self, id: u64) -> JsonRpcResponse {
let session_id = Uuid::new_v4().to_string();
self.sessions.insert(session_id.clone(), Session {
conversation_id: None,
prev_output_len: 0,
});
self.sessions.insert(
session_id.clone(),
Session {
conversation_id: None,
prev_output: String::new(),
},
);
JsonRpcResponse {
jsonrpc: "2.0",
id,
Expand All @@ -95,7 +135,10 @@ impl Adapter {
}

async fn handle_session_prompt(&mut self, id: u64, params: &Value) -> Vec<String> {
let session_id = params.get("sessionId").and_then(|v| v.as_str()).unwrap_or("");
let session_id = params
.get("sessionId")
.and_then(|v| v.as_str())
.unwrap_or("");
let prompt_text = params
.get("prompt")
.and_then(|v| v.as_array())
Expand All @@ -108,6 +151,17 @@ impl Adapter {
.unwrap_or_default();
let clean_prompt = prompt_text.trim();

let conversation_snapshot = if self
.sessions
.get(session_id)
.map(|s| s.conversation_id.is_none())
.unwrap_or(false)
{
Some(self.conversation_snapshot())
} else {
None
};

// Build args: use --conversation <ID> for subsequent turns
let mut args: Vec<String> = Vec::new();
// Always add working dir as workspace so agy reads AGENTS.md/GEMINI.md
Expand Down Expand Up @@ -141,31 +195,36 @@ impl Adapter {
Ok(output) => {
let full_text = String::from_utf8_lossy(&output.stdout).to_string();

// Extract only the new content (delta)
let prev_len = self.sessions.get(session_id)
.map(|s| s.prev_output_len)
.unwrap_or(0);
let new_text = if prev_len < full_text.len() {
full_text[prev_len..].trim_start().to_string()
} else {
full_text.clone()
};
let prev_output = self
.sessions
.get(session_id)
.map(|s| s.prev_output.clone())
.unwrap_or_default();
let conversation_bound = self
.sessions
.get(session_id)
.map(|s| s.conversation_id.is_some())
.unwrap_or(false);
let new_text = Self::extract_delta(&prev_output, &full_text, conversation_bound);

// Update session state
let conv_id = if self.sessions.get(session_id)
.map(|s| s.conversation_id.is_none())
.unwrap_or(false)
{
self.latest_conversation_id()
} else {
None
};
let conv_id = conversation_snapshot
.as_ref()
.and_then(|before| self.new_conversation_id(before));

if let Some(session) = self.sessions.get_mut(session_id) {
session.prev_output_len = full_text.len();
if session.conversation_id.is_none() {
session.conversation_id = conv_id;
}
if session.conversation_id.is_some() {
session.prev_output = full_text;
} else {
session.prev_output.clear();
eprintln!(
"[agy-acp] WARN: could not bind an agy conversation ID; \
this ACP session will run in single-turn mode until a \
conversation can be bound"
);
}
}

let notification = serde_json::to_string(&JsonRpcNotification {
Expand All @@ -178,13 +237,24 @@ impl Adapter {
"content": { "type": "text", "text": new_text },
},
}),
}).unwrap();
})
.unwrap();
output_lines.push(notification);
let resp = JsonRpcResponse { jsonrpc: "2.0", id, result: Some(json!({ "stopReason": "end_turn" })), error: None };
let resp = JsonRpcResponse {
jsonrpc: "2.0",
id,
result: Some(json!({ "stopReason": "end_turn" })),
error: None,
};
output_lines.push(serde_json::to_string(&resp).unwrap());
}
Err(e) => {
let resp = JsonRpcResponse { jsonrpc: "2.0", id, result: None, error: Some(json!({"code":-32000,"message":format!("failed to run agy: {e}")})) };
let resp = JsonRpcResponse {
jsonrpc: "2.0",
id,
result: None,
error: Some(json!({"code":-32000,"message":format!("failed to run agy: {e}")})),
};
output_lines.push(serde_json::to_string(&resp).unwrap());
}
}
Expand Down Expand Up @@ -236,11 +306,23 @@ async fn main() {
adapter.handle_session_prompt(id, &params).await
}
Some("session/cancel") => {
let r = JsonRpcResponse { jsonrpc: "2.0", id, result: Some(json!({})), error: None };
let r = JsonRpcResponse {
jsonrpc: "2.0",
id,
result: Some(json!({})),
error: None,
};
vec![serde_json::to_string(&r).unwrap()]
}
Some(method) => {
let r = JsonRpcResponse { jsonrpc: "2.0", id, result: None, error: Some(json!({"code":-32601,"message":format!("method not found: {method}")})) };
let r = JsonRpcResponse {
jsonrpc: "2.0",
id,
result: None,
error: Some(
json!({"code":-32601,"message":format!("method not found: {method}")}),
),
};
vec![serde_json::to_string(&r).unwrap()]
}
None => continue,
Expand All @@ -252,3 +334,52 @@ async fn main() {
let _ = stdout.flush();
}
}

#[cfg(test)]
mod tests {
use super::*;
use std::fs;

#[test]
fn extract_delta_returns_full_text_without_bound_conversation() {
let output = Adapter::extract_delta("old", "oldnew", false);
assert_eq!(output, "oldnew");
}

#[test]
fn extract_delta_returns_only_appended_output_for_bound_conversation() {
let output =
Adapter::extract_delta("first response\n", "first response\nsecond response", true);
assert_eq!(output, "second response");
}

#[test]
fn extract_delta_falls_back_when_output_is_not_append_only() {
let output = Adapter::extract_delta("old response", "fresh response", true);
assert_eq!(output, "fresh response");
}

#[test]
fn new_conversation_id_uses_snapshot_diff_instead_of_latest_file() {
let root = std::env::temp_dir().join(format!("agy-acp-test-{}", Uuid::new_v4()));
let conversations_dir = root.join("conversations");
fs::create_dir_all(&conversations_dir).unwrap();
fs::write(conversations_dir.join("old.pb"), b"old").unwrap();

let adapter = Adapter {
sessions: HashMap::new(),
working_dir: root.to_string_lossy().to_string(),
conversations_dir: conversations_dir.clone(),
};

let before = adapter.conversation_snapshot();
fs::write(conversations_dir.join("new.pb"), b"new").unwrap();

assert_eq!(
adapter.new_conversation_id(&before),
Some("new".to_string())
);

let _ = fs::remove_dir_all(root);
}
}
Loading