diff --git a/build.gradle.kts b/build.gradle.kts index 64b24e9e..86c30171 100644 --- a/build.gradle.kts +++ b/build.gradle.kts @@ -60,6 +60,7 @@ allprojects { showStackTraces = true exceptionFormat = TestExceptionFormat.FULL events = setOf(TestLogEvent.PASSED, TestLogEvent.SKIPPED, TestLogEvent.FAILED) + showStandardStreams = true } } diff --git a/dp-inntekt-api/src/main/kotlin/no/nav/dagpenger/inntekt/subsumsjonbrukt/KafkaSubsumsjonBruktDataConsumer.kt b/dp-inntekt-api/src/main/kotlin/no/nav/dagpenger/inntekt/subsumsjonbrukt/KafkaSubsumsjonBruktDataConsumer.kt index 05f47d29..eb91909a 100644 --- a/dp-inntekt-api/src/main/kotlin/no/nav/dagpenger/inntekt/subsumsjonbrukt/KafkaSubsumsjonBruktDataConsumer.kt +++ b/dp-inntekt-api/src/main/kotlin/no/nav/dagpenger/inntekt/subsumsjonbrukt/KafkaSubsumsjonBruktDataConsumer.kt @@ -3,6 +3,7 @@ package no.nav.dagpenger.inntekt.subsumsjonbrukt import kotlinx.coroutines.CoroutineScope import kotlinx.coroutines.Dispatchers import kotlinx.coroutines.Job +import kotlinx.coroutines.SupervisorJob import kotlinx.coroutines.launch import mu.KotlinLogging import no.nav.dagpenger.events.Packet @@ -26,31 +27,24 @@ import kotlin.coroutines.CoroutineContext internal class KafkaSubsumsjonBruktDataConsumer( private val config: Configuration, - private val inntektStore: InntektStore + private val inntektStore: InntektStore, + private val graceDuration: Duration = Duration.ofHours(3) ) : CoroutineScope, HealthCheck { private val logger = KotlinLogging.logger { } override val coroutineContext: CoroutineContext get() = Dispatchers.IO + job - val grace by lazy { - Grace() - } - - data class Grace( - val duration: Duration = Duration.ofHours(3), - val from: ZonedDateTime = ZonedDateTime.now(ZoneOffset.UTC) - ) { - private val expires = from.plus(duration) - fun expired() = ZonedDateTime.now(ZoneOffset.UTC).isAfter(expires) + private val grace by lazy { + Grace(duration = graceDuration) } private val job: Job by lazy { - Job() + SupervisorJob() } fun listen() { - launch { + launch(coroutineContext) { val creds = config.kafka.user?.let { u -> config.kafka.password?.let { p -> KafkaCredential(username = u, password = p) @@ -97,16 +91,8 @@ internal class KafkaSubsumsjonBruktDataConsumer( } } } catch (e: Exception) { - when (e) { - is RetriableException, - is SQLTransientConnectionException -> { - logger.warn("Retriable exception, looping back", e) - } - else -> { - logger.error("Unexpected exception while consuming messages. Stopping", e) - stop() - } - } + logger.error("Unexpected exception while consuming messages. Stopping consumer, grace period ${grace.duration.seconds / 60} minutes", e) + stop() } } } @@ -126,4 +112,12 @@ internal class KafkaSubsumsjonBruktDataConsumer( logger.info { "Stopping ${config.application.id} consumer" } job.cancel() } + + data class Grace( + val duration: Duration = Duration.ofHours(3), + val from: ZonedDateTime = ZonedDateTime.now(ZoneOffset.UTC) + ) { + private val expires = from.plus(duration) + fun expired() = ZonedDateTime.now(ZoneOffset.UTC).isAfter(expires) + } } diff --git a/dp-inntekt-api/src/test/kotlin/no/nav/dagpenger/inntekt/subsumsjonbrukt/KafkaSubsumsjonBruktDataConsumerTest.kt b/dp-inntekt-api/src/test/kotlin/no/nav/dagpenger/inntekt/subsumsjonbrukt/KafkaSubsumsjonBruktDataConsumerTest.kt index dbfe1d92..b9b00832 100644 --- a/dp-inntekt-api/src/test/kotlin/no/nav/dagpenger/inntekt/subsumsjonbrukt/KafkaSubsumsjonBruktDataConsumerTest.kt +++ b/dp-inntekt-api/src/test/kotlin/no/nav/dagpenger/inntekt/subsumsjonbrukt/KafkaSubsumsjonBruktDataConsumerTest.kt @@ -2,9 +2,10 @@ package no.nav.dagpenger.inntekt.subsumsjonbrukt import de.huxhorn.sulky.ulid.ULID import io.kotest.matchers.shouldBe -import io.mockk.every +import io.mockk.coEvery import io.mockk.mockk import io.mockk.verify + import kotlinx.coroutines.runBlocking import mu.KotlinLogging import no.nav.dagpenger.events.moshiInstance @@ -19,12 +20,14 @@ import org.apache.kafka.clients.producer.ProducerRecord import org.apache.kafka.common.serialization.StringSerializer import org.junit.jupiter.api.Test import org.testcontainers.containers.KafkaContainer +import java.sql.SQLTransientConnectionException +import java.time.Duration import java.time.ZoneOffset import java.time.ZonedDateTime import java.util.concurrent.TimeUnit private val LOGGER = KotlinLogging.logger { } -class KafkaSubsumsjonBruktDataConsumerTest { +internal class KafkaSubsumsjonBruktDataConsumerTest { private object Kafka { val instance by lazy { KafkaContainer("5.3.1").apply { this.start() } @@ -46,9 +49,9 @@ class KafkaSubsumsjonBruktDataConsumerTest { @Test fun `Should mark inntekt id as used`() { runBlocking { - val storeMock = mockk(relaxed = false).apply { - every { this@apply.markerInntektBrukt(any()) } returns 1 - } + val inntektId = InntektId(ULID().nextULID()) + val storeMock = mockk(relaxed = false) + coEvery { storeMock.markerInntektBrukt(inntektId) } returns 1 val config = Configuration().run { copy(kafka = kafka.copy(brokers = Kafka.instance.bootstrapServers, user = null, password = null)) } @@ -57,8 +60,7 @@ class KafkaSubsumsjonBruktDataConsumerTest { listen() } - val id = ULID().nextULID() - val bruktSubsumsjonData = mapOf("faktum" to mapOf("inntektsId" to id)) + val bruktSubsumsjonData = mapOf("faktum" to mapOf("inntektsId" to inntektId.id)) val metaData = producer.send(ProducerRecord(config.subsumsjonBruktDataTopic, "test", adapter.toJson(bruktSubsumsjonData))) .get(5, TimeUnit.SECONDS) @@ -67,19 +69,17 @@ class KafkaSubsumsjonBruktDataConsumerTest { TimeUnit.MILLISECONDS.sleep(500) verify(exactly = 1) { - storeMock.markerInntektBrukt(InntektId(id)) + storeMock.markerInntektBrukt(inntektId) } - consumer.status() shouldBe HealthStatus.UP + consumer.stop() } } @Test fun `Cannot mark inntekt id as used if not present in faktum`() { runBlocking { - val storeMock = mockk(relaxed = false).apply { - every { this@apply.markerInntektBrukt(any()) } returns 1 - } + val storeMock = mockk(relaxed = false) val config = Configuration().run { copy(kafka = kafka.copy(brokers = Kafka.instance.bootstrapServers, user = null, password = null)) } @@ -101,33 +101,33 @@ class KafkaSubsumsjonBruktDataConsumerTest { } consumer.status() shouldBe HealthStatus.UP + consumer.stop() } } @Test fun `Should have grace period on health status when job is no longer active`() { runBlocking { - val storeMock = mockk(relaxed = false).apply { - every { this@apply.markerInntektBrukt(any()) } returns 1 - } + val inntektId = InntektId(ULID().nextULID()) + val storeMock = mockk(relaxed = false) + coEvery { storeMock.markerInntektBrukt(inntektId) } throws SQLTransientConnectionException("BLÆ") val config = Configuration().run { copy(kafka = kafka.copy(brokers = Kafka.instance.bootstrapServers, user = null, password = null)) } - val consumer = KafkaSubsumsjonBruktDataConsumer(config, storeMock).apply { + val consumer = KafkaSubsumsjonBruktDataConsumer(config = config, inntektStore = storeMock, graceDuration = Duration.ofMillis(1)).apply { listen() } - val illegalInntektId = "bla bla bla" // should - val bruktSubsumsjonData = mapOf("faktum" to mapOf("inntektsId" to illegalInntektId)) + val bruktSubsumsjonData = mapOf("faktum" to mapOf("inntektsId" to inntektId.id)) val metaData = producer.send(ProducerRecord(config.subsumsjonBruktDataTopic, "test", adapter.toJson(bruktSubsumsjonData))) .get(5, TimeUnit.SECONDS) - LOGGER.info("Producer produced $bruktSubsumsjonData with meta $metaData") + LOGGER.info("Producer produced $bruktSubsumsjonData with meta $metaData + should fail") - TimeUnit.MILLISECONDS.sleep(500) + TimeUnit.MILLISECONDS.sleep(1500) - consumer.status() shouldBe HealthStatus.UP + consumer.status() shouldBe HealthStatus.DOWN } }