-
Notifications
You must be signed in to change notification settings - Fork 0
Data Model Platform
Tiana_ edited this page May 30, 2026
·
1 revision
Part of the Data Model reference. Schema:
platformandshared. Companion to Domain-Model, ADR-0003, Architecture-Security.
| Table / Object | Role |
|---|---|
platform.idempotency_keys |
HTTP-level request deduplication (TTL 24h) |
platform.outbox_events |
Transactional outbox for Redpanda publishing |
platform.webhook_subscriptions |
Registered webhook endpoints |
platform.webhook_deliveries |
Per-event delivery attempts |
platform.audit_events |
Centralized audit log (range-partitioned by month) |
platform.processed_events |
Shared consumer-side event deduplication |
shared.set_updated_at() |
Utility trigger function used across all schemas |
-- Lives in the `shared` schema; used by BEFORE UPDATE triggers in all other schemas
CREATE OR REPLACE FUNCTION shared.set_updated_at()
RETURNS TRIGGER AS $$
BEGIN
NEW.updated_at = NOW();
RETURN NEW;
END;
$$ LANGUAGE plpgsql;CREATE OR REPLACE FUNCTION is inherently idempotent. This function is created in migration 001-shared-functions.xml before any table migrations run.
CREATE TABLE IF NOT EXISTS platform.idempotency_keys (
key VARCHAR(255) PRIMARY KEY,
request_hash VARCHAR(64) NOT NULL, -- SHA-256 hex of request body
response_status INT,
response_body JSONB,
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
expires_at TIMESTAMPTZ NOT NULL
);
CREATE INDEX IF NOT EXISTS idx_idempotency_keys_expires
ON platform.idempotency_keys(expires_at);
-- Cleanup job: DELETE WHERE expires_at < NOW()Query patterns:
- Dedup check: PK lookup
key, O(1) - Expiry cleanup: range scan on
idx_idempotency_keys_expires
Notes:
- TTL = 24 hours.
expires_at = NOW() + INTERVAL '24 hours'set at insert time. -
keyformat aligns with the API contract:minLength: 32, maxLength: 128, pattern: ^[A-Za-z0-9_-]+$(decision A8). -
request_hashguards against clients replaying the same key with a different request body (conflict returns 422). - Insert-first-on-conflict is the concurrency gate (not check-then-insert) to prevent TOCTOU under concurrent requests with the same key (decision A8).
CREATE TABLE IF NOT EXISTS platform.outbox_events (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
aggregate_type VARCHAR(64) NOT NULL,
aggregate_id VARCHAR(64) NOT NULL,
event_type VARCHAR(128) NOT NULL,
payload JSONB NOT NULL,
status VARCHAR(16) NOT NULL DEFAULT 'PENDING',
attempts INT NOT NULL DEFAULT 0,
last_error TEXT,
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
published_at TIMESTAMPTZ,
CONSTRAINT ck_outbox_events_status CHECK (status IN ('PENDING','PUBLISHED','FAILED'))
);
CREATE INDEX IF NOT EXISTS idx_outbox_events_pending
ON platform.outbox_events(status, created_at)
WHERE status = 'PENDING';
CREATE INDEX IF NOT EXISTS idx_outbox_events_aggregate
ON platform.outbox_events(aggregate_type, aggregate_id, created_at);
-- Cleanup job: DELETE WHERE status = 'PUBLISHED' AND published_at < NOW() - INTERVAL '30 days'Query patterns:
- Dispatcher poll (batch, SKIP LOCKED):
idx_outbox_events_pending, p99 < 20ms for 100 rows - Aggregate event history:
idx_outbox_events_aggregate
Notes:
- Written in the same local transaction as the aggregate mutation. No
@Versionon the outbox row (decision A3). - The dispatcher reads
PENDINGrows withSELECT ... FOR UPDATE SKIP LOCKEDto avoid contention between dispatcher instances. - See ADR-0003 for the outbox pattern rationale.
CREATE TABLE IF NOT EXISTS platform.webhook_subscriptions (
id UUID PRIMARY KEY,
owner_id VARCHAR(64) NOT NULL, -- user or service identifier
url VARCHAR(2048) NOT NULL,
events TEXT[] NOT NULL, -- array of event_type strings
secret_encrypted BYTEA NOT NULL, -- AES-256-GCM, DEK from Vault
status VARCHAR(32) NOT NULL DEFAULT 'ACTIVE',
failure_count INT NOT NULL DEFAULT 0,
last_success_at TIMESTAMPTZ,
last_failure_at TIMESTAMPTZ,
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
updated_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
CONSTRAINT ck_webhook_subs_status CHECK (status IN ('ACTIVE','PAUSED','DISABLED_DEAD'))
);
CREATE INDEX IF NOT EXISTS idx_webhook_subs_owner
ON platform.webhook_subscriptions(owner_id);
-- GIN index for array containment: find subscriptions listening to a given event type
CREATE INDEX IF NOT EXISTS idx_webhook_subs_active_events
ON platform.webhook_subscriptions USING GIN (events)
WHERE status = 'ACTIVE';
CREATE OR REPLACE TRIGGER trg_webhook_subs_updated_at
BEFORE UPDATE ON platform.webhook_subscriptions
FOR EACH ROW EXECUTE FUNCTION shared.set_updated_at();Query patterns:
- Find active subscriptions for an event type: GIN
@>query onidx_webhook_subs_active_events, e.g.WHERE events @> ARRAY['payment.completed'] AND status = 'ACTIVE' - List subscriptions for an owner:
idx_webhook_subs_owner
Notes:
-
secret_encryptedstores the HMAC signing secret encrypted at the column level (AES-256-GCM). The plaintext secret is never persisted. -
eventsis aTEXT[]. The GIN index supports containment queries (@>) to find subscriptions that include a specific event type.
CREATE TABLE IF NOT EXISTS platform.webhook_deliveries (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
subscription_id UUID NOT NULL,
event_id UUID NOT NULL,
status VARCHAR(32) NOT NULL DEFAULT 'PENDING',
attempts INT NOT NULL DEFAULT 0,
next_retry_at TIMESTAMPTZ,
http_status INT,
http_response_body_excerpt VARCHAR(2048),
latency_ms INT,
first_attempt_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
last_attempt_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
CONSTRAINT fk_webhook_deliveries_sub FOREIGN KEY (subscription_id)
REFERENCES platform.webhook_subscriptions(id) ON DELETE RESTRICT,
CONSTRAINT ck_webhook_deliveries_status CHECK (status IN (
'PENDING','DELIVERED','FAILED','PERMANENTLY_FAILED'
))
);
CREATE INDEX IF NOT EXISTS idx_webhook_deliveries_sub_status
ON platform.webhook_deliveries(subscription_id, status);
CREATE INDEX IF NOT EXISTS idx_webhook_deliveries_pending_retry
ON platform.webhook_deliveries(next_retry_at)
WHERE status = 'FAILED' AND next_retry_at IS NOT NULL;
CREATE INDEX IF NOT EXISTS idx_webhook_deliveries_event
ON platform.webhook_deliveries(event_id);Query patterns:
- Retry poll:
idx_webhook_deliveries_pending_retry(partial, FAILED with scheduled retry) - Delivery history for subscription:
idx_webhook_deliveries_sub_status - Delivery lookup by event:
idx_webhook_deliveries_event
CREATE TABLE IF NOT EXISTS platform.audit_events (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
occurred_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
actor_type VARCHAR(16) NOT NULL,
actor_id VARCHAR(64) NOT NULL,
actor_display VARCHAR(255),
actor_ip INET,
actor_user_agent TEXT,
action VARCHAR(128) NOT NULL,
resource_type VARCHAR(64) NOT NULL,
resource_id VARCHAR(64) NOT NULL,
result VARCHAR(16) NOT NULL,
correlation_id UUID,
request_id UUID,
method VARCHAR(8),
path VARCHAR(2048),
status_code INT,
diff_before JSONB,
diff_after JSONB,
CONSTRAINT ck_audit_actor_type CHECK (actor_type IN ('USER','SERVICE','SYSTEM')),
CONSTRAINT ck_audit_result CHECK (result IN ('SUCCESS','FAILURE','DENIED'))
) PARTITION BY RANGE (occurred_at);
-- Initial partitions; subsequent partitions via pg_partman (retain 7 years)
CREATE TABLE IF NOT EXISTS platform.audit_events_2026_04 PARTITION OF platform.audit_events
FOR VALUES FROM ('2026-04-01') TO ('2026-05-01');
CREATE TABLE IF NOT EXISTS platform.audit_events_2026_05 PARTITION OF platform.audit_events
FOR VALUES FROM ('2026-05-01') TO ('2026-06-01');
CREATE TABLE IF NOT EXISTS platform.audit_events_2026_06 PARTITION OF platform.audit_events
FOR VALUES FROM ('2026-06-01') TO ('2026-07-01');
CREATE INDEX IF NOT EXISTS idx_audit_actor
ON platform.audit_events(actor_id, occurred_at DESC);
CREATE INDEX IF NOT EXISTS idx_audit_resource
ON platform.audit_events(resource_type, resource_id);
CREATE INDEX IF NOT EXISTS idx_audit_action
ON platform.audit_events(action, occurred_at DESC);
CREATE INDEX IF NOT EXISTS idx_audit_correlation
ON platform.audit_events(correlation_id) WHERE correlation_id IS NOT NULL;Query patterns:
- Audit history for an actor:
idx_audit_actor - Audit history for a resource:
idx_audit_resource - Correlation trace:
idx_audit_correlation(partial)
Notes:
- Regulatory retention: 7 years. Partitioned monthly;
pg_partmanauto-creates future partitions. - One audit row is written per ledger write (decision A10), using the authenticated principal wired into
actor_id/actor_display. -
diff_beforeanddiff_afterstore only the changed fields, not the full row, to minimize storage and avoid logging sensitive data.
pg_partman configuration:
SELECT partman.create_parent(
p_parent_table => 'platform.audit_events',
p_control => 'occurred_at',
p_type => 'range',
p_interval => '1 month',
p_premake => 3
);CREATE TABLE IF NOT EXISTS platform.processed_events (
event_id UUID PRIMARY KEY,
consumer_group VARCHAR(64) NOT NULL,
processed_at TIMESTAMPTZ NOT NULL DEFAULT NOW()
);
CREATE INDEX IF NOT EXISTS idx_platform_processed_events_processed
ON platform.processed_events(processed_at);
-- Cleanup job: DELETE WHERE processed_at < NOW() - INTERVAL '30 days'Notes:
- Mirror of
compliance.processed_eventsfor centralized/shared consumers. - Each consumer group records which event IDs it has already processed, enabling exactly-once processing semantics over at-least-once delivery.
v0.1/
000-bootstrap.xml -- schemas + extensions
001-shared-functions.xml -- shared.set_updated_at()
050-platform-idempotency.xml
051-platform-outbox.xml
052-platform-webhook-subscriptions.xml
053-platform-webhook-deliveries.xml
054-platform-audit-events.xml
055-platform-processed-events.xml
| Table | Column | Method |
|---|---|---|
webhook_subscriptions |
secret_encrypted |
AES-256-GCM column, DEK from Vault |
See Architecture-Security for the full encryption-at-rest summary and DEK rotation schedule.
- Data-Model - conventions, ERD, cross-cutting sections
- ADR-0003 - outbox pattern rationale
- Architecture-Security - encryption, audit
- Domain-Model - platform aggregate definitions
- Overview
- Services
- Data Model
- Domain Model
- Event Flow
- Security
- Observability
- Resilience
- SLA / SLI / SLO