diff --git a/docs/feishu.md b/docs/feishu.md index f1139d08..81669687 100644 --- a/docs/feishu.md +++ b/docs/feishu.md @@ -200,6 +200,22 @@ To start a threaded conversation: reply to any bot message in a group chat (long Streaming (typewriter) mode works in threads — edits target the same message regardless of thread context. +## Agent-Controlled Reply-To + +Agents can reply to a specific message using the `[[reply_to:message_id]]` output directive (see [docs/output-directives.md](output-directives.md)). The gateway sends the reply via Feishu's native Reply API, showing a quote reference in the UI. + +``` +Agent output: + [[reply_to:om_xxx]] + This is my reply to that specific message. +``` + +**How agents get message IDs:** Every incoming message includes `message_id` in the `SenderContext` injected into the agent prompt. Agents can store and reference these IDs to reply to specific messages. + +**Fallback:** If the specified message ID is invalid or the Reply API fails, the gateway automatically falls back to a plain send (no quote). + +**Use case:** In multi-bot threads, each bot can reply to a different message, creating clear visual conversation threads within a Feishu thread. + ## Bot-to-Bot Collaboration (Gateway-Side Only) The gateway adapter includes bot identification and filtering scaffolding (`AllowBots` enum, `FEISHU_TRUSTED_BOT_IDS`, `FEISHU_MAX_BOT_TURNS` with human-reset safety valve), matching Discord's `allow_bot_messages` design. diff --git a/docs/output-directives.md b/docs/output-directives.md index 79711658..9b5876ac 100644 --- a/docs/output-directives.md +++ b/docs/output-directives.md @@ -35,6 +35,7 @@ Here is my reply to that specific message. **Behavior**: - Discord: sends with `message_reference`, showing the native "replying to..." UI +- Feishu: sends via Reply API (`POST /im/v1/messages/{id}/reply`), showing native quote UI - Invalid/non-existent message ID: silently falls back to plain send - Works in both streaming and send-once modes @@ -73,4 +74,4 @@ This creates clear visual conversation threads within a Discord thread — essen | Hermes Agent | `DISCORD_REPLY_TO_MODE` env var | ❌ Platform decides, always to trigger msg | | **OAB** | `[[reply_to:message_id]]` directive | ✅ Agent chooses any message | -> **Note:** `reply_to` is currently implemented for Discord only. Slack message IDs (ts format like `1234567890.123456`) are accepted by the parser but the Slack adapter does not yet send threaded replies via this directive — it falls back to plain send. Slack support can be added in a future PR. +> **Note:** `reply_to` is currently implemented for Discord and Feishu (gateway). Slack message IDs (ts format like `1234567890.123456`) are accepted by the parser but the Slack adapter does not yet send threaded replies via this directive — it falls back to plain send. Slack support can be added in a future PR. diff --git a/gateway/src/adapters/feishu.rs b/gateway/src/adapters/feishu.rs index 09e97fe0..061777cc 100644 --- a/gateway/src/adapters/feishu.rs +++ b/gateway/src/adapters/feishu.rs @@ -106,6 +106,9 @@ pub struct FeishuConfig { /// tracking entirely — all messages will require @mention. /// Converted from `FEISHU_SESSION_TTL_HOURS` (user-facing, in hours) to seconds internally. pub session_ttl_secs: u64, + /// Override the API base URL. Used in tests to point at a mock server. + /// Always None in production (not read from env). + pub api_base_override: Option, } impl FeishuConfig { @@ -192,11 +195,15 @@ impl FeishuConfig { dedupe_ttl_secs, message_limit, session_ttl_secs, + api_base_override: None, }) } /// API base URL for the configured domain. pub fn api_base(&self) -> String { + if let Some(ref base) = self.api_base_override { + return base.clone(); + } if self.domain == "lark" { "https://open.larksuite.com".into() } else { @@ -1904,6 +1911,9 @@ pub async fn handle_reply( let api_base = adapter.config.api_base(); let text = &reply.content.text; let limit = adapter.config.message_limit; + // quote_message_id (agent-controlled reply-to) takes priority over thread_id + let reply_target = reply.quote_message_id.as_deref() + .or(reply.channel.thread_id.as_deref()); let thread_id = reply.channel.thread_id.as_deref(); // Split long messages; store sent message_ids in dedupe to prevent @@ -1911,7 +1921,15 @@ pub async fn handle_reply( // Use post (rich text) format for markdown rendering. // When in a thread (thread_id present), use reply API to stay in the same thread. if text.len() <= limit { - match send_post_message(&adapter.client, &api_base, &token, &reply.channel.id, thread_id, text).await { + let result = send_post_message(&adapter.client, &api_base, &token, &reply.channel.id, reply_target, text).await; + // Fallback: if quote_message_id caused failure, retry without it + let result = if result.is_none() && reply.quote_message_id.is_some() { + tracing::warn!(quote_message_id = ?reply.quote_message_id, channel_id = %reply.channel.id, "reply-to failed, falling back to plain send"); + send_post_message(&adapter.client, &api_base, &token, &reply.channel.id, thread_id, text).await + } else { + result + }; + match result { Some(msg_id) => { adapter.dedupe.is_duplicate(&msg_id); // Record thread participation for mention bypass @@ -1953,11 +1971,21 @@ pub async fn handle_reply( } else { let mut sent_any = false; for chunk in split_text(text, limit) { - if let Some(msg_id) = send_post_message(&adapter.client, &api_base, &token, &reply.channel.id, thread_id, chunk).await { + if let Some(msg_id) = send_post_message(&adapter.client, &api_base, &token, &reply.channel.id, reply_target, chunk).await { adapter.dedupe.is_duplicate(&msg_id); sent_any = true; } } + // Fallback: if quote_message_id caused all chunks to fail, retry without it + if !sent_any && reply.quote_message_id.is_some() { + tracing::warn!(quote_message_id = ?reply.quote_message_id, channel_id = %reply.channel.id, "chunked reply-to failed, falling back to plain send"); + for chunk in split_text(text, limit) { + if let Some(msg_id) = send_post_message(&adapter.client, &api_base, &token, &reply.channel.id, thread_id, chunk).await { + adapter.dedupe.is_duplicate(&msg_id); + sent_any = true; + } + } + } if sent_any { if let Some(tid) = thread_id { record_participation(&adapter.participated_threads, tid, adapter.config.session_ttl_secs); @@ -2318,6 +2346,7 @@ mod tests { dedupe_ttl_secs: 300, message_limit: 4000, session_ttl_secs: 86400, + api_base_override: None, } } @@ -2942,4 +2971,148 @@ mod tests { // (caller would pass false because Mentions mode always returns false) assert!(parse_message_event(&env, Some("ou_bot"), &cfg, false).is_none()); } + + #[test] + fn quote_message_id_takes_priority_over_thread_id() { + use crate::schema::{GatewayReply, ReplyChannel, Content}; + let reply = GatewayReply { + schema: "openab.gateway.reply.v1".into(), + reply_to: "evt_123".into(), + platform: "feishu".into(), + channel: ReplyChannel { + id: "chat_123".into(), + thread_id: Some("om_root".into()), + }, + content: Content { + content_type: "text".into(), + text: "hello".into(), + attachments: vec![], + }, + command: None, + request_id: None, + quote_message_id: Some("om_specific".into()), + }; + // quote_message_id should take priority + let reply_target = reply.quote_message_id.as_deref() + .or(reply.channel.thread_id.as_deref()); + assert_eq!(reply_target, Some("om_specific")); + } + + #[test] + fn reply_target_falls_back_to_thread_id_when_no_quote() { + use crate::schema::{GatewayReply, ReplyChannel, Content}; + let reply = GatewayReply { + schema: "openab.gateway.reply.v1".into(), + reply_to: "evt_123".into(), + platform: "feishu".into(), + channel: ReplyChannel { + id: "chat_123".into(), + thread_id: Some("om_root".into()), + }, + content: Content { + content_type: "text".into(), + text: "hello".into(), + attachments: vec![], + }, + command: None, + request_id: None, + quote_message_id: None, + }; + let reply_target = reply.quote_message_id.as_deref() + .or(reply.channel.thread_id.as_deref()); + assert_eq!(reply_target, Some("om_root")); + } + + #[test] + fn reply_target_is_none_when_both_absent() { + use crate::schema::{GatewayReply, ReplyChannel, Content}; + let reply = GatewayReply { + schema: "openab.gateway.reply.v1".into(), + reply_to: "evt_123".into(), + platform: "feishu".into(), + channel: ReplyChannel { + id: "chat_123".into(), + thread_id: None, + }, + content: Content { + content_type: "text".into(), + text: "hello".into(), + attachments: vec![], + }, + command: None, + request_id: None, + quote_message_id: None, + }; + let reply_target = reply.quote_message_id.as_deref() + .or(reply.channel.thread_id.as_deref()); + assert_eq!(reply_target, None); + } + + #[tokio::test] + async fn quote_message_id_fallback_on_reply_failure() { + // Tests the actual handle_reply fallback path: when quote_message_id + // is set and the reply API fails, handle_reply retries as plain send. + let server = MockServer::start().await; + + // Token endpoint + Mock::given(method("POST")) + .and(path("/open-apis/auth/v3/tenant_access_token/internal")) + .respond_with(ResponseTemplate::new(200).set_body_json(serde_json::json!({ + "code": 0, + "tenant_access_token": "t-test", + "expire": 7200 + }))) + .mount(&server) + .await; + + // Reply API returns 400 (invalid quote_message_id) + Mock::given(method("POST")) + .and(path("/open-apis/im/v1/messages/om_invalid/reply")) + .respond_with(ResponseTemplate::new(400).set_body_string("invalid message_id")) + .expect(1) + .named("reply_api_fail") + .mount(&server) + .await; + + // Plain send endpoint succeeds (fallback path) + Mock::given(method("POST")) + .and(path("/open-apis/im/v1/messages")) + .respond_with(ResponseTemplate::new(200).set_body_json(serde_json::json!({ + "code": 0, + "data": {"message_id": "om_fallback_ok"} + }))) + .expect(1) + .named("plain_send_fallback") + .mount(&server) + .await; + + let mut config = test_config(); + config.api_base_override = Some(server.uri()); + let adapter = FeishuAdapter::new(config); + + let (event_tx, _rx) = tokio::sync::broadcast::channel(16); + + let reply = crate::schema::GatewayReply { + schema: "openab.gateway.reply.v1".into(), + reply_to: "evt_123".into(), + platform: "feishu".into(), + channel: crate::schema::ReplyChannel { + id: "oc_chat1".into(), + thread_id: None, + }, + content: crate::schema::Content { + content_type: "text".into(), + text: "hello from fallback test".into(), + attachments: vec![], + }, + command: None, + request_id: None, + quote_message_id: Some("om_invalid".into()), + }; + + handle_reply(&reply, &adapter, &event_tx).await; + // wiremock expect(1) on both mocks verifies: + // 1. Reply API was called (and failed) + // 2. Plain send was called (fallback triggered by quote_message_id.is_some() guard) + } } diff --git a/gateway/src/adapters/googlechat.rs b/gateway/src/adapters/googlechat.rs index 73787089..20a884c3 100644 --- a/gateway/src/adapters/googlechat.rs +++ b/gateway/src/adapters/googlechat.rs @@ -1375,6 +1375,7 @@ mod tests { }, command: None, request_id: Some("req_123".into()), + quote_message_id: None, }; adapter.handle_reply(&reply, &event_tx).await; @@ -1418,6 +1419,7 @@ mod tests { }, command: None, request_id: Some("req_fail".into()), + quote_message_id: None, }; adapter.handle_reply(&reply, &event_tx).await; @@ -1465,6 +1467,7 @@ mod tests { }, command: None, request_id: Some("req_empty".into()), + quote_message_id: None, }; adapter.handle_reply(&reply, &event_tx).await; @@ -1509,6 +1512,7 @@ mod tests { }, command: None, request_id: Some("req_multi_fail".into()), + quote_message_id: None, }; adapter.handle_reply(&reply, &event_tx).await; @@ -1543,6 +1547,7 @@ mod tests { }, command: None, request_id: Some("req_notoken".into()), + quote_message_id: None, }; adapter.handle_reply(&reply, &event_tx).await; @@ -1588,6 +1593,7 @@ mod tests { }, command: Some("edit_message".into()), request_id: None, + quote_message_id: None, }; adapter.handle_reply(&reply, &event_tx).await; @@ -1630,6 +1636,7 @@ mod tests { }, command: None, request_id: Some("req_multi".into()), + quote_message_id: None, }; adapter.handle_reply(&reply, &event_tx).await; @@ -1687,6 +1694,7 @@ mod tests { }, command: None, request_id: Some("req_partial".into()), + quote_message_id: None, }; adapter.handle_reply(&reply, &event_tx).await; diff --git a/gateway/src/main.rs b/gateway/src/main.rs index 3df4ab1a..eaaf6999 100644 --- a/gateway/src/main.rs +++ b/gateway/src/main.rs @@ -427,6 +427,7 @@ mod tests { }, command: None, request_id: None, + quote_message_id: None, } } diff --git a/gateway/src/schema.rs b/gateway/src/schema.rs index a38554df..560648a0 100644 --- a/gateway/src/schema.rs +++ b/gateway/src/schema.rs @@ -64,6 +64,12 @@ pub struct GatewayReply { pub command: Option, #[serde(default)] pub request_id: Option, + /// When set, send this message as a reply/quote to the specified platform message ID. + /// Unlike `reply_to` (which identifies the triggering event for routing/dedup), + /// this field controls the visual reply/quote UI on the platform. + /// If quoting fails, the gateway MUST fall back to sending without quoting. + #[serde(default)] + pub quote_message_id: Option, } #[derive(Clone, Debug, Serialize, Deserialize)] diff --git a/src/gateway.rs b/src/gateway.rs index c0f3077c..9557f323 100644 --- a/src/gateway.rs +++ b/src/gateway.rs @@ -10,6 +10,9 @@ use tokio::sync::Mutex; use tokio_tungstenite::tungstenite::Message; use tracing::{error, info, warn}; +/// Timeout for waiting on gateway reply acknowledgement. +const GATEWAY_REPLY_TIMEOUT_SECS: u64 = 5; + // --- Gateway event/reply schemas (mirrors gateway service) --- #[derive(Clone, Debug, Deserialize)] @@ -77,6 +80,11 @@ struct GatewayReply { command: Option, #[serde(skip_serializing_if = "Option::is_none")] request_id: Option, + /// When set, the gateway should send this message as a reply/quote to the specified message ID. + /// Unlike `reply_to` (routing/dedup identifier for the triggering event), this field controls + /// the visual reply/quote UI on the platform. Falls back to plain send on failure. + #[serde(skip_serializing_if = "Option::is_none")] + quote_message_id: Option, } #[derive(Serialize)] @@ -139,6 +147,74 @@ impl GatewayAdapter { streaming, } } + + /// Internal helper for send_message / send_message_with_reply. + async fn send_gateway_reply( + &self, + channel: &ChannelRef, + content: &str, + quote_message_id: Option<&str>, + ) -> Result { + let req_id = if self.streaming { + Some(format!("req_{}", uuid::Uuid::new_v4())) + } else { + None + }; + let pending_rx = if let Some(ref id) = req_id { + let (tx, rx) = tokio::sync::oneshot::channel(); + self.pending.lock().await.insert(id.clone(), tx); + Some(rx) + } else { + None + }; + let reply = GatewayReply { + schema: "openab.gateway.reply.v1".into(), + reply_to: channel.origin_event_id.clone().unwrap_or_default(), + platform: channel.platform.clone(), + channel: ReplyChannel { + id: channel.channel_id.clone(), + thread_id: channel.thread_id.clone(), + }, + content: ReplyContent { + content_type: "text".into(), + text: content.into(), + }, + command: None, + request_id: req_id.clone(), + quote_message_id: quote_message_id.map(|s| s.to_string()), + }; + let json = serde_json::to_string(&reply)?; + if let Err(e) = self.ws_tx.lock().await.send(Message::Text(json)).await { + if let Some(ref id) = req_id { + self.pending.lock().await.remove(id); + } + return Err(e.into()); + } + let msg_id = if let (Some(rx), Some(ref id)) = (pending_rx, &req_id) { + match tokio::time::timeout(std::time::Duration::from_secs(GATEWAY_REPLY_TIMEOUT_SECS), rx).await { + Ok(Ok(resp)) if resp.success => resp.message_id.unwrap_or_else(|| "gw_sent".into()), + Ok(Ok(_resp)) => { + tracing::warn!(request_id = %id, "gateway replied with failure"); + "gw_sent".into() + } + Ok(Err(_)) => { + tracing::warn!(request_id = %id, "gateway response channel closed"); + "gw_sent".into() + } + Err(_) => { + tracing::warn!(request_id = %id, "gateway reply timed out"); + self.pending.lock().await.remove(id); + "gw_sent".into() + } + } + } else { + "gw_sent".into() + }; + Ok(MessageRef { + channel: channel.clone(), + message_id: msg_id, + }) + } } /// Send a fire-and-forget reply via the shared WebSocket (no request-response). @@ -162,6 +238,7 @@ async fn send_fire_and_forget( }, command: None, request_id: None, + quote_message_id: None, }; let json = serde_json::to_string(&reply)?; ws_tx.lock().await.send(Message::Text(json)).await?; @@ -305,56 +382,16 @@ impl ChatAdapter for GatewayAdapter { } async fn send_message(&self, channel: &ChannelRef, content: &str) -> Result { - let req_id = if self.streaming { - Some(format!("req_{}", uuid::Uuid::new_v4())) - } else { - None - }; - - let pending_rx = if let Some(ref id) = req_id { - let (tx, rx) = tokio::sync::oneshot::channel(); - self.pending.lock().await.insert(id.clone(), tx); - Some(rx) - } else { - None - }; - - let reply = GatewayReply { - schema: "openab.gateway.reply.v1".into(), - reply_to: channel.origin_event_id.clone().unwrap_or_default(), - platform: channel.platform.clone(), - channel: ReplyChannel { - id: channel.channel_id.clone(), - thread_id: channel.thread_id.clone(), - }, - content: ReplyContent { - content_type: "text".into(), - text: content.into(), - }, - command: None, - request_id: req_id.clone(), - }; - let json = serde_json::to_string(&reply)?; - self.ws_tx.lock().await.send(Message::Text(json)).await?; - - // When streaming is enabled, wait for gateway to return real message_id - // (needed for edit_message). Otherwise fire-and-forget. - let msg_id = if let (Some(rx), Some(ref id)) = (pending_rx, &req_id) { - match tokio::time::timeout(std::time::Duration::from_secs(5), rx).await { - Ok(Ok(resp)) if resp.success => resp.message_id.unwrap_or_else(|| "gw_sent".into()), - _ => { - self.pending.lock().await.remove(id); - "gw_sent".into() - } - } - } else { - "gw_sent".into() - }; + self.send_gateway_reply(channel, content, None).await + } - Ok(MessageRef { - channel: channel.clone(), - message_id: msg_id, - }) + async fn send_message_with_reply( + &self, + channel: &ChannelRef, + content: &str, + reply_to_message_id: &str, + ) -> Result { + self.send_gateway_reply(channel, content, Some(reply_to_message_id)).await } async fn create_thread( @@ -382,6 +419,7 @@ impl ChatAdapter for GatewayAdapter { }, command: Some("create_topic".into()), request_id: Some(req_id.clone()), + quote_message_id: None, }; let json = serde_json::to_string(&reply)?; self.ws_tx.lock().await.send(Message::Text(json)).await?; @@ -421,6 +459,7 @@ impl ChatAdapter for GatewayAdapter { text: emoji.into(), }, command: Some("add_reaction".into()), + quote_message_id: None, request_id: None, }; let json = serde_json::to_string(&reply)?; @@ -442,6 +481,7 @@ impl ChatAdapter for GatewayAdapter { text: emoji.into(), }, command: Some("remove_reaction".into()), + quote_message_id: None, request_id: None, }; let json = serde_json::to_string(&reply)?; @@ -463,6 +503,7 @@ impl ChatAdapter for GatewayAdapter { text: content.into(), }, command: Some("edit_message".into()), + quote_message_id: None, request_id: None, }; let json = serde_json::to_string(&reply)?;