Skip to content

[Tier 6] Data warehouse export: Snowflake / BigQuery / Databricks / Iceberg + cold tier #62

@dcoln25-writer

Description

@dcoln25-writer

Problem

Aperio's SIEM dispatcher (workers/siem-dispatcher.ts + internal/siemdispatcher/) is built for SOC consumption — finding-shaped envelopes pushed in near-real-time to Splunk / Panther / etc. That's correct for alerting, but wrong for two other audiences:

  • Security analysts doing longitudinal queries ("what's our 90-day median time-to-resolve?", "which OAuth scopes most often precede incidents?", "show me cross-tenant trends in our compliance scorecard"). They want SQL-shaped data in a warehouse, not finding-by-finding pushes.
  • Compliance needing append-only, multi-year retention without keeping hot Postgres rows around forever.

Aperio's Postgres is the hot transactional store; running analytical workloads against it is wrong, and growing it indefinitely is wrong.

Goals

  1. Data warehouse export adapters for Snowflake, BigQuery, Databricks, and Apache Iceberg on S3.
  2. Cold storage tier — every IngestedEvent, SecurityFinding, RuleRun (Product observability: connector health, SIEM delivery, rule-run audit, /metrics #54), WorkflowDelivery (Workflow & ticketing integration: JIRA, Linear, Slack, Teams, PagerDuty, SLA tracking #50), and audit-log row gets archived to a customer-controlled object store with a configurable retention window in the hot Postgres.
  3. Schema-stable analytical views — published table definitions so analyst SQL doesn't break across Aperio versions.
  4. Backfill + CDC — initial backfill of historical data; ongoing change-data-capture for new rows.

Non-goals

  • Not replacing the SIEM path — this is additive for analyst self-serve.
  • Not building a query layer / BI surface (that's the warehouse's job).
  • Not multi-region warehouse routing — single export target per org.

Proposed design

What to reuse

The SIEM dispatcher pattern (internal/siemdispatcher/) is the right template:

  • Durable outbox (SiemDelivery table semantics) — adapt to a new WarehouseDelivery.
  • Adapter interfaceDeliver(ctx, payload) with retry/backoff + lease semantics.
  • JSONL file destination is already shipped and is the dirt-simplest analyst export.

New schema

enum WarehouseKind {
  SNOWFLAKE
  BIGQUERY
  DATABRICKS
  ICEBERG_S3
  PARQUET_S3
  JSONL_S3
  POSTGRES_REPLICA
}

enum WarehouseTable {
  EVENTS
  FINDINGS
  RULE_RUNS
  SIEM_DELIVERIES
  WORKFLOW_DELIVERIES
  AUDIT_LOG
  OAUTH_GRANTS
  IDENTITIES
  ASSETS
  BASELINE_SCORES
}

model WarehouseDestination {
  id              String   @id @default(cuid())
  organizationId  String   @map("organization_id")
  kind            WarehouseKind
  name            String   @db.VarChar(160)
  config          Json     // kind-specific: account url, dataset id, warehouse + role, bucket, etc.
  encryptedCredentials String? @map("encrypted_credentials") @db.Text
  tokenKeyVersion String   @default("v1") @map("token_key_version") @db.VarChar(32)
  enabledTables   WarehouseTable[] @default([])
  schemaVersion   String   @default("v1") @map("schema_version") @db.VarChar(20)
  status          String   @db.VarChar(20)   // "active" | "paused" | "error"
  lastDeliveryAt  DateTime? @map("last_delivery_at")
  lastError       String?  @map("last_error") @db.VarChar(500)
  createdAt       DateTime @default(now()) @map("created_at")
  updatedAt       DateTime @updatedAt @map("updated_at")
  organization    Organization @relation(...)
  deliveries      WarehouseDelivery[]
  @@index([organizationId, kind, status])
  @@map("warehouse_destinations")
}

model WarehouseDelivery {
  id              String   @id @default(cuid())
  organizationId  String   @map("organization_id")
  destinationId   String   @map("destination_id")
  table           WarehouseTable
  watermarkStart  DateTime @map("watermark_start")
  watermarkEnd    DateTime @map("watermark_end")
  rowCount        Int      @default(0) @map("row_count")
  status          String   @db.VarChar(20)
  attempts        Int      @default(0)
  maxAttempts     Int      @default(5) @map("max_attempts")
  nextAttemptAt   DateTime @default(now()) @map("next_attempt_at")
  leaseOwner      String?  @map("lease_owner") @db.VarChar(180)
  leaseExpiresAt  DateTime? @map("lease_expires_at")
  lastError       String?  @map("last_error") @db.VarChar(500)
  deliveredAt     DateTime? @map("delivered_at")
  createdAt       DateTime @default(now()) @map("created_at")
  destination     WarehouseDestination @relation(fields: [destinationId], references: [id], onDelete: Cascade)
  @@unique([destinationId, table, watermarkStart, watermarkEnd])
  @@index([organizationId, status, nextAttemptAt])
  @@map("warehouse_deliveries")
}

Watermark-based CDC

For each (destination, table) pair the dispatcher tracks a high-water-mark (updatedAt, id) cursor. The dispatcher batches rows in fixed time windows (default 15 min), encodes per-table schema, and ships to the destination.

Adapters:

  • SnowflakePUT Parquet files to an internal stage + COPY INTO.
  • BigQuerybigquery.Inserter streaming inserts for hot rows; load jobs for backfill.
  • Databricksdbsql driver; tables registered in Unity Catalog.
  • Iceberg on S3 — write Parquet + commit to Iceberg catalog (Glue / Nessie).
  • Parquet on S3 — partitioned by org_id / date.
  • JSONL on S3 — append-only files (cheapest, dumbest).
  • Postgres replica — for customers that want a logical read replica without managed warehouse.

Schema stability

Published per-table schemas in warehouse/schemas/<table>/v1.json. Schema changes are additive between minor versions; breaking changes bump major version + run a parallel dual-write window (v1 and v2 tables) before deprecating v1.

Cold-storage tier

When a row in the hot DB ages past Organization.dataRetentionDays:

  1. Verify it's been delivered to at least one WarehouseDestination (or marked "no destination configured" via explicit operator override).
  2. Insert a tombstone with id + delivered_to into a cold_storage_archive audit table.
  3. Drop the hot row.

This bounds Postgres growth without losing any data the customer can prove they have in their warehouse.

Phasing

Phase Scope
P1 Schema; dispatcher worker; JSONL-on-S3 + Parquet-on-S3 adapters (simplest path); per-table watermark CDC for FINDINGS, EVENTS, AUDIT_LOG; /admin/warehouse CRUD UI
P2 Snowflake + BigQuery adapters; backfill tooling; schema v1 publication
P3 Databricks + Iceberg + Postgres replica adapters; remaining tables (rule runs, workflow deliveries, OAuth grants, baselines)
P4 Cold-storage tier wired to retention policy; tombstoning + recall path; analyst-facing example notebooks in warehouse/notebooks/

Open questions

  • How to handle late-arriving updates after watermark advances — re-emit + idempotent merges in the warehouse, or freeze the analytical surface at delivery time?
  • Per-table or per-destination delivery cadence — make it operator-tunable.
  • Cold-storage tombstone — is "we deleted this and you have it in your warehouse" enough for SOC 2, or do we need to keep the tombstone for the audit window?

References

Metadata

Metadata

Assignees

No one assigned

    Labels

    Type

    No type
    No fields configured for issues without a type.

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions