Skip to content

Commit abbf874

Browse files
authored
feat: webhook system with retain.completed event, UI, and docs (#487)
* doc: update cookbook * fix(cookbook): preserve tag keys during sync, strip local .md links - Fix extract_tags_from_readme/notebook to return dict[str,str] preserving sdk/topic keys instead of bare values, preventing topics like "Customer Service" from being misclassified as SDK - Add strip_local_md_links() to remove relative .md references that would cause broken link errors in Docusaurus build * ci: run test-doc-examples independently without waiting for test-rust-cli Build the CLI directly in the job instead of downloading the artifact, so test-doc-examples can start at the beginning in parallel with all other jobs. * feat: webhook system with task-owned retry, retain.completed event, and UI - New webhook system: register per-bank webhooks with HMAC signing, configurable HTTP method/timeout/headers/params (http_config JSONB), and PATCH support - Webhook deliveries run as async_operations (webhook_delivery type) with task-owned retry via RetryTaskAt exception and exponential backoff (60s / 5m / 30m / 2h / 8h, max 6 attempts) - New retain.completed event fires per-document for both sync and async retain - Delivery debug info (status code, response body) stored in result_metadata - Control plane UI: webhooks tab per bank with create/edit/delete and a deliveries table with cursor pagination and expandable response details - 28 webhook tests covering HMAC signing, delivery retries, CRUD endpoints, PATCH update, and retain.completed queuing - Docs page at developer/api/webhooks documenting event payloads and delivery - OpenAPI spec and all client SDKs (Python, TypeScript, Rust, Go) regenerated * fix: update tests for task-owned retry model and guard _webhook_manager attribute - test_worker.py: test_executor_exception_triggers_retry now raises RetryTaskAt (plain exceptions are immediate failures in the new system); rename test_executor_exception_marks_failed_after_max_retries to test_executor_exception_marks_failed_immediately to reflect new semantics - test_batch_api.py: remove max_retries kwarg from WorkerPoller constructor - memory_engine.py: use getattr for _webhook_manager in _fire_retain_webhook to avoid AttributeError when engine is created without __init__ (tests) * fix: remove max_retries from benchmark WorkerPoller call * fix(webhooks): transactional outbox, observations_deleted tracking, sidebar - Queue webhook delivery rows atomically with the primary operation using the transactional outbox pattern — prevents lost events on process crash: - Retain (sync + async): outbox_callback passed into orchestrator.retain_batch and called inside the DB transaction, replacing the post-commit fire call - Consolidation: new _mark_operation_completed_and_fire_webhook combines the status UPDATE and webhook INSERT in one transaction - Added fire_event_with_conn() to WebhookManager for in-connection delivery - Track observations_deleted count in consolidation stats and expose it in the consolidation.completed webhook payload (was always None) - Add Webhooks page to docs sidebar - Document at-least-once delivery guarantee with operation_id dedup guidance * fix(ui): add retain.completed to available webhook event types * feat(ui): add delete confirmation dialog for webhooks * fix(webhooks): include operation_id in task_payload so delivery is marked completed The task_payload JSON was missing the operation_id field, causing execute_task to see operation_id=None and skip _mark_operation_completed — leaving every delivery row stuck in 'pending' forever. Added a test that inserts a real async_operations row and verifies the status transitions to 'completed' after a successful execute_task call. * style: fix prettier formatting in webhooks-view
1 parent 51d2fc5 commit abbf874

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

52 files changed

+10887
-240
lines changed
Lines changed: 62 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,62 @@
1+
"""Add webhooks table and next_retry_at to async_operations.
2+
3+
Webhook deliveries are handled as async_operations tasks (operation_type='webhook_delivery')
4+
rather than a dedicated webhook_deliveries table.
5+
6+
Revision ID: e4f5a6b7c8d9
7+
Revises: d2e3f4a5b6c7
8+
Create Date: 2026-03-04
9+
"""
10+
11+
from collections.abc import Sequence
12+
13+
from alembic import context, op
14+
15+
revision: str = "e4f5a6b7c8d9"
16+
down_revision: str | Sequence[str] | None = "d2e3f4a5b6c7"
17+
branch_labels: str | Sequence[str] | None = None
18+
depends_on: str | Sequence[str] | None = None
19+
20+
21+
def _get_schema_prefix() -> str:
22+
schema = context.config.get_main_option("target_schema")
23+
return f'"{schema}".' if schema else ""
24+
25+
26+
def upgrade() -> None:
27+
schema = _get_schema_prefix()
28+
29+
op.execute(
30+
f"""
31+
CREATE TABLE IF NOT EXISTS {schema}webhooks (
32+
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
33+
bank_id TEXT,
34+
url TEXT NOT NULL,
35+
secret TEXT,
36+
event_types TEXT[] NOT NULL DEFAULT '{{}}',
37+
enabled BOOLEAN NOT NULL DEFAULT TRUE,
38+
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
39+
updated_at TIMESTAMPTZ NOT NULL DEFAULT NOW()
40+
)
41+
"""
42+
)
43+
44+
# Index for bank-scoped webhook lookup
45+
op.execute(f"CREATE INDEX IF NOT EXISTS idx_webhooks_bank_id ON {schema}webhooks(bank_id)")
46+
47+
# Add next_retry_at to async_operations for task-owned retry scheduling
48+
op.execute(f"ALTER TABLE {schema}async_operations ADD COLUMN IF NOT EXISTS next_retry_at TIMESTAMPTZ NULL")
49+
50+
# Index for polling: status + next_retry_at
51+
op.execute(
52+
f"CREATE INDEX IF NOT EXISTS idx_async_operations_status_retry "
53+
f"ON {schema}async_operations(status, next_retry_at)"
54+
)
55+
56+
57+
def downgrade() -> None:
58+
schema = _get_schema_prefix()
59+
op.execute(f"DROP INDEX IF EXISTS {schema}idx_async_operations_status_retry")
60+
op.execute(f"ALTER TABLE {schema}async_operations DROP COLUMN IF EXISTS next_retry_at")
61+
op.execute(f"DROP INDEX IF EXISTS {schema}idx_webhooks_bank_id")
62+
op.execute(f"DROP TABLE IF EXISTS {schema}webhooks")
Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,33 @@
1+
"""Add http_config JSONB column to webhooks table.
2+
3+
Stores HTTP delivery configuration (method, timeout, headers, params) as a
4+
single JSONB column rather than separate columns.
5+
6+
Revision ID: f7g8h9i0j1k2
7+
Revises: e4f5a6b7c8d9
8+
Create Date: 2026-03-04
9+
"""
10+
11+
from collections.abc import Sequence
12+
13+
from alembic import context, op
14+
15+
revision: str = "f7g8h9i0j1k2"
16+
down_revision: str | Sequence[str] | None = "e4f5a6b7c8d9"
17+
branch_labels: str | Sequence[str] | None = None
18+
depends_on: str | Sequence[str] | None = None
19+
20+
21+
def _get_schema_prefix() -> str:
22+
schema = context.config.get_main_option("target_schema")
23+
return f'"{schema}".' if schema else ""
24+
25+
26+
def upgrade() -> None:
27+
schema = _get_schema_prefix()
28+
op.execute(f"ALTER TABLE {schema}webhooks ADD COLUMN IF NOT EXISTS http_config JSONB NOT NULL DEFAULT '{{}}'")
29+
30+
31+
def downgrade() -> None:
32+
schema = _get_schema_prefix()
33+
op.execute(f"ALTER TABLE {schema}webhooks DROP COLUMN IF EXISTS http_config")

0 commit comments

Comments
 (0)