Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ allprojects {
showStackTraces = true
exceptionFormat = TestExceptionFormat.FULL
events = setOf(TestLogEvent.PASSED, TestLogEvent.SKIPPED, TestLogEvent.FAILED)
showStandardStreams = true
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package no.nav.dagpenger.inntekt.subsumsjonbrukt
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.Job
import kotlinx.coroutines.SupervisorJob
import kotlinx.coroutines.launch
import mu.KotlinLogging
import no.nav.dagpenger.events.Packet
Expand All @@ -26,31 +27,24 @@ import kotlin.coroutines.CoroutineContext

internal class KafkaSubsumsjonBruktDataConsumer(
private val config: Configuration,
private val inntektStore: InntektStore
private val inntektStore: InntektStore,
private val graceDuration: Duration = Duration.ofHours(3)
) : CoroutineScope, HealthCheck {

private val logger = KotlinLogging.logger { }
override val coroutineContext: CoroutineContext
get() = Dispatchers.IO + job

val grace by lazy {
Grace()
}

data class Grace(
val duration: Duration = Duration.ofHours(3),
val from: ZonedDateTime = ZonedDateTime.now(ZoneOffset.UTC)
) {
private val expires = from.plus(duration)
fun expired() = ZonedDateTime.now(ZoneOffset.UTC).isAfter(expires)
private val grace by lazy {
Grace(duration = graceDuration)
}

private val job: Job by lazy {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Må ikke denne brukes på en eller annen måte? Vil launch automagisk starte jobber som child av denne? Såvidt jeg skjønner av docsa så må man bruke enten withContext() eller supervisionScope?

Copy link
Contributor

@gtcno gtcno Jul 28, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Klassen ER en CoroutineScope(linje 30) og CoroutineContext til scopet inneholder jobben(linje 33).

launch er en extension fun på scopet og arver dermed context og job.
The coroutine context is inherited from a CoroutineScope. Additional context elements can be specified with context argument. If the context does not have any dispatcher nor any other ContinuationInterceptor, then Dispatchers.Default is used. The parent job is inherited from a CoroutineScope as well, but it can also be overridden with a corresponding context element

Men ja det går an å gjøre det litt mer implisitt og lesbar.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ja, den er litt krøkkete, kan lage den mer lesbar

Job()
SupervisorJob()
}

fun listen() {
launch {
launch(coroutineContext) {
val creds = config.kafka.user?.let { u ->
config.kafka.password?.let { p ->
KafkaCredential(username = u, password = p)
Expand Down Expand Up @@ -97,16 +91,8 @@ internal class KafkaSubsumsjonBruktDataConsumer(
}
}
} catch (e: Exception) {
when (e) {
is RetriableException,
is SQLTransientConnectionException -> {
logger.warn("Retriable exception, looping back", e)
}
else -> {
logger.error("Unexpected exception while consuming messages. Stopping", e)
stop()
}
}
logger.error("Unexpected exception while consuming messages. Stopping consumer, grace period ${grace.duration.seconds / 60} minutes", e)
stop()
}
}
}
Expand All @@ -126,4 +112,12 @@ internal class KafkaSubsumsjonBruktDataConsumer(
logger.info { "Stopping ${config.application.id} consumer" }
job.cancel()
}

data class Grace(
val duration: Duration = Duration.ofHours(3),
val from: ZonedDateTime = ZonedDateTime.now(ZoneOffset.UTC)
) {
private val expires = from.plus(duration)
fun expired() = ZonedDateTime.now(ZoneOffset.UTC).isAfter(expires)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,10 @@ package no.nav.dagpenger.inntekt.subsumsjonbrukt

import de.huxhorn.sulky.ulid.ULID
import io.kotest.matchers.shouldBe
import io.mockk.every
import io.mockk.coEvery
import io.mockk.mockk
import io.mockk.verify

import kotlinx.coroutines.runBlocking
import mu.KotlinLogging
import no.nav.dagpenger.events.moshiInstance
Expand All @@ -19,12 +20,14 @@ import org.apache.kafka.clients.producer.ProducerRecord
import org.apache.kafka.common.serialization.StringSerializer
import org.junit.jupiter.api.Test
import org.testcontainers.containers.KafkaContainer
import java.sql.SQLTransientConnectionException
import java.time.Duration
import java.time.ZoneOffset
import java.time.ZonedDateTime
import java.util.concurrent.TimeUnit

private val LOGGER = KotlinLogging.logger { }
class KafkaSubsumsjonBruktDataConsumerTest {
internal class KafkaSubsumsjonBruktDataConsumerTest {
private object Kafka {
val instance by lazy {
KafkaContainer("5.3.1").apply { this.start() }
Expand All @@ -46,9 +49,9 @@ class KafkaSubsumsjonBruktDataConsumerTest {
@Test
fun `Should mark inntekt id as used`() {
runBlocking {
val storeMock = mockk<InntektStore>(relaxed = false).apply {
every { this@apply.markerInntektBrukt(any()) } returns 1
}
val inntektId = InntektId(ULID().nextULID())
val storeMock = mockk<InntektStore>(relaxed = false)
coEvery { storeMock.markerInntektBrukt(inntektId) } returns 1
val config = Configuration().run {
copy(kafka = kafka.copy(brokers = Kafka.instance.bootstrapServers, user = null, password = null))
}
Expand All @@ -57,8 +60,7 @@ class KafkaSubsumsjonBruktDataConsumerTest {
listen()
}

val id = ULID().nextULID()
val bruktSubsumsjonData = mapOf("faktum" to mapOf("inntektsId" to id))
val bruktSubsumsjonData = mapOf("faktum" to mapOf("inntektsId" to inntektId.id))

val metaData = producer.send(ProducerRecord(config.subsumsjonBruktDataTopic, "test", adapter.toJson(bruktSubsumsjonData)))
.get(5, TimeUnit.SECONDS)
Expand All @@ -67,19 +69,17 @@ class KafkaSubsumsjonBruktDataConsumerTest {
TimeUnit.MILLISECONDS.sleep(500)

verify(exactly = 1) {
storeMock.markerInntektBrukt(InntektId(id))
storeMock.markerInntektBrukt(inntektId)
}

consumer.status() shouldBe HealthStatus.UP
consumer.stop()
}
}

@Test
fun `Cannot mark inntekt id as used if not present in faktum`() {
runBlocking {
val storeMock = mockk<InntektStore>(relaxed = false).apply {
every { this@apply.markerInntektBrukt(any()) } returns 1
}
val storeMock = mockk<InntektStore>(relaxed = false)
val config = Configuration().run {
copy(kafka = kafka.copy(brokers = Kafka.instance.bootstrapServers, user = null, password = null))
}
Expand All @@ -101,33 +101,33 @@ class KafkaSubsumsjonBruktDataConsumerTest {
}

consumer.status() shouldBe HealthStatus.UP
consumer.stop()
}
}

@Test
fun `Should have grace period on health status when job is no longer active`() {
runBlocking {
val storeMock = mockk<InntektStore>(relaxed = false).apply {
every { this@apply.markerInntektBrukt(any()) } returns 1
}
val inntektId = InntektId(ULID().nextULID())
val storeMock = mockk<InntektStore>(relaxed = false)
coEvery { storeMock.markerInntektBrukt(inntektId) } throws SQLTransientConnectionException("BLÆ")
val config = Configuration().run {
copy(kafka = kafka.copy(brokers = Kafka.instance.bootstrapServers, user = null, password = null))
}

val consumer = KafkaSubsumsjonBruktDataConsumer(config, storeMock).apply {
val consumer = KafkaSubsumsjonBruktDataConsumer(config = config, inntektStore = storeMock, graceDuration = Duration.ofMillis(1)).apply {
listen()
}

val illegalInntektId = "bla bla bla" // should
val bruktSubsumsjonData = mapOf("faktum" to mapOf("inntektsId" to illegalInntektId))
val bruktSubsumsjonData = mapOf("faktum" to mapOf("inntektsId" to inntektId.id))

val metaData = producer.send(ProducerRecord(config.subsumsjonBruktDataTopic, "test", adapter.toJson(bruktSubsumsjonData)))
.get(5, TimeUnit.SECONDS)
LOGGER.info("Producer produced $bruktSubsumsjonData with meta $metaData")
LOGGER.info("Producer produced $bruktSubsumsjonData with meta $metaData + should fail")

TimeUnit.MILLISECONDS.sleep(500)
TimeUnit.MILLISECONDS.sleep(1500)

consumer.status() shouldBe HealthStatus.UP
consumer.status() shouldBe HealthStatus.DOWN
}
}

Expand Down