Skip to content

Commit

Permalink
feat: add dead letter queue to store faulty blocks
Browse files Browse the repository at this point in the history
  • Loading branch information
ingvaar committed Apr 13, 2022
1 parent 6cd73c7 commit c57d36b
Showing 1 changed file with 40 additions and 19 deletions.
59 changes: 40 additions & 19 deletions src/main/kotlin/com/okp4/processor/cosmos/topology.kt
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,7 @@ import org.apache.kafka.common.serialization.Serdes
import org.apache.kafka.streams.KeyValue
import org.apache.kafka.streams.StreamsBuilder
import org.apache.kafka.streams.Topology
import org.apache.kafka.streams.kstream.Consumed
import org.apache.kafka.streams.kstream.Named
import org.apache.kafka.streams.kstream.Produced
import org.apache.kafka.streams.kstream.*
import org.slf4j.LoggerFactory
import tendermint.types.BlockOuterClass
import java.util.*
Expand All @@ -22,34 +20,57 @@ fun topology(props: Properties): Topology {
val topicOut = requireNotNull(props.getProperty("topic.out")) {
"Option 'topic.out' was not specified."
}
val topicError = requireNotNull(props.getProperty("topic.error")) {
"Option 'topic.error' was not specified."
}

return StreamsBuilder()
.apply {
stream(topicIn, Consumed.with(Serdes.String(), Serdes.ByteArray()).withName("input"))
.map(
{ k, v ->
try {
KeyValue(k, BlockOuterClass.Block.parseFrom(v))
KeyValue(k, Pair(BlockOuterClass.Block.parseFrom(v).toByteArray(), null))
} catch (e: Exception) {
logger.error("Deserialization failed for block with key $k: ${e.message}")
KeyValue(k, BlockOuterClass.Block.getDefaultInstance())
KeyValue(k, Pair(v, e.message))
}
}, Named.`as`("block-deserialization")
)
.split()
.branch({ _, v -> !v.second.isNullOrEmpty() }, Branched.withConsumer { ks ->
ks.mapValues { pair ->
pair.first
}
.to(
topicError, Produced.with(Serdes.String(), Serdes.ByteArray()).withName("error")
)
})
.defaultBranch(Branched.withConsumer { ks ->
ks.mapValues(
{ pair ->
BlockOuterClass.Block.parseFrom(pair.first)
},
Named.`as`("extract-block")
)
.peek(
{ _, block -> logger.debug("→ block ${block.header.height} (${block.data.txsCount} txs)") },
Named.`as`("log")
).flatMapValues(
{ block ->
block.data.txsList
}, Named.`as`("extract-transactions")
).mapValues(
.peek(
{ _, block -> logger.debug("→ block ${block.header.height} (${block.data.txsCount} txs)") },
Named.`as`("log")
)
.flatMapValues(
{ block ->
block.data.txsList
}, Named.`as`("extract-transactions")
)
.mapValues(
{ tx ->
tx.toByteArray()
}, Named.`as`("convert-transactions-to-bytearray")
).to(
topicOut, Produced.with(Serdes.String(), Serdes.ByteArray()).withName("output")
)
}.build()
}

)
.to(
topicOut, Produced.with(Serdes.String(), Serdes.ByteArray()).withName("output")
)
})

}.build()
}

0 comments on commit c57d36b

Please sign in to comment.