Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
64 changes: 62 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ class OrderService(
}
```

Autoconfiguration handles scheduling, retries, and delivery automatically.
Autoconfiguration handles scheduling, retries, and delivery automatically. For Micrometer metrics, also add `okapi-micrometer` — see [Observability](#observability).

**Using Kafka instead of HTTP?** Swap the deliverer bean and delivery info:

Expand Down Expand Up @@ -95,6 +95,63 @@ Okapi implements the [transactional outbox pattern](https://softwaremill.com/mic
- **Concurrent processing** — multiple processors can run in parallel using `FOR UPDATE SKIP LOCKED`, so messages are never processed twice simultaneously.
- **Delivery result classification** — each transport classifies errors as `Success`, `RetriableFailure`, or `PermanentFailure`. For example, HTTP 429 is retriable while HTTP 400 is permanent.

## Observability

Add `okapi-micrometer` alongside `okapi-spring-boot` (from the Quick Start above) to get Micrometer metrics:

```kotlin
implementation("com.softwaremill.okapi:okapi-micrometer")
```

With Spring Boot Actuator and a Prometheus registry (`micrometer-registry-prometheus`) on the classpath, metrics are automatically exposed on `/actuator/prometheus`. They are also visible via `/actuator/metrics`.

| Metric | Type | Description |
|--------|------|-------------|
| `okapi.entries.delivered` | Counter | Successfully delivered entries |
| `okapi.entries.retry.scheduled` | Counter | Failed attempts rescheduled for retry |
| `okapi.entries.failed` | Counter | Permanently failed entries |
| `okapi.batch.duration` | Timer | Processing time per batch |
| `okapi.entries.count` | Gauge | Current entry count (tag: `status=pending\|delivered\|failed`) |
| `okapi.entries.lag.seconds` | Gauge | Age of oldest entry in seconds (tag: `status`) |

### Multi-instance deployments

Counters and timers (`okapi.entries.delivered`, `okapi.entries.retry.scheduled`, `okapi.entries.failed`, `okapi.batch.duration`) report work performed by **each instance** — aggregate with `sum`:

```promql
sum(rate(okapi_entries_delivered_total[5m]))
```

Gauges (`okapi.entries.count`, `okapi.entries.lag.seconds`) reflect the **shared outbox state** and are reported identically by every instance. Aggregate with `max by (status)`, not `sum`:

```promql
max by (status) (okapi_entries_count)
```

Polling cost per instance is `2 queries / okapi.metrics.refresh-interval` (default `2 queries / 15s`).

### Without Spring Boot

`okapi-micrometer` has no Spring dependency. Construct the beans manually and pass a `MeterRegistry`. `MicrometerOutboxMetrics` requires a `TransactionRunner` for Exposed-backed stores — see the class KDoc.

For periodic gauge refresh, use the framework-agnostic `OutboxMetricsRefresher` (single daemon thread):

```kotlin
val listener = MicrometerOutboxListener(meterRegistry)
val metrics = MicrometerOutboxMetrics(store, meterRegistry, transactionRunner)

val refresher = OutboxMetricsRefresher(metrics, Duration.ofSeconds(15))
refresher.start()
// on application shutdown:
refresher.close()
```

Or call `metrics.refresh()` from your own scheduler (Ktor coroutine, `ScheduledExecutorService`, etc.) — `refresh()` is thread-safe.

### Custom listener

Implement `OutboxProcessorListener` to react to delivery events (logging, alerting, custom metrics). `OutboxProcessor` accepts a single listener; to combine multiple, implement a composite that delegates to each.

## Modules

```mermaid
Expand All @@ -103,9 +160,11 @@ graph BT
MY[okapi-mysql] --> CORE
HTTP[okapi-http] --> CORE
KAFKA[okapi-kafka] --> CORE
MICRO[okapi-micrometer] --> CORE
SPRING[okapi-spring-boot] --> CORE
SPRING -.->|compileOnly| PG
SPRING -.->|compileOnly| MY
SPRING -.->|compileOnly| MICRO
BOM[okapi-bom]

style CORE fill:#4a9eff,color:#fff
Expand All @@ -119,7 +178,8 @@ graph BT
| `okapi-mysql` | MySQL 8+ storage via Exposed ORM |
| `okapi-http` | HTTP webhook delivery (JDK HttpClient) |
| `okapi-kafka` | Kafka topic publishing |
| `okapi-spring-boot` | Spring Boot autoconfiguration (auto-detects store and transports) |
| `okapi-micrometer` | Micrometer metrics (counters, timers, gauges) |
| `okapi-spring-boot` | Spring Boot autoconfiguration (auto-detects store, transports, and metrics) |
| `okapi-bom` | Bill of Materials for version alignment |

## Compatibility
Expand Down
3 changes: 3 additions & 0 deletions gradle/libs.versions.toml
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ wiremock = "3.13.2"
slf4j = "2.0.17"
assertj = "3.27.7"
h2 = "2.4.240"
micrometer = "1.15.6"

[libraries]
kotlinGradlePlugin = { module = "org.jetbrains.kotlin:kotlin-gradle-plugin", version.ref = "kotlin" }
Expand Down Expand Up @@ -44,6 +45,8 @@ springBootAutoconfigure = { module = "org.springframework.boot:spring-boot-autoc
springBootStarterValidation = { module = "org.springframework.boot:spring-boot-starter-validation", version.ref = "springBoot" }
springBootTest = { module = "org.springframework.boot:spring-boot-test", version.ref = "springBoot" }
assertjCore = { module = "org.assertj:assertj-core", version.ref = "assertj" }
micrometerCore = { module = "io.micrometer:micrometer-core", version.ref = "micrometer" }
micrometerTest = { module = "io.micrometer:micrometer-test", version.ref = "micrometer" }
wiremock = { module = "org.wiremock:wiremock", version.ref = "wiremock" }
slf4jApi = { module = "org.slf4j:slf4j-api", version.ref = "slf4j" }
slf4jSimple = { module = "org.slf4j:slf4j-simple", version.ref = "slf4j" }
Expand Down
1 change: 1 addition & 0 deletions okapi-bom/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -13,5 +13,6 @@ dependencies {
api(project(":okapi-http"))
api(project(":okapi-kafka"))
api(project(":okapi-spring-boot"))
api(project(":okapi-micrometer"))
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
package com.softwaremill.okapi.core

import java.time.Duration

/**
* Outcome of processing a single [OutboxEntry], emitted by [OutboxProcessor]
* to [OutboxProcessorListener].
*/
sealed interface OutboxProcessingEvent {
val entry: OutboxEntry

/** Wall-clock duration of the delivery attempt, excluding the database update. */
val duration: Duration

data class Delivered(override val entry: OutboxEntry, override val duration: Duration) : OutboxProcessingEvent
data class RetryScheduled(override val entry: OutboxEntry, override val duration: Duration, val error: String) : OutboxProcessingEvent
data class Failed(override val entry: OutboxEntry, override val duration: Duration, val error: String) : OutboxProcessingEvent
}
Original file line number Diff line number Diff line change
@@ -1,19 +1,61 @@
package com.softwaremill.okapi.core

import org.slf4j.LoggerFactory
import java.time.Clock
import java.time.Duration

/**
* Orchestrates a single processing cycle: claims pending entries from [OutboxStore],
* delegates each to [OutboxEntryProcessor], and persists the result.
*
* Transaction management is the caller's responsibility.
* An optional [OutboxProcessorListener] is notified after each entry and after the
* full batch. Exceptions in the listener are caught and logged — they never break
* processing. Transaction management is the caller's responsibility.
*/
class OutboxProcessor(
private val store: OutboxStore,
private val entryProcessor: OutboxEntryProcessor,
private val listener: OutboxProcessorListener? = null,
private val clock: Clock = Clock.systemUTC(),
) {
fun processNext(limit: Int = 10) {
val batchStart = clock.instant()
var count = 0
store.claimPending(limit).forEach { entry ->
val entryStart = clock.instant()
val updated = entryProcessor.process(entry)
val deliveryDuration = Duration.between(entryStart, clock.instant())
store.updateAfterProcessing(updated)
count++
notifyEntry(updated, deliveryDuration)
}
notifyBatch(count, Duration.between(batchStart, clock.instant()))
}

private fun notifyEntry(updated: OutboxEntry, duration: Duration) {
if (listener == null) return
try {
val event = when (updated.status) {
OutboxStatus.DELIVERED -> OutboxProcessingEvent.Delivered(updated, duration)
OutboxStatus.PENDING -> OutboxProcessingEvent.RetryScheduled(updated, duration, updated.lastError ?: "")
OutboxStatus.FAILED -> OutboxProcessingEvent.Failed(updated, duration, updated.lastError ?: "")
}
listener.onEntryProcessed(event)
} catch (e: Exception) {
logger.warn("OutboxProcessorListener.onEntryProcessed failed", e)
}
}

private fun notifyBatch(count: Int, duration: Duration) {
if (listener == null) return
try {
listener.onBatchProcessed(count, duration)
} catch (e: Exception) {
logger.warn("OutboxProcessorListener.onBatchProcessed failed", e)
}
}

companion object {
private val logger = LoggerFactory.getLogger(OutboxProcessor::class.java)
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
package com.softwaremill.okapi.core

import java.time.Duration

/**
* Callback interface for observing [OutboxProcessor] activity.
*
* Default no-op implementations allow consumers to override only the
* methods they care about. Exceptions thrown by implementations are
* caught and logged — they never break processing.
*
* [OutboxProcessor] accepts a single listener. To combine multiple listeners,
* implement a composite that delegates to each.
*/
interface OutboxProcessorListener {
/** Called after each entry is processed (delivered, retried, or failed). */
fun onEntryProcessed(event: OutboxProcessingEvent) {}

/** Called after a full batch completes (even if empty). */
fun onBatchProcessed(processedCount: Int, duration: Duration) {}
}
Loading