From e8a217e121d456989f2583348908aa081b78d7da Mon Sep 17 00:00:00 2001 From: masami-agent Date: Fri, 24 Apr 2026 23:04:13 +0000 Subject: [PATCH 1/3] feat(gateway): add MS Teams adapter + modular refactor MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Refactor gateway into modules and add Teams as the third adapter: - schema.rs: shared gateway event/reply types - adapters/telegram.rs: Telegram adapter (extracted, all features preserved including secret_token, mentions, reactions, create_topic, parse_mode) - adapters/line.rs: LINE adapter (extracted, HMAC signature validation and push message reply preserved) - adapters/teams.rs: NEW — JWT validation (issuer check + JWKS cache-miss fallback), OAuth2 client credentials, Bot Framework REST API, tenant allowlist, sovereign cloud support, 16 unit tests - main.rs: multi-platform reply routing by reply.platform field, Teams service_url cache with TTL cleanup Zero OAB core changes. Closes #548 --- gateway/Cargo.toml | 1 + gateway/src/adapters/line.rs | 155 +++++++ gateway/src/adapters/mod.rs | 3 + gateway/src/adapters/teams.rs | 669 ++++++++++++++++++++++++++++++ gateway/src/adapters/telegram.rs | 255 ++++++++++++ gateway/src/main.rs | 673 +++++++------------------------ gateway/src/schema.rs | 98 +++++ 7 files changed, 1322 insertions(+), 532 deletions(-) create mode 100644 gateway/src/adapters/line.rs create mode 100644 gateway/src/adapters/mod.rs create mode 100644 gateway/src/adapters/teams.rs create mode 100644 gateway/src/adapters/telegram.rs create mode 100644 gateway/src/schema.rs diff --git a/gateway/Cargo.toml b/gateway/Cargo.toml index dd5e78ae..67bb94c6 100644 --- a/gateway/Cargo.toml +++ b/gateway/Cargo.toml @@ -19,3 +19,4 @@ chrono = { version = "0.4", features = ["serde"] } hmac = "0.12" sha2 = "0.10" base64 = "0.22" +jsonwebtoken = "9" diff --git a/gateway/src/adapters/line.rs b/gateway/src/adapters/line.rs new file mode 100644 index 00000000..dc0f3a58 --- /dev/null +++ b/gateway/src/adapters/line.rs @@ -0,0 +1,155 @@ +use crate::schema::*; +use axum::extract::State; +use serde::Deserialize; +use std::sync::Arc; +use tracing::{error, info, warn}; + +// --- LINE types --- + +#[derive(Debug, Deserialize)] +pub struct LineWebhookBody { + events: Vec, +} + +#[derive(Debug, Deserialize)] +struct LineEvent { + #[serde(rename = "type")] + event_type: String, + source: Option, + message: Option, + #[serde(rename = "replyToken")] + reply_token: Option, +} + +#[derive(Debug, Deserialize)] +struct LineSource { + #[serde(rename = "type")] + source_type: String, + #[serde(rename = "userId")] + user_id: Option, + #[serde(rename = "groupId")] + group_id: Option, + #[serde(rename = "roomId")] + room_id: Option, +} + +#[derive(Debug, Deserialize)] +struct LineMessage { + id: String, + #[serde(rename = "type")] + message_type: String, + text: Option, +} + +// --- Webhook handler --- + +pub async fn webhook( + State(state): State>, + headers: axum::http::HeaderMap, + body: axum::body::Bytes, +) -> axum::http::StatusCode { + // Validate X-Line-Signature + if let Some(ref channel_secret) = state.line_channel_secret { + use base64::Engine; + use hmac::{Hmac, Mac}; + use sha2::Sha256; + + let signature = headers + .get("x-line-signature") + .and_then(|v| v.to_str().ok()); + let Some(signature) = signature else { + warn!("LINE webhook rejected: missing X-Line-Signature"); + return axum::http::StatusCode::UNAUTHORIZED; + }; + + let mut mac = Hmac::::new_from_slice(channel_secret.as_bytes()).expect("HMAC key"); + mac.update(&body); + let expected = + base64::engine::general_purpose::STANDARD.encode(mac.finalize().into_bytes()); + if signature != expected { + warn!("LINE webhook rejected: invalid signature"); + return axum::http::StatusCode::UNAUTHORIZED; + } + } + + let webhook_body: LineWebhookBody = match serde_json::from_slice(&body) { + Ok(b) => b, + Err(e) => { + warn!("LINE webhook parse error: {e}"); + return axum::http::StatusCode::BAD_REQUEST; + } + }; + + for event in webhook_body.events { + if event.event_type != "message" { + continue; + } + let Some(ref msg) = event.message else { + continue; + }; + if msg.message_type != "text" { + continue; + } + let Some(ref text) = msg.text else { + continue; + }; + if text.trim().is_empty() { + continue; + } + + let source = event.source.as_ref(); + let (channel_id, channel_type) = match source { + Some(s) if s.source_type == "group" => { + (s.group_id.clone().unwrap_or_default(), "group".to_string()) + } + Some(s) if s.source_type == "room" => { + (s.room_id.clone().unwrap_or_default(), "room".to_string()) + } + Some(s) => (s.user_id.clone().unwrap_or_default(), "user".to_string()), + None => continue, + }; + let user_id = source + .and_then(|s| s.user_id.as_deref()) + .unwrap_or("unknown"); + + let gateway_event = GatewayEvent::new( + "line", + ChannelInfo { + id: channel_id.clone(), + channel_type, + thread_id: None, + }, + SenderInfo { + id: user_id.into(), + name: user_id.into(), + display_name: user_id.into(), + is_bot: false, + }, + text, + &msg.id, + vec![], + ); + + let json = serde_json::to_string(&gateway_event).unwrap(); + info!(channel = %channel_id, sender = %user_id, "line → gateway"); + let _ = state.event_tx.send(json); + } + + axum::http::StatusCode::OK +} + +// --- Reply handler --- + +pub async fn handle_reply(reply: &GatewayReply, access_token: &str, client: &reqwest::Client) { + info!(to = %reply.channel.id, "gateway → line"); + let _ = client + .post("https://api.line.me/v2/bot/message/push") + .bearer_auth(access_token) + .json(&serde_json::json!({ + "to": reply.channel.id, + "messages": [{"type": "text", "text": reply.content.text}] + })) + .send() + .await + .map_err(|e| error!("line send error: {e}")); +} diff --git a/gateway/src/adapters/mod.rs b/gateway/src/adapters/mod.rs new file mode 100644 index 00000000..61c8f110 --- /dev/null +++ b/gateway/src/adapters/mod.rs @@ -0,0 +1,3 @@ +pub mod line; +pub mod teams; +pub mod telegram; diff --git a/gateway/src/adapters/teams.rs b/gateway/src/adapters/teams.rs new file mode 100644 index 00000000..04f82e78 --- /dev/null +++ b/gateway/src/adapters/teams.rs @@ -0,0 +1,669 @@ +use crate::schema::*; +use axum::extract::State; +use axum::http::{HeaderMap, StatusCode}; +use jsonwebtoken::{decode, Algorithm, DecodingKey, Validation}; +use serde::Deserialize; +use std::sync::Arc; +use tokio::sync::RwLock; +use tracing::{debug, error, info, warn}; + +// --- Bot Framework activity types --- + +#[allow(dead_code)] // Bot Framework schema fields — needed for future features +#[derive(Debug, Deserialize)] +#[serde(rename_all = "camelCase")] +pub struct Activity { + #[serde(rename = "type")] + pub activity_type: String, + pub id: Option, + pub timestamp: Option, + pub service_url: Option, + pub channel_id: Option, + pub from: Option, + pub conversation: Option, + pub text: Option, + pub tenant: Option, +} + +#[allow(dead_code)] +#[derive(Debug, Deserialize)] +#[serde(rename_all = "camelCase")] +pub struct ChannelAccount { + pub id: Option, + pub name: Option, + pub aad_object_id: Option, +} + +#[allow(dead_code)] +#[derive(Debug, Deserialize)] +#[serde(rename_all = "camelCase")] +pub struct ConversationAccount { + pub id: Option, + pub conversation_type: Option, + pub is_group: Option, +} + +#[derive(Debug, Deserialize)] +#[serde(rename_all = "camelCase")] +pub struct TenantInfo { + pub id: Option, +} + +// --- OpenID configuration --- + +#[derive(Debug, Deserialize)] +struct OpenIdConfig { + jwks_uri: String, +} + +#[derive(Debug, Deserialize)] +struct JwksResponse { + keys: Vec, +} + +#[derive(Debug, Clone, Deserialize)] +struct JwkKey { + kid: Option, + n: String, + e: String, + kty: String, +} + +// --- OAuth token --- + +#[derive(Debug, Deserialize)] +struct TokenResponse { + access_token: String, + expires_in: u64, +} + +struct CachedToken { + token: String, + expires_at: std::time::Instant, +} + +// --- Teams adapter config --- + +pub struct TeamsConfig { + pub app_id: String, + pub app_secret: String, + pub oauth_endpoint: String, + pub openid_metadata: String, + pub allowed_tenants: Vec, +} + +impl TeamsConfig { + pub fn from_env() -> Option { + let app_id = std::env::var("TEAMS_APP_ID").ok()?; + let app_secret = std::env::var("TEAMS_APP_SECRET").ok()?; + Some(Self { + app_id, + app_secret, + oauth_endpoint: std::env::var("TEAMS_OAUTH_ENDPOINT").unwrap_or_else(|_| { + "https://login.microsoftonline.com/botframework.com/oauth2/v2.0/token".into() + }), + openid_metadata: std::env::var("TEAMS_OPENID_METADATA").unwrap_or_else(|_| { + "https://login.botframework.com/v1/.well-known/openidconfiguration".into() + }), + allowed_tenants: std::env::var("TEAMS_ALLOWED_TENANTS") + .unwrap_or_default() + .split(',') + .map(|s| s.trim().to_string()) + .filter(|s| !s.is_empty()) + .collect(), + }) + } +} + +// --- Teams adapter state --- + +pub struct TeamsAdapter { + config: TeamsConfig, + client: reqwest::Client, + token_cache: RwLock>, + jwks_cache: RwLock, std::time::Instant)>>, +} + +const JWKS_CACHE_TTL: std::time::Duration = std::time::Duration::from_secs(3600); +const TOKEN_REFRESH_MARGIN: std::time::Duration = std::time::Duration::from_secs(300); + +impl TeamsAdapter { + pub fn new(config: TeamsConfig) -> Self { + Self { + config, + client: reqwest::Client::new(), + token_cache: RwLock::new(None), + jwks_cache: RwLock::new(None), + } + } + + /// Get a valid OAuth bearer token, refreshing if needed. + async fn get_token(&self) -> anyhow::Result { + // Check cache + { + let cache = self.token_cache.read().await; + if let Some(ref cached) = *cache { + if cached.expires_at > std::time::Instant::now() + TOKEN_REFRESH_MARGIN { + return Ok(cached.token.clone()); + } + } + } + + // Fetch new token + let resp: TokenResponse = self + .client + .post(&self.config.oauth_endpoint) + .form(&[ + ("grant_type", "client_credentials"), + ("client_id", &self.config.app_id), + ("client_secret", &self.config.app_secret), + ("scope", "https://api.botframework.com/.default"), + ]) + .send() + .await? + .json() + .await?; + + let token = resp.access_token.clone(); + *self.token_cache.write().await = Some(CachedToken { + token: resp.access_token, + expires_at: std::time::Instant::now() + std::time::Duration::from_secs(resp.expires_in), + }); + info!("teams OAuth token refreshed"); + Ok(token) + } + + /// Fetch and cache JWKS signing keys from Microsoft's OpenID metadata. + async fn get_jwks(&self) -> anyhow::Result> { + { + let cache = self.jwks_cache.read().await; + if let Some((ref keys, fetched_at)) = *cache { + if fetched_at.elapsed() < JWKS_CACHE_TTL { + return Ok(keys.clone()); + } + } + } + + let config: OpenIdConfig = self + .client + .get(&self.config.openid_metadata) + .send() + .await? + .json() + .await?; + + let jwks: JwksResponse = self + .client + .get(&config.jwks_uri) + .send() + .await? + .json() + .await?; + + let keys = jwks.keys; + *self.jwks_cache.write().await = Some((keys.clone(), std::time::Instant::now())); + info!(count = keys.len(), "teams JWKS keys refreshed"); + Ok(keys) + } + + /// Force-refresh JWKS keys, bypassing cache TTL. Called on cache miss (kid not found). + async fn refresh_jwks(&self) -> anyhow::Result> { + // Invalidate cache so get_jwks fetches fresh + *self.jwks_cache.write().await = None; + self.get_jwks().await + } + + /// Validate the JWT bearer token from an inbound Bot Framework request. + pub async fn validate_jwt(&self, auth_header: &str) -> anyhow::Result<()> { + let token = auth_header + .strip_prefix("Bearer ") + .ok_or_else(|| anyhow::anyhow!("missing Bearer prefix"))?; + + // Decode header to get kid + let header = jsonwebtoken::decode_header(token)?; + let kid = header + .kid + .ok_or_else(|| anyhow::anyhow!("no kid in JWT header"))?; + + let keys = self.get_jwks().await?; + let key = match keys.iter().find(|k| k.kid.as_deref() == Some(&kid)) { + Some(k) => k.clone(), + None => { + // Cache miss: Microsoft may have rotated keys. Force refresh and retry. + let refreshed = self.refresh_jwks().await?; + refreshed + .into_iter() + .find(|k| k.kid.as_deref() == Some(&kid)) + .ok_or_else(|| anyhow::anyhow!("no matching JWK for kid={kid} after refresh"))? + } + }; + + if key.kty != "RSA" { + anyhow::bail!("unsupported key type: {}", key.kty); + } + + let decoding_key = DecodingKey::from_rsa_components(&key.n, &key.e)?; + let mut validation = Validation::new(Algorithm::RS256); + validation.set_audience(&[&self.config.app_id]); + // Bot Framework tokens can use RS256 or RS384 + validation.algorithms = vec![Algorithm::RS256, Algorithm::RS384]; + // Bot Framework issuer per auth spec + validation.set_issuer(&["https://api.botframework.com"]); + validation.validate_aud = true; + validation.validate_exp = true; + validation.validate_nbf = false; + + decode::(token, &decoding_key, &validation)?; + Ok(()) + } + + /// Check tenant allowlist. + fn check_tenant(&self, activity: &Activity) -> bool { + if self.config.allowed_tenants.is_empty() { + return true; + } + activity + .tenant + .as_ref() + .and_then(|t| t.id.as_deref()) + .is_some_and(|tid| self.config.allowed_tenants.iter().any(|a| a == tid)) + } + + /// Send a reply via Bot Framework REST API. + pub async fn send_activity( + &self, + service_url: &str, + conversation_id: &str, + text: &str, + ) -> anyhow::Result { + let token = self.get_token().await?; + let url = format!( + "{}v3/conversations/{}/activities", + ensure_trailing_slash(service_url), + conversation_id + ); + + let body = serde_json::json!({ + "type": "message", + "text": text, + }); + + let resp = self + .client + .post(&url) + .bearer_auth(&token) + .json(&body) + .send() + .await?; + + if !resp.status().is_success() { + let status = resp.status(); + let body = resp.text().await.unwrap_or_default(); + anyhow::bail!("Bot Framework API error {status}: {body}"); + } + + let result: serde_json::Value = resp.json().await?; + Ok(result["id"].as_str().unwrap_or("").to_string()) + } + + /// Edit an existing activity (for streaming updates). + pub async fn update_activity( + &self, + service_url: &str, + conversation_id: &str, + activity_id: &str, + text: &str, + ) -> anyhow::Result<()> { + let token = self.get_token().await?; + let url = format!( + "{}v3/conversations/{}/activities/{}", + ensure_trailing_slash(service_url), + conversation_id, + activity_id + ); + + let body = serde_json::json!({ + "type": "message", + "text": text, + }); + + let resp = self + .client + .put(&url) + .bearer_auth(&token) + .json(&body) + .send() + .await?; + + if !resp.status().is_success() { + let status = resp.status(); + let body = resp.text().await.unwrap_or_default(); + anyhow::bail!("Bot Framework update error {status}: {body}"); + } + Ok(()) + } +} + +fn ensure_trailing_slash(url: &str) -> String { + if url.ends_with('/') { + url.to_string() + } else { + format!("{url}/") + } +} + +// --- Webhook handler --- + +pub async fn webhook( + State(state): State>, + headers: HeaderMap, + body: String, +) -> StatusCode { + let teams = match &state.teams { + Some(t) => t, + None => return StatusCode::NOT_FOUND, + }; + + // JWT validation + if let Some(auth) = headers.get("authorization").and_then(|v| v.to_str().ok()) { + if let Err(e) = teams.validate_jwt(auth).await { + warn!(error = %e, "teams JWT validation failed"); + return StatusCode::UNAUTHORIZED; + } + } else { + warn!("teams webhook: missing authorization header"); + return StatusCode::UNAUTHORIZED; + } + + // Parse activity + let activity: Activity = match serde_json::from_str(&body) { + Ok(a) => a, + Err(e) => { + warn!(error = %e, "teams: invalid activity JSON"); + return StatusCode::BAD_REQUEST; + } + }; + + // Only handle message activities + if activity.activity_type != "message" { + debug!(activity_type = %activity.activity_type, "teams: ignoring non-message activity"); + return StatusCode::OK; + } + + // Tenant check + if !teams.check_tenant(&activity) { + let tid = activity + .tenant + .as_ref() + .and_then(|t| t.id.as_deref()) + .unwrap_or("unknown"); + warn!(tenant = tid, "teams: tenant not in allowlist"); + return StatusCode::FORBIDDEN; + } + + let text = match activity.text.as_deref() { + Some(t) if !t.trim().is_empty() => t.trim(), + _ => return StatusCode::OK, + }; + + let conversation_id = activity + .conversation + .as_ref() + .and_then(|c| c.id.as_deref()) + .unwrap_or(""); + let conversation_type = activity + .conversation + .as_ref() + .and_then(|c| c.conversation_type.as_deref()) + .unwrap_or("personal"); + let service_url = activity.service_url.as_deref().unwrap_or(""); + let sender_id = activity + .from + .as_ref() + .and_then(|f| f.id.as_deref()) + .unwrap_or(""); + let sender_name = activity + .from + .as_ref() + .and_then(|f| f.name.as_deref()) + .unwrap_or("Unknown"); + let activity_id = activity.id.as_deref().unwrap_or(""); + + let event = GatewayEvent::new( + "teams", + ChannelInfo { + id: conversation_id.to_string(), + channel_type: conversation_type.to_string(), + thread_id: None, // Teams conversations don't have sub-threads in the same way + }, + SenderInfo { + id: sender_id.to_string(), + name: sender_name.to_string(), + display_name: sender_name.to_string(), + is_bot: false, + }, + text, + activity_id, + vec![], // Teams @mentions parsing deferred to future PR + ); + + // Store service_url for reply routing + state.teams_service_urls.lock().await.insert( + conversation_id.to_string(), + (service_url.to_string(), std::time::Instant::now()), + ); + + let json = serde_json::to_string(&event).unwrap(); + info!( + conversation = conversation_id, + sender = sender_name, + "teams → gateway" + ); + let _ = state.event_tx.send(json); + + StatusCode::OK +} + +// --- Reply handler --- + +pub async fn handle_reply( + reply: &GatewayReply, + teams: &TeamsAdapter, + service_urls: &tokio::sync::Mutex< + std::collections::HashMap, + >, +) { + let service_url = { + let urls = service_urls.lock().await; + match urls.get(&reply.channel.id) { + Some((url, _)) => url.clone(), + None => { + error!(conversation = %reply.channel.id, "teams: no service_url for conversation"); + return; + } + } + }; + + info!(conversation = %reply.channel.id, "gateway → teams"); + match teams + .send_activity(&service_url, &reply.channel.id, &reply.content.text) + .await + { + Ok(id) => debug!(activity_id = %id, "teams activity sent"), + Err(e) => error!(error = %e, "teams send error"), + } +} + +#[cfg(test)] +mod tests { + use super::*; + + // --- ensure_trailing_slash --- + + #[test] + fn trailing_slash_adds_when_missing() { + assert_eq!( + ensure_trailing_slash("https://example.com"), + "https://example.com/" + ); + } + + #[test] + fn trailing_slash_keeps_when_present() { + assert_eq!( + ensure_trailing_slash("https://example.com/"), + "https://example.com/" + ); + } + + #[test] + fn trailing_slash_empty_string() { + assert_eq!(ensure_trailing_slash(""), "/"); + } + + // --- check_tenant --- + + fn make_config(tenants: Vec<&str>) -> TeamsConfig { + TeamsConfig { + app_id: "test-app".into(), + app_secret: "test-secret".into(), + oauth_endpoint: "https://example.com/token".into(), + openid_metadata: "https://example.com/openid".into(), + allowed_tenants: tenants.into_iter().map(|s| s.to_string()).collect(), + } + } + + fn make_activity_with_tenant(tenant_id: Option<&str>) -> Activity { + Activity { + activity_type: "message".into(), + id: Some("act1".into()), + timestamp: None, + service_url: Some("https://smba.trafficmanager.net/".into()), + channel_id: Some("msteams".into()), + from: None, + conversation: None, + text: Some("hello".into()), + tenant: tenant_id.map(|id| TenantInfo { + id: Some(id.into()), + }), + } + } + + #[test] + fn tenant_allowed_when_list_empty() { + let adapter = TeamsAdapter::new(make_config(vec![])); + let activity = make_activity_with_tenant(Some("any-tenant")); + assert!(adapter.check_tenant(&activity)); + } + + #[test] + fn tenant_allowed_when_in_list() { + let adapter = TeamsAdapter::new(make_config(vec!["tenant-a", "tenant-b"])); + let activity = make_activity_with_tenant(Some("tenant-b")); + assert!(adapter.check_tenant(&activity)); + } + + #[test] + fn tenant_rejected_when_not_in_list() { + let adapter = TeamsAdapter::new(make_config(vec!["tenant-a"])); + let activity = make_activity_with_tenant(Some("tenant-x")); + assert!(!adapter.check_tenant(&activity)); + } + + #[test] + fn tenant_rejected_when_no_tenant_info() { + let adapter = TeamsAdapter::new(make_config(vec!["tenant-a"])); + let activity = make_activity_with_tenant(None); + assert!(!adapter.check_tenant(&activity)); + } + + #[test] + fn tenant_allowed_when_no_tenant_and_empty_list() { + let adapter = TeamsAdapter::new(make_config(vec![])); + let activity = make_activity_with_tenant(None); + assert!(adapter.check_tenant(&activity)); + } + + // --- validate_jwt error paths --- + + #[tokio::test] + async fn jwt_rejects_missing_bearer_prefix() { + let adapter = TeamsAdapter::new(make_config(vec![])); + let result = adapter.validate_jwt("NotBearer xyz").await; + assert!(result.is_err()); + assert!(result.unwrap_err().to_string().contains("Bearer")); + } + + #[tokio::test] + async fn jwt_rejects_empty_bearer() { + let adapter = TeamsAdapter::new(make_config(vec![])); + let result = adapter.validate_jwt("Bearer ").await; + assert!(result.is_err()); + } + + #[tokio::test] + async fn jwt_rejects_garbage_token() { + let adapter = TeamsAdapter::new(make_config(vec![])); + let result = adapter.validate_jwt("Bearer not.a.valid.jwt").await; + assert!(result.is_err()); + } + + // --- Activity deserialization --- + + #[test] + fn deserialize_minimal_activity() { + let json = r#"{"type": "message"}"#; + let activity: Activity = serde_json::from_str(json).unwrap(); + assert_eq!(activity.activity_type, "message"); + assert!(activity.text.is_none()); + assert!(activity.from.is_none()); + } + + #[test] + fn deserialize_full_activity() { + let json = r#"{ + "type": "message", + "id": "act123", + "serviceUrl": "https://smba.trafficmanager.net/", + "channelId": "msteams", + "from": {"id": "user1", "name": "Alice", "aadObjectId": "aad-123"}, + "conversation": {"id": "conv1", "conversationType": "personal", "isGroup": false}, + "text": "hello bot", + "tenant": {"id": "tenant-abc"} + }"#; + let activity: Activity = serde_json::from_str(json).unwrap(); + assert_eq!(activity.activity_type, "message"); + assert_eq!(activity.text.as_deref(), Some("hello bot")); + assert_eq!( + activity.from.as_ref().unwrap().name.as_deref(), + Some("Alice") + ); + assert_eq!( + activity.tenant.as_ref().unwrap().id.as_deref(), + Some("tenant-abc") + ); + } + + #[test] + fn deserialize_non_message_activity() { + let json = r#"{"type": "conversationUpdate"}"#; + let activity: Activity = serde_json::from_str(json).unwrap(); + assert_eq!(activity.activity_type, "conversationUpdate"); + } + + #[test] + fn deserialize_invalid_json_fails() { + let result = serde_json::from_str::("not json"); + assert!(result.is_err()); + } + + // --- TeamsConfig::from_env --- + + #[test] + fn config_from_env_returns_none_without_vars() { + // Ensure the env vars are not set (they shouldn't be in test) + std::env::remove_var("TEAMS_APP_ID"); + std::env::remove_var("TEAMS_APP_SECRET"); + assert!(TeamsConfig::from_env().is_none()); + } +} diff --git a/gateway/src/adapters/telegram.rs b/gateway/src/adapters/telegram.rs new file mode 100644 index 00000000..c6074531 --- /dev/null +++ b/gateway/src/adapters/telegram.rs @@ -0,0 +1,255 @@ +use crate::schema::*; +use axum::extract::State; +use axum::Json; +use serde::Deserialize; +use std::collections::HashMap; +use std::sync::Arc; +use tokio::sync::Mutex; +use tracing::{error, info, warn}; + +// --- Telegram types --- + +#[derive(Debug, Deserialize)] +pub struct TelegramUpdate { + message: Option, +} + +#[derive(Debug, Deserialize)] +struct TelegramMessage { + message_id: i64, + message_thread_id: Option, + chat: TelegramChat, + from: Option, + text: Option, + #[serde(default)] + entities: Vec, +} + +#[derive(Debug, Deserialize)] +struct TelegramEntity { + #[serde(rename = "type")] + entity_type: String, + offset: usize, + length: usize, +} + +#[derive(Debug, Deserialize)] +struct TelegramChat { + id: i64, + #[serde(rename = "type")] + chat_type: String, +} + +#[derive(Debug, Deserialize)] +struct TelegramUser { + id: i64, + first_name: String, + last_name: Option, + username: Option, + is_bot: bool, +} + +// --- Webhook handler --- + +pub async fn webhook( + State(state): State>, + headers: axum::http::HeaderMap, + Json(update): Json, +) -> axum::http::StatusCode { + if let Some(ref expected) = state.telegram_secret_token { + let provided = headers + .get("x-telegram-bot-api-secret-token") + .and_then(|v| v.to_str().ok()); + if provided != Some(expected.as_str()) { + warn!("webhook rejected: invalid or missing secret_token"); + return axum::http::StatusCode::UNAUTHORIZED; + } + } + + let Some(msg) = update.message else { + return axum::http::StatusCode::OK; + }; + let Some(text) = msg.text.as_deref() else { + return axum::http::StatusCode::OK; + }; + if text.trim().is_empty() { + return axum::http::StatusCode::OK; + } + + let from = msg.from.as_ref(); + let sender_name = from + .and_then(|u| u.username.as_deref()) + .unwrap_or("unknown"); + let display_name = from + .map(|u| { + let mut n = u.first_name.clone(); + if let Some(last) = &u.last_name { + n.push(' '); + n.push_str(last); + } + n + }) + .unwrap_or_else(|| "Unknown".into()); + + let mentions: Vec = msg + .entities + .iter() + .filter(|e| e.entity_type == "mention") + .filter_map(|e| { + text.get(e.offset..e.offset + e.length) + .map(|s| s.trim_start_matches('@').to_string()) + }) + .collect(); + + let event = GatewayEvent::new( + "telegram", + ChannelInfo { + id: msg.chat.id.to_string(), + channel_type: msg.chat.chat_type.clone(), + thread_id: msg.message_thread_id.map(|id| id.to_string()), + }, + SenderInfo { + id: from.map(|u| u.id.to_string()).unwrap_or_default(), + name: sender_name.into(), + display_name, + is_bot: from.map(|u| u.is_bot).unwrap_or(false), + }, + text, + &msg.message_id.to_string(), + mentions, + ); + + let json = serde_json::to_string(&event).unwrap(); + info!(chat_id = %msg.chat.id, sender = %sender_name, "telegram → gateway"); + let _ = state.event_tx.send(json); + axum::http::StatusCode::OK +} + +// --- Reply handler --- + +pub async fn handle_reply( + reply: &GatewayReply, + bot_token: &str, + client: &reqwest::Client, + event_tx: &tokio::sync::broadcast::Sender, + reaction_state: &Arc>>>, +) { + // Handle create_topic command + if reply.command.as_deref() == Some("create_topic") { + let req_id = reply.request_id.clone().unwrap_or_default(); + info!(chat_id = %reply.channel.id, "creating forum topic"); + let url = format!("https://api.telegram.org/bot{bot_token}/createForumTopic"); + let resp = client + .post(&url) + .json(&serde_json::json!({"chat_id": reply.channel.id, "name": reply.content.text})) + .send() + .await; + let gw_resp = match resp { + Ok(r) => { + let body: serde_json::Value = r.json().await.unwrap_or_default(); + if body["ok"].as_bool() == Some(true) { + let tid = body["result"]["message_thread_id"] + .as_i64() + .map(|id| id.to_string()); + info!(thread_id = ?tid, "forum topic created"); + GatewayResponse { + schema: "openab.gateway.response.v1".into(), + request_id: req_id, + success: true, + thread_id: tid, + error: None, + } + } else { + let err = body["description"] + .as_str() + .unwrap_or("unknown error") + .to_string(); + warn!(err = %err, "createForumTopic failed"); + GatewayResponse { + schema: "openab.gateway.response.v1".into(), + request_id: req_id, + success: false, + thread_id: None, + error: Some(err), + } + } + } + Err(e) => GatewayResponse { + schema: "openab.gateway.response.v1".into(), + request_id: req_id, + success: false, + thread_id: None, + error: Some(e.to_string()), + }, + }; + let json = serde_json::to_string(&gw_resp).unwrap(); + let _ = event_tx.send(json); + return; + } + + // Handle add_reaction / remove_reaction + if reply.command.as_deref() == Some("add_reaction") + || reply.command.as_deref() == Some("remove_reaction") + { + let msg_key = format!("{}:{}", reply.channel.id, reply.reply_to); + let emoji = &reply.content.text; + let tg_emoji = match emoji.as_str() { + "🆗" => "👍", + other => other, + }; + let is_add = reply.command.as_deref() == Some("add_reaction"); + { + let mut reactions = reaction_state.lock().await; + let set = reactions.entry(msg_key.clone()).or_default(); + if is_add { + if !set.contains(&tg_emoji.to_string()) { + set.push(tg_emoji.to_string()); + } + } else { + set.retain(|e| e != tg_emoji); + } + } + let current: Vec = { + let reactions = reaction_state.lock().await; + reactions + .get(&msg_key) + .map(|v| { + v.iter() + .map(|e| serde_json::json!({"type": "emoji", "emoji": e})) + .collect() + }) + .unwrap_or_default() + }; + let url = format!("https://api.telegram.org/bot{bot_token}/setMessageReaction"); + let _ = client + .post(&url) + .json(&serde_json::json!({ + "chat_id": reply.channel.id, + "message_id": reply.reply_to, + "reaction": current, + })) + .send() + .await + .map_err(|e| error!("telegram reaction error: {e}")); + return; + } + + // Normal send_message + info!( + chat_id = %reply.channel.id, + thread_id = ?reply.channel.thread_id, + "gateway → telegram" + ); + let url = format!("https://api.telegram.org/bot{bot_token}/sendMessage"); + let _ = client + .post(&url) + .json(&serde_json::json!({ + "chat_id": reply.channel.id, + "text": reply.content.text, + "message_thread_id": reply.channel.thread_id, + "parse_mode": "Markdown", + })) + .send() + .await + .map_err(|e| error!("telegram send error: {e}")); +} diff --git a/gateway/src/main.rs b/gateway/src/main.rs index 0c9b948b..40ccf950 100644 --- a/gateway/src/main.rs +++ b/gateway/src/main.rs @@ -1,375 +1,48 @@ +mod adapters; +mod schema; + use anyhow::Result; use axum::{ extract::State, response::IntoResponse, routing::{get, post}, - Json, Router, + Router, }; use futures_util::{SinkExt, StreamExt}; -use serde::{Deserialize, Serialize}; +use schema::GatewayReply; +use std::collections::HashMap; use std::sync::Arc; use tokio::sync::{broadcast, Mutex}; -use tracing::{error, info, warn}; - -// --- Event schema (ADR openab.gateway.event.v1) --- - -#[derive(Clone, Debug, Serialize, Deserialize)] -pub struct GatewayEvent { - pub schema: String, - pub event_id: String, - pub timestamp: String, - pub platform: String, - pub event_type: String, - pub channel: ChannelInfo, - pub sender: SenderInfo, - pub content: Content, - pub mentions: Vec, - pub message_id: String, -} - -#[derive(Clone, Debug, Serialize, Deserialize)] -pub struct ChannelInfo { - pub id: String, - #[serde(rename = "type")] - pub channel_type: String, - pub thread_id: Option, -} - -#[derive(Clone, Debug, Serialize, Deserialize)] -pub struct SenderInfo { - pub id: String, - pub name: String, - pub display_name: String, - pub is_bot: bool, -} - -#[derive(Clone, Debug, Serialize, Deserialize)] -pub struct Content { - #[serde(rename = "type")] - pub content_type: String, - pub text: String, -} - -// --- Reply schema (ADR openab.gateway.reply.v1) --- - -#[derive(Clone, Debug, Serialize, Deserialize)] -pub struct GatewayReply { - pub schema: String, - pub reply_to: String, - pub platform: String, - pub channel: ReplyChannel, - pub content: Content, - #[serde(default)] - pub command: Option, - #[serde(default)] - pub request_id: Option, -} - -#[derive(Clone, Debug, Serialize, Deserialize)] -pub struct ReplyChannel { - pub id: String, - pub thread_id: Option, -} - -/// Response from gateway back to OAB for commands (e.g. create_topic) -#[derive(Clone, Debug, Serialize, Deserialize)] -pub struct GatewayResponse { - pub schema: String, - pub request_id: String, - pub success: bool, - pub thread_id: Option, - pub error: Option, -} - -// --- Telegram types (minimal) --- - -#[derive(Debug, Deserialize)] -struct TelegramUpdate { - message: Option, -} - -#[derive(Debug, Deserialize)] -struct TelegramMessage { - message_id: i64, - message_thread_id: Option, - chat: TelegramChat, - from: Option, - text: Option, - #[serde(default)] - entities: Vec, -} - -#[derive(Debug, Deserialize)] -struct TelegramEntity { - #[serde(rename = "type")] - entity_type: String, - offset: usize, - length: usize, -} - -#[derive(Debug, Deserialize)] -struct TelegramChat { - id: i64, - #[serde(rename = "type")] - chat_type: String, - is_forum: Option, -} - -#[derive(Debug, Deserialize)] -struct TelegramUser { - id: i64, - first_name: String, - last_name: Option, - username: Option, - is_bot: bool, -} - -// --- App state --- - -struct AppState { - bot_token: String, - secret_token: Option, - ws_token: Option, - line_channel_secret: Option, - line_access_token: Option, - /// Broadcast channel: gateway → OAB (events) - event_tx: broadcast::Sender, -} - -// --- Telegram webhook handler --- - -async fn telegram_webhook( - State(state): State>, - headers: axum::http::HeaderMap, - Json(update): Json, -) -> axum::http::StatusCode { - // Validate secret_token if configured - if let Some(ref expected) = state.secret_token { - let provided = headers - .get("x-telegram-bot-api-secret-token") - .and_then(|v| v.to_str().ok()); - if provided != Some(expected.as_str()) { - warn!("webhook rejected: invalid or missing secret_token"); - return axum::http::StatusCode::UNAUTHORIZED; - } - } - let Some(msg) = update.message else { - return axum::http::StatusCode::OK; - }; - let Some(text) = msg.text.as_deref() else { - return axum::http::StatusCode::OK; - }; - // Skip empty messages - if text.trim().is_empty() { - return axum::http::StatusCode::OK; - } - - let from = msg.from.as_ref(); - let sender_name = from - .and_then(|u| u.username.as_deref()) - .unwrap_or("unknown"); - let display_name = from - .map(|u| { - let mut n = u.first_name.clone(); - if let Some(last) = &u.last_name { - n.push(' '); - n.push_str(last); - } - n - }) - .unwrap_or_else(|| "Unknown".into()); - - // Extract @mentions from entities - let mentions: Vec = msg - .entities - .iter() - .filter(|e| e.entity_type == "mention") - .filter_map(|e| { - text.get(e.offset..e.offset + e.length) - .map(|s| s.trim_start_matches('@').to_string()) - }) - .collect(); - - let event = GatewayEvent { - schema: "openab.gateway.event.v1".into(), - event_id: format!("evt_{}", uuid::Uuid::new_v4()), - timestamp: chrono::Utc::now().to_rfc3339(), - platform: "telegram".into(), - event_type: "message".into(), - channel: ChannelInfo { - id: msg.chat.id.to_string(), - channel_type: msg.chat.chat_type.clone(), - thread_id: msg.message_thread_id.map(|id| id.to_string()), - }, - sender: SenderInfo { - id: from.map(|u| u.id.to_string()).unwrap_or_default(), - name: sender_name.into(), - display_name, - is_bot: from.map(|u| u.is_bot).unwrap_or(false), - }, - content: Content { - content_type: "text".into(), - text: text.into(), - }, - mentions, - message_id: msg.message_id.to_string(), - }; - - let json = serde_json::to_string(&event).unwrap(); - info!(chat_id = %msg.chat.id, sender = %sender_name, "telegram → gateway"); - let _ = state.event_tx.send(json); - axum::http::StatusCode::OK -} - -// --- LINE types --- - -#[derive(Debug, Deserialize)] -struct LineWebhookBody { - events: Vec, -} - -#[derive(Debug, Deserialize)] -struct LineEvent { - #[serde(rename = "type")] - event_type: String, - source: Option, - message: Option, - #[serde(rename = "replyToken")] - reply_token: Option, -} - -#[derive(Debug, Deserialize)] -struct LineSource { - #[serde(rename = "type")] - source_type: String, - #[serde(rename = "userId")] - user_id: Option, - #[serde(rename = "groupId")] - group_id: Option, - #[serde(rename = "roomId")] - room_id: Option, -} - -#[derive(Debug, Deserialize)] -struct LineMessage { - id: String, - #[serde(rename = "type")] - message_type: String, - text: Option, -} - -// --- LINE webhook handler --- - -async fn line_webhook( - State(state): State>, - headers: axum::http::HeaderMap, - body: axum::body::Bytes, -) -> axum::http::StatusCode { - // Validate X-Line-Signature - if let Some(ref channel_secret) = state.line_channel_secret { - use base64::Engine; - use hmac::{Hmac, Mac}; - use sha2::Sha256; - - let signature = headers - .get("x-line-signature") - .and_then(|v| v.to_str().ok()); - let Some(signature) = signature else { - warn!("LINE webhook rejected: missing X-Line-Signature"); - return axum::http::StatusCode::UNAUTHORIZED; - }; - - let mut mac = Hmac::::new_from_slice(channel_secret.as_bytes()).expect("HMAC key"); - mac.update(&body); - let expected = - base64::engine::general_purpose::STANDARD.encode(mac.finalize().into_bytes()); - if signature != expected { - warn!("LINE webhook rejected: invalid signature"); - return axum::http::StatusCode::UNAUTHORIZED; - } - } - - let webhook_body: LineWebhookBody = match serde_json::from_slice(&body) { - Ok(b) => b, - Err(e) => { - warn!("LINE webhook parse error: {e}"); - return axum::http::StatusCode::BAD_REQUEST; - } - }; - - for event in webhook_body.events { - if event.event_type != "message" { - continue; - } - let Some(ref msg) = event.message else { - continue; - }; - if msg.message_type != "text" { - continue; - } - let Some(ref text) = msg.text else { - continue; - }; - if text.trim().is_empty() { - continue; - } - - let source = event.source.as_ref(); - let (channel_id, channel_type) = match source { - Some(s) if s.source_type == "group" => { - (s.group_id.clone().unwrap_or_default(), "group".to_string()) - } - Some(s) if s.source_type == "room" => { - (s.room_id.clone().unwrap_or_default(), "room".to_string()) - } - Some(s) => (s.user_id.clone().unwrap_or_default(), "user".to_string()), - None => continue, - }; - let user_id = source - .and_then(|s| s.user_id.as_deref()) - .unwrap_or("unknown"); - - let gateway_event = GatewayEvent { - schema: "openab.gateway.event.v1".into(), - event_id: format!("evt_{}", uuid::Uuid::new_v4()), - timestamp: chrono::Utc::now().to_rfc3339(), - platform: "line".into(), - event_type: "message".into(), - channel: ChannelInfo { - id: channel_id.clone(), - channel_type, - thread_id: None, - }, - sender: SenderInfo { - id: user_id.into(), - name: user_id.into(), - display_name: user_id.into(), - is_bot: false, - }, - content: Content { - content_type: "text".into(), - text: text.clone(), - }, - mentions: vec![], - message_id: msg.id.clone(), - }; - - let json = serde_json::to_string(&gateway_event).unwrap(); - info!(channel = %channel_id, sender = %user_id, "line → gateway"); - let _ = state.event_tx.send(json); - } - - axum::http::StatusCode::OK +use tracing::{info, warn}; + +// --- App state (shared across all adapters) --- + +pub struct AppState { + /// Telegram bot token (None if Telegram disabled) + pub telegram_bot_token: Option, + /// Telegram webhook secret token for request validation + pub telegram_secret_token: Option, + /// LINE channel secret for signature validation + pub line_channel_secret: Option, + /// LINE channel access token for reply API + pub line_access_token: Option, + /// WebSocket authentication token + pub ws_token: Option, + /// Teams adapter (None if Teams disabled) + pub teams: Option, + /// service_url cache for Teams reply routing (conversation_id → (service_url, last_seen)) + pub teams_service_urls: Mutex>, + /// Broadcast channel: gateway → OAB (events from all platforms) + pub event_tx: broadcast::Sender, } // --- WebSocket handler (OAB connects here) --- async fn ws_handler( State(state): State>, - query: axum::extract::Query>, + query: axum::extract::Query>, ws: axum::extract::WebSocketUpgrade, ) -> axum::response::Response { - // Validate WS token if configured if let Some(ref expected) = state.ws_token { let provided = query.get("token").map(|s| s.as_str()); if provided != Some(expected.as_str()) { @@ -386,9 +59,6 @@ async fn handle_oab_connection(state: Arc, socket: axum::extract::ws:: let (mut ws_tx, mut ws_rx) = socket.split(); let mut event_rx = state.event_tx.subscribe(); - // Channel for replies from this OAB client - let (reply_tx, mut reply_rx) = tokio::sync::mpsc::channel::(64); - info!("OAB client connected via WebSocket"); // Forward gateway events → OAB @@ -400,173 +70,64 @@ async fn handle_oab_connection(state: Arc, socket: axum::extract::ws:: break; } } - // No reply forwarding needed on this path — replies go to Telegram directly } } }); - // Receive OAB replies → Telegram - let bot_token = state.bot_token.clone(); - let line_access_token = state.line_access_token.clone(); - let event_tx_for_recv = state.event_tx.clone(); + // Receive OAB replies → route to correct platform + let state_for_recv = state.clone(); // Track per-message reaction state (Telegram replaces all reactions atomically) - let reaction_state: Arc>>> = - Arc::new(Mutex::new(std::collections::HashMap::new())); + let reaction_state: Arc>>> = + Arc::new(Mutex::new(HashMap::new())); let recv_task = tokio::spawn(async move { let client = reqwest::Client::new(); while let Some(Ok(msg)) = ws_rx.next().await { if let Message::Text(text) = msg { - match serde_json::from_str::(&text) { - Ok(reply) => { - // Handle create_topic command - if reply.command.as_deref() == Some("create_topic") { - let req_id = reply.request_id.clone().unwrap_or_default(); - info!(chat_id = %reply.channel.id, "creating forum topic"); - let url = format!( - "https://api.telegram.org/bot{}/createForumTopic", - bot_token - ); - let resp = client - .post(&url) - .json(&serde_json::json!({ - "chat_id": reply.channel.id, - "name": reply.content.text, - })) - .send() + // Check if it's a response to a pending command + if let Ok(resp) = serde_json::from_str::(&*text) { + if resp.schema == "openab.gateway.response.v1" { + let _ = state_for_recv.event_tx.send(text.to_string()); + continue; + } + } + + match serde_json::from_str::(&*text) { + Ok(reply) => match reply.platform.as_str() { + "telegram" => { + if let Some(ref token) = state_for_recv.telegram_bot_token { + adapters::telegram::handle_reply( + &reply, + token, + &client, + &state_for_recv.event_tx, + &reaction_state, + ) .await; - let gw_resp = match resp { - Ok(r) => { - let body: serde_json::Value = - r.json().await.unwrap_or_default(); - if body["ok"].as_bool() == Some(true) { - let tid = body["result"]["message_thread_id"] - .as_i64() - .map(|id| id.to_string()); - info!(thread_id = ?tid, "forum topic created"); - GatewayResponse { - schema: "openab.gateway.response.v1".into(), - request_id: req_id, - success: true, - thread_id: tid, - error: None, - } - } else { - let err = body["description"] - .as_str() - .unwrap_or("unknown error") - .to_string(); - warn!(err = %err, "createForumTopic failed"); - GatewayResponse { - schema: "openab.gateway.response.v1".into(), - request_id: req_id, - success: false, - thread_id: None, - error: Some(err), - } - } - } - Err(e) => GatewayResponse { - schema: "openab.gateway.response.v1".into(), - request_id: req_id, - success: false, - thread_id: None, - error: Some(e.to_string()), - }, - }; - // Send response back — need to use event_tx broadcast - let json = serde_json::to_string(&gw_resp).unwrap(); - let _ = event_tx_for_recv.send(json); - continue; + } else { + warn!("reply for telegram but adapter not configured"); + } } - - // Handle add_reaction / remove_reaction - // Telegram setMessageReaction replaces ALL reactions, so we track state - if reply.command.as_deref() == Some("add_reaction") - || reply.command.as_deref() == Some("remove_reaction") - { - let msg_key = format!("{}:{}", reply.channel.id, reply.reply_to); - let emoji = &reply.content.text; - // Map unsupported emojis to Telegram-compatible ones - let tg_emoji = match emoji.as_str() { - "🆗" => "👍", - other => other, - }; - let is_add = reply.command.as_deref() == Some("add_reaction"); - { - let mut reactions = reaction_state.lock().await; - let set = reactions.entry(msg_key.clone()).or_insert_with(Vec::new); - if is_add { - if !set.contains(&tg_emoji.to_string()) { - set.push(tg_emoji.to_string()); - } - } else { - set.retain(|e| e != tg_emoji); - } + "line" => { + if let Some(ref token) = state_for_recv.line_access_token { + adapters::line::handle_reply(&reply, token, &client).await; + } else { + warn!("reply for line but adapter not configured"); } - let current: Vec = { - let reactions = reaction_state.lock().await; - reactions - .get(&msg_key) - .map(|v| { - v.iter() - .map(|e| { - serde_json::json!({"type": "emoji", "emoji": e}) - }) - .collect() - }) - .unwrap_or_default() - }; - let url = format!( - "https://api.telegram.org/bot{}/setMessageReaction", - bot_token - ); - let _ = client - .post(&url) - .json(&serde_json::json!({ - "chat_id": reply.channel.id, - "message_id": reply.reply_to, - "reaction": current, - })) - .send() - .await - .map_err(|e| error!("telegram reaction error: {e}")); - continue; } - - // Normal send_message — route by platform - if reply.platform == "line" { - // LINE Push Message API - if let Some(ref token) = line_access_token { - info!(to = %reply.channel.id, "gateway → line"); - let _ = client - .post("https://api.line.me/v2/bot/message/push") - .bearer_auth(token) - .json(&serde_json::json!({ - "to": reply.channel.id, - "messages": [{"type": "text", "text": reply.content.text}] - })) - .send() - .await - .map_err(|e| error!("line send error: {e}")); + "teams" => { + if let Some(ref teams) = state_for_recv.teams { + adapters::teams::handle_reply( + &reply, + teams, + &state_for_recv.teams_service_urls, + ) + .await; + } else { + warn!("reply for teams but adapter not configured"); } - } else { - // Telegram sendMessage - info!(chat_id = %reply.channel.id, thread_id = ?reply.channel.thread_id, "gateway → telegram"); - let url = - format!("https://api.telegram.org/bot{}/sendMessage", bot_token); - let _ = client - .post(&url) - .json(&serde_json::json!({ - "chat_id": reply.channel.id, - "text": reply.content.text, - "message_thread_id": reply.channel.thread_id, - "parse_mode": "Markdown", - })) - .send() - .await - .map_err(|e| error!("telegram send error: {e}")); } - } + other => warn!(platform = other, "unknown reply platform"), + }, Err(e) => warn!("invalid reply from OAB: {e}"), } } @@ -580,8 +141,6 @@ async fn handle_oab_connection(state: Arc, socket: axum::extract::ws:: info!("OAB client disconnected"); } -// --- Health check --- - async fn health() -> &'static str { "ok" } @@ -594,41 +153,91 @@ async fn main() -> Result<()> { ) .init(); - let bot_token = std::env::var("TELEGRAM_BOT_TOKEN").expect("TELEGRAM_BOT_TOKEN must be set"); - let secret_token = std::env::var("TELEGRAM_SECRET_TOKEN").ok(); - let ws_token = std::env::var("GATEWAY_WS_TOKEN").ok(); - let line_channel_secret = std::env::var("LINE_CHANNEL_SECRET").ok(); - let line_access_token = std::env::var("LINE_CHANNEL_ACCESS_TOKEN").ok(); let listen_addr = std::env::var("GATEWAY_LISTEN").unwrap_or_else(|_| "0.0.0.0:8080".into()); - let webhook_path = - std::env::var("TELEGRAM_WEBHOOK_PATH").unwrap_or_else(|_| "/webhook/telegram".into()); + let ws_token = std::env::var("GATEWAY_WS_TOKEN").ok(); - if secret_token.is_none() { - warn!("TELEGRAM_SECRET_TOKEN not set — webhook requests are NOT validated (insecure)"); - } if ws_token.is_none() { warn!("GATEWAY_WS_TOKEN not set — WebSocket connections are NOT authenticated (insecure)"); } let (event_tx, _) = broadcast::channel::(256); + // --- Initialize adapters --- + + let mut app = Router::new() + .route("/ws", get(ws_handler)) + .route("/health", get(health)); + + // Telegram adapter + let telegram_bot_token = std::env::var("TELEGRAM_BOT_TOKEN").ok(); + let telegram_secret_token = std::env::var("TELEGRAM_SECRET_TOKEN").ok(); + if telegram_bot_token.is_some() { + let webhook_path = + std::env::var("TELEGRAM_WEBHOOK_PATH").unwrap_or_else(|_| "/webhook/telegram".into()); + if telegram_secret_token.is_none() { + warn!("TELEGRAM_SECRET_TOKEN not set — webhook requests are NOT validated (insecure)"); + } + info!(path = %webhook_path, "telegram adapter enabled"); + app = app.route(&webhook_path, post(adapters::telegram::webhook)); + } + + // LINE adapter + let line_channel_secret = std::env::var("LINE_CHANNEL_SECRET").ok(); + let line_access_token = std::env::var("LINE_CHANNEL_ACCESS_TOKEN").ok(); + if line_access_token.is_some() { + info!("line adapter enabled"); + app = app.route("/webhook/line", post(adapters::line::webhook)); + } + + // Teams adapter + let teams = adapters::teams::TeamsConfig::from_env().map(|config| { + info!("teams adapter enabled"); + adapters::teams::TeamsAdapter::new(config) + }); + if teams.is_some() { + let webhook_path = + std::env::var("TEAMS_WEBHOOK_PATH").unwrap_or_else(|_| "/webhook/teams".into()); + info!(path = %webhook_path, "teams webhook registered"); + app = app.route(&webhook_path, post(adapters::teams::webhook)); + } + + if telegram_bot_token.is_none() && line_access_token.is_none() && teams.is_none() { + warn!("no adapters configured — set TELEGRAM_BOT_TOKEN, LINE_CHANNEL_ACCESS_TOKEN, and/or TEAMS_APP_ID + TEAMS_APP_SECRET"); + } + let state = Arc::new(AppState { - bot_token, - secret_token, - ws_token, + telegram_bot_token, + telegram_secret_token, line_channel_secret, line_access_token, + ws_token, + teams, + teams_service_urls: Mutex::new(HashMap::new()), event_tx, }); - let app = Router::new() - .route(&webhook_path, post(telegram_webhook)) - .route("/webhook/line", post(line_webhook)) - .route("/ws", get(ws_handler)) - .route("/health", get(health)) - .with_state(state); + let app = app.with_state(state.clone()); + + // Periodic cleanup of stale Teams service_url entries (TTL: 4 hours) + tokio::spawn(async move { + let ttl = std::time::Duration::from_secs(4 * 3600); + loop { + tokio::time::sleep(std::time::Duration::from_secs(300)).await; + let mut urls = state.teams_service_urls.lock().await; + let before = urls.len(); + urls.retain(|_, (_, ts)| ts.elapsed() < ttl); + let evicted = before - urls.len(); + if evicted > 0 { + info!( + evicted, + remaining = urls.len(), + "teams service_url cache cleanup" + ); + } + } + }); - info!(addr = %listen_addr, webhook = %webhook_path, "gateway starting"); + info!(addr = %listen_addr, "gateway starting"); let listener = tokio::net::TcpListener::bind(&listen_addr).await?; axum::serve(listener, app).await?; Ok(()) diff --git a/gateway/src/schema.rs b/gateway/src/schema.rs new file mode 100644 index 00000000..339ea4c6 --- /dev/null +++ b/gateway/src/schema.rs @@ -0,0 +1,98 @@ +use serde::{Deserialize, Serialize}; + +// --- Event schema (ADR openab.gateway.event.v1) --- + +#[derive(Clone, Debug, Serialize, Deserialize)] +pub struct GatewayEvent { + pub schema: String, + pub event_id: String, + pub timestamp: String, + pub platform: String, + pub event_type: String, + pub channel: ChannelInfo, + pub sender: SenderInfo, + pub content: Content, + pub mentions: Vec, + pub message_id: String, +} + +#[derive(Clone, Debug, Serialize, Deserialize)] +pub struct ChannelInfo { + pub id: String, + #[serde(rename = "type")] + pub channel_type: String, + pub thread_id: Option, +} + +#[derive(Clone, Debug, Serialize, Deserialize)] +pub struct SenderInfo { + pub id: String, + pub name: String, + pub display_name: String, + pub is_bot: bool, +} + +#[derive(Clone, Debug, Serialize, Deserialize)] +pub struct Content { + #[serde(rename = "type")] + pub content_type: String, + pub text: String, +} + +// --- Reply schema (ADR openab.gateway.reply.v1) --- + +#[derive(Clone, Debug, Serialize, Deserialize)] +pub struct GatewayReply { + pub schema: String, + pub reply_to: String, + pub platform: String, + pub channel: ReplyChannel, + pub content: Content, + #[serde(default)] + pub command: Option, + #[serde(default)] + pub request_id: Option, +} + +#[derive(Clone, Debug, Serialize, Deserialize)] +pub struct ReplyChannel { + pub id: String, + pub thread_id: Option, +} + +/// Response from gateway back to OAB for commands (e.g. create_topic) +#[derive(Clone, Debug, Serialize, Deserialize)] +pub struct GatewayResponse { + pub schema: String, + pub request_id: String, + pub success: bool, + pub thread_id: Option, + pub error: Option, +} + +impl GatewayEvent { + pub fn new( + platform: &str, + channel: ChannelInfo, + sender: SenderInfo, + text: &str, + message_id: &str, + mentions: Vec, + ) -> Self { + Self { + schema: "openab.gateway.event.v1".into(), + event_id: format!("evt_{}", uuid::Uuid::new_v4()), + timestamp: chrono::Utc::now().to_rfc3339(), + platform: platform.into(), + event_type: "message".into(), + channel, + sender, + content: Content { + content_type: "text".into(), + text: text.into(), + }, + mentions, + message_id: message_id.into(), + } + } +} From 4fa8f77439e496975ae76d5496f81236dc65f89e Mon Sep 17 00:00:00 2001 From: masami-agent Date: Sat, 25 Apr 2026 01:38:13 +0000 Subject: [PATCH 2/3] fix(gateway/teams): improve outbound reply path - Add textFormat: markdown to send_activity for proper Teams rendering - Add replyToId to send_activity so replies thread correctly in Teams - Handle add_reaction/remove_reaction commands gracefully (no-op, Teams has limited reaction API support) - Refresh service_url TTL on outbound reply to prevent expiry during active conversations --- gateway/src/adapters/teams.rs | 37 ++++++++++++++++++++++++++++++----- 1 file changed, 32 insertions(+), 5 deletions(-) diff --git a/gateway/src/adapters/teams.rs b/gateway/src/adapters/teams.rs index 04f82e78..01171c68 100644 --- a/gateway/src/adapters/teams.rs +++ b/gateway/src/adapters/teams.rs @@ -275,6 +275,7 @@ impl TeamsAdapter { service_url: &str, conversation_id: &str, text: &str, + reply_to_id: Option<&str>, ) -> anyhow::Result { let token = self.get_token().await?; let url = format!( @@ -283,10 +284,14 @@ impl TeamsAdapter { conversation_id ); - let body = serde_json::json!({ + let mut body = serde_json::json!({ "type": "message", "text": text, + "textFormat": "markdown", }); + if let Some(id) = reply_to_id { + body["replyToId"] = serde_json::Value::String(id.to_string()); + } let resp = self .client @@ -473,10 +478,21 @@ pub async fn handle_reply( std::collections::HashMap, >, ) { + // Reactions are not supported on Teams — silently ignore + if reply.command.as_deref() == Some("add_reaction") + || reply.command.as_deref() == Some("remove_reaction") + { + return; + } + let service_url = { - let urls = service_urls.lock().await; - match urls.get(&reply.channel.id) { - Some((url, _)) => url.clone(), + let mut urls = service_urls.lock().await; + match urls.get_mut(&reply.channel.id) { + Some((url, ts)) => { + // Refresh timestamp on reply to prevent TTL expiry during active conversations + *ts = std::time::Instant::now(); + url.clone() + } None => { error!(conversation = %reply.channel.id, "teams: no service_url for conversation"); return; @@ -484,9 +500,20 @@ pub async fn handle_reply( } }; + let reply_to_id = if reply.reply_to.is_empty() { + None + } else { + Some(reply.reply_to.as_str()) + }; + info!(conversation = %reply.channel.id, "gateway → teams"); match teams - .send_activity(&service_url, &reply.channel.id, &reply.content.text) + .send_activity( + &service_url, + &reply.channel.id, + &reply.content.text, + reply_to_id, + ) .await { Ok(id) => debug!(activity_id = %id, "teams activity sent"), From 72b9c7c0068195cedcc26dd6250d38b40746cca1 Mon Sep 17 00:00:00 2001 From: masami-agent Date: Sat, 25 Apr 2026 01:42:24 +0000 Subject: [PATCH 3/3] fix(gateway/teams): add required Activity.From field to outbound replies Bot Framework returns 400 Bad Request without Activity.From. Add from: { id: app_id } to both send_activity() and update_activity(). --- gateway/src/adapters/teams.rs | 2 ++ 1 file changed, 2 insertions(+) diff --git a/gateway/src/adapters/teams.rs b/gateway/src/adapters/teams.rs index 01171c68..a8f03231 100644 --- a/gateway/src/adapters/teams.rs +++ b/gateway/src/adapters/teams.rs @@ -286,6 +286,7 @@ impl TeamsAdapter { let mut body = serde_json::json!({ "type": "message", + "from": { "id": &self.config.app_id }, "text": text, "textFormat": "markdown", }); @@ -329,6 +330,7 @@ impl TeamsAdapter { let body = serde_json::json!({ "type": "message", + "from": { "id": &self.config.app_id }, "text": text, });