diff --git a/src/main/kotlin/no/nav/syfo/Bootstrap.kt b/src/main/kotlin/no/nav/syfo/Bootstrap.kt index 1ece8a2..5d67c9f 100644 --- a/src/main/kotlin/no/nav/syfo/Bootstrap.kt +++ b/src/main/kotlin/no/nav/syfo/Bootstrap.kt @@ -285,6 +285,7 @@ fun launchListeners( applicationState = applicationState, aivenConsumer = kafkaConsumerJournalfoeringHendelseAiven, behandlingService = behandlingService, + env = env, ) } } @@ -293,23 +294,32 @@ suspend fun blockingApplicationLogic( applicationState: ApplicationState, aivenConsumer: KafkaConsumer, behandlingService: BehandlingService, + env: Environment, ) { while (applicationState.ready) { aivenConsumer.poll(Duration.ofMillis(1000)).forEach { consumerRecord -> - val journalfoeringHendelseRecord = consumerRecord.value() - val sykmeldingId = UUID.randomUUID().toString() - val loggingMeta = - LoggingMeta( + try { + val journalfoeringHendelseRecord = consumerRecord.value() + val sykmeldingId = UUID.randomUUID().toString() + val loggingMeta = + LoggingMeta( + sykmeldingId = sykmeldingId, + journalpostId = journalfoeringHendelseRecord.journalpostId.toString(), + hendelsesId = journalfoeringHendelseRecord.hendelsesId, + ) + + behandlingService.handleJournalpost( + journalfoeringEvent = journalfoeringHendelseRecord, + loggingMeta = loggingMeta, sykmeldingId = sykmeldingId, - journalpostId = journalfoeringHendelseRecord.journalpostId.toString(), - hendelsesId = journalfoeringHendelseRecord.hendelsesId, ) - - behandlingService.handleJournalpost( - journalfoeringEvent = journalfoeringHendelseRecord, - loggingMeta = loggingMeta, - sykmeldingId = sykmeldingId, - ) + } catch (e: Exception) { + if (env.cluster != "dev-gcp") { + throw e + } else { + log.warn("Skipping error in dev", e) + } + } } } } diff --git a/src/main/kotlin/no/nav/syfo/opprettsykmelding/OpprettSykmeldingService.kt b/src/main/kotlin/no/nav/syfo/opprettsykmelding/OpprettSykmeldingService.kt index c108919..87d6b32 100644 --- a/src/main/kotlin/no/nav/syfo/opprettsykmelding/OpprettSykmeldingService.kt +++ b/src/main/kotlin/no/nav/syfo/opprettsykmelding/OpprettSykmeldingService.kt @@ -15,6 +15,7 @@ import no.nav.syfo.pdl.service.PdlPersonService import no.nav.syfo.securelog import no.nav.syfo.service.SykmeldingService import no.nav.syfo.util.LoggingMeta +import org.apache.kafka.clients.consumer.ConsumerRecord import org.apache.kafka.clients.consumer.KafkaConsumer import org.apache.kafka.common.serialization.StringDeserializer import org.slf4j.LoggerFactory @@ -78,49 +79,62 @@ class OpprettSykmeldingService( kafkaConsumer.subscribe(listOf(env.opprettSykmeldingTopic)) while (applicationState.ready) { kafkaConsumer.poll(Duration.ofMillis(1000)).forEach { consumerRecord -> - val opprettSykmeldingRecord = consumerRecord.value() - val sykmeldingId = UUID.randomUUID().toString() - val loggingMeta = - LoggingMeta( - sykmeldingId = sykmeldingId, - journalpostId = opprettSykmeldingRecord.data.journalpostId, - hendelsesId = opprettSykmeldingRecord.data.journalpostId, - ) - val journalpostId = opprettSykmeldingRecord.data.journalpostId - log.info("received opprett sykmelding for $journalpostId {}", fields(loggingMeta)) - securelog.info( - "received opprett sykmelding message $opprettSykmeldingRecord {}", - fields(loggingMeta) - ) - val journalpostMetadata = - safJournalpostClient.getJournalpostMetadata( - journalpostId, - journalPostQuery, - loggingMeta, - ) - ?: throw IllegalStateException( - "Unable to find journalpost with id $journalpostId", - ) - - val fnrEllerAktorId = - when (journalpostMetadata.bruker.type) { - "ORGNR" -> throw IllegalStateException("Bruker id is ORGNR") - else -> journalpostMetadata.bruker.id - } - ?: throw IllegalStateException("Bruker id is null") + try { + handleOpprettSykmelding(consumerRecord) + }catch (e: Exception) { + log.error("error in handeling message", e) + if (env.cluster == "dev-gcp"){ + log.warn("skipping error in dev") + } else { + throw e + } + } + } + } + } - val pasient = pdlPersonService.getPdlPerson(fnrEllerAktorId, loggingMeta) - sykmeldingService.behandleSykmelding( - journalpostId = journalpostId, - pasient = pasient, - datoOpprettet = journalpostMetadata.datoOpprettet, - dokumentInfoIdPdf = journalpostMetadata.dokumentInfoIdPdf, - dokumentInfoId = journalpostMetadata.dokumentInfoId, - temaEndret = false, - loggingMeta = loggingMeta, - sykmeldingId = sykmeldingId, + private suspend fun handleOpprettSykmelding(consumerRecord: ConsumerRecord) { + val opprettSykmeldingRecord = consumerRecord.value() + val sykmeldingId = UUID.randomUUID().toString() + val loggingMeta = + LoggingMeta( + sykmeldingId = sykmeldingId, + journalpostId = opprettSykmeldingRecord.data.journalpostId, + hendelsesId = opprettSykmeldingRecord.data.journalpostId, + ) + val journalpostId = opprettSykmeldingRecord.data.journalpostId + log.info("received opprett sykmelding for $journalpostId {}", fields(loggingMeta)) + securelog.info( + "received opprett sykmelding message $opprettSykmeldingRecord {}", + fields(loggingMeta) + ) + val journalpostMetadata = + safJournalpostClient.getJournalpostMetadata( + journalpostId, + journalPostQuery, + loggingMeta, + ) + ?: throw IllegalStateException( + "Unable to find journalpost with id $journalpostId", ) + + val fnrEllerAktorId = + when (journalpostMetadata.bruker.type) { + "ORGNR" -> throw IllegalStateException("Bruker id is ORGNR") + else -> journalpostMetadata.bruker.id } - } + ?: throw IllegalStateException("Bruker id is null") + + val pasient = pdlPersonService.getPdlPerson(fnrEllerAktorId, loggingMeta) + sykmeldingService.behandleSykmelding( + journalpostId = journalpostId, + pasient = pasient, + datoOpprettet = journalpostMetadata.datoOpprettet, + dokumentInfoIdPdf = journalpostMetadata.dokumentInfoIdPdf, + dokumentInfoId = journalpostMetadata.dokumentInfoId, + temaEndret = false, + loggingMeta = loggingMeta, + sykmeldingId = sykmeldingId, + ) } }