diff --git a/docs/adr/line-adapter.md b/docs/adr/line-adapter.md index f899a747..88460530 100644 --- a/docs/adr/line-adapter.md +++ b/docs/adr/line-adapter.md @@ -1,8 +1,9 @@ # ADR: LINE Messaging API Adapter -- **Status:** Proposed +- **Status:** Accepted - **Date:** 2026-04-22 -- **Author:** @chaodu-agent +- **Last Updated:** 2026-04-28 +- **Author:** @chaodu-agent, @iamninihuang --- @@ -101,17 +102,57 @@ LINE is not ideal when: - Group → line:{groupId} - Room → line:{roomId} 6. Message is routed to AdapterRouter → ACP Session Pool → kiro-cli process -7. Agent response is sent back via LINE Push Message API +7. Agent response is sent back via LINE Reply API (free) or Push Message API (fallback) ``` -### Reply Strategy: Push Messages +### Hybrid Reply/Push Dispatch Flow -LINE offers two reply mechanisms: -- **Reply message**: uses a reply token, but the token expires in 1 minute -- **Push message**: no time limit, can send to any user/group at any time +``` +LINE User Gateway OAB Core + │ │ │ + │ message + replyToken │ │ + │ ─────────────────────────▶ │ │ + │ │ 1. Verify HMAC signature │ + │ │ 2. Generate event_id (UUID) │ + │ │ 3. Cache: │ + │ │ event_id → replyToken │ + │ │ (TTL 50s, max 10k) │ + │ │ │ + │ │ GatewayEvent { event_id } │ + │ │ ─────────────────────────────▶│ + │ │ │ Store event_id in + │ │ │ ChannelRef.origin_event_id + │ │ │ + │ │ │ Agent processes... + │ │ │ + │ │ GatewayReply { │ + │ │ reply_to: event_id │ + │ │ } │ + │ │ ◀─────────────────────────────│ + │ │ │ + │ │ 4. Lookup cache(event_id) │ + │ │ ├─ HIT + fresh │ + │ Reply API (FREE) ✅ │ │ → Reply API │ + │ ◀──────────────────────────│ │ │ + │ │ ├─ HIT + expired │ + │ Push API (quota) 💰 │ │ → Push API fallback │ + │ ◀──────────────────────────│ │ │ + │ │ └─ MISS │ + │ Push API (quota) 💰 │ → Push API fallback │ + │ ◀──────────────────────────│ │ +``` -OpenAB uses **push messages** because agent processing typically exceeds the 1-minute reply token window. The trade-off is that push messages count against the monthly messaging quota on free-tier LINE accounts. +### Reply Strategy: Hybrid Reply/Push Messages +LINE offers two reply mechanisms: +- **Reply message**: uses a reply token, but the token expires in 1 minute (free). +- **Push message**: no time limit, can send to any user/group at any time (consumes quota). + +Historically, OpenAB relied solely on **push messages** because agent processing can exceed the 1-minute reply token window. To optimize costs for free-tier accounts, OpenAB now uses a **Hybrid Strategy** implemented at the gateway level: +1. The gateway caches incoming `replyToken`s keyed by `event_id` with a 50-second TTL. +2. When OAB replies with a non-empty `reply_to` that matches a cached entry, the gateway routes the message via the free **Reply API**. +3. If the token is expired, missing, or `reply_to` is empty, the gateway falls back to the **Push API**. +4. A background task sweeps expired cache entries to prevent memory growth. --- ## 3. Architectural Differences from Discord/Slack @@ -381,7 +422,7 @@ For v1: - LINE users can interact with OpenAB agents without switching to Discord or Slack - The inbound webhook pattern opens the door for future webhook-based platforms (Telegram, WhatsApp, etc.) - Using `axum` for the HTTP server provides a solid foundation for a general-purpose webhook gateway -- Push message strategy avoids the 1-minute reply token limitation, enabling long-running agent tasks +- Hybrid reply/push strategy optimizes cost: the gateway opportunistically uses the free Reply API when the agent responds within the token TTL, falling back to Push API for longer-running tasks ### Negative @@ -414,8 +455,9 @@ To ensure this ADR is followed in implementation and future changes: ## Notes -- **Version:** 0.1 +- **Version:** 0.2 - **Changelog:** + - 0.2 (2026-04-28): Hybrid Reply/Push strategy implemented (#608). Updated status to Accepted. Added dispatch flow diagram. Reply strategy section rewritten from Push-only to hybrid. Core propagates `event_id` via `ChannelRef.origin_event_id` (#619). - 0.1 (2026-04-22): Initial proposed version --- diff --git a/gateway/Cargo.lock b/gateway/Cargo.lock index d09bac1a..e0d445d5 100644 --- a/gateway/Cargo.lock +++ b/gateway/Cargo.lock @@ -26,6 +26,16 @@ version = "1.0.102" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7f202df86484c868dbad7eaa557ef785d5c66295e41b460ef922eca0723b842c" +[[package]] +name = "assert-json-diff" +version = "2.0.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "47e4f2b81832e72834d7518d8487a0396a28cc408186a2e8854c0f98011faf12" +dependencies = [ + "serde", + "serde_json", +] + [[package]] name = "atomic-waker" version = "1.1.2" @@ -199,6 +209,24 @@ version = "2.10.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d7a1e2f27636f116493b8b860f5546edb47c8d8f8ea73e1d2a20be88e28d1fea" +[[package]] +name = "deadpool" +version = "0.12.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0be2b1d1d6ec8d846f05e137292d0b89133caf95ef33695424c09568bdd39b1b" +dependencies = [ + "deadpool-runtime", + "lazy_static", + "num_cpus", + "tokio", +] + +[[package]] +name = "deadpool-runtime" +version = "0.1.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "092966b41edc516079bdf31ec78a2e0588d1d0c08f78b91d8307215928642b2b" + [[package]] name = "digest" version = "0.10.7" @@ -243,6 +271,12 @@ version = "0.1.9" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "5baebc0774151f905a1a2cc41989300b1e6fbb29aff0ceffa1064fdd3088d582" +[[package]] +name = "fnv" +version = "1.0.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3f9eec918d3f24069decb9af1554cad7c880e2da24a9afd88aca000531ab82c1" + [[package]] name = "foldhash" version = "0.1.5" @@ -258,6 +292,21 @@ dependencies = [ "percent-encoding", ] +[[package]] +name = "futures" +version = "0.3.32" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8b147ee9d1f6d097cef9ce628cd2ee62288d963e16fb287bd9286455b241382d" +dependencies = [ + "futures-channel", + "futures-core", + "futures-executor", + "futures-io", + "futures-sink", + "futures-task", + "futures-util", +] + [[package]] name = "futures-channel" version = "0.3.32" @@ -265,6 +314,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "07bbe89c50d7a535e539b8c17bc0b49bdb77747034daa8087407d655f3f7cc1d" dependencies = [ "futures-core", + "futures-sink", ] [[package]] @@ -273,6 +323,23 @@ version = "0.3.32" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7e3450815272ef58cec6d564423f6e755e25379b217b0bc688e295ba24df6b1d" +[[package]] +name = "futures-executor" +version = "0.3.32" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "baf29c38818342a3b26b5b923639e7b1f4a61fc5e76102d4b1981c6dc7a7579d" +dependencies = [ + "futures-core", + "futures-task", + "futures-util", +] + +[[package]] +name = "futures-io" +version = "0.3.32" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "cecba35d7ad927e23624b22ad55235f2239cfa44fd10428eecbeba6d6a717718" + [[package]] name = "futures-macro" version = "0.3.32" @@ -302,10 +369,13 @@ version = "0.3.32" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "389ca41296e6190b48053de0321d02a77f32f8a5d2461dd38762c0593805c6d6" dependencies = [ + "futures-channel", "futures-core", + "futures-io", "futures-macro", "futures-sink", "futures-task", + "memchr", "pin-project-lite", "slab", ] @@ -360,6 +430,25 @@ dependencies = [ "wasip3", ] +[[package]] +name = "h2" +version = "0.4.13" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2f44da3a8150a6703ed5d34e164b875fd14c2cdab9af1252a9a1020bde2bdc54" +dependencies = [ + "atomic-waker", + "bytes", + "fnv", + "futures-core", + "futures-sink", + "http", + "indexmap", + "slab", + "tokio", + "tokio-util", + "tracing", +] + [[package]] name = "hashbrown" version = "0.15.5" @@ -381,6 +470,12 @@ version = "0.5.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "2304e00983f87ffb38b55b444b5e3b60a884b5d30c0fca7d82fe33449bbe55ea" +[[package]] +name = "hermit-abi" +version = "0.5.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fc0fef456e4baa96da950455cd02c081ca953b141298e41db3fc7e36b1da849c" + [[package]] name = "hmac" version = "0.12.1" @@ -445,6 +540,7 @@ dependencies = [ "bytes", "futures-channel", "futures-core", + "h2", "http", "http-body", "httparse", @@ -775,6 +871,16 @@ dependencies = [ "autocfg", ] +[[package]] +name = "num_cpus" +version = "1.17.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "91df4bbde75afed763b708b7eee1e8e7651e02d97f6d5dd763e89367e957b23b" +dependencies = [ + "hermit-abi", + "libc", +] + [[package]] name = "once_cell" version = "1.21.4" @@ -800,6 +906,7 @@ dependencies = [ "tracing", "tracing-subscriber", "uuid", + "wiremock", ] [[package]] @@ -1018,6 +1125,18 @@ dependencies = [ "bitflags", ] +[[package]] +name = "regex" +version = "1.12.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e10754a14b9137dd7b1e3e5b0493cc9171fdd105e0ab477f51b72e7f3ac0e276" +dependencies = [ + "aho-corasick", + "memchr", + "regex-automata", + "regex-syntax", +] + [[package]] name = "regex-automata" version = "0.4.14" @@ -1506,6 +1625,19 @@ dependencies = [ "tungstenite 0.29.0", ] +[[package]] +name = "tokio-util" +version = "0.7.18" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9ae9cec805b01e8fc3fd2fe289f89149a9b66dd16786abd8b19cfa7b48cb0098" +dependencies = [ + "bytes", + "futures-core", + "futures-sink", + "pin-project-lite", + "tokio", +] + [[package]] name = "tower" version = "0.5.3" @@ -2103,6 +2235,29 @@ version = "0.53.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d6bbff5f0aada427a1e5a6da5f1f98158182f26556f345ac9e04d36d0ebed650" +[[package]] +name = "wiremock" +version = "0.6.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "08db1edfb05d9b3c1542e521aea074442088292f00b5f28e435c714a98f85031" +dependencies = [ + "assert-json-diff", + "base64", + "deadpool", + "futures", + "http", + "http-body-util", + "hyper", + "hyper-util", + "log", + "once_cell", + "regex", + "serde", + "serde_json", + "tokio", + "url", +] + [[package]] name = "wit-bindgen" version = "0.51.0" diff --git a/gateway/Cargo.toml b/gateway/Cargo.toml index dd5e78ae..d430ff4a 100644 --- a/gateway/Cargo.toml +++ b/gateway/Cargo.toml @@ -19,3 +19,6 @@ chrono = { version = "0.4", features = ["serde"] } hmac = "0.12" sha2 = "0.10" base64 = "0.22" + +[dev-dependencies] +wiremock = "0.6" diff --git a/gateway/README.md b/gateway/README.md index 6e2f65dd..2a06e49d 100644 --- a/gateway/README.md +++ b/gateway/README.md @@ -47,12 +47,15 @@ url = "ws://gateway:8080/ws" | `TELEGRAM_BOT_TOKEN` | (required) | Telegram Bot API token | | `GATEWAY_LISTEN` | `0.0.0.0:8080` | Listen address | | `TELEGRAM_WEBHOOK_PATH` | `/webhook/telegram` | Webhook endpoint path | +| `LINE_CHANNEL_SECRET` | (optional) | LINE channel secret for webhook HMAC signature verification | +| `LINE_CHANNEL_ACCESS_TOKEN` | (optional) | LINE channel access token for Reply/Push API | ### Endpoints | Path | Description | |---|---| | `POST /webhook/telegram` | Telegram webhook receiver | +| `POST /webhook/line` | LINE webhook receiver | | `GET /ws` | WebSocket server (OAB connects here) | | `GET /health` | Health check | diff --git a/gateway/src/main.rs b/gateway/src/main.rs index 0c9b948b..7cd04912 100644 --- a/gateway/src/main.rs +++ b/gateway/src/main.rs @@ -8,6 +8,7 @@ use axum::{ use futures_util::{SinkExt, StreamExt}; use serde::{Deserialize, Serialize}; use std::sync::Arc; +use std::time::Instant; use tokio::sync::{broadcast, Mutex}; use tracing::{error, info, warn}; @@ -126,6 +127,19 @@ struct TelegramUser { // --- App state --- +/// Cache entry for LINE reply tokens: (replyToken, insertion_time). +/// Uses std::sync::Mutex — critical sections are short (insert/remove/retain) +/// and never held across .await, so async Mutex overhead is unnecessary. +type ReplyTokenCache = Arc>>; + +/// Maximum age (in seconds) before a cached reply token is considered expired. +/// LINE tokens are valid for ~1 minute; we use 50s as a conservative margin. +const REPLY_TOKEN_TTL_SECS: u64 = 50; + +/// Maximum number of cached reply tokens. Prevents unbounded memory growth +/// if webhooks arrive faster than OAB can reply (e.g. OAB offline, spam burst). +const REPLY_TOKEN_CACHE_MAX: usize = 10_000; + struct AppState { bot_token: String, secret_token: Option, @@ -134,6 +148,11 @@ struct AppState { line_access_token: Option, /// Broadcast channel: gateway → OAB (events) event_tx: broadcast::Sender, + /// Cache: event_id → (LINE replyToken, timestamp). + /// Global across all OAB WebSocket clients. LINE reply tokens are single-use: + /// the first client to `remove()` a token wins the free Reply API call; + /// other clients for the same event naturally fall back to Push API. + reply_token_cache: ReplyTokenCache, } // --- Telegram webhook handler --- @@ -329,9 +348,22 @@ async fn line_webhook( .and_then(|s| s.user_id.as_deref()) .unwrap_or("unknown"); + let event_id = format!("evt_{}", uuid::Uuid::new_v4()); + + // Cache the reply token for hybrid Reply/Push dispatch + if let Some(ref reply_token) = event.reply_token { + let mut cache = state.reply_token_cache.lock().unwrap_or_else(|e| e.into_inner()); + if cache.len() >= REPLY_TOKEN_CACHE_MAX { + warn!(size = cache.len(), "reply token cache full, skipping insert"); + } else { + cache.insert(event_id.clone(), (reply_token.clone(), Instant::now())); + info!(event_id = %event_id, "cached LINE replyToken"); + } + } + let gateway_event = GatewayEvent { schema: "openab.gateway.event.v1".into(), - event_id: format!("evt_{}", uuid::Uuid::new_v4()), + event_id, timestamp: chrono::Utc::now().to_rfc3339(), platform: "line".into(), event_type: "message".into(), @@ -400,7 +432,6 @@ async fn handle_oab_connection(state: Arc, socket: axum::extract::ws:: break; } } - // No reply forwarding needed on this path — replies go to Telegram directly } } }); @@ -408,6 +439,7 @@ async fn handle_oab_connection(state: Arc, socket: axum::extract::ws:: // Receive OAB replies → Telegram let bot_token = state.bot_token.clone(); let line_access_token = state.line_access_token.clone(); + let reply_cache = state.reply_token_cache.clone(); let event_tx_for_recv = state.event_tx.clone(); // Track per-message reaction state (Telegram replaces all reactions atomically) let reaction_state: Arc>>> = @@ -535,19 +567,15 @@ async fn handle_oab_connection(state: Arc, socket: axum::extract::ws:: // Normal send_message — route by platform if reply.platform == "line" { - // LINE Push Message API - if let Some(ref token) = line_access_token { - info!(to = %reply.channel.id, "gateway → line"); - let _ = client - .post("https://api.line.me/v2/bot/message/push") - .bearer_auth(token) - .json(&serde_json::json!({ - "to": reply.channel.id, - "messages": [{"type": "text", "text": reply.content.text}] - })) - .send() - .await - .map_err(|e| error!("line send error: {e}")); + if let Some(ref access_token) = line_access_token { + dispatch_line_reply( + &client, + access_token, + &reply_cache, + &reply, + LINE_API_BASE, + ) + .await; } } else { // Telegram sendMessage @@ -580,6 +608,90 @@ async fn handle_oab_connection(state: Arc, socket: axum::extract::ws:: info!("OAB client disconnected"); } +/// Base URL for LINE Messaging API. Overridden in tests via the `api_base` parameter. +const LINE_API_BASE: &str = "https://api.line.me"; + +/// Dispatch a reply to LINE using the hybrid Reply/Push strategy. +/// +/// Returns `true` if Reply API was used (or assumed used), `false` if Push API was used. +async fn dispatch_line_reply( + client: &reqwest::Client, + access_token: &str, + reply_cache: &ReplyTokenCache, + reply: &GatewayReply, + api_base: &str, +) -> bool { + // Extract token from cache (drop lock before HTTP call) + let cached_token = { + let mut cache = reply_cache.lock().unwrap_or_else(|e| e.into_inner()); + cache + .remove(&reply.reply_to) + .and_then(|(token, cached_at)| { + if cached_at.elapsed().as_secs() < REPLY_TOKEN_TTL_SECS { + Some(token) + } else { + info!("LINE replyToken expired, using Push API"); + None + } + }) + }; + + // Try Reply API first (free, no quota consumed) + let mut used_reply = false; + if let Some(reply_token) = cached_token { + info!(to = %reply.channel.id, "gateway → line (reply API)"); + let resp = client + .post(format!("{}/v2/bot/message/reply", api_base)) + .bearer_auth(access_token) + .json(&serde_json::json!({ + "replyToken": reply_token, + "messages": [{"type": "text", "text": reply.content.text}] + })) + .send() + .await; + match resp { + Ok(r) if r.status().is_success() => { + used_reply = true; + } + Ok(r) => { + let status = r.status(); + let body = r.text().await.unwrap_or_default(); + let body_lower = body.to_lowercase(); + let token_unusable = status.as_u16() == 400 + && ((body_lower.contains("invalid") && body_lower.contains("reply token")) + || body_lower.contains("expired")); + if token_unusable { + warn!(status = %status, body = %body, "LINE reply token unusable, falling back to Push"); + } else { + error!(status = %status, body = %body, "LINE Reply API error, NOT falling back to Push (possible duplicate risk)"); + used_reply = true; + } + } + Err(e) => { + error!(err = %e, "LINE Reply API network error, NOT falling back to Push (possible duplicate risk)"); + used_reply = true; + } + } + } + + // Fallback to Push API + if !used_reply { + info!(to = %reply.channel.id, "gateway → line (push API)"); + let _ = client + .post(format!("{}/v2/bot/message/push", api_base)) + .bearer_auth(access_token) + .json(&serde_json::json!({ + "to": reply.channel.id, + "messages": [{"type": "text", "text": reply.content.text}] + })) + .send() + .await + .map_err(|e| error!("line push error: {e}")); + } + + used_reply +} + // --- Health check --- async fn health() -> &'static str { @@ -611,6 +723,8 @@ async fn main() -> Result<()> { } let (event_tx, _) = broadcast::channel::(256); + let reply_token_cache: ReplyTokenCache = + Arc::new(std::sync::Mutex::new(std::collections::HashMap::new())); let state = Arc::new(AppState { bot_token, @@ -619,8 +733,30 @@ async fn main() -> Result<()> { line_channel_secret, line_access_token, event_tx, + reply_token_cache, }); + // Background task: sweep expired reply tokens every REPLY_TOKEN_TTL_SECS + { + let cache_state = state.clone(); + tokio::spawn(async move { + loop { + tokio::time::sleep(std::time::Duration::from_secs(REPLY_TOKEN_TTL_SECS)).await; + let mut cache = cache_state.reply_token_cache.lock().unwrap_or_else(|e| e.into_inner()); + let before = cache.len(); + cache.retain(|_, (_, t)| t.elapsed().as_secs() < REPLY_TOKEN_TTL_SECS); + let after = cache.len(); + if before != after { + info!( + removed = before - after, + remaining = after, + "reply token cache sweep" + ); + } + } + }); + } + let app = Router::new() .route(&webhook_path, post(telegram_webhook)) .route("/webhook/line", post(line_webhook)) @@ -633,3 +769,179 @@ async fn main() -> Result<()> { axum::serve(listener, app).await?; Ok(()) } + +#[cfg(test)] +mod tests { + use super::*; + use std::collections::HashMap; + use std::time::{Duration, Instant}; + use wiremock::matchers::{method, path}; + use wiremock::{Mock, MockServer, ResponseTemplate}; + + fn make_reply(event_id: &str) -> GatewayReply { + GatewayReply { + schema: "openab.gateway.reply.v1".into(), + reply_to: event_id.into(), + platform: "line".into(), + channel: ReplyChannel { id: "U1234".into(), thread_id: None }, + content: Content { content_type: "text".into(), text: "hello".into() }, + command: None, + request_id: None, + } + } + + fn make_cache() -> ReplyTokenCache { + Arc::new(std::sync::Mutex::new(HashMap::new())) + } + + /// Cache hit: uses Reply API, does NOT call Push API. + #[tokio::test] + async fn cache_hit_uses_reply_api() { + let server = MockServer::start().await; + let _reply = Mock::given(method("POST")) + .and(path("/v2/bot/message/reply")) + .respond_with(ResponseTemplate::new(200).set_body_string("{}")) + .expect(1) + .mount_as_scoped(&server) + .await; + let _push = Mock::given(method("POST")) + .and(path("/v2/bot/message/push")) + .respond_with(ResponseTemplate::new(200)) + .expect(0) + .mount_as_scoped(&server) + .await; + + let cache = make_cache(); + cache.lock().unwrap().insert("evt_1".into(), ("tok_abc".into(), Instant::now())); + + let client = reqwest::Client::new(); + let used = dispatch_line_reply(&client, "token", &cache, &make_reply("evt_1"), &server.uri()).await; + + assert!(used, "should report Reply API was used"); + // Scoped mocks auto-verify expect(N) on drop + } + + /// Cache miss: falls back to Push API. + #[tokio::test] + async fn cache_miss_uses_push_api() { + let server = MockServer::start().await; + let _reply = Mock::given(method("POST")) + .and(path("/v2/bot/message/reply")) + .respond_with(ResponseTemplate::new(200)) + .expect(0) + .mount_as_scoped(&server) + .await; + let _push = Mock::given(method("POST")) + .and(path("/v2/bot/message/push")) + .respond_with(ResponseTemplate::new(200).set_body_string("{}")) + .expect(1) + .mount_as_scoped(&server) + .await; + + let cache = make_cache(); + + let client = reqwest::Client::new(); + let used = dispatch_line_reply(&client, "token", &cache, &make_reply("evt_miss"), &server.uri()).await; + + assert!(!used, "should report Push API was used (no reply token)"); + } + + /// Expired cached token: falls back to Push API. + #[tokio::test] + async fn expired_token_uses_push_api() { + let server = MockServer::start().await; + let _reply = Mock::given(method("POST")) + .and(path("/v2/bot/message/reply")) + .respond_with(ResponseTemplate::new(200)) + .expect(0) + .mount_as_scoped(&server) + .await; + let _push = Mock::given(method("POST")) + .and(path("/v2/bot/message/push")) + .respond_with(ResponseTemplate::new(200).set_body_string("{}")) + .expect(1) + .mount_as_scoped(&server) + .await; + + let cache = make_cache(); + let expired_time = Instant::now() - Duration::from_secs(REPLY_TOKEN_TTL_SECS + 10); + cache.lock().unwrap().insert("evt_exp".into(), ("tok_old".into(), expired_time)); + + let client = reqwest::Client::new(); + let used = dispatch_line_reply(&client, "token", &cache, &make_reply("evt_exp"), &server.uri()).await; + + assert!(!used, "should report Push API was used (expired token)"); + } + + /// Reply API 400 with invalid/expired reply token: falls back to Push API. + #[tokio::test] + async fn reply_400_invalid_token_falls_back_to_push() { + let server = MockServer::start().await; + let _reply = Mock::given(method("POST")) + .and(path("/v2/bot/message/reply")) + .respond_with( + ResponseTemplate::new(400) + .set_body_string(r#"{"message":"Invalid reply token"}"#), + ) + .expect(1) + .mount_as_scoped(&server) + .await; + let _push = Mock::given(method("POST")) + .and(path("/v2/bot/message/push")) + .respond_with(ResponseTemplate::new(200).set_body_string("{}")) + .expect(1) + .mount_as_scoped(&server) + .await; + + let cache = make_cache(); + cache.lock().unwrap().insert("evt_400".into(), ("tok_bad".into(), Instant::now())); + + let client = reqwest::Client::new(); + let used = dispatch_line_reply(&client, "token", &cache, &make_reply("evt_400"), &server.uri()).await; + + assert!(!used, "should fall back to Push on 400 invalid token"); + } + + /// Reply API 5xx: does NOT fall back to Push (duplicate risk). + #[tokio::test] + async fn reply_5xx_does_not_fallback() { + let server = MockServer::start().await; + let _reply = Mock::given(method("POST")) + .and(path("/v2/bot/message/reply")) + .respond_with(ResponseTemplate::new(500).set_body_string("Internal Server Error")) + .expect(1) + .mount_as_scoped(&server) + .await; + let _push = Mock::given(method("POST")) + .and(path("/v2/bot/message/push")) + .respond_with(ResponseTemplate::new(200)) + .expect(0) + .mount_as_scoped(&server) + .await; + + let cache = make_cache(); + cache.lock().unwrap().insert("evt_5xx".into(), ("tok_5xx".into(), Instant::now())); + + let client = reqwest::Client::new(); + let used = dispatch_line_reply(&client, "token", &cache, &make_reply("evt_5xx"), &server.uri()).await; + + assert!(used, "should NOT fall back to Push on 5xx"); + } + + /// Reply API network/timeout error: does NOT fall back to Push (duplicate risk). + #[tokio::test] + async fn reply_network_error_does_not_fallback() { + let bad_base = "http://127.0.0.1:1"; + + let cache = make_cache(); + cache.lock().unwrap().insert("evt_net".into(), ("tok_net".into(), Instant::now())); + + let client = reqwest::Client::builder() + .timeout(Duration::from_millis(100)) + .build() + .unwrap(); + let used = dispatch_line_reply(&client, "token", &cache, &make_reply("evt_net"), bad_base).await; + + assert!(used, "should NOT fall back to Push on network error"); + } +}