Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
package com.softwaremill.okapi.http

import com.fasterxml.jackson.core.JsonProcessingException
import com.softwaremill.okapi.core.DeliveryResult
import com.softwaremill.okapi.core.MessageDeliverer
import com.softwaremill.okapi.core.OutboxEntry
import java.io.IOException
import java.net.URI
import java.net.http.HttpClient
import java.net.http.HttpRequest
Expand All @@ -17,7 +19,10 @@ import java.time.Duration
* - 5xx, 429, 408 → [DeliveryResult.RetriableFailure] (configurable via [retriableStatusCodes])
* - other → [DeliveryResult.PermanentFailure]
*
* Connection errors are treated as retriable.
* Exception classification:
* - [IOException] (connection/timeout) → [DeliveryResult.RetriableFailure]
* - [InterruptedException] → [DeliveryResult.RetriableFailure] (interrupt flag restored)
* - other (corrupt metadata, unknown service, malformed URI) → [DeliveryResult.PermanentFailure]
*/
class HttpMessageDeliverer @JvmOverloads constructor(
private val urlResolver: ServiceUrlResolver,
Expand All @@ -26,36 +31,42 @@ class HttpMessageDeliverer @JvmOverloads constructor(
) : MessageDeliverer {
override val type: String = HttpDeliveryInfo.TYPE

override fun deliver(entry: OutboxEntry): DeliveryResult {
override fun deliver(entry: OutboxEntry): DeliveryResult = try {
val info = HttpDeliveryInfo.deserialize(entry.deliveryMetadata)
val url = urlResolver.resolve(info.serviceName) + info.endpointPath

return try {
val request =
HttpRequest
.newBuilder()
.uri(URI.create(url))
.timeout(REQUEST_TIMEOUT)
.header("Content-Type", "application/json")
.method(
info.httpMethod.name,
HttpRequest.BodyPublishers.ofString(entry.payload),
)
.apply { info.headers.forEach { (k, v) -> header(k, v) } }
.build()
val request =
HttpRequest
.newBuilder()
.uri(URI.create(url))
.timeout(REQUEST_TIMEOUT)
.header("Content-Type", "application/json")
.method(
info.httpMethod.name,
HttpRequest.BodyPublishers.ofString(entry.payload),
)
.apply { info.headers.forEach { (k, v) -> header(k, v) } }
.build()

val response = httpClient.send(request, HttpResponse.BodyHandlers.ofString())
val status = response.statusCode()
val body = response.body()
val response = httpClient.send(request, HttpResponse.BodyHandlers.ofString())
val status = response.statusCode()
val body = response.body()

when {
status in 200..299 -> DeliveryResult.Success
status in retriableStatusCodes -> DeliveryResult.RetriableFailure("HTTP $status: $body")
else -> DeliveryResult.PermanentFailure("HTTP $status: $body")
}
} catch (e: Exception) {
DeliveryResult.RetriableFailure(e.message ?: "Connection failed")
when {
status in 200..299 -> DeliveryResult.Success
status in retriableStatusCodes -> DeliveryResult.RetriableFailure("HTTP $status: $body")
else -> DeliveryResult.PermanentFailure("HTTP $status: $body")
}
} catch (e: JsonProcessingException) {
// Subtype of IOException, but corrupt metadata won't fix itself — classify before IOException.
DeliveryResult.PermanentFailure(e.message ?: e.javaClass.simpleName)
} catch (e: IOException) {
DeliveryResult.RetriableFailure(e.message ?: "Connection failed")
} catch (e: InterruptedException) {
Thread.currentThread().interrupt()
DeliveryResult.RetriableFailure(e.message ?: "Interrupted")
} catch (e: Exception) {
DeliveryResult.PermanentFailure(e.message ?: e.javaClass.simpleName)
}

companion object {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,4 +90,15 @@ class HttpMessageDelivererTest : FunSpec({
)
deliverer.deliver(entry()).shouldBeInstanceOf<DeliveryResult.RetriableFailure>()
}

test("corrupt delivery metadata -> PermanentFailure (does not throw)") {
val poisoned = entry().copy(deliveryMetadata = "{not valid json")
deliverer.deliver(poisoned).shouldBeInstanceOf<DeliveryResult.PermanentFailure>()
}

test("urlResolver throwing -> PermanentFailure (does not throw)") {
val throwingResolver = ServiceUrlResolver { name -> error("Unknown service: $name") }
val d = HttpMessageDeliverer(throwingResolver)
d.deliver(entry()).shouldBeInstanceOf<DeliveryResult.PermanentFailure>()
}
})