Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Stream interface? #69

Closed
aheckmann opened this issue May 23, 2016 · 16 comments
Closed

Stream interface? #69

aheckmann opened this issue May 23, 2016 · 16 comments

Comments

@aheckmann
Copy link

Producer / Consumers could benefit from a stream interface to increase interop with any NodeJS stream.

Thoughts?

@oleksiyk
Copy link
Owner

I'm not sure. Stream interface will add a huge layer of complexity to the code without real benefits.

@oleksiyk
Copy link
Owner

Maybe in a different library that wraps no-kafka.

@aheckmann
Copy link
Author

The stream interface is an essential building block for node apps and now too, kafka. We are writing our own stream wrapper for the consumer today but it's a little harder than it should be b/c the current consumer doesn't have any public pause/resume mechanism - we have to hack it by not resolving or rejecting the returned promise.

@oleksiyk
Copy link
Owner

oleksiyk commented May 25, 2016

Its not hard to add pause and resume for the given topic/partition pair. Its just to set/clear pause=true property of the consumer.subscriptions structure: https://github.com/oleksiyk/kafka/blob/master/lib/base_consumer.js#L40

I mean I can add these two methods.

@jorgemsrs
Copy link

@oleksiyk can you give an example, please?

It seems to me the impl unpauses it https://github.com/oleksiyk/kafka/blob/master/lib/base_consumer.js#L71.

Also, because subscribing is async it gets hard to pause it at the right time.

@oleksiyk
Copy link
Owner

An implementation unpauses it only once the handler has resolved. But the handler won't be called if the topic/partition is paused. I agree its hard to choose the right moment. So the easiest solution is to just not resolve the handler.

@jorgemsrs
Copy link

@oleksiyk that causes the lib to read as fast as min(idleTimeout, handler execution speed) which causes a stream impl not to buffer which is not necessarily bad...

Will be juggling the concept a little bit more.

@raviten
Copy link

raviten commented Oct 5, 2016

Hi All,

Can you please suggest, If following is possible?

When a consumer is started, it will fetch data from latest offset from a single (topic, group) and each message is processed and offset is committed locally. When all the offsets are committed/ messages were processed, It will fetch new set of messages from the offset which is equal to last offset obtained in previous fetch. Locally committed offsets will be committed to kafka broker periodically

@jorgemsrs
Copy link

@black-star yes it is possible and the lib interfaces allow for it. See the base consumer implementation for details.
You'll need to handle the locally committed offsets yourself and pass the offset to the library on reconnection.

@koresar
Copy link

koresar commented Oct 31, 2016

Another benefit of Stream interface.

Currently no-kafka producers never notify if Kafka/Zookeeper is unreachable. If producer was an EventEmitter then it would be fairly easy to subscribe to an error event to handle connectivity errors, or to a connected event to determine if Kafka is reachable.

See Mongoose for example: http://mongoosejs.com/docs/api.html#connection_Connection-readyState

Connection#readyState

Connection ready state

  • 0 = disconnected
    
  • 1 = connected
    
  • 2 = connecting
    
  • 3 = disconnecting
    

Each state change emits its associated event name.
Example

conn.on('connected', callback);
conn.on('disconnected', callback);

@oleksiyk
Copy link
Owner

Currently no-kafka producers never notify if Kafka/Zookeeper is unreachable.

Just a side note: no-kafka doesn't have a connection to Zookeeper. It was a part of pre 0.9 Kafka clients.

@oleksiyk
Copy link
Owner

Anyone interested to try #167 ?

@koresar
Copy link

koresar commented Apr 22, 2017

Looks cool I would be interested to try producer writable streams.

@hugebdu
Copy link

hugebdu commented Sep 18, 2017

@oleksiyk not sure totally related, but I have a requirement to pause all consumers (I use GroupConsumer) and then resume.
So as suggested here, I simply do not resolve the handler promise.
I do see that the handler doesn't called anymore until resumed, but it looks like the no-kafka still consumes the messages, just doesn't handle it to my handler function.

Is there a way to really pause the consumers?

10x!

@oleksiyk
Copy link
Owner

@hugebdu Have a look at https://github.com/oleksiyk/kafka/blob/master/lib/base_consumer.js#L40, as you can see paused topic/partitions are not fetched (omitted from Fetch request). So its really paused.

@hugebdu
Copy link

hugebdu commented Sep 19, 2017

@oleksiyk ok, thanks. Will double check my conclusions :)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

6 participants