Skip to content

Commit

Permalink
Merge pull request #16 from okp4/fix/keep-offset-state
Browse files Browse the repository at this point in the history
Fix/keep offset state
  • Loading branch information
amimart committed Apr 8, 2022
2 parents 4773e75 + a6e4829 commit be238b8
Show file tree
Hide file tree
Showing 2 changed files with 35 additions and 8 deletions.
18 changes: 11 additions & 7 deletions src/main/kotlin/com/okp4/connect/cosmos/CosmosSourceTask.kt
Expand Up @@ -20,13 +20,7 @@ class CosmosSourceTask : SourceTask() {

private lateinit var serviceClient: CosmosServiceClient

private val lastBlockHeightFromOffsetStorage: Long
get() =
context
.offsetStorageReader()
.offset(sourcePartition)
?.get(HEIGHT_FIELD) as Long?
?: 0L
private var lastBlockHeightFromOffsetStorage: Long = 0

override fun version(): String = CosmosSourceConnector.VERSION

Expand All @@ -43,6 +37,12 @@ class CosmosSourceTask : SourceTask() {
NODE_FIELD to nodeAddress
)
serviceClient = CosmosServiceClient.with(nodeAddress, nodePort, tlsEnable)

lastBlockHeightFromOffsetStorage = context
.offsetStorageReader()
.offset(sourcePartition)
?.get(HEIGHT_FIELD) as Long?
?: 0L
}

@Throws(InterruptedException::class)
Expand All @@ -57,6 +57,10 @@ class CosmosSourceTask : SourceTask() {
.map { asSourceRecord(it) }
.toList()
}
}.also {
it.lastOrNull()?.run {
lastBlockHeightFromOffsetStorage = sourceOffset()[HEIGHT_FIELD] as Long
}
}

override fun stop() {
Expand Down
25 changes: 24 additions & 1 deletion src/test/kotlin/com/okp4/connect/cosmos/CosmosSourceTaskTest.kt
Expand Up @@ -129,7 +129,7 @@ class CosmosSourceTaskTest : BehaviorSpec({
}
}

When("the client is closed calling during poll") {
When("the client is closed during poll") {
mockForPoll(null, 4, null, 3)
props[CosmosSourceConnector.MAX_POLL_LENGTH_CONFIG] = "10"
cosmosSourceTask.start(props)
Expand Down Expand Up @@ -159,6 +159,29 @@ class CosmosSourceTaskTest : BehaviorSpec({
verify(exactly = 1) { cosmosClient.close() }
}
}

When("poll is called multiple times") {
mockForPoll(10, 50, null, null)
props[CosmosSourceConnector.MAX_POLL_LENGTH_CONFIG] = "2"
cosmosSourceTask.start(props)

val resp1 = cosmosSourceTask.poll()
val resp2 = cosmosSourceTask.poll()
then("the offset is not reset") {
resp1.size shouldBe 2
resp2.size shouldBe 2

resp1.last().sourceOffset() shouldBe mapOf(CosmosSourceTask.HEIGHT_FIELD to 12)
resp2.last().sourceOffset() shouldBe mapOf(CosmosSourceTask.HEIGHT_FIELD to 14)

coVerifyOrder {
cosmosClient.getBlockByHeight(11)
cosmosClient.getBlockByHeight(12)
cosmosClient.getBlockByHeight(13)
cosmosClient.getBlockByHeight(14)
}
}
}
}

given("A config") {
Expand Down

0 comments on commit be238b8

Please sign in to comment.