Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Manual commitAsync completes before actual commit #94

Open
voidconductor opened this issue Aug 1, 2019 · 3 comments
Open

Manual commitAsync completes before actual commit #94

voidconductor opened this issue Aug 1, 2019 · 3 comments

Comments

@voidconductor
Copy link
Contributor

In KafkaCOnsumerObservableManualCommit.scala:56 there's incorrect implementation of async commit

override def commitBatchAsync(batch: Map[TopicPartition, Long], callback: OffsetCommitCallback): Task[Unit] =
  Task {
    blocking(consumer.synchronized(consumer.commitAsync(batch.map {
      case (k, v) => k -> new OffsetAndMetadata(v)
     }.asJava, callback)))
  }

Apache kafka commitAsync(offsets, callback) returns immediately and invokes callback when commit completes

this task completes when consumer.commitAsync returns, not when callback invoked and also ignores possible commit errors also passed through callback, it should probably be rewritten to Task.async

@Avasil Avasil added the bug label Aug 1, 2019
@Avasil Avasil added this to the 1.0.0 milestone Aug 1, 2019
@Avasil
Copy link
Collaborator

Avasil commented Aug 1, 2019

Good catch @voidconductor ! Will fix it before the next release

@voidconductor
Copy link
Contributor Author

I've recently tried to fix this, and it turned out to be harder than I thought.

Looks like apache kafka client puts callbacks passed to commitAsync to some internal queue and invokes them on consumer's poll.

So naive solution with Task.create doesn't even pass tests, probably because poll won't be called before commit callback completes and lets observable to run on

@Avasil
Copy link
Collaborator

Avasil commented Aug 22, 2019

In this case, I think we could either include a comment about it or return a Fiber which can be joined.

I assume we can't cancel commit so perhaps even Task[Task[Unit]] instead of Task[Fiber]

voidconductor added a commit to voidconductor/monix-kafka that referenced this issue Aug 26, 2019
voidconductor added a commit to voidconductor/monix-kafka that referenced this issue Sep 11, 2019
voidconductor added a commit to voidconductor/monix-kafka that referenced this issue Sep 11, 2019
paualarco added a commit to paualarco/monix-kafka that referenced this issue Jan 23, 2021
Add consumer polling (monix#94)

AutoCommit polling logic (monix#94)

Remove nested task loop

Remove redundant atomic refs

Add test for (monix#101)

Apply changes to older versions
Avasil pushed a commit that referenced this issue Mar 24, 2022
* Async commit completed with callback (#94)

Add consumer polling (#94)

AutoCommit polling logic (#94)

Remove nested task loop

Remove redundant atomic refs

Add test for (#101)

Apply changes to older versions

* Tests passing

* Observable poll heartbeat rate

* Resolves merge conflicts

* Unused imports

* Multiple threads test failing

* Test small poll heartbeat and slow upstream

* ScalafmtAll

* Removes lambda anonymous class for scala 11 compatibility

* Scaladoc fix

* MR Feedback

* Unused import

* Adding poll heartbeat tests

* l

* Improved rebalancing test scenarios

* a

* Improved test

* Monix Kafka benchmarks for consumer, single and sink producer

Block


Fix


Binded connection


First benchmark for kafka producer


Added kafka benchmark strategy plan


Added sink and consumer benchmarks 


Producer results


Akka 


Removed references to akka


a


Final

* Benchmarks

* Fs2

* Heartbeat poll

* Benchmarks updated

* Private poll heartbeat rate

* Some improvements

* Fix unexpected consumed records

* Benchmarks with different pollHeartbeatIntervals

* New benchmark results

* More benchmarking

* Increase consumed records on perf test

* Set kafka-clients dependency to 1.1.1

* RC8 - ProducerBenchmarks

* Provided scala compat

* Downgrade scalacheck dependency

* Clean up

* Fixed topic regex tests

* Typo

* Update embedded kafka dependency

* Port back to older kafka subprojects

* A

* Make poll interval rate private on kafka 10 and 9

* Bit of clean up and updated readme

* Update changelog and trigger pipeline

* Test indentation fix

* Add logback in test

* Add unchanged topic partition test case

* Kafka tests running on github actions

* Renames github workflows folder

* Updates scala action

* Removes tests for kafka 0.9.x


Revmoves travis-ci

* Add benchmark with autocomit async and sync scenarios

* Removed benchmark results file

* Some clean up

* Use pollTimeout and updated benchmark results

* ScalafmtAll

* Ignore Ds_store file

* Removes logback file from main

* Remove unnecessary stuff

* Remove unused import

* Benchmarks readme

* Small correction

* MR Feedback
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

2 participants