diff --git a/src/main/kotlin/com/okp4/processor/cosmos/topology.kt b/src/main/kotlin/com/okp4/processor/cosmos/topology.kt index bfa8b88..4d9e2ed 100644 --- a/src/main/kotlin/com/okp4/processor/cosmos/topology.kt +++ b/src/main/kotlin/com/okp4/processor/cosmos/topology.kt @@ -22,9 +22,7 @@ fun topology(props: Properties): Topology { val topicOut = requireNotNull(props.getProperty("topic.out")) { "Option 'topic.out' was not specified." } - val topicError = requireNotNull(props.getProperty("topic.error")) { - "Option 'topic.error' was not specified." - } + val topicError: String? = props.getProperty("topic.error") return StreamsBuilder().apply { stream(topicIn, Consumed.with(Serdes.String(), Serdes.ByteArray()).withName("input")).mapValues( @@ -37,37 +35,43 @@ fun topology(props: Properties): Topology { ks.peek( { k, v -> v.second.onFailure { - logger.warn("Deserialization failed for block with key <$k> (block will be sent to topic $topicError): ${it.message}", it) + logger.warn("Deserialization failed for block with key <$k>: ${it.message}", it) } }, Named.`as`("log-deserialization-failure") ) - .mapValues({ pair -> pair.first }, Named.`as`("extract-original-bytearray")).to( - topicError, Produced.with(Serdes.String(), Serdes.ByteArray()).withName("error") + .mapValues({ pair -> pair.first }, Named.`as`("extract-original-bytearray")) + .apply { + if (!topicError.isNullOrEmpty()) { + logger.info("Failed block will be sent to the topic $topicError") + to( + topicError, Produced.with(Serdes.String(), Serdes.ByteArray()).withName("error") + ) + } + } + } + ).defaultBranch( + Branched.withConsumer { ks -> + ks.mapValues( + { v -> + v.second.getOrThrow() + }, Named.`as`("extract-block") + ).peek( + { _, block -> logger.debug("→ block ${block.header.height} (${block.data.txsCount} txs)") }, + Named.`as`("log-block-extraction") + ).flatMapValues( + { block -> + block.data.txsList + }, Named.`as`("extract-transactions") + ).mapValues( + { tx -> + tx.toByteArray() + }, Named.`as`("convert-transactions-to-bytearray") + ).to( + topicOut, Produced.with(Serdes.String(), Serdes.ByteArray()).withName("output") + ) + } ) - } - ).defaultBranch( - Branched.withConsumer { ks -> - ks.mapValues( - { v -> - v.second.getOrThrow() - }, Named.`as`("extract-block") - ).peek( - { _, block -> logger.debug("→ block ${block.header.height} (${block.data.txsCount} txs)") }, - Named.`as`("log-block-extraction") - ).flatMapValues( - { block -> - block.data.txsList - }, Named.`as`("extract-transactions") - ).mapValues( - { tx -> - tx.toByteArray() - }, Named.`as`("convert-transactions-to-bytearray") - ).to( - topicOut, Produced.with(Serdes.String(), Serdes.ByteArray()).withName("output") - ) - } - ) - }.build() - } - \ No newline at end of file + }.build() + } + \ No newline at end of file