diff --git a/docs/06-cli.md b/docs/06-cli.md index 4f0009b..4d64966 100644 --- a/docs/06-cli.md +++ b/docs/06-cli.md @@ -44,6 +44,28 @@ wasila gateway add customer webhook Enables the first generic HTTP webhook gateway for customer conversations. +Now supported customer gateway types are: + +```bash +wasila gateway add customer webhook +wasila gateway add customer telegram +wasila gateway add customer whatsapp +``` + +When customer gateway type is `telegram` or `whatsapp`, daemon also accepts dedicated endpoints: + +```bash +POST /webhook/telegram +POST /webhook/whatsapp +``` + +and fallback endpoint: + +```bash +POST /webhook/customer +POST /customer +``` + ### Add Owner Gateway ```bash @@ -60,6 +82,7 @@ wasila gateway add owner hermes ``` OpenClaw and Hermes are configured as dedicated gateway types and currently resolve through the same webhook transport for bootstrap compatibility. +Customer gateway expansion (`telegram`, `whatsapp`) is now supported in Stage 2 and currently shares the same webhook transport for bootstrap compatibility. ### Start Daemon diff --git a/src/wasila/cli/main.py b/src/wasila/cli/main.py index 2080d96..388cf26 100644 --- a/src/wasila/cli/main.py +++ b/src/wasila/cli/main.py @@ -72,7 +72,11 @@ def build_parser() -> argparse.ArgumentParser: gateway_sub = gateway_parser.add_subparsers(dest="gateway_command", required=True) gateway_add = gateway_sub.add_parser("add", help="Set gateway type and metadata.") gateway_add.add_argument("role", choices=["customer", "owner"]) - gateway_add.add_argument("type", choices=["webhook", "openclaw", "hermes"]) + gateway_add.add_argument( + "type", + choices=["webhook", "telegram", "whatsapp", "openclaw", "hermes"], + help="Gateway type to use for the selected role.", + ) gateway_add.add_argument("--metadata", action="append", default=[], help="KEY=VALUE metadata") gateway_add.set_defaults(func=handle_gateway_add) @@ -163,8 +167,10 @@ def handle_provider_set(args: argparse.Namespace) -> None: def handle_gateway_add(args: argparse.Namespace) -> None: config = _load_or_default(args.config) metadata = _parse_metadata_args(args.metadata) - if args.role == "customer" and args.type not in {"webhook"}: - raise SystemExit("customer gateway currently only supports webhook in Stage 1") + if args.role == "customer" and args.type not in {"webhook", "telegram", "whatsapp"}: + raise SystemExit("customer gateway only supports webhook, telegram, or whatsapp") + if args.role == "owner" and args.type not in {"webhook", "openclaw", "hermes"}: + raise SystemExit("owner gateway only supports webhook, openclaw, or hermes") if args.role == "customer": config.customer_gateway = GatewayConfig( type=args.type, @@ -190,8 +196,8 @@ def handle_daemon_start(args: argparse.Namespace) -> None: config.customer_gateway.metadata, ) - def process(event_payload: dict[str, Any]) -> dict[str, Any]: - event = gateway.normalize(event_payload) + def process(event_payload: dict[str, Any] | CustomerEvent) -> dict[str, Any]: + event = event_payload if isinstance(event_payload, CustomerEvent) else gateway.normalize(event_payload) result = workflow.run(event) return { "customer_response": result.customer_response, @@ -200,7 +206,17 @@ def process(event_payload: dict[str, Any]) -> dict[str, Any]: "gateway": event.gateway, } - daemon = WebhookDaemon(handler=process, gateway=gateway, host=args.host, port=args.port) + route_gateways = {} + if gateway.name in {"telegram", "whatsapp"}: + route_gateways[f"/webhook/{gateway.name}"] = gateway + + daemon = WebhookDaemon( + handler=process, + gateway=gateway, + route_gateways=route_gateways, + host=args.host, + port=args.port, + ) daemon.start() diff --git a/src/wasila/gateways/__init__.py b/src/wasila/gateways/__init__.py index aff3391..d1c28db 100644 --- a/src/wasila/gateways/__init__.py +++ b/src/wasila/gateways/__init__.py @@ -2,13 +2,179 @@ from __future__ import annotations -from typing import Any - +from wasila.core.contracts import CustomerEvent from wasila.core.ports import CustomerGateway, OwnerGateway from wasila.gateways.webhook import WebhookCustomerGateway, WebhookOwnerGateway +class TelegramCustomerGateway(WebhookCustomerGateway): + """Customer gateway adapter for Telegram payloads. + + Stage 2 keeps Telegram transport compatible with webhook payload ingestion. + """ + + def __init__(self, metadata: dict[str, str] | None = None) -> None: + super().__init__(metadata=metadata) + self.name = "telegram" + + def normalize(self, payload: dict) -> CustomerEvent: + message = payload.get("message") + raw_text = message if isinstance(message, str) else ( + message.get("text") + if isinstance(message, dict) + else "" + ) + if raw_text == "" and isinstance(message, dict): + raw_text = message.get("body") or message.get("caption") or "" + if raw_text == "": + raw_text = payload.get("text") or payload.get("body") or "" + + if not isinstance(raw_text, str): + raw_text = "" + + sender = payload.get("from") + if not isinstance(sender, dict): + sender = payload.get("sender") + if not isinstance(sender, dict) and isinstance(message, dict): + sender = message.get("from") + + event_id = payload.get("event_id") + if event_id is not None and not isinstance(event_id, str): + event_id = str(event_id) + sender_id = "" + sender_name = "" + if isinstance(sender, dict): + sender_id = str( + sender.get("id") + or sender.get("username") + or sender.get("first_name") + or "" + ) + sender_name = str(sender.get("first_name") or sender.get("username") or "") + + return CustomerEvent( + gateway=payload.get("gateway", self.metadata.get("id", "telegram")), + gateway_role=payload.get("gateway_role", "customer"), + external_conversation_id=str( + payload.get("external_conversation_id") + or (payload.get("chat") or {}).get("id") + or payload.get("conversation_id") + or "" + ), + external_customer_id=str(payload.get("external_customer_id") or payload.get("customer_id") or sender_id or ""), + message_text=raw_text, + message_timestamp=payload.get("message_timestamp") or "", + id=event_id, + customer_id=payload.get("customer_id"), + metadata_json={ + "name": sender_name or payload.get("name") or payload.get("display_name"), + "source": "telegram", + "raw": payload, + }, + ) + + +class WhatsAppCustomerGateway(WebhookCustomerGateway): + """Customer gateway adapter for WhatsApp payloads. + + Stage 2 keeps WhatsApp transport compatible with webhook payload ingestion. + """ + + def __init__(self, metadata: dict[str, str] | None = None) -> None: + super().__init__(metadata=metadata) + self.name = "whatsapp" + + def normalize(self, payload: dict) -> CustomerEvent: + raw_text = payload.get("message") or payload.get("body") or "" + raw_msg: dict | None = None + if isinstance(payload.get("entry"), list) and payload["entry"]: + first_entry = payload["entry"][0] + if isinstance(first_entry, dict): + changes = first_entry.get("changes") + if isinstance(changes, list) and changes: + raw_msg = changes[0] + if isinstance(raw_msg, dict): + value = raw_msg.get("value") + if isinstance(value, dict): + messages = value.get("messages") + if isinstance(messages, list) and messages: + raw_msg = messages[0] + if isinstance(raw_msg, dict): + text_candidate = ( + raw_msg.get("text") + if isinstance(raw_msg.get("text"), str) + else ( + raw_msg.get("text", {}).get("body") + if isinstance(raw_msg.get("text"), dict) + else None + ) + ) + if isinstance(text_candidate, str): + raw_text = text_candidate + else: + raw_text = raw_msg.get("message") or raw_msg.get("body") or raw_text + + payload["from"] = raw_msg.get("from", first_entry.get("from")) + + if isinstance(raw_text, dict): + raw_text = "" + if not isinstance(raw_text, str): + raw_text = str(raw_text) + + event_id = payload.get("event_id") + if event_id is not None and not isinstance(event_id, str): + event_id = str(event_id) + + sender = payload.get("from") + if isinstance(raw_msg, dict) and isinstance(raw_msg.get("from"), dict): + sender = raw_msg.get("from") + + sender_id = "" + sender_name = "" + if isinstance(sender, dict): + sender_id = str( + sender.get("id") + or sender.get("wa_id") + or sender.get("phone") + or sender.get("username") + or "" + ) + sender_name = str(sender.get("name") or sender.get("profile_name") or sender_id) + else: + sender_id = str(sender or "") + + if event_id is None and isinstance(raw_msg, dict): + msg_event_id = raw_msg.get("id") + if msg_event_id is not None and not isinstance(msg_event_id, str): + msg_event_id = str(msg_event_id) + if msg_event_id is not None: + event_id = msg_event_id + + return CustomerEvent( + gateway=payload.get("gateway", self.metadata.get("id", "whatsapp")), + gateway_role=payload.get("gateway_role", "customer"), + external_conversation_id=str( + payload.get("external_conversation_id") + or (raw_msg or {}).get("conversation") + or payload.get("conversation_id") + or payload.get("wa_id") + or sender_id + or "" + ), + external_customer_id=str(payload.get("external_customer_id") or payload.get("customer_id") or sender_id or ""), + message_text=raw_text, + message_timestamp=payload.get("message_timestamp") or "", + id=event_id, + customer_id=payload.get("customer_id"), + metadata_json={ + "name": sender_name or payload.get("name") or payload.get("display_name"), + "source": "whatsapp", + "raw": payload, + }, + ) + + class OpenClawOwnerGateway(WebhookOwnerGateway): """Owner gateway adapter for OpenClaw. @@ -35,6 +201,10 @@ def __init__(self, metadata: dict[str, str] | None = None) -> None: def build_customer_gateway(gateway_type: str, metadata: dict[str, str] | None = None) -> CustomerGateway: if gateway_type == "webhook": return WebhookCustomerGateway(metadata=metadata) + if gateway_type == "telegram": + return TelegramCustomerGateway(metadata=metadata) + if gateway_type == "whatsapp": + return WhatsAppCustomerGateway(metadata=metadata) raise ValueError(f"unsupported customer gateway type: {gateway_type}") @@ -52,6 +222,8 @@ def build_owner_gateway(gateway_type: str, metadata: dict[str, str] | None = Non __all__ = [ "WebhookCustomerGateway", "WebhookOwnerGateway", + "TelegramCustomerGateway", + "WhatsAppCustomerGateway", "OpenClawOwnerGateway", "HermesOwnerGateway", "build_customer_gateway", diff --git a/src/wasila/gateways/webhook.py b/src/wasila/gateways/webhook.py index 4127823..01cd8a2 100644 --- a/src/wasila/gateways/webhook.py +++ b/src/wasila/gateways/webhook.py @@ -76,15 +76,18 @@ def deliver(self, notification: OwnerNotification) -> None: class WebhookDaemon: def __init__( self, - handler: Callable[[dict[str, Any]], dict[str, Any]], + handler: Callable[[Any], dict[str, Any]], gateway: CustomerGateway | None = None, + route_gateways: dict[str, CustomerGateway] | None = None, host: str = "127.0.0.1", port: int = 8000, ): self.handler = handler self.gateway = gateway or WebhookCustomerGateway() + self.route_gateways = route_gateways or {} self.host = host self.port = port + self._httpd: ThreadingHTTPServer | None = None def start(self) -> None: gateway = self.gateway @@ -105,27 +108,56 @@ def do_GET(inner_self): # noqa: N802 inner_self.end_headers() def do_POST(inner_self): # noqa: N802 - if inner_self.path not in {"/webhook/customer", "/customer"}: + path = inner_self.path.split("?", 1)[0] + request_gateway = route_gateways_for_path(path) + if request_gateway is None: inner_self.send_response(404) inner_self.end_headers() return length = int(inner_self.headers.get("Content-Length", "0") or "0") - raw = inner_self.rfile.read(length) + raw = inner_self.rfile.read(length) if length > 0 else b"" + + if not raw: + inner_self.send_response(400) + inner_self.send_header("Content-Type", "application/json") + response = { + "ok": False, + "error": "invalid json: empty body", + } + raw_response = json.dumps(response).encode("utf-8") + inner_self.send_header("Content-Length", str(len(raw_response))) + inner_self.end_headers() + inner_self.wfile.write(raw_response) + return + try: payload = json.loads(raw.decode("utf-8")) except json.JSONDecodeError as exc: inner_self.send_response(400) - inner_self.end_headers() + inner_self.send_header("Content-Type", "application/json") response = {"ok": False, "error": f"invalid json: {exc}"} raw_response = json.dumps(response).encode("utf-8") + inner_self.send_header("Content-Length", str(len(raw_response))) + inner_self.end_headers() + inner_self.wfile.write(raw_response) + return + except UnicodeDecodeError as exc: + inner_self.send_response(400) + inner_self.send_header("Content-Type", "application/json") + response = {"ok": False, "error": f"invalid body encoding: {exc}"} + raw_response = json.dumps(response).encode("utf-8") + inner_self.send_header("Content-Length", str(len(raw_response))) + inner_self.end_headers() inner_self.wfile.write(raw_response) return try: - event = gateway.normalize(payload) - outcome = self.handler(event.__dict__) + event = request_gateway.normalize(payload) + outcome = self.handler(event) except Exception as exc: inner_self.send_response(500) + inner_self.send_header("Content-Type", "application/json") + inner_self.send_header("Content-Length", str(len(json.dumps({"ok": False, "error": str(exc)}).encode("utf-8")))) inner_self.end_headers() response = {"ok": False, "error": str(exc)} raw_response = json.dumps(response).encode("utf-8") @@ -147,7 +179,24 @@ def do_POST(inner_self): # noqa: N802 print(f"starting webhook daemon on {self.host}:{self.port}") httpd = ThreadingHTTPServer((self.host, self.port), _RequestHandler) + self._httpd = httpd + + def route_gateways_for_path(path: str) -> CustomerGateway | None: + if path in {"/webhook/customer", "/customer"}: + return self.gateway + + if path.startswith("/webhook/"): + return self.route_gateways.get(path) + + return None + try: httpd.serve_forever() finally: httpd.server_close() + + def stop(self) -> None: + if self._httpd is not None: + self._httpd.shutdown() + self._httpd.server_close() + self._httpd = None diff --git a/tests/test_gateway_normalization.py b/tests/test_gateway_normalization.py new file mode 100644 index 0000000..c1fdad5 --- /dev/null +++ b/tests/test_gateway_normalization.py @@ -0,0 +1,97 @@ +import unittest + +from wasila.gateways import TelegramCustomerGateway, WhatsAppCustomerGateway + + +class GatewayNormalizationTests(unittest.TestCase): + def test_telegram_normalization_parses_nested_payload(self): + gateway = TelegramCustomerGateway(metadata={"id": "telegram-webhook"}) + payload = { + "message": {"text": "How to reset password?"}, + "chat": {"id": "conv_123"}, + "from": {"id": "cust_007", "first_name": "Ari"}, + } + + event = gateway.normalize(payload) + self.assertEqual(event.gateway, "telegram-webhook") + self.assertEqual(event.message_text, "How to reset password?") + self.assertEqual(event.external_customer_id, "cust_007") + self.assertEqual(event.external_conversation_id, "conv_123") + self.assertEqual(event.metadata_json["name"], "Ari") + + def test_telegram_normalization_handles_nested_sender_and_external_conversation(self): + gateway = TelegramCustomerGateway(metadata={"id": "telegram-webhook"}) + payload = { + "external_conversation_id": "canonical_conv", + "message": {"text": "Status update", "from": {"id": "cust_nested", "first_name": "Bima"}}, + "chat": None, + } + + event = gateway.normalize(payload) + self.assertEqual(event.external_customer_id, "cust_nested") + self.assertEqual(event.external_conversation_id, "canonical_conv") + self.assertEqual(event.message_text, "Status update") + + def test_whatsapp_normalization_parses_nested_messages(self): + gateway = WhatsAppCustomerGateway(metadata={"id": "wa-webhook"}) + payload = { + "entry": [ + { + "id": "123", + "changes": [ + { + "value": { + "messages": [ + { + "from": "6281234567", + "text": {"body": "Need invoice"}, + } + ], + } + } + ], + } + ] + } + + event = gateway.normalize(payload) + self.assertEqual(event.gateway, "wa-webhook") + self.assertEqual(event.message_text, "Need invoice") + self.assertEqual(event.external_customer_id, "6281234567") + self.assertEqual(event.external_conversation_id, "6281234567") + + def test_whatsapp_normalization_uses_nested_event_id_and_external_conversation(self): + gateway = WhatsAppCustomerGateway(metadata={"id": "wa-webhook"}) + payload = { + "external_conversation_id": "wa_conv_custom", + "entry": [ + { + "changes": [ + { + "value": { + "messages": [ + { + "id": "wamid.12345", + "from": { + "id": "6281234567", + "profile": {"name": "Budi"}, + }, + "text": {"body": "Tagihan belum dibayar"}, + } + ], + } + } + ], + } + ], + } + + event = gateway.normalize(payload) + self.assertEqual(event.id, "wamid.12345") + self.assertEqual(event.external_conversation_id, "wa_conv_custom") + self.assertEqual(event.external_customer_id, "6281234567") + self.assertEqual(event.message_text, "Tagihan belum dibayar") + + +if __name__ == "__main__": + unittest.main() diff --git a/tests/test_gateway_types.py b/tests/test_gateway_types.py index 23420c5..6b66073 100644 --- a/tests/test_gateway_types.py +++ b/tests/test_gateway_types.py @@ -4,10 +4,16 @@ class GatewayTypeTests(unittest.TestCase): - def test_build_customer_gateway_only_supports_webhook_now(self): + def test_build_customer_gateway_supports_webhook_and_stage2_customer_types(self): customer_gateway = build_customer_gateway("webhook", {"id": "sbx"}) + customer_telegram = build_customer_gateway("telegram", {"id": "tg"}) + customer_whatsapp = build_customer_gateway("whatsapp", {"id": "wa"}) self.assertEqual(customer_gateway.name, "webhook") + self.assertEqual(customer_telegram.name, "telegram") + self.assertEqual(customer_whatsapp.name, "whatsapp") self.assertTrue(callable(getattr(customer_gateway, "normalize", None))) + self.assertTrue(callable(getattr(customer_telegram, "normalize", None))) + self.assertTrue(callable(getattr(customer_whatsapp, "normalize", None))) with self.assertRaises(ValueError): build_customer_gateway("openclaw", {}) diff --git a/tests/test_webhook_daemon.py b/tests/test_webhook_daemon.py new file mode 100644 index 0000000..e85d41a --- /dev/null +++ b/tests/test_webhook_daemon.py @@ -0,0 +1,114 @@ +import json +import socket +import threading +import time +import unittest +from contextlib import closing +from urllib.error import HTTPError +from urllib.request import Request, urlopen + +from wasila.core.contracts import CustomerEvent +from wasila.gateways.webhook import WebhookCustomerGateway, WebhookDaemon + + +def _free_port() -> int: + with closing(socket.socket(socket.AF_INET, socket.SOCK_STREAM)) as sock: + sock.bind(("127.0.0.1", 0)) + return sock.getsockname()[1] + + +def _wait_for_service(port: int) -> None: + deadline = time.time() + 5 + while time.time() < deadline: + try: + with urlopen(f"http://127.0.0.1:{port}/health", timeout=0.5): + return + except Exception: + time.sleep(0.05) + raise AssertionError("daemon did not become healthy in time") + + +class WebhookDaemonTests(unittest.TestCase): + def test_webhook_customer_endpoint_accepts_message(self) -> None: + captured: list[CustomerEvent] = [] + gateway = WebhookCustomerGateway(metadata={"id": "webhook-it"}) + port = _free_port() + + def handle(event: CustomerEvent) -> dict[str, object]: + captured.append(event) + return { + "customer_response": "ok", + "metadata": {"received": event.external_customer_id}, + "customer_id": event.customer_id, + } + + daemon = WebhookDaemon( + handler=handle, + gateway=gateway, + host="127.0.0.1", + port=port, + ) + thread = threading.Thread(target=daemon.start, daemon=True) + thread.start() + + try: + _wait_for_service(port) + request = Request( + f"http://127.0.0.1:{port}/webhook/customer", + method="POST", + data=json.dumps( + { + "message": "Hello", + "customer_id": "cust_001", + "external_customer_id": "cust_ext_001", + }, + ).encode("utf-8"), + headers={"Content-Type": "application/json"}, + ) + with urlopen(request, timeout=2) as response: + body = json.loads(response.read().decode("utf-8")) + + self.assertEqual(response.status, 200) + self.assertTrue(body["ok"]) + self.assertEqual(body["customer_id"], "cust_001") + self.assertEqual(len(captured), 1) + self.assertEqual(captured[0].external_customer_id, "cust_ext_001") + finally: + daemon.stop() + thread.join(timeout=1.0) + + def test_webhook_customer_endpoint_rejects_empty_body(self) -> None: + gateway = WebhookCustomerGateway(metadata={"id": "webhook-it"}) + port = _free_port() + + def handle(event: CustomerEvent) -> dict[str, object]: + self.fail("handler should not be called for empty body") + return {} + + daemon = WebhookDaemon( + handler=handle, + gateway=gateway, + host="127.0.0.1", + port=port, + ) + thread = threading.Thread(target=daemon.start, daemon=True) + thread.start() + + try: + _wait_for_service(port) + request = Request( + f"http://127.0.0.1:{port}/webhook/customer", + method="POST", + data=b"", + headers={"Content-Type": "application/json"}, + ) + with self.assertRaises(HTTPError) as context: + urlopen(request, timeout=2) + self.assertEqual(context.exception.code, 400) + finally: + daemon.stop() + thread.join(timeout=1.0) + + +if __name__ == "__main__": + unittest.main()