diff --git a/src/main/kotlin/com/okp4/connect/cosmos/CosmosSourceTask.kt b/src/main/kotlin/com/okp4/connect/cosmos/CosmosSourceTask.kt index abf7c23..f959c95 100644 --- a/src/main/kotlin/com/okp4/connect/cosmos/CosmosSourceTask.kt +++ b/src/main/kotlin/com/okp4/connect/cosmos/CosmosSourceTask.kt @@ -6,6 +6,7 @@ import kotlinx.coroutines.flow.* import kotlinx.coroutines.runBlocking import org.apache.kafka.connect.data.Schema import org.apache.kafka.connect.errors.ConnectException +import org.apache.kafka.connect.errors.RetriableException import org.apache.kafka.connect.source.SourceRecord import org.apache.kafka.connect.source.SourceTask import tendermint.types.BlockOuterClass.Block @@ -53,7 +54,7 @@ class CosmosSourceTask : SourceTask() { .takeWhile { !serviceClient.isClosed() } .map { serviceClient.getBlockByHeight(it) } .map { it.getOrThrow() } - .catch { if (it is StatusException && it.status.code != Status.INVALID_ARGUMENT.code) throw it } + .catch { if (it is StatusException && it.status.code != Status.INVALID_ARGUMENT.code) throw RetriableException(it) } .map { asSourceRecord(it) } .toList() } diff --git a/src/test/kotlin/com/okp4/connect/cosmos/CosmosSourceTaskTest.kt b/src/test/kotlin/com/okp4/connect/cosmos/CosmosSourceTaskTest.kt index 1537c70..640da74 100644 --- a/src/test/kotlin/com/okp4/connect/cosmos/CosmosSourceTaskTest.kt +++ b/src/test/kotlin/com/okp4/connect/cosmos/CosmosSourceTaskTest.kt @@ -8,6 +8,7 @@ import io.kotest.datatest.withData import io.kotest.matchers.shouldBe import io.mockk.* import org.apache.kafka.connect.errors.ConnectException +import org.apache.kafka.connect.errors.RetriableException import tendermint.types.BlockOuterClass import tendermint.types.Types @@ -115,11 +116,11 @@ class CosmosSourceTaskTest : BehaviorSpec({ cosmosSourceTask.start(props) then("it shall complete exceptionally") { - val thrown = shouldThrow { + val thrown = shouldThrow { cosmosSourceTask.poll() } - thrown.status shouldBe Status.DEADLINE_EXCEEDED + (thrown.cause as StatusException).status shouldBe Status.DEADLINE_EXCEEDED coVerifyOrder { cosmosClient.getBlockByHeight(1)