Skip to content

Commit

Permalink
Read from opprett sykmelding topic (#234)
Browse files Browse the repository at this point in the history
* Added OpprettSykmeldingConsumer and tried to create sykmelding

Co-authored-by: Andreas <danduras@gmail.com>
Co-authored-by: Helene Arnesen <helene.arnesen@nav.no>

* updated logging

Co-authored-by: Andreas <danduras@gmail.com>
Co-authored-by: Helene Arnesen <helene.arnesen@nav.no>

* more log

Co-authored-by: Joakim Taule Kartveit <joakimkartveit@gmail.com>

* Use Dispatchers.IO

---------

Co-authored-by: Jørn-Are Flaten <ja.flaten91@gmail.com>
Co-authored-by: Andreas <danduras@gmail.com>
Co-authored-by: Helene Arnesen <helene.arnesen@nav.no>
  • Loading branch information
4 people committed Nov 22, 2023
1 parent 1797241 commit f9ded01
Show file tree
Hide file tree
Showing 8 changed files with 192 additions and 3 deletions.
12 changes: 11 additions & 1 deletion src/main/kotlin/no/nav/syfo/Bootstrap.kt
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import java.util.Properties
import java.util.UUID
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.DelicateCoroutinesApi
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.GlobalScope
import kotlinx.coroutines.Job
import kotlinx.coroutines.launch
Expand All @@ -45,6 +46,7 @@ import no.nav.syfo.kafka.aiven.KafkaUtils
import no.nav.syfo.kafka.toConsumerConfig
import no.nav.syfo.kafka.toProducerConfig
import no.nav.syfo.model.ReceivedSykmelding
import no.nav.syfo.opprettsykmelding.startOpprettSykmeldingConsumer
import no.nav.syfo.pdl.PdlFactory
import no.nav.syfo.service.BehandlingService
import no.nav.syfo.service.OppgaveService
Expand Down Expand Up @@ -236,6 +238,14 @@ fun main() {
behandlingService,
)

startOpprettSykmeldingConsumer(
env,
applicationState,
sykmeldingService,
safJournalpostClient,
pdlPersonService
)

applicationServer.start()
}

Expand All @@ -244,7 +254,7 @@ fun createListener(
applicationState: ApplicationState,
action: suspend CoroutineScope.() -> Unit
): Job =
GlobalScope.launch {
GlobalScope.launch(Dispatchers.IO) {
try {
action()
} catch (e: TrackableException) {
Expand Down
1 change: 1 addition & 0 deletions src/main/kotlin/no/nav/syfo/Environment.kt
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ data class Environment(
val dokArkivScope: String = getEnvVar("DOK_ARKIV_SCOPE"),
val okSykmeldingTopic: String = "teamsykmelding.ok-sykmelding",
val smregistreringTopic: String = "teamsykmelding.papir-sm-registering",
val opprettSykmeldingTopic: String = "teamsykmelding.opprett-sykmelding",
val pdlGraphqlPath: String = getEnvVar("PDL_GRAPHQL_PATH"),
val pdlScope: String = getEnvVar("PDL_SCOPE"),
val aadAccessTokenV2Url: String = getEnvVar("AZURE_OPENID_CONFIG_TOKEN_ENDPOINT"),
Expand Down
6 changes: 5 additions & 1 deletion src/main/kotlin/no/nav/syfo/client/SafDokumentClient.kt
Original file line number Diff line number Diff line change
Expand Up @@ -79,9 +79,13 @@ class SafDokumentClient(
): Skanningmetadata? {
return try {
val dokument = hentDokumentFraSaf(journalpostId, dokumentInfoId, msgId, loggingMeta)
log.info("Got document with id: $dokumentInfoId")
safeUnmarshalSkanningmetadata(dokument.byteInputStream(Charsets.UTF_8))
} catch (ex: JAXBException) {
log.warn("Klarte ikke å tolke OCR-dokument, ${fields(loggingMeta)}", ex)
log.warn(
"Klarte ikke å tolke OCR-dokument for dokument $dokumentInfoId, ${fields(loggingMeta)}",
ex
)
PAPIRSM_HENTDOK_FEIL.inc()
null
}
Expand Down
5 changes: 4 additions & 1 deletion src/main/kotlin/no/nav/syfo/client/SafJournalpostClient.kt
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,10 @@ fun finnDokumentIdForOcr(dokumentListe: List<Dokument>?, loggingMeta: LoggingMet
dokumentListe?.forEach { dokument ->
dokument.dokumentvarianter.forEach {
if (it.variantformat.name == "ORIGINAL") {
log.info("Fant OCR-dokument {}", fields(loggingMeta))
log.info(
"Fant OCR-dokument dokumentInfoId: ${dokument.dokumentInfoId} {}",
fields(loggingMeta)
)
return dokument.dokumentInfoId
}
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
package no.nav.syfo.opprettsykmelding

import com.fasterxml.jackson.databind.DeserializationFeature
import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule
import com.fasterxml.jackson.module.kotlin.jacksonObjectMapper
import com.fasterxml.jackson.module.kotlin.readValue
import no.nav.syfo.opprettsykmelding.model.OpprettSykmeldingRecord
import org.apache.kafka.common.serialization.Deserializer

class OpprettSykmeldingDeserializer : Deserializer<OpprettSykmeldingRecord> {
private val objectMapper =
jacksonObjectMapper().apply {
registerModule(JavaTimeModule())
configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false)
}

override fun deserialize(topic: String, data: ByteArray): OpprettSykmeldingRecord {
return objectMapper.readValue(data)
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,126 @@
package no.nav.syfo.opprettsykmelding

import java.time.Duration
import java.util.*
import kotlinx.coroutines.DelicateCoroutinesApi
import net.logstash.logback.argument.StructuredArguments.fields
import no.nav.syfo.Environment
import no.nav.syfo.application.ApplicationState
import no.nav.syfo.client.SafJournalpostClient
import no.nav.syfo.createListener
import no.nav.syfo.kafka.aiven.KafkaUtils
import no.nav.syfo.kafka.toConsumerConfig
import no.nav.syfo.opprettsykmelding.model.OpprettSykmeldingRecord
import no.nav.syfo.pdl.service.PdlPersonService
import no.nav.syfo.securelog
import no.nav.syfo.service.SykmeldingService
import no.nav.syfo.util.LoggingMeta
import org.apache.kafka.clients.consumer.KafkaConsumer
import org.apache.kafka.common.serialization.StringDeserializer
import org.slf4j.LoggerFactory

@OptIn(DelicateCoroutinesApi::class)
fun startOpprettSykmeldingConsumer(
environment: Environment,
applicationState: ApplicationState,
sykmeldingService: SykmeldingService,
safJournalpostClient: SafJournalpostClient,
pdlPersonService: PdlPersonService
) {

val consumer = "opprett-sykmelding-consumer"
val consumerProperties =
KafkaUtils.getAivenKafkaConfig(consumer)
.toConsumerConfig(
groupId = consumer,
valueDeserializer = OpprettSykmeldingDeserializer::class
)

val kafkaConsumer =
KafkaConsumer(consumerProperties, StringDeserializer(), OpprettSykmeldingDeserializer())

OpprettSykmeldingService(
kafkaConsumer = kafkaConsumer,
sykmeldingService = sykmeldingService,
env = environment,
applicationState = applicationState,
safJournalpostClient = safJournalpostClient,
pdlPersonService = pdlPersonService,
)
.start()
}

class OpprettSykmeldingService(
private val kafkaConsumer: KafkaConsumer<String, OpprettSykmeldingRecord>,
private val sykmeldingService: SykmeldingService,
private val env: Environment,
private val applicationState: ApplicationState,
private val safJournalpostClient: SafJournalpostClient,
private val pdlPersonService: PdlPersonService
) {
companion object {
private val log = LoggerFactory.getLogger(OpprettSykmeldingService::class.java)
}

private val journalPostQuery =
SafJournalpostClient::class
.java
.getResource("/graphql/findJournalpost.graphql")!!
.readText()
.replace(Regex("[\n\t]"), "")

@DelicateCoroutinesApi
fun start() {
createListener(applicationState) { consumeTopic() }
}

suspend fun consumeTopic() {
kafkaConsumer.subscribe(listOf(env.opprettSykmeldingTopic))
while (applicationState.ready) {
kafkaConsumer.poll(Duration.ofMillis(1000)).forEach { consumerRecord ->
val opprettSykmeldingRecord = consumerRecord.value()
val sykmeldingId = UUID.randomUUID().toString()
val loggingMeta =
LoggingMeta(
sykmeldingId = sykmeldingId,
journalpostId = opprettSykmeldingRecord.data.journalpostId,
hendelsesId = opprettSykmeldingRecord.data.journalpostId,
)
val journalpostId = opprettSykmeldingRecord.data.journalpostId
log.info("received opprett sykmelding for $journalpostId {}", fields(loggingMeta))
securelog.info(
"received opprett sykmelding message $opprettSykmeldingRecord {}",
fields(loggingMeta)
)
val journalpostMetadata =
safJournalpostClient.getJournalpostMetadata(
journalpostId,
journalPostQuery,
loggingMeta,
)
?: throw IllegalStateException(
"Unable to find journalpost with id $journalpostId",
)

val fnrEllerAktorId =
when (journalpostMetadata.bruker.type) {
"ORGNR" -> throw IllegalStateException("Bruker id is ORGNR")
else -> journalpostMetadata.bruker.id
}
?: throw IllegalStateException("Bruker id is null")

val pasient = pdlPersonService.getPdlPerson(fnrEllerAktorId, loggingMeta)
sykmeldingService.behandleSykmelding(
journalpostId = journalpostId,
pasient = pasient,
datoOpprettet = journalpostMetadata.datoOpprettet,
dokumentInfoIdPdf = journalpostMetadata.dokumentInfoIdPdf,
dokumentInfoId = journalpostMetadata.dokumentInfoId,
temaEndret = false,
loggingMeta = loggingMeta,
sykmeldingId = sykmeldingId,
)
}
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
package no.nav.syfo.opprettsykmelding.model

data class Metadata(
val type: String = "journalpost",
val source: String = "syk-dig",
)

data class JournalpostMetadata(
val journalpostId: String,
val tema: String,
)

data class OpprettSykmeldingRecord(
val metadata: Metadata,
val data: JournalpostMetadata,
)
9 changes: 9 additions & 0 deletions src/main/kotlin/no/nav/syfo/service/OppgaveService.kt
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@ import no.nav.syfo.log
import no.nav.syfo.metrics.PAPIRSM_FORDELINGSOPPGAVE
import no.nav.syfo.metrics.PAPIRSM_MOTTATT_UTEN_BRUKER
import no.nav.syfo.metrics.PAPIRSM_OPPGAVE
import no.nav.syfo.objectMapper
import no.nav.syfo.securelog
import no.nav.syfo.util.LoggingMeta

class OppgaveService(
Expand Down Expand Up @@ -83,6 +85,13 @@ class OppgaveService(

if (!oppgave.duplikat) {
PAPIRSM_FORDELINGSOPPGAVE.inc()
securelog.info(
"Opprettet fordelingsoppgave med {}, {} {} {}",
StructuredArguments.keyValue("oppgaveId", oppgave.oppgaveId),
StructuredArguments.keyValue("journalpostId", journalpostId),
StructuredArguments.keyValue("oppgave", objectMapper.writeValueAsString(oppgave)),
fields(loggingMeta),
)
log.info(
"Opprettet fordelingsoppgave med {}, {} {}",
StructuredArguments.keyValue("oppgaveId", oppgave.oppgaveId),
Expand Down

0 comments on commit f9ded01

Please sign in to comment.