Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Async consumer & manual offset control #38

Closed
vielmath opened this issue Sep 6, 2018 · 5 comments
Closed

Async consumer & manual offset control #38

vielmath opened this issue Sep 6, 2018 · 5 comments

Comments

@vielmath
Copy link
Contributor

vielmath commented Sep 6, 2018

I had some trouble trying to manually commit offsets.

After initializing a consumer and registering it to a topic I start a simple loop like this one:

(go-loop []
    (when-let [{:keys [topic offset partition] :as event} (<! (first consumer))]
      (when (= :record (:type event))
        ;; SOME PROCESSING
        (put! (second consumer) {:op :commit
                                 :topic-offsets [{:topic topic
                                                  :partition partition
                                                  :offset (inc offset)
                                                  :metadata ""}]}))
      (recur)))

A few messages are processed and then it stops processing. The loop seems dead.

After a bit of investigation I found that it fails here https://github.com/pyr/kinsky/blob/master/src/kinsky/async.clj#L108

The commit operations are so close that a wake-up exception is thrown while processing a commit operation. There is no try catch here so the loop dies...

Even with a try catch, poller-ctl only processes one event so if multiple operations are submitted "simultaneously" only the first will be applied.

@fmjrey
Copy link
Contributor

fmjrey commented Nov 12, 2018

@vielmath I wonder if the changes I made in the fmjrey branch on my fork would resolve your issue:
https://github.com/fmjrey/kinsky/tree/fmjrey

@pyr
Copy link
Owner

pyr commented Jan 8, 2019

@vielmath I released 0.1.23, can you verify with this version and see if you are still running into the same issue?

@vielmath
Copy link
Contributor Author

vielmath commented Jan 8, 2019

@pyr 0.1.23 doesn't seem to fix my problem.
@fmjrey your branch doesn't do the trick either.

Thanks for trying ;)

@pyr
Copy link
Owner

pyr commented Jan 6, 2020

@vielmath the async facade has been deprecated, sorry about this

@pyr pyr closed this as completed Jan 6, 2020
@fmjrey
Copy link
Contributor

fmjrey commented Jan 20, 2020

The referenced line has sinced changed, I believe this is where the exception was raised:

(client/commit! driver topic-offsets)

I've not encountered this issue, a good test case would help BTW.

Still, my guess is it's related to the control thread calling wakeup whenever there is a message on the control channel (source).

This makes me wonder why would we need a second thread for control? I imagine this was because of a consumer blocking indefinitely. However KIP-266: Fix consumer indefinite blocking behavior was released in 2.1, meaning most if not all consumer operations can now timeout, including commit.

So I wonder now, do we still need that second control thread which only purpose is to call wakeup on any input message?

@pyr @vielmath
Edit: I suggest we continue this discussion on fmjrey/kinsky-async#1

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

3 participants