Skip to content

[Tier 4] Cross-connector correlation engine (stateful, time-windowed rules) #55

@dcoln25-writer

Description

@dcoln25-writer

Problem

Today's detection (evaluateSecurityRules in workers/ingestion-worker.ts) is single-event, single-provider, stateless. Every rule fires on one IngestedEvent row in isolation. That misses the most valuable detections in modern SaaS attacks:

  • "Okta admin role granted + GitHub org-admin add + Slack workspace-admin grant — same actor, < 1h" → privilege-escalation chain.
  • "5+ MFA factor resets across distinct users in 10m, same actor" → admin-driven account-takeover precursor.
  • "OAuth grant + immediate first use from a new IP geo for the same identity" → token-theft starter pattern.
  • "External user added to private repo + secret committed to that repo, < 5m" → insider/external collusion.

These are stateful, cross-provider, time-bucketed conditions. The current engine cannot express any of them.

Goals

  1. A stateful rule grammar layered on top of the YAML/CEL substrate from Detection-as-code: declarative YAML rules + community rule packs #47.
  2. Sliding-window aggregations (count, distinct, sum, percentile) with bounded memory.
  3. Cross-connector joins keyed by actor / Person (#45) / asset.
  4. A correlator worker that incrementally evaluates open windows against the live event stream, durable across restarts.
  5. Backtest support for stateful rules over historical IngestedEvent data.

Non-goals

  • Not building a full CEP (complex event processing) engine — keep the surface to time-windowed aggregations + sequence detection.
  • Not real-time SLA tighter than ~1 minute in v1; tail-latency optimization is later.

Proposed design

Rule grammar extension (YAML, on top of #47)

id: identity.privilege_escalation_chain
version: 1.0.0
type: stateful_sequence

inputs:
  - provider: OKTA
    event_types: [admin_role_assigned]
    alias: okta_admin
  - provider: GITHUB
    event_types: [org_membership_changed]
    alias: gh_admin
    filter: "event.payload.membership.role == 'admin'"
  - provider: SLACK
    event_types: [workspace_admin_granted]
    alias: slack_admin

correlation:
  key: "person.id"          # join on the linked Person (#45) — fall back to email if not linked
  within: 1h
  sequence: [okta_admin, gh_admin, slack_admin]
  min_distinct_aliases: 2

finding:
  title: "Privilege escalation chain: {{ context.person.primary_email }}"
  severity: CRITICAL
  ...
id: okta.mfa_reset_burst
version: 1.0.0
type: stateful_window

input:
  provider: OKTA
  event_types: [user_mfa_factor_reset, mfa_factor_reset]

window:
  duration: 10m
  key: "event.actor"
  aggregations:
    distinct_targets: distinct(event.payload.target.id)
    count: count()
  threshold: "distinct_targets >= 5"

finding:
  title: "Okta MFA factor reset burst by {{ event.actor }}"
  severity: HIGH
  ...

New domain model

enum CorrelationWindowStatus {
  OPEN
  CLOSED_FIRED
  CLOSED_EXPIRED
}

model CorrelationWindow {
  id              String   @id @default(cuid())
  organizationId  String   @map("organization_id")
  ruleKey         String   @map("rule_key") @db.VarChar(160)
  ruleVersion     String   @map("rule_version") @db.VarChar(32)
  partitionKey    String   @map("partition_key") @db.VarChar(255) // computed correlation key
  openedAt        DateTime @map("opened_at")
  closesAt        DateTime @map("closes_at")
  status          CorrelationWindowStatus @default(OPEN)
  aggregateState  Json     @map("aggregate_state")  // running counts/distinct sets/etc.
  matchedEventIds String[] @default([]) @map("matched_event_ids")
  firedFindingId  String?  @map("fired_finding_id")
  updatedAt       DateTime @updatedAt @map("updated_at")
  organization    Organization @relation(...)
  @@index([organizationId, ruleKey, partitionKey])
  @@index([organizationId, status, closesAt])
  @@map("correlation_windows")
}

Correlator worker

A new background worker (internal/correlator/):

  1. Subscribes to the ingestion event bus (or polls IngestedEvent if event bus is off).
  2. For each event, finds matching stateful rules + computes the partition key (actor, person, asset).
  3. Loads or opens a CorrelationWindow for (rule_key, partition_key) valid at the event's occurredAt.
  4. Updates aggregateState (HyperLogLog for distinct, fixed-window counts) and matchedEventIds.
  5. Evaluates the threshold expression; on fire, opens a SecurityFinding linking all matchedEventIds.
  6. A periodic sweeper closes expired open windows.

Backtest

Replay IngestedEvent rows in chronological order through the correlator worker against a candidate rule, accumulating fires. Reuses the #47 backtest harness.

Phasing

Phase Scope
P1 CorrelationWindow schema; stateful_window rule type (single-input aggregation); correlator worker; backtest support
P2 stateful_sequence rule type (multi-input ordered/unordered); partition key on Person (#45)
P3 Distributed correlator (multi-instance) for high-throughput tenants; sharded windows by partition_key hash

Open questions

  • HyperLogLog vs. bounded distinct-set for distinct() — accuracy vs. memory.
  • How to express "anything OAuth + same actor within 5m" without exploding window count for noisy connectors.
  • Cross-tenant evaluation forbidden by construction; partition_key always scoped per org.

References

Metadata

Metadata

Assignees

No one assigned

    Labels

    correlationCross-event / cross-connector correlation enginedetection-engineeringDetection rules, rule packs, backtestingtier-4-detection-sophisticationTier 4: detection sophistication beyond rules

    Type

    No type
    No fields configured for issues without a type.

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions