-
Notifications
You must be signed in to change notification settings - Fork 0
Architecture Event Flow
Kafka topics, event schemas, producers, consumers, outbox dispatcher mechanics, dedup strategy, retry topics, dead-letter queues. Companion to Architecture-Overview, Architecture-Services, User-Flows.
FinCore is a financial system - every action has cross-service downstream effects:
- A posted transaction → AML evaluation, balance update, webhook delivery, notification
- A KYC approval → user activation, compliance case auto-resolve, welcome email
- A decision rule activation → cache invalidation, audit log
Doing these synchronously in the request path:
- Couples services
- Blows out latency budgets
- Single point of failure (one slow consumer kills the request)
Doing them via events:
- Each consumer scales independently
- Failures of consumers don't kill the API
- Audit trail naturally falls out of the event log
- New consumers can be added without touching producers
But events come with costs: ordering, duplication, eventual consistency. The patterns below address all of them.
Why Redpanda for default:
- Single binary, no ZooKeeper, no JVM
- Sub-second startup (vs ~30s for Kafka + ZK)
- ~200 MB memory footprint vs ~1.5 GB
- Kafka API compatible - production deployments swap for Apache Kafka, Strimzi, MSK, Confluent without code changes
- Excellent for
docker compose upDX (Killer Feature: 30-second sandbox)
Why Kafka API specifically:
- Industry-standard for log-based event streaming
- Mature ecosystem (Kafka Streams, Kafka Connect, schema registries)
- Production-grade tooling everyone in fintech recognizes
- Offset-based consumer model maps perfectly to our idempotent consumer pattern
See ADR-0006 for the full decision.
Production swap:
spring:
kafka:
bootstrap-servers: ${KAFKA_BOOTSTRAP_SERVERS:redpanda:9092}Change one env var, deploy, done.
<context>.events[.<sub-stream>]
-
<context>- bounded context (ledger,payment,compliance,decision,kyc,aml) -
events- primary stream of domain events -
<sub-stream>- optional, for retry/dlq variants (retry,dlq)
| Topic | Producer | Consumers | Partitions | Retention | Compaction |
|---|---|---|---|---|---|
ledger.events |
Outbox Dispatcher (Ledger) | Compliance, Webhook, Reporting, Analytics | 16 | 7 days hot, 1 year cold | by aggregate_id |
payment.events |
Outbox Dispatcher (Payment) | Webhook, Compliance, Reporting | 16 | 7 days hot, 1 year cold | by aggregate_id |
compliance.events |
Outbox Dispatcher (Compliance) | Webhook, Reporting | 8 | 7 days hot, 7 years cold | none (regulatory append-only) |
decision.events |
Outbox Dispatcher (Decision) | All services (cache invalidation) | 4 | 7 days hot | by rule_id |
kyc.events |
Outbox Dispatcher (Compliance) | Webhook, Reporting | 4 | 7 days hot, 1 year cold | by aggregate_id |
| Pattern | Example |
|---|---|
<topic>.retry |
payment.events.retry |
<topic>.dlq |
payment.events.dlq |
DLQ topics retain forever (until manually cleared via runbook).
| Topic | Description |
|---|---|
system.audit |
All audit-significant events for SIEM ingestion (signed, sealed) |
system.heartbeat |
Service liveness signal (1/min) for monitoring |
Every event flowing through Kafka uses the same envelope:
{
"id": "evt_01HXAB...",
"type": "transaction.posted",
"version": "1.0",
"specversion": "1.0",
"source": "fincore-engine/ledger-service@v0.1.0",
"occurredAt": "2026-04-25T10:00:00.123Z",
"publishedAt": "2026-04-25T10:00:00.245Z",
"aggregateType": "Transaction",
"aggregateId": "tx_01HX...",
"correlationId": "01HX...",
"causationId": "evt_01HX...",
"actor": {
"type": "USER",
"id": "usr_01H..."
},
"tenantId": null,
"data": {
"transactionId": "tx_01HX...",
"reference": "demo-001",
"entries": [
{
"accountId": "acc_01H...",
"amount": "-100.00",
"currency": "EUR",
"direction": "DEBIT"
},
{
"accountId": "acc_01H...",
"amount": "100.00",
"currency": "EUR",
"direction": "CREDIT"
}
],
"postedAt": "2026-04-25T10:00:00.123Z"
}
}Aligned with CloudEvents 1.0 for ecosystem compatibility (Knative, Argo Events, etc.).
-
id- globally unique, UUID v7 prefixedevt_ -
type- dotted noun.verb (past tense) -transaction.posted,payment.completed, nevertransaction-postorpostTransaction -
version- schema version (1.0, 1.1, ... breaking changes get major bump) -
aggregateId- Kafka partition key for ordering preservation -
correlationId- propagated from inbound HTTP request, ties together cause and effect -
causationId- the event that caused this event (for chains) -
actor- who initiated (user, service, system/cron) -
tenantId- null in v0.1 (single-tenant deployments); reserved for v1.5+ multi-tenancy
- Additive only within a version: new optional fields OK, mandatory new fields require new major version
- Removing fields: requires new major version + deprecation period
- Type changes: requires new major version
- Old versions retained: consumers can opt into specific schema versions
- Schema registry: consumed via Confluent Schema Registry (Kafka-compatible) or Apicurio in production
Producers in FinCore are not services directly - they're the Outbox Dispatcher reading committed outbox rows.
The dual-write problem: if a service writes to DB and then to Kafka, what if Kafka write fails after DB commits?
- Either you lose the event (silent data loss)
- Or you retry and risk publishing for state that may have been rolled back
- Or you wrap both in 2PC (complex, slow, fragile)
Outbox pattern sidesteps this entirely:
- Service writes business state + outbox row in one DB transaction
- Background dispatcher reads committed outbox rows, publishes to Kafka
- Dispatcher marks rows as
PUBLISHED(idempotent - re-publish if marker write fails) - Consumers dedup via
event.id
Guarantee: at-least-once delivery to Kafka, and the outbox table is the source of truth for "what events have happened" - no events are ever lost.
See Architecture-Resilience for dispatcher mechanics.
sequenceDiagram
participant Worker as Outbox Worker<br/>(per-pod, lease-based)
participant DB
participant Kafka
loop every 100ms
Worker->>DB: BEGIN<br/>SELECT * FROM outbox_events<br/>WHERE status='PENDING'<br/>ORDER BY created_at<br/>FOR UPDATE SKIP LOCKED LIMIT 100
Worker->>Worker: Group by topic
loop per event
Worker->>Kafka: PUBLISH(topic, key=aggregateId, payload, headers)
alt success
Worker->>DB: UPDATE outbox_events SET status='PUBLISHED', published_at=now() WHERE id=:id
else failure
Worker->>DB: UPDATE outbox_events SET attempts+=1, last_error=:err WHERE id=:id
end
end
Worker->>DB: COMMIT
end
Key properties:
-
SKIP LOCKEDenables multiple dispatcher pods without leader election - Per-aggregate ordering preserved via Kafka partition key (
aggregateId) - Crash recovery automatic - uncommitted Kafka publish on restart simply re-published, consumer dedup handles
- Backlog visibility via
outbox.events.pendingmetric per schema
- Outbox table grows in PostgreSQL - no events lost
- Alert fires when
outbox.events.pending > 1000for > 5 min - Once Kafka recovers, dispatcher catches up at full speed
- Risk: outbox table growth can stress Postgres if outage is days. Mitigation: size budget for 24h normal volume × 3.
Every Kafka consumer in FinCore follows this pattern:
sequenceDiagram
participant Kafka
participant Consumer
participant DB
Kafka->>Consumer: poll() returns batch (max 50)
loop for each record
Consumer->>DB: SELECT 1 FROM processed_events<br/>WHERE event_id=:id
alt Already processed
Consumer->>Consumer: skip (or short-circuit ack)
else New event
rect rgb(255, 243, 224)
note right of DB: Single DB transaction
Consumer->>Consumer: handle(event) [pure business logic]
Consumer->>DB: INSERT INTO processed_events (event_id, processed_at)
Consumer->>DB: business writes
Consumer->>DB: COMMIT
end
end
end
Consumer->>Kafka: commitSync(offsets) [AFTER DB commit]
- Business writes +
processed_eventsinsert in one DB transaction - Kafka offset commit AFTER DB commit
-
Crash between DB commit and Kafka commit: reprocessing finds
processed_eventsrow, skips, OK - Crash between handle() and DB commit: DB rolls back, message redelivered, processed cleanly
- No "exactly-once" promised - at-least-once delivery + idempotent processing = effectively-once
The KIP-98 transactional producer/consumer is impressive but:
- Adds significant complexity (transactional IDs, fencing, abort handling)
- Couples consumer commit to Kafka cluster availability
- Doesn't help with cross-system writes (Kafka + Postgres)
The DB-side dedup pattern is simpler, robust, and works across heterogeneous systems.
spring:
kafka:
bootstrap-servers: ${KAFKA_BOOTSTRAP_SERVERS}
consumer:
group-id: ${spring.application.name}
auto-offset-reset: earliest
enable-auto-commit: false
max-poll-records: 50
max-poll-interval-ms: 300000
session-timeout-ms: 30000
isolation-level: read_committed # only see committed transactional writes
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: com.fincore.events.EventEnvelopeDeserializer
listener:
ack-mode: MANUAL_IMMEDIATE
concurrency: 4
poll-timeout: 1s
type: BATCHWhen a consumer fails to process an event (transient error: provider timeout, DB deadlock, etc.), naive infinite retry burns through retry budget and blocks the partition. Better:
flowchart LR
Topic[ledger.events] --> Consumer
Consumer -- "transient fail" --> Retry[ledger.events.retry<br/>delay 30s]
Retry --> Consumer
Consumer -- "fail again" --> DLQ[ledger.events.dlq]
DLQ --> Operator[Operator runbook]
Tier 1 - In-memory retry inside consumer: 3 attempts with 50ms / 200ms / 500ms backoff. Handles transient blips without leaving the partition.
Tier 2 - Retry topic with delay: still failing? Publish to <topic>.retry with delay header (30s). A scheduled re-consumer picks up after delay and retries main flow.
Tier 3 - Dead-letter queue: still failing after retry topic? Publish to <topic>.dlq with error context, alert fires, operator inspects and either fixes data + replays or marks as known-bad.
@KafkaListener(
topics = ["ledger.events"],
containerFactory = "ledgerEventsContainer",
)
@RetryableTopic(
attempts = "3",
backoff = @Backoff(delay = 50, maxDelay = 500, multiplier = 2.0),
dltStrategy = DltStrategy.FAIL_ON_ERROR,
autoCreateTopics = "true",
topicSuffixingStrategy = TopicSuffixingStrategy.SUFFIX_WITH_INDEX_VALUE,
)
fun handle(event: EventEnvelope, ack: Acknowledgment) {
// process - tier 1 retries automatic
ack.acknowledge()
}
@DltHandler
fun handleDlt(event: EventEnvelope, @Header(DLT_EXCEPTION_FQCN) cause: String) {
// log + metric + alert
log.error("DLQ received: ${event.id}, cause=$cause")
meterRegistry.counter("kafka.dlq.received", "topic", "ledger.events").increment()
alertService.notifyDlq(event, cause)
}runbooks/dlq-inspection.md:
- List DLQ entries:
kafka-console-consumer --topic ledger.events.dlq --from-beginning - For each:
- Read
causeheader (FQCN of exception) - Read original payload
- Determine: data bug, code bug, transient (replay)
- Read
- If replay-safe: produce back to original topic with new event_id
- If data bug: fix data, replay
- If code bug: deploy fix, replay
- Document resolution in incident log
sequenceDiagram
participant Ledger
participant LedgerOutbox as Ledger Outbox
participant Kafka
participant Compliance as Compliance Consumer
participant DecisionEngine
participant ComplianceDB as Compliance DB
participant ComplianceOutbox as Compliance Outbox
Ledger->>LedgerOutbox: INSERT outbox_events<br/>(transaction.posted)<br/>[in tx with business writes]
LedgerOutbox->>Kafka: PUBLISH ledger.events
Kafka-->>Compliance: poll batch
Compliance->>Compliance: dedup processed_events
Compliance->>DecisionEngine: evaluate(amlRuleSet, txnContext)
DecisionEngine-->>Compliance: matchedRules
alt rules matched
Compliance->>ComplianceDB: INSERT aml_alerts, compliance_cases
Compliance->>ComplianceOutbox: INSERT outbox_events<br/>(aml.flagged)
end
Compliance->>ComplianceDB: INSERT processed_events
Compliance->>ComplianceDB: COMMIT
Compliance-->>Kafka: commit offset
ComplianceOutbox->>Kafka: PUBLISH compliance.events
sequenceDiagram
participant AnyService
participant Outbox
participant Kafka
participant WebhookConsumer
participant Subscriber
AnyService->>Outbox: outbox_events row [in tx]
Outbox->>Kafka: publish event
Kafka-->>WebhookConsumer: poll
WebhookConsumer->>WebhookConsumer: dedup by event.id
WebhookConsumer->>WebhookConsumer: query subscriptions matching event.type
loop per subscription
WebhookConsumer->>WebhookConsumer: sign payload (HMAC subscription.secret)
WebhookConsumer->>Subscriber: POST URL with X-Signature
alt 2xx
WebhookConsumer->>WebhookConsumer: record delivery DELIVERED
else fail
WebhookConsumer->>WebhookConsumer: schedule retry with backoff
end
end
WebhookConsumer-->>Kafka: commit offset
| What | Guarantee |
|---|---|
Events for the same aggregateId
|
Strictly ordered (Kafka partition key = aggregateId) |
Events for the same correlationId
|
Causally ordered via causationId chain (replay tools support this) |
Events across aggregateIds |
No ordering guarantee (consumers must not assume) |
| Events across topics | No ordering guarantee |
If a consumer needs cross-aggregate ordering (rare), it must materialize via a state machine that handles arbitrary order.
- Add
eventTypeconstant to producer - Define schema (JSON Schema)
- Document in
wiki/Events-Catalog.md - Existing consumers ignore unknown types - no breaking change
| Change | Compatible? | Action |
|---|---|---|
| Add optional field | Yes Yes | Bump minor version (1.0 → 1.1) |
| Add required field | No No | New major version, dual-publish |
| Remove field | No No | Deprecation cycle 6+ months |
| Rename field | No No | Add new + deprecate old |
| Change type | No No | New major version |
// Producer publishes both v1 and v2 for 3 months
@Service
class TransactionEventPublisher {
fun publishPosted(tx: Transaction) {
outbox.write(tx.toEventV1()) // existing consumers
outbox.write(tx.toEventV2()) // new consumers
}
}After all consumers migrated → drop v1.
| Panel | Query |
|---|---|
| Production rate | rate(kafka_messages_in_total[1m]) |
| Consumer lag (per group) | kafka_consumer_lag |
| DLQ depth | kafka_topic_partition_log_size{topic=~".*\\.dlq"} |
| Outbox pending depth (per schema) | outbox_events_pending |
| End-to-end latency p99 | histogram_quantile(0.99, rate(event_e2e_latency_bucket[5m])) |
| Retry topic depth | kafka_topic_partition_log_size{topic=~".*\\.retry"} |
| Failed publishes | rate(outbox_publish_failures_total[5m]) |
Dashboard JSON in deploy/grafana/dashboards/event-flow.json.
| Alert | Condition | Action |
|---|---|---|
| Outbox backlog | outbox_events_pending > 1000 for 5m |
PagerDuty |
| Consumer lag | kafka_consumer_lag > 10000 for 5m |
PagerDuty |
| DLQ non-zero | kafka_topic_partition_log_size{topic=~".*\\.dlq"} > 0 |
Slack #compliance |
| Outbox failed | outbox_events_failed > 0 |
PagerDuty |
| End-to-end latency | event_e2e_p99 > 30s for 5m |
Slack #engineering |
For v0.1: events serialized as JSON in Kafka, no registry. Sufficient for MVP.
For v0.2+: introduce Confluent Schema Registry (Kafka-compatible) or Apicurio for:
- Centralized schema management
- Producer/consumer compatibility checks
- Avro/Protobuf options for performance-critical streams (e.g.
ledger.eventsif volume warrants)
Decision deferred to ADR after v0.1 production usage data.
- TLS required between services and brokers in production
- SASL_SCRAM authentication per service identity
- Topic ACLs: producer roles can only publish to their own topics; consumer roles can only consume topics they're authorized for
- Encryption at rest: broker-level (Redpanda native, Kafka via tiered storage encryption)
- PII never serialized in events: only IDs, hashes, references. PII pulled via direct API call by consumer if needed (with audit).
-
Audit topic (
system.audit) is signed (event-level signing) for forensic integrity
- Outbox is the source of truth - even if Kafka is wiped, all events can be re-published from PostgreSQL
- Topic snapshots to S3 (compaction + tiered storage) for 90-day cold replay
- Consumer offsets backed up nightly (kafka-consumer-groups CLI)
- DLQ never auto-purged - operator manually clears after resolution
- No Kafka Streams or KSQL (overkill for v0.1)
- No Avro/Protobuf (JSON simpler for early dev; revisit at scale)
- No Schema registry (revisit when stabilizing schema versions)
- No Multi-region Kafka mirror (v1.5+)
- No Cross-cluster ordering (single-region only)
- No Event sourcing as system pattern (the journal is the only event-sourced aggregate; rest is state-stored with outbox)
These show up in the roadmap when actual demand emerges.
- Architecture-Overview - high-level architecture
- Architecture-Resilience - caching, sagas, fault tolerance
- Domain-Model - aggregates that produce events
- User-Flows - end-to-end flows
- ADR-0003 - why outbox over direct Kafka
- ADR-0006 - broker choice
- Overview
- Services
- Data Model
- Domain Model
- Event Flow
- Security
- Observability
- Resilience
- SLA / SLI / SLO