Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Manual commits support #64

Merged
merged 12 commits into from
Feb 19, 2019
Merged

Manual commits support #64

merged 12 commits into from
Feb 19, 2019

Conversation

poslegm
Copy link
Contributor

@poslegm poslegm commented Jan 15, 2019

Added support for manual offsets committing (inspired by alpakka and fs2-kafka).

Reasons and solution was described in #24

I rely on a sctrict code review because I guess that not all my decisions are correct. I could be wrong both in code style and in performance.

Short example of consumer with manual commits:

KafkaConsumerObservable.manualCommit[String,String](consumerCfg, List(topicName))
  .map(message => message.record.value() -> message.committableOffset)
  .mapTask { case (value, offset) => performBusinessLogic(value).map(_ => offset) }
  .bufferTimedAndCounted(1.second, 1000)
  .mapTask(offsets => CommittableOffsetBatch(offsets).commit()) 

Unit test for this looks synthetic because I didn't find a way to discover current consumer group offset in Embedded Kafka topic. Maybe someone knows how to do it?

@poslegm
Copy link
Contributor Author

poslegm commented Jan 15, 2019

Did I do the right thing by adding new feature to all subprojects (monix-kafka-9, monix-kafka-10 and other) or should I do it to last version only?

@Avasil
Copy link
Collaborator

Avasil commented Jan 15, 2019

Many thanks @poslegm this is very useful feature
though I will have to ask you for patience because my next two weeks are very intense and I would like to play a bit with PR. I'll try to at least read it in the meantime. :)

Unit test for this looks synthetic because I didn't find a way to discover current consumer group offset in Embedded Kafka topic. Maybe someone knows how to do it?

Perhaps consumer.position(topicPartition) could work? Or maybe tools like ZKAdmin could work with EmbeddedKafka, I'm not sure

Did I do the right thing by adding new feature to all subprojects (monix-kafka-9, monix-kafka-10 and other) or should I do it to last version only?

Yeah, we want it for all versions but it would be better to just start with one so in case of any changes you don't have to apply it 4 times. :)

Btw, I'm not convinced we still need this separation but that's just something I'll have to deal with when I find more time.

@poslegm
Copy link
Contributor Author

poslegm commented Jan 16, 2019

Perhaps consumer.position(topicPartition) could work?

The main problem is that KafkaConsumerObservable incapsulates consumer object. I tried to write something like this:

val consumer = KafkaConsumerObservable.createConsumer[String,String](consumerCfg, List(topicName))
val source = KafkaConsumerObservable.manualCommit[String,String](consumerCfg, consumer)

...

Await.result(consumer.map(c => c.position(new TopicPartition(topicName, 0))).runToFuture, 60.seconds)

But I receive java.util.ConcurrentModificationException: KafkaConsumer is not safe for multi-threaded access because KafkaConsumerObservable manipulates with consumer not in main thread.

Also I tried to create new kafka consumer but its position method returns knowingly wrong offset and kafka logs exception about consumer instances duplication.

@Avasil
Copy link
Collaborator

Avasil commented Jan 28, 2019

@poslegm FYI I'm about to have more time to spend on OSS and this PR is the first on my list so you can expect some feedback in a couple of days!

@poslegm
Copy link
Contributor Author

poslegm commented Jan 29, 2019

It's a really good news!

Copy link
Collaborator

@Avasil Avasil left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Once again - sorry it took so long, thank you for your time and patience. :)

Overall I like the API but I think we'll have to be a bit more careful about those commit functions. Hopefully we can think of something!

Copy link
Collaborator

@Avasil Avasil left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks once again, I like it! Just left few suggestions.
So I guess now I'll try to find something for testing offset

@leandrob13
Would you have a time to take a brief look?
I think it looks good but there's a good chance I've missed something. :)

kafka-1.0.x/src/main/scala/monix/kafka/Commit.scala Outdated Show resolved Hide resolved
val asyncCb = Callback.forked(cb)
val cancelable = BooleanCancelable()

val commit: Commit = new Commit {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We could probably create it outside this method and reuse

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We need KafkaConsumer[K, V] in scope for Commit creation.

I can move it out to private def commit(consumer: KafkaConsumer[K, V]): Commit = ???, but I have no ideas how to create it once and reuse later.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, right, I've missed it. I'll let you know if anything comes to my mind

@Avasil Avasil merged commit bd91a37 into monix:master Feb 19, 2019
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants