From b6f3e47a219b4c8435f5cba1e6c1ead1731ff17f Mon Sep 17 00:00:00 2001 From: iamninihuang Date: Tue, 28 Apr 2026 10:42:17 +0800 Subject: [PATCH 01/16] feat(gateway): implement hybrid LINE reply/push strategy This implementation adds a stateful token cache in the gateway to prioritize the free LINE Reply API. It also includes an auto-fill logic for the reply_to field to avoid modifying the OAB core. --- docs/adr/line-adapter.md | 15 +-- docs/feature-requests/line-reply-api.md | 102 +++++++++++++++++++ gateway/src/main.rs | 125 +++++++++++++++++++++--- run_gateway.sh | 12 +++ 4 files changed, 232 insertions(+), 22 deletions(-) create mode 100644 docs/feature-requests/line-reply-api.md create mode 100755 run_gateway.sh diff --git a/docs/adr/line-adapter.md b/docs/adr/line-adapter.md index f899a747..01c922bb 100644 --- a/docs/adr/line-adapter.md +++ b/docs/adr/line-adapter.md @@ -104,14 +104,17 @@ LINE is not ideal when: 7. Agent response is sent back via LINE Push Message API ``` -### Reply Strategy: Push Messages +### Reply Strategy: Hybrid Reply/Push Messages LINE offers two reply mechanisms: -- **Reply message**: uses a reply token, but the token expires in 1 minute -- **Push message**: no time limit, can send to any user/group at any time - -OpenAB uses **push messages** because agent processing typically exceeds the 1-minute reply token window. The trade-off is that push messages count against the monthly messaging quota on free-tier LINE accounts. - +- **Reply message**: uses a reply token, but the token expires in 1 minute (free). +- **Push message**: no time limit, can send to any user/group at any time (consumes quota). + +Historically, OpenAB relied solely on **push messages** because agent processing can exceed the 1-minute reply token window. To optimize costs for free-tier accounts, OpenAB now uses a **Hybrid Strategy** implemented at the gateway level: +1. The gateway caches incoming `replyToken`s for 50 seconds. +2. It tracks the last `event_id` sent to each OAB client to auto-fill the `reply_to` context. +3. If an agent replies within the window, the gateway intercepts and routes it via the free **Reply API**. +4. If the token is expired or missing, it gracefully falls back to the **Push API**. --- ## 3. Architectural Differences from Discord/Slack diff --git a/docs/feature-requests/line-reply-api.md b/docs/feature-requests/line-reply-api.md new file mode 100644 index 00000000..80f8f33c --- /dev/null +++ b/docs/feature-requests/line-reply-api.md @@ -0,0 +1,102 @@ +# Feature Request: Hybrid LINE Reply/Push API Strategy + +**Title**: `feat(gateway): implement hybrid LINE Reply/Push API strategy` + +**Labels**: `feature` + +**GitHub Issue**: [#607](https://github.com/openabdev/openab/issues/607) + +## 1. Description + +Add a hybrid reply strategy to the LINE adapter in `openab-gateway`. +When an agent response arrives within the 1-minute Reply API window, +the gateway uses the free Reply API; otherwise it falls back to the +existing Push Message API. This saves messaging quota on free-tier +LINE accounts without modifying OAB core. + +## 2. Use Case + +As a LINE bot operator on a free-tier plan (200 push messages/month), +I want the bot to prefer the Reply API so that routine Q&A exchanges +do not consume my push quota. + +- **Problem**: The current Push-only strategy (`docs/adr/line-adapter.md` + Section 3, "Reply Strategy: Push Messages") exhausts the 200-message + limit within days of active development or personal use. +- **Trigger**: Any 1:1 or group message that receives an agent response + within 1 minute of the webhook event. +- **Beneficiaries**: Individual developers, testers, and users in + LINE-dominant regions (Taiwan, Japan, Thailand). + +## 3. Proposed Solution + +Implement a **Stateful Token Cache** inside `gateway/src/main.rs`. +The cache maps `event_id` to the LINE `replyToken` received in the +webhook payload. OAB core remains completely unmodified — it already +returns `reply_to: "evt_..."` in every `GatewayReply`. + +```text ++--------------+ OAB Reply +------------------+ +| openab |---------------------->| Custom Gateway | +| (Rust) |<--------------------->| (Stateful) | ++--------------+ Gateway WS +--------+---------+ + | + V [Logic] + 1. Match reply_to with Cache + 2. If exists -> Use Reply API (Free) + 3. Else -> Fallback to Push API +``` + +### Implementation Details + +1. **Cache storage**: When `line_webhook()` processes a message event, + extract the `replyToken` from the LINE payload and insert it into + a thread-safe `HashMap` keyed by the + generated `event_id`, with a TTL of 50 seconds (conservative + margin within LINE's 1-minute limit). + +2. **Zero core modification (Stateful Auto-fill)**: + While the original proposal assumed OAB core sends `reply_to`, current observation shows `reply_to` is often empty in standard `send_message` calls. To maintain "Zero Core Modification", the gateway now implements a **Per-Client Last Event Tracker**: + - The gateway tracks the most recent `event_id` sent to each connected OAB client. + - When a reply arrives with an empty `reply_to`, the gateway automatically injects the last tracked `event_id` for that client before performing the cache lookup. + - This ensures the Reply API works seamlessly even with legacy or unmodified OAB versions. + +3. **Hybrid dispatch** (in the reply handler, ~line 537-551): + - Look up `reply.reply_to` in the token cache. + - **Hit + fresh**: call `POST v2/bot/message/reply` with the cached + `replyToken`. On success, done (free, no quota consumed). + - **Hit + reply API returns 400**: token expired; fall through. + - **Miss or fallback**: call `POST v2/bot/message/push` (existing + logic, consumes quota). + +4. **Cache cleanup**: A background `tokio::spawn` task sweeps expired + entries every 60 seconds to prevent memory growth. + +### Alignment with Existing Architecture + +- `docs/adr/custom-gateway.md` Section 3 (line 167) already lists + `reply_context` as a deferred schema concern: _"Reply token, quote + target, original message reference."_ This implementation stays + entirely within the gateway, avoiding premature schema changes + while addressing the concrete cost problem now. + +- `docs/adr/line-adapter.md` Section 3 (line 107-113) documents the + Push-only decision and its trade-off. This feature preserves Push + as the guaranteed fallback while opportunistically using Reply. + +## 4. Prior Art + +- **OpenAB ADR (line-adapter.md)**: Explicitly chose Push API because + "agent processing typically exceeds the 1-minute reply token window." + This proposal respects that decision by keeping Push as fallback. +- **OpenAB ADR (custom-gateway.md)**: Lists `reply_context` as a + known deferred concern, confirming this is an anticipated extension. +- **OpenClaw**: Manages LINE bridging via a plugin architecture with + buffered responses and loading animations. +- **LINE Official Docs**: `replyToken` is valid for ~1 minute; + webhook must respond with HTTP 200 within ~2 seconds. + +## 5. Related Issues + +None found. This complements the architecture in `docs/adr/line-adapter.md` +and `docs/adr/custom-gateway.md`. diff --git a/gateway/src/main.rs b/gateway/src/main.rs index 0c9b948b..d5dba050 100644 --- a/gateway/src/main.rs +++ b/gateway/src/main.rs @@ -8,6 +8,7 @@ use axum::{ use futures_util::{SinkExt, StreamExt}; use serde::{Deserialize, Serialize}; use std::sync::Arc; +use std::time::Instant; use tokio::sync::{broadcast, Mutex}; use tracing::{error, info, warn}; @@ -126,6 +127,13 @@ struct TelegramUser { // --- App state --- +/// Cache entry for LINE reply tokens: (replyToken, insertion_time) +type ReplyTokenCache = Arc>>; + +/// Maximum age (in seconds) before a cached reply token is considered expired. +/// LINE tokens are valid for ~1 minute; we use 50s as a conservative margin. +const REPLY_TOKEN_TTL_SECS: u64 = 50; + struct AppState { bot_token: String, secret_token: Option, @@ -134,6 +142,8 @@ struct AppState { line_access_token: Option, /// Broadcast channel: gateway → OAB (events) event_tx: broadcast::Sender, + /// Cache: event_id → (LINE replyToken, timestamp) + reply_token_cache: ReplyTokenCache, } // --- Telegram webhook handler --- @@ -329,9 +339,18 @@ async fn line_webhook( .and_then(|s| s.user_id.as_deref()) .unwrap_or("unknown"); + let event_id = format!("evt_{}", uuid::Uuid::new_v4()); + + // Cache the reply token for hybrid Reply/Push dispatch + if let Some(ref reply_token) = event.reply_token { + let mut cache = state.reply_token_cache.lock().await; + cache.insert(event_id.clone(), (reply_token.clone(), Instant::now())); + info!(event_id = %event_id, "cached LINE replyToken"); + } + let gateway_event = GatewayEvent { schema: "openab.gateway.event.v1".into(), - event_id: format!("evt_{}", uuid::Uuid::new_v4()), + event_id, timestamp: chrono::Utc::now().to_rfc3339(), platform: "line".into(), event_type: "message".into(), @@ -391,16 +410,26 @@ async fn handle_oab_connection(state: Arc, socket: axum::extract::ws:: info!("OAB client connected via WebSocket"); + let last_event_id = Arc::new(Mutex::new(None)); + // Forward gateway events → OAB + let last_event_id_for_send = last_event_id.clone(); let send_task = tokio::spawn(async move { loop { tokio::select! { Ok(event_json) = event_rx.recv() => { + // Track the last event ID sent to this client + if let Ok(v) = serde_json::from_str::(&event_json) { + if let Some(eid) = v["event_id"].as_str() { + let mut last = last_event_id_for_send.lock().await; + *last = Some(eid.to_string()); + } + } + if ws_tx.send(Message::Text(event_json.into())).await.is_err() { break; } } - // No reply forwarding needed on this path — replies go to Telegram directly } } }); @@ -408,16 +437,26 @@ async fn handle_oab_connection(state: Arc, socket: axum::extract::ws:: // Receive OAB replies → Telegram let bot_token = state.bot_token.clone(); let line_access_token = state.line_access_token.clone(); + let reply_cache = state.reply_token_cache.clone(); let event_tx_for_recv = state.event_tx.clone(); // Track per-message reaction state (Telegram replaces all reactions atomically) let reaction_state: Arc>>> = Arc::new(Mutex::new(std::collections::HashMap::new())); + let last_event_id_for_recv = last_event_id.clone(); let recv_task = tokio::spawn(async move { let client = reqwest::Client::new(); while let Some(Ok(msg)) = ws_rx.next().await { if let Message::Text(text) = msg { match serde_json::from_str::(&text) { - Ok(reply) => { + Ok(mut reply) => { + // Auto-fill reply_to if empty using the last event sent to this client + if reply.reply_to.is_empty() { + let last = last_event_id_for_recv.lock().await; + if let Some(ref eid) = *last { + reply.reply_to = eid.clone(); + } + } + // Handle create_topic command if reply.command.as_deref() == Some("create_topic") { let req_id = reply.request_id.clone().unwrap_or_default(); @@ -535,19 +574,54 @@ async fn handle_oab_connection(state: Arc, socket: axum::extract::ws:: // Normal send_message — route by platform if reply.platform == "line" { - // LINE Push Message API - if let Some(ref token) = line_access_token { - info!(to = %reply.channel.id, "gateway → line"); - let _ = client - .post("https://api.line.me/v2/bot/message/push") - .bearer_auth(token) - .json(&serde_json::json!({ - "to": reply.channel.id, - "messages": [{"type": "text", "text": reply.content.text}] - })) - .send() - .await - .map_err(|e| error!("line send error: {e}")); + if let Some(ref access_token) = line_access_token { + // Try Reply API first (free, no quota consumed) + let mut used_reply = false; + { + let mut cache = reply_cache.lock().await; + let entry: Option<(String, Instant)> = cache.remove(&reply.reply_to); + if let Some((reply_token, cached_at)) = entry { + if cached_at.elapsed().as_secs() < REPLY_TOKEN_TTL_SECS { + info!(to = %reply.channel.id, "gateway → line (reply API)"); + let resp = client + .post("https://api.line.me/v2/bot/message/reply") + .bearer_auth(access_token) + .json(&serde_json::json!({ + "replyToken": reply_token, + "messages": [{"type": "text", "text": reply.content.text}] + })) + .send() + .await; + match resp { + Ok(r) if r.status().is_success() => { + used_reply = true; + } + Ok(r) => { + warn!(status = %r.status(), "LINE Reply API failed, falling back to Push"); + } + Err(e) => { + warn!(err = %e, "LINE Reply API error, falling back to Push"); + } + } + } else { + info!("LINE replyToken expired, using Push API"); + } + } + } + // Fallback to Push API + if !used_reply { + info!(to = %reply.channel.id, "gateway → line (push API)"); + let _ = client + .post("https://api.line.me/v2/bot/message/push") + .bearer_auth(access_token) + .json(&serde_json::json!({ + "to": reply.channel.id, + "messages": [{"type": "text", "text": reply.content.text}] + })) + .send() + .await + .map_err(|e| error!("line push error: {e}")); + } } } else { // Telegram sendMessage @@ -611,6 +685,7 @@ async fn main() -> Result<()> { } let (event_tx, _) = broadcast::channel::(256); + let reply_token_cache: ReplyTokenCache = Arc::new(Mutex::new(std::collections::HashMap::new())); let state = Arc::new(AppState { bot_token, @@ -619,8 +694,26 @@ async fn main() -> Result<()> { line_channel_secret, line_access_token, event_tx, + reply_token_cache, }); + // Background task: sweep expired reply tokens every 60 seconds + { + let cache_state = state.clone(); + tokio::spawn(async move { + loop { + tokio::time::sleep(std::time::Duration::from_secs(60)).await; + let mut cache = cache_state.reply_token_cache.lock().await; + let before = cache.len(); + cache.retain(|_, (_, t)| t.elapsed().as_secs() < REPLY_TOKEN_TTL_SECS + 10); + let after = cache.len(); + if before != after { + info!(removed = before - after, remaining = after, "reply token cache sweep"); + } + } + }); + } + let app = Router::new() .route(&webhook_path, post(telegram_webhook)) .route("/webhook/line", post(line_webhook)) diff --git a/run_gateway.sh b/run_gateway.sh new file mode 100755 index 00000000..75017336 --- /dev/null +++ b/run_gateway.sh @@ -0,0 +1,12 @@ +#!/bin/bash +# Set listen port +export GATEWAY_LISTEN="0.0.0.0:9090" +export RUST_LOG=info + +echo "Starting OpenAB Gateway on port 9090..." +# Use allexport to export all vars from the file +set -a +source secrets.env +set +a + +./gateway/target/release/openab-gateway From 50a3b94f28f99e722fb1085d2fc7b2fdce92dad9 Mon Sep 17 00:00:00 2001 From: iamninihuang Date: Tue, 28 Apr 2026 11:02:02 +0800 Subject: [PATCH 02/16] fix(gateway): track last_event_id per channel to prevent cross-group race conditions --- gateway/src/main.rs | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/gateway/src/main.rs b/gateway/src/main.rs index d5dba050..0b34a42a 100644 --- a/gateway/src/main.rs +++ b/gateway/src/main.rs @@ -410,7 +410,7 @@ async fn handle_oab_connection(state: Arc, socket: axum::extract::ws:: info!("OAB client connected via WebSocket"); - let last_event_id = Arc::new(Mutex::new(None)); + let last_event_id: Arc>> = Arc::new(Mutex::new(std::collections::HashMap::new())); // Forward gateway events → OAB let last_event_id_for_send = last_event_id.clone(); @@ -420,9 +420,9 @@ async fn handle_oab_connection(state: Arc, socket: axum::extract::ws:: Ok(event_json) = event_rx.recv() => { // Track the last event ID sent to this client if let Ok(v) = serde_json::from_str::(&event_json) { - if let Some(eid) = v["event_id"].as_str() { + if let (Some(eid), Some(cid)) = (v["event_id"].as_str(), v["channel"]["id"].as_str()) { let mut last = last_event_id_for_send.lock().await; - *last = Some(eid.to_string()); + last.insert(cid.to_string(), eid.to_string()); } } @@ -452,7 +452,7 @@ async fn handle_oab_connection(state: Arc, socket: axum::extract::ws:: // Auto-fill reply_to if empty using the last event sent to this client if reply.reply_to.is_empty() { let last = last_event_id_for_recv.lock().await; - if let Some(ref eid) = *last { + if let Some(eid) = last.get(&reply.channel.id) { reply.reply_to = eid.clone(); } } From 1058a6694f5cf7fdeb9b02e3d18c6e1f6f55d5d7 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E8=B6=85=E6=B8=A1=E6=B3=95=E5=B8=AB?= Date: Tue, 28 Apr 2026 17:19:54 +0000 Subject: [PATCH 03/16] fix(gateway): address PR #608 review findings MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - Remove auto-fill reply_to mechanism that could attach replies to wrong LINE messages under concurrent traffic (blocker from shaun-agent, 超渡法師, 普渡法師). If OAB sends empty reply_to, gateway now correctly falls back to Push API instead of guessing the latest event. - Extract reply token from cache before making HTTP call so Mutex is not held across network I/O (blocker from shaun-agent, 超渡法師, 普渡法師). - Unify sweep TTL with handler TTL (REPLY_TOKEN_TTL_SECS) to eliminate dead-weight cache entries in 50-60s range (普渡法師). - Add secrets.env to .gitignore (shaun-agent, 超渡法師). - Run rustfmt --edition 2021 (shaun-agent). - Update ADR Consequences to reflect hybrid reply/push strategy (超渡法師). --- .gitignore | 1 + docs/adr/line-adapter.md | 2 +- gateway/src/main.rs | 95 +++++++++++++++++++--------------------- 3 files changed, 46 insertions(+), 52 deletions(-) diff --git a/.gitignore b/.gitignore index 26834d0d..ae5c0382 100644 --- a/.gitignore +++ b/.gitignore @@ -3,6 +3,7 @@ config.toml *.swp .DS_Store .env +secrets.env .kiro/ CLAUDE.md gateway/target/ diff --git a/docs/adr/line-adapter.md b/docs/adr/line-adapter.md index 01c922bb..f7d0d7da 100644 --- a/docs/adr/line-adapter.md +++ b/docs/adr/line-adapter.md @@ -384,7 +384,7 @@ For v1: - LINE users can interact with OpenAB agents without switching to Discord or Slack - The inbound webhook pattern opens the door for future webhook-based platforms (Telegram, WhatsApp, etc.) - Using `axum` for the HTTP server provides a solid foundation for a general-purpose webhook gateway -- Push message strategy avoids the 1-minute reply token limitation, enabling long-running agent tasks +- Hybrid reply/push strategy optimizes cost: the gateway opportunistically uses the free Reply API when the agent responds within the token TTL, falling back to Push API for longer-running tasks ### Negative diff --git a/gateway/src/main.rs b/gateway/src/main.rs index 0b34a42a..5e937601 100644 --- a/gateway/src/main.rs +++ b/gateway/src/main.rs @@ -410,22 +410,11 @@ async fn handle_oab_connection(state: Arc, socket: axum::extract::ws:: info!("OAB client connected via WebSocket"); - let last_event_id: Arc>> = Arc::new(Mutex::new(std::collections::HashMap::new())); - // Forward gateway events → OAB - let last_event_id_for_send = last_event_id.clone(); let send_task = tokio::spawn(async move { loop { tokio::select! { Ok(event_json) = event_rx.recv() => { - // Track the last event ID sent to this client - if let Ok(v) = serde_json::from_str::(&event_json) { - if let (Some(eid), Some(cid)) = (v["event_id"].as_str(), v["channel"]["id"].as_str()) { - let mut last = last_event_id_for_send.lock().await; - last.insert(cid.to_string(), eid.to_string()); - } - } - if ws_tx.send(Message::Text(event_json.into())).await.is_err() { break; } @@ -442,21 +431,12 @@ async fn handle_oab_connection(state: Arc, socket: axum::extract::ws:: // Track per-message reaction state (Telegram replaces all reactions atomically) let reaction_state: Arc>>> = Arc::new(Mutex::new(std::collections::HashMap::new())); - let last_event_id_for_recv = last_event_id.clone(); let recv_task = tokio::spawn(async move { let client = reqwest::Client::new(); while let Some(Ok(msg)) = ws_rx.next().await { if let Message::Text(text) = msg { match serde_json::from_str::(&text) { - Ok(mut reply) => { - // Auto-fill reply_to if empty using the last event sent to this client - if reply.reply_to.is_empty() { - let last = last_event_id_for_recv.lock().await; - if let Some(eid) = last.get(&reply.channel.id) { - reply.reply_to = eid.clone(); - } - } - + Ok(reply) => { // Handle create_topic command if reply.command.as_deref() == Some("create_topic") { let req_id = reply.request_id.clone().unwrap_or_default(); @@ -575,39 +555,48 @@ async fn handle_oab_connection(state: Arc, socket: axum::extract::ws:: // Normal send_message — route by platform if reply.platform == "line" { if let Some(ref access_token) = line_access_token { - // Try Reply API first (free, no quota consumed) - let mut used_reply = false; - { + // Extract token from cache (drop lock before HTTP call) + let cached_token = { let mut cache = reply_cache.lock().await; - let entry: Option<(String, Instant)> = cache.remove(&reply.reply_to); - if let Some((reply_token, cached_at)) = entry { - if cached_at.elapsed().as_secs() < REPLY_TOKEN_TTL_SECS { - info!(to = %reply.channel.id, "gateway → line (reply API)"); - let resp = client - .post("https://api.line.me/v2/bot/message/reply") - .bearer_auth(access_token) - .json(&serde_json::json!({ - "replyToken": reply_token, - "messages": [{"type": "text", "text": reply.content.text}] - })) - .send() - .await; - match resp { - Ok(r) if r.status().is_success() => { - used_reply = true; - } - Ok(r) => { - warn!(status = %r.status(), "LINE Reply API failed, falling back to Push"); - } - Err(e) => { - warn!(err = %e, "LINE Reply API error, falling back to Push"); - } + cache + .remove(&reply.reply_to) + .and_then(|(token, cached_at)| { + if cached_at.elapsed().as_secs() < REPLY_TOKEN_TTL_SECS + { + Some(token) + } else { + info!("LINE replyToken expired, using Push API"); + None } - } else { - info!("LINE replyToken expired, using Push API"); + }) + }; + + // Try Reply API first (free, no quota consumed) + let mut used_reply = false; + if let Some(reply_token) = cached_token { + info!(to = %reply.channel.id, "gateway → line (reply API)"); + let resp = client + .post("https://api.line.me/v2/bot/message/reply") + .bearer_auth(access_token) + .json(&serde_json::json!({ + "replyToken": reply_token, + "messages": [{"type": "text", "text": reply.content.text}] + })) + .send() + .await; + match resp { + Ok(r) if r.status().is_success() => { + used_reply = true; + } + Ok(r) => { + warn!(status = %r.status(), "LINE Reply API failed, falling back to Push"); + } + Err(e) => { + warn!(err = %e, "LINE Reply API error, falling back to Push"); } } } + // Fallback to Push API if !used_reply { info!(to = %reply.channel.id, "gateway → line (push API)"); @@ -705,10 +694,14 @@ async fn main() -> Result<()> { tokio::time::sleep(std::time::Duration::from_secs(60)).await; let mut cache = cache_state.reply_token_cache.lock().await; let before = cache.len(); - cache.retain(|_, (_, t)| t.elapsed().as_secs() < REPLY_TOKEN_TTL_SECS + 10); + cache.retain(|_, (_, t)| t.elapsed().as_secs() < REPLY_TOKEN_TTL_SECS); let after = cache.len(); if before != after { - info!(removed = before - after, remaining = after, "reply token cache sweep"); + info!( + removed = before - after, + remaining = after, + "reply token cache sweep" + ); } } }); From 7feb47bb0d0a3272b6ac27a4a76886340f75709c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E8=B6=85=E6=B8=A1=E6=B3=95=E5=B8=AB?= Date: Tue, 28 Apr 2026 17:20:44 +0000 Subject: [PATCH 04/16] docs(gateway): clarify global reply_token_cache vs per-connection semantics MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit LINE reply tokens are single-use. Document that the first OAB client to remove() a cached token gets the free Reply API call; others fall back to Push API. This is correct behavior but was not documented, which could confuse maintainers (普渡法師). --- gateway/src/main.rs | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/gateway/src/main.rs b/gateway/src/main.rs index 5e937601..58c5fb03 100644 --- a/gateway/src/main.rs +++ b/gateway/src/main.rs @@ -142,7 +142,10 @@ struct AppState { line_access_token: Option, /// Broadcast channel: gateway → OAB (events) event_tx: broadcast::Sender, - /// Cache: event_id → (LINE replyToken, timestamp) + /// Cache: event_id → (LINE replyToken, timestamp). + /// Global across all OAB WebSocket clients. LINE reply tokens are single-use: + /// the first client to `remove()` a token wins the free Reply API call; + /// other clients for the same event naturally fall back to Push API. reply_token_cache: ReplyTokenCache, } From 35a1f0f40e4b2ffe915a4689b1a9d1d8b81c9f4e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E8=B6=85=E6=B8=A1=E6=B3=95=E5=B8=AB?= Date: Tue, 28 Apr 2026 17:44:49 +0000 Subject: [PATCH 05/16] docs: update feature request to reflect removal of auto-fill mechanism MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The Per-Client Last Event Tracker was removed during review because it caused incorrect reply-token association in group chats. Update the feature request doc to match the actual implementation: gateway does not infer reply_to, it falls back to Push API when reply_to is empty or not found in cache (普渡法師 doc nit). --- docs/feature-requests/line-reply-api.md | 13 +++++++------ 1 file changed, 7 insertions(+), 6 deletions(-) diff --git a/docs/feature-requests/line-reply-api.md b/docs/feature-requests/line-reply-api.md index 80f8f33c..028d8325 100644 --- a/docs/feature-requests/line-reply-api.md +++ b/docs/feature-requests/line-reply-api.md @@ -55,11 +55,11 @@ returns `reply_to: "evt_..."` in every `GatewayReply`. generated `event_id`, with a TTL of 50 seconds (conservative margin within LINE's 1-minute limit). -2. **Zero core modification (Stateful Auto-fill)**: - While the original proposal assumed OAB core sends `reply_to`, current observation shows `reply_to` is often empty in standard `send_message` calls. To maintain "Zero Core Modification", the gateway now implements a **Per-Client Last Event Tracker**: - - The gateway tracks the most recent `event_id` sent to each connected OAB client. - - When a reply arrives with an empty `reply_to`, the gateway automatically injects the last tracked `event_id` for that client before performing the cache lookup. - - This ensures the Reply API works seamlessly even with legacy or unmodified OAB versions. +2. **Zero core modification**: OAB core is not modified. If OAB sends + a `reply_to` matching a cached `event_id`, the gateway uses the + Reply API. If `reply_to` is empty or not found in the cache, the + gateway falls back to the Push API. No auto-fill or inference is + performed — the gateway never guesses which event a reply belongs to. 3. **Hybrid dispatch** (in the reply handler, ~line 537-551): - Look up `reply.reply_to` in the token cache. @@ -70,7 +70,8 @@ returns `reply_to: "evt_..."` in every `GatewayReply`. logic, consumes quota). 4. **Cache cleanup**: A background `tokio::spawn` task sweeps expired - entries every 60 seconds to prevent memory growth. + entries using the same `REPLY_TOKEN_TTL_SECS` constant to prevent + memory growth. ### Alignment with Existing Architecture From 1b86667c65283bbd9df7d99c330d66603f11fa51 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E8=B6=85=E6=B8=A1=E6=B3=95=E5=B8=AB?= Date: Tue, 28 Apr 2026 17:46:03 +0000 Subject: [PATCH 06/16] docs(adr): remove stale auto-fill description from hybrid strategy The 4-step hybrid strategy description still referenced the removed Per-Client Last Event Tracker auto-fill mechanism. Updated to accurately describe the actual implementation: event_id-keyed cache lookup with empty reply_to falling back to Push API. --- docs/adr/line-adapter.md | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/docs/adr/line-adapter.md b/docs/adr/line-adapter.md index f7d0d7da..b929e6ca 100644 --- a/docs/adr/line-adapter.md +++ b/docs/adr/line-adapter.md @@ -111,10 +111,10 @@ LINE offers two reply mechanisms: - **Push message**: no time limit, can send to any user/group at any time (consumes quota). Historically, OpenAB relied solely on **push messages** because agent processing can exceed the 1-minute reply token window. To optimize costs for free-tier accounts, OpenAB now uses a **Hybrid Strategy** implemented at the gateway level: -1. The gateway caches incoming `replyToken`s for 50 seconds. -2. It tracks the last `event_id` sent to each OAB client to auto-fill the `reply_to` context. -3. If an agent replies within the window, the gateway intercepts and routes it via the free **Reply API**. -4. If the token is expired or missing, it gracefully falls back to the **Push API**. +1. The gateway caches incoming `replyToken`s keyed by `event_id` with a 50-second TTL. +2. When OAB replies with a non-empty `reply_to` that matches a cached entry, the gateway routes the message via the free **Reply API**. +3. If the token is expired, missing, or `reply_to` is empty, the gateway falls back to the **Push API**. +4. A background task sweeps expired cache entries to prevent memory growth. --- ## 3. Architectural Differences from Discord/Slack From 7fdeac03fe57627ab97afcf165fa9ee899ba1edd Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E8=B6=85=E6=B8=A1=E6=B3=95=E5=B8=AB?= Date: Tue, 28 Apr 2026 17:52:41 +0000 Subject: [PATCH 07/16] refactor(gateway): address review nits on PR #608 MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - Switch ReplyTokenCache from tokio::sync::Mutex to std::sync::Mutex; critical sections are short and never held across .await (超渡法師). - Log LINE Reply API 400 response body for easier debugging (超渡法師). - Guard secrets.env source in run_gateway.sh with existence check (超渡法師). - Align sweep interval with REPLY_TOKEN_TTL_SECS (50s) (超渡法師). --- gateway/src/main.rs | 23 ++++++++++++++--------- run_gateway.sh | 10 +++++++--- 2 files changed, 21 insertions(+), 12 deletions(-) diff --git a/gateway/src/main.rs b/gateway/src/main.rs index 58c5fb03..ad05385d 100644 --- a/gateway/src/main.rs +++ b/gateway/src/main.rs @@ -127,8 +127,10 @@ struct TelegramUser { // --- App state --- -/// Cache entry for LINE reply tokens: (replyToken, insertion_time) -type ReplyTokenCache = Arc>>; +/// Cache entry for LINE reply tokens: (replyToken, insertion_time). +/// Uses std::sync::Mutex — critical sections are short (insert/remove/retain) +/// and never held across .await, so async Mutex overhead is unnecessary. +type ReplyTokenCache = Arc>>; /// Maximum age (in seconds) before a cached reply token is considered expired. /// LINE tokens are valid for ~1 minute; we use 50s as a conservative margin. @@ -346,7 +348,7 @@ async fn line_webhook( // Cache the reply token for hybrid Reply/Push dispatch if let Some(ref reply_token) = event.reply_token { - let mut cache = state.reply_token_cache.lock().await; + let mut cache = state.reply_token_cache.lock().unwrap(); cache.insert(event_id.clone(), (reply_token.clone(), Instant::now())); info!(event_id = %event_id, "cached LINE replyToken"); } @@ -560,7 +562,7 @@ async fn handle_oab_connection(state: Arc, socket: axum::extract::ws:: if let Some(ref access_token) = line_access_token { // Extract token from cache (drop lock before HTTP call) let cached_token = { - let mut cache = reply_cache.lock().await; + let mut cache = reply_cache.lock().unwrap(); cache .remove(&reply.reply_to) .and_then(|(token, cached_at)| { @@ -592,7 +594,9 @@ async fn handle_oab_connection(state: Arc, socket: axum::extract::ws:: used_reply = true; } Ok(r) => { - warn!(status = %r.status(), "LINE Reply API failed, falling back to Push"); + let status = r.status(); + let body = r.text().await.unwrap_or_default(); + warn!(status = %status, body = %body, "LINE Reply API failed, falling back to Push"); } Err(e) => { warn!(err = %e, "LINE Reply API error, falling back to Push"); @@ -677,7 +681,8 @@ async fn main() -> Result<()> { } let (event_tx, _) = broadcast::channel::(256); - let reply_token_cache: ReplyTokenCache = Arc::new(Mutex::new(std::collections::HashMap::new())); + let reply_token_cache: ReplyTokenCache = + Arc::new(std::sync::Mutex::new(std::collections::HashMap::new())); let state = Arc::new(AppState { bot_token, @@ -689,13 +694,13 @@ async fn main() -> Result<()> { reply_token_cache, }); - // Background task: sweep expired reply tokens every 60 seconds + // Background task: sweep expired reply tokens every REPLY_TOKEN_TTL_SECS { let cache_state = state.clone(); tokio::spawn(async move { loop { - tokio::time::sleep(std::time::Duration::from_secs(60)).await; - let mut cache = cache_state.reply_token_cache.lock().await; + tokio::time::sleep(std::time::Duration::from_secs(REPLY_TOKEN_TTL_SECS)).await; + let mut cache = cache_state.reply_token_cache.lock().unwrap(); let before = cache.len(); cache.retain(|_, (_, t)| t.elapsed().as_secs() < REPLY_TOKEN_TTL_SECS); let after = cache.len(); diff --git a/run_gateway.sh b/run_gateway.sh index 75017336..50f132b2 100755 --- a/run_gateway.sh +++ b/run_gateway.sh @@ -5,8 +5,12 @@ export RUST_LOG=info echo "Starting OpenAB Gateway on port 9090..." # Use allexport to export all vars from the file -set -a -source secrets.env -set +a +if [ -f secrets.env ]; then + set -a + source secrets.env + set +a +else + echo "Warning: secrets.env not found — skipping. Create it with LINE_CHANNEL_SECRET, LINE_CHANNEL_ACCESS_TOKEN, etc." +fi ./gateway/target/release/openab-gateway From 46626f61982ff6809baded7c6bcc96ab14c35c7b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E8=B6=85=E6=B8=A1=E6=B3=95=E5=B8=AB?= Date: Tue, 28 Apr 2026 18:01:33 +0000 Subject: [PATCH 08/16] fix(gateway): prevent duplicate delivery on Reply API 5xx/network error MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - Only fallback to Push API on 4xx (token invalid/expired). For 5xx and network errors, the message may have already been delivered — do NOT fallback to avoid duplicate delivery (擺渡法師 major finding). - Update ADR message flow step 7 to reflect hybrid Reply/Push strategy instead of Push-only (擺渡法師 doc nit). --- docs/adr/line-adapter.md | 2 +- gateway/src/main.rs | 16 ++++++++++++++-- 2 files changed, 15 insertions(+), 3 deletions(-) diff --git a/docs/adr/line-adapter.md b/docs/adr/line-adapter.md index b929e6ca..66660764 100644 --- a/docs/adr/line-adapter.md +++ b/docs/adr/line-adapter.md @@ -101,7 +101,7 @@ LINE is not ideal when: - Group → line:{groupId} - Room → line:{roomId} 6. Message is routed to AdapterRouter → ACP Session Pool → kiro-cli process -7. Agent response is sent back via LINE Push Message API +7. Agent response is sent back via LINE Reply API (free) or Push Message API (fallback) ``` ### Reply Strategy: Hybrid Reply/Push Messages diff --git a/gateway/src/main.rs b/gateway/src/main.rs index ad05385d..0beef001 100644 --- a/gateway/src/main.rs +++ b/gateway/src/main.rs @@ -593,13 +593,25 @@ async fn handle_oab_connection(state: Arc, socket: axum::extract::ws:: Ok(r) if r.status().is_success() => { used_reply = true; } + Ok(r) if r.status().is_client_error() => { + // 4xx = token invalid/expired, safe to fallback + let status = r.status(); + let body = r.text().await.unwrap_or_default(); + warn!(status = %status, body = %body, "LINE Reply API client error, falling back to Push"); + } Ok(r) => { + // 5xx / other = message may have been delivered, + // do NOT fallback to Push to avoid duplicate delivery let status = r.status(); let body = r.text().await.unwrap_or_default(); - warn!(status = %status, body = %body, "LINE Reply API failed, falling back to Push"); + error!(status = %status, body = %body, "LINE Reply API server error, NOT falling back to Push (possible duplicate risk)"); + used_reply = true; // prevent Push fallback } Err(e) => { - warn!(err = %e, "LINE Reply API error, falling back to Push"); + // Network error = request may or may not have reached LINE, + // do NOT fallback to avoid duplicate delivery + error!(err = %e, "LINE Reply API network error, NOT falling back to Push (possible duplicate risk)"); + used_reply = true; // prevent Push fallback } } } From b6d80bf30d8edaaa18c3d74beec621cc6af36826 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E8=B6=85=E6=B8=A1=E6=B3=95=E5=B8=AB?= Date: Tue, 28 Apr 2026 18:02:56 +0000 Subject: [PATCH 09/16] fix(gateway): narrow Reply API fallback to explicit token-unusable only MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Only fallback to Push API when LINE's 400 response body explicitly indicates the reply token is invalid or expired. All other failures (5xx, other 4xx, network/timeout errors) are treated as ambiguous delivery — logged as error and dropped to prevent duplicate delivery (擺渡法師 refined major finding, 普渡法師 concurred). --- gateway/src/main.rs | 32 +++++++++++++++++++------------- 1 file changed, 19 insertions(+), 13 deletions(-) diff --git a/gateway/src/main.rs b/gateway/src/main.rs index 0beef001..38feddb0 100644 --- a/gateway/src/main.rs +++ b/gateway/src/main.rs @@ -593,25 +593,31 @@ async fn handle_oab_connection(state: Arc, socket: axum::extract::ws:: Ok(r) if r.status().is_success() => { used_reply = true; } - Ok(r) if r.status().is_client_error() => { - // 4xx = token invalid/expired, safe to fallback - let status = r.status(); - let body = r.text().await.unwrap_or_default(); - warn!(status = %status, body = %body, "LINE Reply API client error, falling back to Push"); - } Ok(r) => { - // 5xx / other = message may have been delivered, - // do NOT fallback to Push to avoid duplicate delivery let status = r.status(); let body = r.text().await.unwrap_or_default(); - error!(status = %status, body = %body, "LINE Reply API server error, NOT falling back to Push (possible duplicate risk)"); - used_reply = true; // prevent Push fallback + // Only fallback to Push when LINE explicitly says + // the reply token is unusable (invalid/expired). + // LINE returns "Invalid reply token" or "expired" + // in the error body for token-specific failures. + let body_lower = body.to_lowercase(); + let token_unusable = status.as_u16() == 400 + && (body_lower.contains("invalid") + && body_lower.contains("reply token") + || body_lower.contains("expired")); + if token_unusable { + warn!(status = %status, body = %body, "LINE reply token unusable, falling back to Push"); + } else { + // Ambiguous: 5xx, other 4xx, or unrecognized 400. + // Message may have been delivered — do NOT fallback. + error!(status = %status, body = %body, "LINE Reply API error, NOT falling back to Push (possible duplicate risk)"); + used_reply = true; + } } Err(e) => { - // Network error = request may or may not have reached LINE, - // do NOT fallback to avoid duplicate delivery + // Network/timeout error: delivery ambiguous, do NOT fallback error!(err = %e, "LINE Reply API network error, NOT falling back to Push (possible duplicate risk)"); - used_reply = true; // prevent Push fallback + used_reply = true; } } } From f7536ca4d21393b5530418e7ab7311436b1e7a28 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E8=B6=85=E6=B8=A1=E6=B3=95=E5=B8=AB?= Date: Tue, 28 Apr 2026 19:07:56 +0000 Subject: [PATCH 10/16] fix(gateway): address review findings on PR #608 - Fix operator precedence bug in Reply API 400 fallback: add explicit parentheses so (invalid AND reply token) OR expired is evaluated correctly - Change lock().unwrap() to lock().unwrap_or_else(|e| e.into_inner()) at all 3 cache lock sites to recover from mutex poison instead of panicking - Add REPLY_TOKEN_CACHE_MAX (10k) bound: skip insert and log warning when cache is full, gracefully degrading to Push API --- gateway/src/main.rs | 22 +++++++++++++++------- 1 file changed, 15 insertions(+), 7 deletions(-) diff --git a/gateway/src/main.rs b/gateway/src/main.rs index 38feddb0..67fb2b34 100644 --- a/gateway/src/main.rs +++ b/gateway/src/main.rs @@ -136,6 +136,10 @@ type ReplyTokenCache = Arc, @@ -348,9 +352,13 @@ async fn line_webhook( // Cache the reply token for hybrid Reply/Push dispatch if let Some(ref reply_token) = event.reply_token { - let mut cache = state.reply_token_cache.lock().unwrap(); - cache.insert(event_id.clone(), (reply_token.clone(), Instant::now())); - info!(event_id = %event_id, "cached LINE replyToken"); + let mut cache = state.reply_token_cache.lock().unwrap_or_else(|e| e.into_inner()); + if cache.len() >= REPLY_TOKEN_CACHE_MAX { + warn!(size = cache.len(), "reply token cache full, skipping insert"); + } else { + cache.insert(event_id.clone(), (reply_token.clone(), Instant::now())); + info!(event_id = %event_id, "cached LINE replyToken"); + } } let gateway_event = GatewayEvent { @@ -562,7 +570,7 @@ async fn handle_oab_connection(state: Arc, socket: axum::extract::ws:: if let Some(ref access_token) = line_access_token { // Extract token from cache (drop lock before HTTP call) let cached_token = { - let mut cache = reply_cache.lock().unwrap(); + let mut cache = reply_cache.lock().unwrap_or_else(|e| e.into_inner()); cache .remove(&reply.reply_to) .and_then(|(token, cached_at)| { @@ -602,8 +610,8 @@ async fn handle_oab_connection(state: Arc, socket: axum::extract::ws:: // in the error body for token-specific failures. let body_lower = body.to_lowercase(); let token_unusable = status.as_u16() == 400 - && (body_lower.contains("invalid") - && body_lower.contains("reply token") + && ((body_lower.contains("invalid") + && body_lower.contains("reply token")) || body_lower.contains("expired")); if token_unusable { warn!(status = %status, body = %body, "LINE reply token unusable, falling back to Push"); @@ -718,7 +726,7 @@ async fn main() -> Result<()> { tokio::spawn(async move { loop { tokio::time::sleep(std::time::Duration::from_secs(REPLY_TOKEN_TTL_SECS)).await; - let mut cache = cache_state.reply_token_cache.lock().unwrap(); + let mut cache = cache_state.reply_token_cache.lock().unwrap_or_else(|e| e.into_inner()); let before = cache.len(); cache.retain(|_, (_, t)| t.elapsed().as_secs() < REPLY_TOKEN_TTL_SECS); let after = cache.len(); From 3da68d79611ff8df9f3bf94a1f25b5cff393264d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E8=B6=85=E6=B8=A1=E6=B3=95=E5=B8=AB?= Date: Tue, 28 Apr 2026 19:15:06 +0000 Subject: [PATCH 11/16] docs: remove feature-requests/line-reply-api.md Feature is now implemented and documented in docs/adr/line-adapter.md. Keeping a feature request doc for a shipped feature is misleading. --- docs/feature-requests/line-reply-api.md | 103 ------------------------ 1 file changed, 103 deletions(-) delete mode 100644 docs/feature-requests/line-reply-api.md diff --git a/docs/feature-requests/line-reply-api.md b/docs/feature-requests/line-reply-api.md deleted file mode 100644 index 028d8325..00000000 --- a/docs/feature-requests/line-reply-api.md +++ /dev/null @@ -1,103 +0,0 @@ -# Feature Request: Hybrid LINE Reply/Push API Strategy - -**Title**: `feat(gateway): implement hybrid LINE Reply/Push API strategy` - -**Labels**: `feature` - -**GitHub Issue**: [#607](https://github.com/openabdev/openab/issues/607) - -## 1. Description - -Add a hybrid reply strategy to the LINE adapter in `openab-gateway`. -When an agent response arrives within the 1-minute Reply API window, -the gateway uses the free Reply API; otherwise it falls back to the -existing Push Message API. This saves messaging quota on free-tier -LINE accounts without modifying OAB core. - -## 2. Use Case - -As a LINE bot operator on a free-tier plan (200 push messages/month), -I want the bot to prefer the Reply API so that routine Q&A exchanges -do not consume my push quota. - -- **Problem**: The current Push-only strategy (`docs/adr/line-adapter.md` - Section 3, "Reply Strategy: Push Messages") exhausts the 200-message - limit within days of active development or personal use. -- **Trigger**: Any 1:1 or group message that receives an agent response - within 1 minute of the webhook event. -- **Beneficiaries**: Individual developers, testers, and users in - LINE-dominant regions (Taiwan, Japan, Thailand). - -## 3. Proposed Solution - -Implement a **Stateful Token Cache** inside `gateway/src/main.rs`. -The cache maps `event_id` to the LINE `replyToken` received in the -webhook payload. OAB core remains completely unmodified — it already -returns `reply_to: "evt_..."` in every `GatewayReply`. - -```text -+--------------+ OAB Reply +------------------+ -| openab |---------------------->| Custom Gateway | -| (Rust) |<--------------------->| (Stateful) | -+--------------+ Gateway WS +--------+---------+ - | - V [Logic] - 1. Match reply_to with Cache - 2. If exists -> Use Reply API (Free) - 3. Else -> Fallback to Push API -``` - -### Implementation Details - -1. **Cache storage**: When `line_webhook()` processes a message event, - extract the `replyToken` from the LINE payload and insert it into - a thread-safe `HashMap` keyed by the - generated `event_id`, with a TTL of 50 seconds (conservative - margin within LINE's 1-minute limit). - -2. **Zero core modification**: OAB core is not modified. If OAB sends - a `reply_to` matching a cached `event_id`, the gateway uses the - Reply API. If `reply_to` is empty or not found in the cache, the - gateway falls back to the Push API. No auto-fill or inference is - performed — the gateway never guesses which event a reply belongs to. - -3. **Hybrid dispatch** (in the reply handler, ~line 537-551): - - Look up `reply.reply_to` in the token cache. - - **Hit + fresh**: call `POST v2/bot/message/reply` with the cached - `replyToken`. On success, done (free, no quota consumed). - - **Hit + reply API returns 400**: token expired; fall through. - - **Miss or fallback**: call `POST v2/bot/message/push` (existing - logic, consumes quota). - -4. **Cache cleanup**: A background `tokio::spawn` task sweeps expired - entries using the same `REPLY_TOKEN_TTL_SECS` constant to prevent - memory growth. - -### Alignment with Existing Architecture - -- `docs/adr/custom-gateway.md` Section 3 (line 167) already lists - `reply_context` as a deferred schema concern: _"Reply token, quote - target, original message reference."_ This implementation stays - entirely within the gateway, avoiding premature schema changes - while addressing the concrete cost problem now. - -- `docs/adr/line-adapter.md` Section 3 (line 107-113) documents the - Push-only decision and its trade-off. This feature preserves Push - as the guaranteed fallback while opportunistically using Reply. - -## 4. Prior Art - -- **OpenAB ADR (line-adapter.md)**: Explicitly chose Push API because - "agent processing typically exceeds the 1-minute reply token window." - This proposal respects that decision by keeping Push as fallback. -- **OpenAB ADR (custom-gateway.md)**: Lists `reply_context` as a - known deferred concern, confirming this is an anticipated extension. -- **OpenClaw**: Manages LINE bridging via a plugin architecture with - buffered responses and loading animations. -- **LINE Official Docs**: `replyToken` is valid for ~1 minute; - webhook must respond with HTTP 200 within ~2 seconds. - -## 5. Related Issues - -None found. This complements the architecture in `docs/adr/line-adapter.md` -and `docs/adr/custom-gateway.md`. From 970018d905e1f492eb7c36841304ea4e361628a8 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E8=B6=85=E6=B8=A1=E6=B3=95=E5=B8=AB?= Date: Tue, 28 Apr 2026 19:18:20 +0000 Subject: [PATCH 12/16] revert: remove .gitignore change (secrets.env not needed) --- .gitignore | 1 - 1 file changed, 1 deletion(-) diff --git a/.gitignore b/.gitignore index ae5c0382..26834d0d 100644 --- a/.gitignore +++ b/.gitignore @@ -3,7 +3,6 @@ config.toml *.swp .DS_Store .env -secrets.env .kiro/ CLAUDE.md gateway/target/ From df2693d8c1de94e1d464e8a979f0b9bc11c274a2 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E8=B6=85=E6=B8=A1=E6=B3=95=E5=B8=AB?= Date: Tue, 28 Apr 2026 19:19:24 +0000 Subject: [PATCH 13/16] docs(adr): update line-adapter with hybrid dispatch diagram and changelog MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - Status: Proposed → Accepted - Add sequence diagram showing hybrid Reply/Push dispatch flow - Update changelog to v0.2 with #608 and #619 references - Add @iamninihuang as co-author --- docs/adr/line-adapter.md | 46 ++++++++++++++++++++++++++++++++++++---- 1 file changed, 42 insertions(+), 4 deletions(-) diff --git a/docs/adr/line-adapter.md b/docs/adr/line-adapter.md index 66660764..29672e35 100644 --- a/docs/adr/line-adapter.md +++ b/docs/adr/line-adapter.md @@ -1,8 +1,8 @@ # ADR: LINE Messaging API Adapter -- **Status:** Proposed -- **Date:** 2026-04-22 -- **Author:** @chaodu-agent +- **Status:** Accepted +- **Date:** 2026-04-28 +- **Author:** @chaodu-agent, @iamninihuang --- @@ -104,6 +104,43 @@ LINE is not ideal when: 7. Agent response is sent back via LINE Reply API (free) or Push Message API (fallback) ``` +### Hybrid Reply/Push Dispatch Flow + +``` +LINE User Gateway OAB Core + │ │ │ + │ message + replyToken │ │ + │ ─────────────────────────▶ │ │ + │ │ 1. Verify HMAC signature │ + │ │ 2. Generate event_id (UUID) │ + │ │ 3. Cache: │ + │ │ event_id → replyToken │ + │ │ (TTL 50s, max 10k) │ + │ │ │ + │ │ GatewayEvent { event_id } │ + │ │ ─────────────────────────────▶│ + │ │ │ Store event_id in + │ │ │ ChannelRef.origin_event_id + │ │ │ + │ │ │ Agent processes... + │ │ │ + │ │ GatewayReply { │ + │ │ reply_to: event_id │ + │ │ } │ + │ │ ◀─────────────────────────────│ + │ │ │ + │ │ 4. Lookup cache(event_id) │ + │ │ ├─ HIT + fresh │ + │ Reply API (FREE) ✅ │ │ → Reply API │ + │ ◀──────────────────────────│ │ │ + │ │ ├─ HIT + expired │ + │ Push API (quota) 💰 │ │ → Push API fallback │ + │ ◀──────────────────────────│ │ │ + │ │ └─ MISS │ + │ Push API (quota) 💰 │ → Push API fallback │ + │ ◀──────────────────────────│ │ +``` + ### Reply Strategy: Hybrid Reply/Push Messages LINE offers two reply mechanisms: @@ -417,8 +454,9 @@ To ensure this ADR is followed in implementation and future changes: ## Notes -- **Version:** 0.1 +- **Version:** 0.2 - **Changelog:** + - 0.2 (2026-04-28): Hybrid Reply/Push strategy implemented (#608). Updated status to Accepted. Added dispatch flow diagram. Reply strategy section rewritten from Push-only to hybrid. Core propagates `event_id` via `ChannelRef.origin_event_id` (#619). - 0.1 (2026-04-22): Initial proposed version --- From 01f12f8b911901c0212b0a9f8e0f1cf5c3f1cd88 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E8=B6=85=E6=B8=A1=E6=B3=95=E5=B8=AB?= Date: Tue, 28 Apr 2026 19:22:29 +0000 Subject: [PATCH 14/16] docs: remove run_gateway.sh, add LINE env vars to gateway README - Remove run_gateway.sh (README already documents how to run; script used port 9090 conflicting with README default 8080) - Add LINE_CHANNEL_SECRET and LINE_CHANNEL_ACCESS_TOKEN to env vars table - Add POST /webhook/line to endpoints table --- gateway/README.md | 3 +++ run_gateway.sh | 16 ---------------- 2 files changed, 3 insertions(+), 16 deletions(-) delete mode 100755 run_gateway.sh diff --git a/gateway/README.md b/gateway/README.md index 6e2f65dd..2a06e49d 100644 --- a/gateway/README.md +++ b/gateway/README.md @@ -47,12 +47,15 @@ url = "ws://gateway:8080/ws" | `TELEGRAM_BOT_TOKEN` | (required) | Telegram Bot API token | | `GATEWAY_LISTEN` | `0.0.0.0:8080` | Listen address | | `TELEGRAM_WEBHOOK_PATH` | `/webhook/telegram` | Webhook endpoint path | +| `LINE_CHANNEL_SECRET` | (optional) | LINE channel secret for webhook HMAC signature verification | +| `LINE_CHANNEL_ACCESS_TOKEN` | (optional) | LINE channel access token for Reply/Push API | ### Endpoints | Path | Description | |---|---| | `POST /webhook/telegram` | Telegram webhook receiver | +| `POST /webhook/line` | LINE webhook receiver | | `GET /ws` | WebSocket server (OAB connects here) | | `GET /health` | Health check | diff --git a/run_gateway.sh b/run_gateway.sh deleted file mode 100755 index 50f132b2..00000000 --- a/run_gateway.sh +++ /dev/null @@ -1,16 +0,0 @@ -#!/bin/bash -# Set listen port -export GATEWAY_LISTEN="0.0.0.0:9090" -export RUST_LOG=info - -echo "Starting OpenAB Gateway on port 9090..." -# Use allexport to export all vars from the file -if [ -f secrets.env ]; then - set -a - source secrets.env - set +a -else - echo "Warning: secrets.env not found — skipping. Create it with LINE_CHANNEL_SECRET, LINE_CHANNEL_ACCESS_TOKEN, etc." -fi - -./gateway/target/release/openab-gateway From 22f882655bbef72f6bdf392019ad99a83c7bb84e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E8=B6=85=E6=B8=A1=E6=B3=95=E5=B8=AB?= Date: Tue, 28 Apr 2026 19:22:46 +0000 Subject: [PATCH 15/16] docs(adr): restore original Date, add Last Updated field --- docs/adr/line-adapter.md | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/docs/adr/line-adapter.md b/docs/adr/line-adapter.md index 29672e35..88460530 100644 --- a/docs/adr/line-adapter.md +++ b/docs/adr/line-adapter.md @@ -1,7 +1,8 @@ # ADR: LINE Messaging API Adapter - **Status:** Accepted -- **Date:** 2026-04-28 +- **Date:** 2026-04-22 +- **Last Updated:** 2026-04-28 - **Author:** @chaodu-agent, @iamninihuang --- From 72c4365e7311162e1da11147dba829656f7480ab Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E8=B6=85=E6=B8=A1=E6=B3=95=E5=B8=AB?= Date: Tue, 28 Apr 2026 19:31:47 +0000 Subject: [PATCH 16/16] test(gateway): add coverage for hybrid LINE reply/push dispatch MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Extract dispatch_line_reply() into a testable function and add 6 tests covering the full decision matrix: - cache hit → Reply API only - cache miss → Push API fallback - expired token → Push API fallback - Reply 400 invalid token → Push API fallback - Reply 5xx → no fallback (duplicate risk) - network error → no fallback (duplicate risk) Uses wiremock for HTTP mocking with scoped expectations. Closes #620 --- gateway/Cargo.lock | 155 ++++++++++++++++++++ gateway/Cargo.toml | 3 + gateway/src/main.rs | 344 ++++++++++++++++++++++++++++++++++---------- 3 files changed, 426 insertions(+), 76 deletions(-) diff --git a/gateway/Cargo.lock b/gateway/Cargo.lock index d09bac1a..e0d445d5 100644 --- a/gateway/Cargo.lock +++ b/gateway/Cargo.lock @@ -26,6 +26,16 @@ version = "1.0.102" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7f202df86484c868dbad7eaa557ef785d5c66295e41b460ef922eca0723b842c" +[[package]] +name = "assert-json-diff" +version = "2.0.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "47e4f2b81832e72834d7518d8487a0396a28cc408186a2e8854c0f98011faf12" +dependencies = [ + "serde", + "serde_json", +] + [[package]] name = "atomic-waker" version = "1.1.2" @@ -199,6 +209,24 @@ version = "2.10.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d7a1e2f27636f116493b8b860f5546edb47c8d8f8ea73e1d2a20be88e28d1fea" +[[package]] +name = "deadpool" +version = "0.12.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0be2b1d1d6ec8d846f05e137292d0b89133caf95ef33695424c09568bdd39b1b" +dependencies = [ + "deadpool-runtime", + "lazy_static", + "num_cpus", + "tokio", +] + +[[package]] +name = "deadpool-runtime" +version = "0.1.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "092966b41edc516079bdf31ec78a2e0588d1d0c08f78b91d8307215928642b2b" + [[package]] name = "digest" version = "0.10.7" @@ -243,6 +271,12 @@ version = "0.1.9" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "5baebc0774151f905a1a2cc41989300b1e6fbb29aff0ceffa1064fdd3088d582" +[[package]] +name = "fnv" +version = "1.0.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3f9eec918d3f24069decb9af1554cad7c880e2da24a9afd88aca000531ab82c1" + [[package]] name = "foldhash" version = "0.1.5" @@ -258,6 +292,21 @@ dependencies = [ "percent-encoding", ] +[[package]] +name = "futures" +version = "0.3.32" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8b147ee9d1f6d097cef9ce628cd2ee62288d963e16fb287bd9286455b241382d" +dependencies = [ + "futures-channel", + "futures-core", + "futures-executor", + "futures-io", + "futures-sink", + "futures-task", + "futures-util", +] + [[package]] name = "futures-channel" version = "0.3.32" @@ -265,6 +314,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "07bbe89c50d7a535e539b8c17bc0b49bdb77747034daa8087407d655f3f7cc1d" dependencies = [ "futures-core", + "futures-sink", ] [[package]] @@ -273,6 +323,23 @@ version = "0.3.32" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7e3450815272ef58cec6d564423f6e755e25379b217b0bc688e295ba24df6b1d" +[[package]] +name = "futures-executor" +version = "0.3.32" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "baf29c38818342a3b26b5b923639e7b1f4a61fc5e76102d4b1981c6dc7a7579d" +dependencies = [ + "futures-core", + "futures-task", + "futures-util", +] + +[[package]] +name = "futures-io" +version = "0.3.32" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "cecba35d7ad927e23624b22ad55235f2239cfa44fd10428eecbeba6d6a717718" + [[package]] name = "futures-macro" version = "0.3.32" @@ -302,10 +369,13 @@ version = "0.3.32" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "389ca41296e6190b48053de0321d02a77f32f8a5d2461dd38762c0593805c6d6" dependencies = [ + "futures-channel", "futures-core", + "futures-io", "futures-macro", "futures-sink", "futures-task", + "memchr", "pin-project-lite", "slab", ] @@ -360,6 +430,25 @@ dependencies = [ "wasip3", ] +[[package]] +name = "h2" +version = "0.4.13" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2f44da3a8150a6703ed5d34e164b875fd14c2cdab9af1252a9a1020bde2bdc54" +dependencies = [ + "atomic-waker", + "bytes", + "fnv", + "futures-core", + "futures-sink", + "http", + "indexmap", + "slab", + "tokio", + "tokio-util", + "tracing", +] + [[package]] name = "hashbrown" version = "0.15.5" @@ -381,6 +470,12 @@ version = "0.5.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "2304e00983f87ffb38b55b444b5e3b60a884b5d30c0fca7d82fe33449bbe55ea" +[[package]] +name = "hermit-abi" +version = "0.5.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fc0fef456e4baa96da950455cd02c081ca953b141298e41db3fc7e36b1da849c" + [[package]] name = "hmac" version = "0.12.1" @@ -445,6 +540,7 @@ dependencies = [ "bytes", "futures-channel", "futures-core", + "h2", "http", "http-body", "httparse", @@ -775,6 +871,16 @@ dependencies = [ "autocfg", ] +[[package]] +name = "num_cpus" +version = "1.17.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "91df4bbde75afed763b708b7eee1e8e7651e02d97f6d5dd763e89367e957b23b" +dependencies = [ + "hermit-abi", + "libc", +] + [[package]] name = "once_cell" version = "1.21.4" @@ -800,6 +906,7 @@ dependencies = [ "tracing", "tracing-subscriber", "uuid", + "wiremock", ] [[package]] @@ -1018,6 +1125,18 @@ dependencies = [ "bitflags", ] +[[package]] +name = "regex" +version = "1.12.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e10754a14b9137dd7b1e3e5b0493cc9171fdd105e0ab477f51b72e7f3ac0e276" +dependencies = [ + "aho-corasick", + "memchr", + "regex-automata", + "regex-syntax", +] + [[package]] name = "regex-automata" version = "0.4.14" @@ -1506,6 +1625,19 @@ dependencies = [ "tungstenite 0.29.0", ] +[[package]] +name = "tokio-util" +version = "0.7.18" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9ae9cec805b01e8fc3fd2fe289f89149a9b66dd16786abd8b19cfa7b48cb0098" +dependencies = [ + "bytes", + "futures-core", + "futures-sink", + "pin-project-lite", + "tokio", +] + [[package]] name = "tower" version = "0.5.3" @@ -2103,6 +2235,29 @@ version = "0.53.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d6bbff5f0aada427a1e5a6da5f1f98158182f26556f345ac9e04d36d0ebed650" +[[package]] +name = "wiremock" +version = "0.6.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "08db1edfb05d9b3c1542e521aea074442088292f00b5f28e435c714a98f85031" +dependencies = [ + "assert-json-diff", + "base64", + "deadpool", + "futures", + "http", + "http-body-util", + "hyper", + "hyper-util", + "log", + "once_cell", + "regex", + "serde", + "serde_json", + "tokio", + "url", +] + [[package]] name = "wit-bindgen" version = "0.51.0" diff --git a/gateway/Cargo.toml b/gateway/Cargo.toml index dd5e78ae..d430ff4a 100644 --- a/gateway/Cargo.toml +++ b/gateway/Cargo.toml @@ -19,3 +19,6 @@ chrono = { version = "0.4", features = ["serde"] } hmac = "0.12" sha2 = "0.10" base64 = "0.22" + +[dev-dependencies] +wiremock = "0.6" diff --git a/gateway/src/main.rs b/gateway/src/main.rs index 67fb2b34..7cd04912 100644 --- a/gateway/src/main.rs +++ b/gateway/src/main.rs @@ -568,82 +568,14 @@ async fn handle_oab_connection(state: Arc, socket: axum::extract::ws:: // Normal send_message — route by platform if reply.platform == "line" { if let Some(ref access_token) = line_access_token { - // Extract token from cache (drop lock before HTTP call) - let cached_token = { - let mut cache = reply_cache.lock().unwrap_or_else(|e| e.into_inner()); - cache - .remove(&reply.reply_to) - .and_then(|(token, cached_at)| { - if cached_at.elapsed().as_secs() < REPLY_TOKEN_TTL_SECS - { - Some(token) - } else { - info!("LINE replyToken expired, using Push API"); - None - } - }) - }; - - // Try Reply API first (free, no quota consumed) - let mut used_reply = false; - if let Some(reply_token) = cached_token { - info!(to = %reply.channel.id, "gateway → line (reply API)"); - let resp = client - .post("https://api.line.me/v2/bot/message/reply") - .bearer_auth(access_token) - .json(&serde_json::json!({ - "replyToken": reply_token, - "messages": [{"type": "text", "text": reply.content.text}] - })) - .send() - .await; - match resp { - Ok(r) if r.status().is_success() => { - used_reply = true; - } - Ok(r) => { - let status = r.status(); - let body = r.text().await.unwrap_or_default(); - // Only fallback to Push when LINE explicitly says - // the reply token is unusable (invalid/expired). - // LINE returns "Invalid reply token" or "expired" - // in the error body for token-specific failures. - let body_lower = body.to_lowercase(); - let token_unusable = status.as_u16() == 400 - && ((body_lower.contains("invalid") - && body_lower.contains("reply token")) - || body_lower.contains("expired")); - if token_unusable { - warn!(status = %status, body = %body, "LINE reply token unusable, falling back to Push"); - } else { - // Ambiguous: 5xx, other 4xx, or unrecognized 400. - // Message may have been delivered — do NOT fallback. - error!(status = %status, body = %body, "LINE Reply API error, NOT falling back to Push (possible duplicate risk)"); - used_reply = true; - } - } - Err(e) => { - // Network/timeout error: delivery ambiguous, do NOT fallback - error!(err = %e, "LINE Reply API network error, NOT falling back to Push (possible duplicate risk)"); - used_reply = true; - } - } - } - - // Fallback to Push API - if !used_reply { - info!(to = %reply.channel.id, "gateway → line (push API)"); - let _ = client - .post("https://api.line.me/v2/bot/message/push") - .bearer_auth(access_token) - .json(&serde_json::json!({ - "to": reply.channel.id, - "messages": [{"type": "text", "text": reply.content.text}] - })) - .send() - .await - .map_err(|e| error!("line push error: {e}")); - } + dispatch_line_reply( + &client, + access_token, + &reply_cache, + &reply, + LINE_API_BASE, + ) + .await; } } else { // Telegram sendMessage @@ -676,6 +608,90 @@ async fn handle_oab_connection(state: Arc, socket: axum::extract::ws:: info!("OAB client disconnected"); } +/// Base URL for LINE Messaging API. Overridden in tests via the `api_base` parameter. +const LINE_API_BASE: &str = "https://api.line.me"; + +/// Dispatch a reply to LINE using the hybrid Reply/Push strategy. +/// +/// Returns `true` if Reply API was used (or assumed used), `false` if Push API was used. +async fn dispatch_line_reply( + client: &reqwest::Client, + access_token: &str, + reply_cache: &ReplyTokenCache, + reply: &GatewayReply, + api_base: &str, +) -> bool { + // Extract token from cache (drop lock before HTTP call) + let cached_token = { + let mut cache = reply_cache.lock().unwrap_or_else(|e| e.into_inner()); + cache + .remove(&reply.reply_to) + .and_then(|(token, cached_at)| { + if cached_at.elapsed().as_secs() < REPLY_TOKEN_TTL_SECS { + Some(token) + } else { + info!("LINE replyToken expired, using Push API"); + None + } + }) + }; + + // Try Reply API first (free, no quota consumed) + let mut used_reply = false; + if let Some(reply_token) = cached_token { + info!(to = %reply.channel.id, "gateway → line (reply API)"); + let resp = client + .post(format!("{}/v2/bot/message/reply", api_base)) + .bearer_auth(access_token) + .json(&serde_json::json!({ + "replyToken": reply_token, + "messages": [{"type": "text", "text": reply.content.text}] + })) + .send() + .await; + match resp { + Ok(r) if r.status().is_success() => { + used_reply = true; + } + Ok(r) => { + let status = r.status(); + let body = r.text().await.unwrap_or_default(); + let body_lower = body.to_lowercase(); + let token_unusable = status.as_u16() == 400 + && ((body_lower.contains("invalid") && body_lower.contains("reply token")) + || body_lower.contains("expired")); + if token_unusable { + warn!(status = %status, body = %body, "LINE reply token unusable, falling back to Push"); + } else { + error!(status = %status, body = %body, "LINE Reply API error, NOT falling back to Push (possible duplicate risk)"); + used_reply = true; + } + } + Err(e) => { + error!(err = %e, "LINE Reply API network error, NOT falling back to Push (possible duplicate risk)"); + used_reply = true; + } + } + } + + // Fallback to Push API + if !used_reply { + info!(to = %reply.channel.id, "gateway → line (push API)"); + let _ = client + .post(format!("{}/v2/bot/message/push", api_base)) + .bearer_auth(access_token) + .json(&serde_json::json!({ + "to": reply.channel.id, + "messages": [{"type": "text", "text": reply.content.text}] + })) + .send() + .await + .map_err(|e| error!("line push error: {e}")); + } + + used_reply +} + // --- Health check --- async fn health() -> &'static str { @@ -753,3 +769,179 @@ async fn main() -> Result<()> { axum::serve(listener, app).await?; Ok(()) } + +#[cfg(test)] +mod tests { + use super::*; + use std::collections::HashMap; + use std::time::{Duration, Instant}; + use wiremock::matchers::{method, path}; + use wiremock::{Mock, MockServer, ResponseTemplate}; + + fn make_reply(event_id: &str) -> GatewayReply { + GatewayReply { + schema: "openab.gateway.reply.v1".into(), + reply_to: event_id.into(), + platform: "line".into(), + channel: ReplyChannel { id: "U1234".into(), thread_id: None }, + content: Content { content_type: "text".into(), text: "hello".into() }, + command: None, + request_id: None, + } + } + + fn make_cache() -> ReplyTokenCache { + Arc::new(std::sync::Mutex::new(HashMap::new())) + } + + /// Cache hit: uses Reply API, does NOT call Push API. + #[tokio::test] + async fn cache_hit_uses_reply_api() { + let server = MockServer::start().await; + let _reply = Mock::given(method("POST")) + .and(path("/v2/bot/message/reply")) + .respond_with(ResponseTemplate::new(200).set_body_string("{}")) + .expect(1) + .mount_as_scoped(&server) + .await; + let _push = Mock::given(method("POST")) + .and(path("/v2/bot/message/push")) + .respond_with(ResponseTemplate::new(200)) + .expect(0) + .mount_as_scoped(&server) + .await; + + let cache = make_cache(); + cache.lock().unwrap().insert("evt_1".into(), ("tok_abc".into(), Instant::now())); + + let client = reqwest::Client::new(); + let used = dispatch_line_reply(&client, "token", &cache, &make_reply("evt_1"), &server.uri()).await; + + assert!(used, "should report Reply API was used"); + // Scoped mocks auto-verify expect(N) on drop + } + + /// Cache miss: falls back to Push API. + #[tokio::test] + async fn cache_miss_uses_push_api() { + let server = MockServer::start().await; + let _reply = Mock::given(method("POST")) + .and(path("/v2/bot/message/reply")) + .respond_with(ResponseTemplate::new(200)) + .expect(0) + .mount_as_scoped(&server) + .await; + let _push = Mock::given(method("POST")) + .and(path("/v2/bot/message/push")) + .respond_with(ResponseTemplate::new(200).set_body_string("{}")) + .expect(1) + .mount_as_scoped(&server) + .await; + + let cache = make_cache(); + + let client = reqwest::Client::new(); + let used = dispatch_line_reply(&client, "token", &cache, &make_reply("evt_miss"), &server.uri()).await; + + assert!(!used, "should report Push API was used (no reply token)"); + } + + /// Expired cached token: falls back to Push API. + #[tokio::test] + async fn expired_token_uses_push_api() { + let server = MockServer::start().await; + let _reply = Mock::given(method("POST")) + .and(path("/v2/bot/message/reply")) + .respond_with(ResponseTemplate::new(200)) + .expect(0) + .mount_as_scoped(&server) + .await; + let _push = Mock::given(method("POST")) + .and(path("/v2/bot/message/push")) + .respond_with(ResponseTemplate::new(200).set_body_string("{}")) + .expect(1) + .mount_as_scoped(&server) + .await; + + let cache = make_cache(); + let expired_time = Instant::now() - Duration::from_secs(REPLY_TOKEN_TTL_SECS + 10); + cache.lock().unwrap().insert("evt_exp".into(), ("tok_old".into(), expired_time)); + + let client = reqwest::Client::new(); + let used = dispatch_line_reply(&client, "token", &cache, &make_reply("evt_exp"), &server.uri()).await; + + assert!(!used, "should report Push API was used (expired token)"); + } + + /// Reply API 400 with invalid/expired reply token: falls back to Push API. + #[tokio::test] + async fn reply_400_invalid_token_falls_back_to_push() { + let server = MockServer::start().await; + let _reply = Mock::given(method("POST")) + .and(path("/v2/bot/message/reply")) + .respond_with( + ResponseTemplate::new(400) + .set_body_string(r#"{"message":"Invalid reply token"}"#), + ) + .expect(1) + .mount_as_scoped(&server) + .await; + let _push = Mock::given(method("POST")) + .and(path("/v2/bot/message/push")) + .respond_with(ResponseTemplate::new(200).set_body_string("{}")) + .expect(1) + .mount_as_scoped(&server) + .await; + + let cache = make_cache(); + cache.lock().unwrap().insert("evt_400".into(), ("tok_bad".into(), Instant::now())); + + let client = reqwest::Client::new(); + let used = dispatch_line_reply(&client, "token", &cache, &make_reply("evt_400"), &server.uri()).await; + + assert!(!used, "should fall back to Push on 400 invalid token"); + } + + /// Reply API 5xx: does NOT fall back to Push (duplicate risk). + #[tokio::test] + async fn reply_5xx_does_not_fallback() { + let server = MockServer::start().await; + let _reply = Mock::given(method("POST")) + .and(path("/v2/bot/message/reply")) + .respond_with(ResponseTemplate::new(500).set_body_string("Internal Server Error")) + .expect(1) + .mount_as_scoped(&server) + .await; + let _push = Mock::given(method("POST")) + .and(path("/v2/bot/message/push")) + .respond_with(ResponseTemplate::new(200)) + .expect(0) + .mount_as_scoped(&server) + .await; + + let cache = make_cache(); + cache.lock().unwrap().insert("evt_5xx".into(), ("tok_5xx".into(), Instant::now())); + + let client = reqwest::Client::new(); + let used = dispatch_line_reply(&client, "token", &cache, &make_reply("evt_5xx"), &server.uri()).await; + + assert!(used, "should NOT fall back to Push on 5xx"); + } + + /// Reply API network/timeout error: does NOT fall back to Push (duplicate risk). + #[tokio::test] + async fn reply_network_error_does_not_fallback() { + let bad_base = "http://127.0.0.1:1"; + + let cache = make_cache(); + cache.lock().unwrap().insert("evt_net".into(), ("tok_net".into(), Instant::now())); + + let client = reqwest::Client::builder() + .timeout(Duration::from_millis(100)) + .build() + .unwrap(); + let used = dispatch_line_reply(&client, "token", &cache, &make_reply("evt_net"), bad_base).await; + + assert!(used, "should NOT fall back to Push on network error"); + } +}