Skip to content

Commit

Permalink
feat: make the error topic optional
Browse files Browse the repository at this point in the history
  • Loading branch information
ccamel authored and ingvaar committed Apr 15, 2022
1 parent 7fc8e2f commit 7441099
Showing 1 changed file with 36 additions and 32 deletions.
68 changes: 36 additions & 32 deletions src/main/kotlin/com/okp4/processor/cosmos/topology.kt
Expand Up @@ -22,9 +22,7 @@ fun topology(props: Properties): Topology {
val topicOut = requireNotNull(props.getProperty("topic.out")) {
"Option 'topic.out' was not specified."
}
val topicError = requireNotNull(props.getProperty("topic.error")) {
"Option 'topic.error' was not specified."
}
val topicError: String? = props.getProperty("topic.error")

return StreamsBuilder().apply {
stream(topicIn, Consumed.with(Serdes.String(), Serdes.ByteArray()).withName("input")).mapValues(
Expand All @@ -37,37 +35,43 @@ fun topology(props: Properties): Topology {
ks.peek(
{ k, v ->
v.second.onFailure {
logger.warn("Deserialization failed for block with key <$k> (block will be sent to topic $topicError): ${it.message}", it)
logger.warn("Deserialization failed for block with key <$k>: ${it.message}", it)
}
},
Named.`as`("log-deserialization-failure")
)
.mapValues({ pair -> pair.first }, Named.`as`("extract-original-bytearray")).to(
topicError, Produced.with(Serdes.String(), Serdes.ByteArray()).withName("error")
.mapValues({ pair -> pair.first }, Named.`as`("extract-original-bytearray"))
.apply {
if (!topicError.isNullOrEmpty()) {
logger.info("Failed block will be sent to the topic $topicError")
to(
topicError, Produced.with(Serdes.String(), Serdes.ByteArray()).withName("error")
)
}
}
}
).defaultBranch(
Branched.withConsumer { ks ->
ks.mapValues(
{ v ->
v.second.getOrThrow()
}, Named.`as`("extract-block")
).peek(
{ _, block -> logger.debug("→ block ${block.header.height} (${block.data.txsCount} txs)") },
Named.`as`("log-block-extraction")
).flatMapValues(
{ block ->
block.data.txsList
}, Named.`as`("extract-transactions")
).mapValues(
{ tx ->
tx.toByteArray()
}, Named.`as`("convert-transactions-to-bytearray")
).to(
topicOut, Produced.with(Serdes.String(), Serdes.ByteArray()).withName("output")
)
}
)
}
).defaultBranch(
Branched.withConsumer { ks ->
ks.mapValues(
{ v ->
v.second.getOrThrow()
}, Named.`as`("extract-block")
).peek(
{ _, block -> logger.debug("→ block ${block.header.height} (${block.data.txsCount} txs)") },
Named.`as`("log-block-extraction")
).flatMapValues(
{ block ->
block.data.txsList
}, Named.`as`("extract-transactions")
).mapValues(
{ tx ->
tx.toByteArray()
}, Named.`as`("convert-transactions-to-bytearray")
).to(
topicOut, Produced.with(Serdes.String(), Serdes.ByteArray()).withName("output")
)
}
)
}.build()
}

}.build()
}

0 comments on commit 7441099

Please sign in to comment.