Skip to content

Data Model Platform

Tiana_ edited this page May 30, 2026 · 1 revision

Data Model - Platform Schema

Part of the Data Model reference. Schema: platform and shared. Companion to Domain-Model, ADR-0003, Architecture-Security.


Tables in this schema

Table / Object Role
platform.idempotency_keys HTTP-level request deduplication (TTL 24h)
platform.outbox_events Transactional outbox for Redpanda publishing
platform.webhook_subscriptions Registered webhook endpoints
platform.webhook_deliveries Per-event delivery attempts
platform.audit_events Centralized audit log (range-partitioned by month)
platform.processed_events Shared consumer-side event deduplication
shared.set_updated_at() Utility trigger function used across all schemas

Shared utility function

-- Lives in the `shared` schema; used by BEFORE UPDATE triggers in all other schemas
CREATE OR REPLACE FUNCTION shared.set_updated_at()
RETURNS TRIGGER AS $$
BEGIN
    NEW.updated_at = NOW();
    RETURN NEW;
END;
$$ LANGUAGE plpgsql;

CREATE OR REPLACE FUNCTION is inherently idempotent. This function is created in migration 001-shared-functions.xml before any table migrations run.


Table: idempotency_keys

CREATE TABLE IF NOT EXISTS platform.idempotency_keys (
    key             VARCHAR(255) PRIMARY KEY,
    request_hash    VARCHAR(64)  NOT NULL,     -- SHA-256 hex of request body
    response_status INT,
    response_body   JSONB,
    created_at      TIMESTAMPTZ  NOT NULL DEFAULT NOW(),
    expires_at      TIMESTAMPTZ  NOT NULL
);

CREATE INDEX IF NOT EXISTS idx_idempotency_keys_expires
    ON platform.idempotency_keys(expires_at);
-- Cleanup job: DELETE WHERE expires_at < NOW()

Query patterns:

  • Dedup check: PK lookup key, O(1)
  • Expiry cleanup: range scan on idx_idempotency_keys_expires

Notes:

  • TTL = 24 hours. expires_at = NOW() + INTERVAL '24 hours' set at insert time.
  • key format aligns with the API contract: minLength: 32, maxLength: 128, pattern: ^[A-Za-z0-9_-]+$ (decision A8).
  • request_hash guards against clients replaying the same key with a different request body (conflict returns 422).
  • Insert-first-on-conflict is the concurrency gate (not check-then-insert) to prevent TOCTOU under concurrent requests with the same key (decision A8).

Table: outbox_events

CREATE TABLE IF NOT EXISTS platform.outbox_events (
    id              UUID        PRIMARY KEY DEFAULT gen_random_uuid(),
    aggregate_type  VARCHAR(64) NOT NULL,
    aggregate_id    VARCHAR(64) NOT NULL,
    event_type      VARCHAR(128) NOT NULL,
    payload         JSONB       NOT NULL,
    status          VARCHAR(16) NOT NULL DEFAULT 'PENDING',
    attempts        INT         NOT NULL DEFAULT 0,
    last_error      TEXT,
    created_at      TIMESTAMPTZ NOT NULL DEFAULT NOW(),
    published_at    TIMESTAMPTZ,

    CONSTRAINT ck_outbox_events_status CHECK (status IN ('PENDING','PUBLISHED','FAILED'))
);

CREATE INDEX IF NOT EXISTS idx_outbox_events_pending
    ON platform.outbox_events(status, created_at)
    WHERE status = 'PENDING';
CREATE INDEX IF NOT EXISTS idx_outbox_events_aggregate
    ON platform.outbox_events(aggregate_type, aggregate_id, created_at);
-- Cleanup job: DELETE WHERE status = 'PUBLISHED' AND published_at < NOW() - INTERVAL '30 days'

Query patterns:

  • Dispatcher poll (batch, SKIP LOCKED): idx_outbox_events_pending, p99 < 20ms for 100 rows
  • Aggregate event history: idx_outbox_events_aggregate

Notes:

  • Written in the same local transaction as the aggregate mutation. No @Version on the outbox row (decision A3).
  • The dispatcher reads PENDING rows with SELECT ... FOR UPDATE SKIP LOCKED to avoid contention between dispatcher instances.
  • See ADR-0003 for the outbox pattern rationale.

Table: webhook_subscriptions

CREATE TABLE IF NOT EXISTS platform.webhook_subscriptions (
    id               UUID          PRIMARY KEY,
    owner_id         VARCHAR(64)   NOT NULL,     -- user or service identifier
    url              VARCHAR(2048) NOT NULL,
    events           TEXT[]        NOT NULL,      -- array of event_type strings
    secret_encrypted BYTEA         NOT NULL,      -- AES-256-GCM, DEK from Vault
    status           VARCHAR(32)   NOT NULL DEFAULT 'ACTIVE',
    failure_count    INT           NOT NULL DEFAULT 0,
    last_success_at  TIMESTAMPTZ,
    last_failure_at  TIMESTAMPTZ,
    created_at       TIMESTAMPTZ   NOT NULL DEFAULT NOW(),
    updated_at       TIMESTAMPTZ   NOT NULL DEFAULT NOW(),

    CONSTRAINT ck_webhook_subs_status CHECK (status IN ('ACTIVE','PAUSED','DISABLED_DEAD'))
);

CREATE INDEX IF NOT EXISTS idx_webhook_subs_owner
    ON platform.webhook_subscriptions(owner_id);
-- GIN index for array containment: find subscriptions listening to a given event type
CREATE INDEX IF NOT EXISTS idx_webhook_subs_active_events
    ON platform.webhook_subscriptions USING GIN (events)
    WHERE status = 'ACTIVE';

CREATE OR REPLACE TRIGGER trg_webhook_subs_updated_at
    BEFORE UPDATE ON platform.webhook_subscriptions
    FOR EACH ROW EXECUTE FUNCTION shared.set_updated_at();

Query patterns:

  • Find active subscriptions for an event type: GIN @> query on idx_webhook_subs_active_events, e.g. WHERE events @> ARRAY['payment.completed'] AND status = 'ACTIVE'
  • List subscriptions for an owner: idx_webhook_subs_owner

Notes:

  • secret_encrypted stores the HMAC signing secret encrypted at the column level (AES-256-GCM). The plaintext secret is never persisted.
  • events is a TEXT[]. The GIN index supports containment queries (@>) to find subscriptions that include a specific event type.

Table: webhook_deliveries

CREATE TABLE IF NOT EXISTS platform.webhook_deliveries (
    id                         UUID        PRIMARY KEY DEFAULT gen_random_uuid(),
    subscription_id            UUID        NOT NULL,
    event_id                   UUID        NOT NULL,
    status                     VARCHAR(32) NOT NULL DEFAULT 'PENDING',
    attempts                   INT         NOT NULL DEFAULT 0,
    next_retry_at              TIMESTAMPTZ,
    http_status                INT,
    http_response_body_excerpt VARCHAR(2048),
    latency_ms                 INT,
    first_attempt_at           TIMESTAMPTZ NOT NULL DEFAULT NOW(),
    last_attempt_at            TIMESTAMPTZ NOT NULL DEFAULT NOW(),

    CONSTRAINT fk_webhook_deliveries_sub FOREIGN KEY (subscription_id)
        REFERENCES platform.webhook_subscriptions(id) ON DELETE RESTRICT,
    CONSTRAINT ck_webhook_deliveries_status CHECK (status IN (
        'PENDING','DELIVERED','FAILED','PERMANENTLY_FAILED'
    ))
);

CREATE INDEX IF NOT EXISTS idx_webhook_deliveries_sub_status
    ON platform.webhook_deliveries(subscription_id, status);
CREATE INDEX IF NOT EXISTS idx_webhook_deliveries_pending_retry
    ON platform.webhook_deliveries(next_retry_at)
    WHERE status = 'FAILED' AND next_retry_at IS NOT NULL;
CREATE INDEX IF NOT EXISTS idx_webhook_deliveries_event
    ON platform.webhook_deliveries(event_id);

Query patterns:

  • Retry poll: idx_webhook_deliveries_pending_retry (partial, FAILED with scheduled retry)
  • Delivery history for subscription: idx_webhook_deliveries_sub_status
  • Delivery lookup by event: idx_webhook_deliveries_event

Table: audit_events (range-partitioned by month)

CREATE TABLE IF NOT EXISTS platform.audit_events (
    id               UUID        PRIMARY KEY DEFAULT gen_random_uuid(),
    occurred_at      TIMESTAMPTZ NOT NULL DEFAULT NOW(),
    actor_type       VARCHAR(16) NOT NULL,
    actor_id         VARCHAR(64) NOT NULL,
    actor_display    VARCHAR(255),
    actor_ip         INET,
    actor_user_agent TEXT,
    action           VARCHAR(128) NOT NULL,
    resource_type    VARCHAR(64)  NOT NULL,
    resource_id      VARCHAR(64)  NOT NULL,
    result           VARCHAR(16)  NOT NULL,
    correlation_id   UUID,
    request_id       UUID,
    method           VARCHAR(8),
    path             VARCHAR(2048),
    status_code      INT,
    diff_before      JSONB,
    diff_after       JSONB,

    CONSTRAINT ck_audit_actor_type CHECK (actor_type IN ('USER','SERVICE','SYSTEM')),
    CONSTRAINT ck_audit_result     CHECK (result     IN ('SUCCESS','FAILURE','DENIED'))
) PARTITION BY RANGE (occurred_at);

-- Initial partitions; subsequent partitions via pg_partman (retain 7 years)
CREATE TABLE IF NOT EXISTS platform.audit_events_2026_04 PARTITION OF platform.audit_events
    FOR VALUES FROM ('2026-04-01') TO ('2026-05-01');

CREATE TABLE IF NOT EXISTS platform.audit_events_2026_05 PARTITION OF platform.audit_events
    FOR VALUES FROM ('2026-05-01') TO ('2026-06-01');

CREATE TABLE IF NOT EXISTS platform.audit_events_2026_06 PARTITION OF platform.audit_events
    FOR VALUES FROM ('2026-06-01') TO ('2026-07-01');

CREATE INDEX IF NOT EXISTS idx_audit_actor
    ON platform.audit_events(actor_id, occurred_at DESC);
CREATE INDEX IF NOT EXISTS idx_audit_resource
    ON platform.audit_events(resource_type, resource_id);
CREATE INDEX IF NOT EXISTS idx_audit_action
    ON platform.audit_events(action, occurred_at DESC);
CREATE INDEX IF NOT EXISTS idx_audit_correlation
    ON platform.audit_events(correlation_id) WHERE correlation_id IS NOT NULL;

Query patterns:

  • Audit history for an actor: idx_audit_actor
  • Audit history for a resource: idx_audit_resource
  • Correlation trace: idx_audit_correlation (partial)

Notes:

  • Regulatory retention: 7 years. Partitioned monthly; pg_partman auto-creates future partitions.
  • One audit row is written per ledger write (decision A10), using the authenticated principal wired into actor_id / actor_display.
  • diff_before and diff_after store only the changed fields, not the full row, to minimize storage and avoid logging sensitive data.

pg_partman configuration:

SELECT partman.create_parent(
    p_parent_table  => 'platform.audit_events',
    p_control       => 'occurred_at',
    p_type          => 'range',
    p_interval      => '1 month',
    p_premake       => 3
);

Table: processed_events (centralized consumer deduplication)

CREATE TABLE IF NOT EXISTS platform.processed_events (
    event_id        UUID        PRIMARY KEY,
    consumer_group  VARCHAR(64) NOT NULL,
    processed_at    TIMESTAMPTZ NOT NULL DEFAULT NOW()
);

CREATE INDEX IF NOT EXISTS idx_platform_processed_events_processed
    ON platform.processed_events(processed_at);
-- Cleanup job: DELETE WHERE processed_at < NOW() - INTERVAL '30 days'

Notes:

  • Mirror of compliance.processed_events for centralized/shared consumers.
  • Each consumer group records which event IDs it has already processed, enabling exactly-once processing semantics over at-least-once delivery.

Liquibase changelogs for this schema

v0.1/
  000-bootstrap.xml           -- schemas + extensions
  001-shared-functions.xml    -- shared.set_updated_at()
  050-platform-idempotency.xml
  051-platform-outbox.xml
  052-platform-webhook-subscriptions.xml
  053-platform-webhook-deliveries.xml
  054-platform-audit-events.xml
  055-platform-processed-events.xml

Encryption summary for this schema

Table Column Method
webhook_subscriptions secret_encrypted AES-256-GCM column, DEK from Vault

See Architecture-Security for the full encryption-at-rest summary and DEK rotation schedule.


Related

Clone this wiki locally