Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Failing to commit offset at the end of the Observable #240

Open
fchaillou opened this issue Dec 14, 2020 · 7 comments · May be fixed by #306
Open

Failing to commit offset at the end of the Observable #240

fchaillou opened this issue Dec 14, 2020 · 7 comments · May be fixed by #306

Comments

@fchaillou
Copy link

Hello,
I'm using monix-kafka's KafkaConsumerObservable.manualCommit consumer to consume data from kafka in a batch application.
We consume data until there is no more (using .timeoutOnSlowUpstreamTo(5.seconds, Observable.empty) to detect the end).
I'm processing the data and at the end of the Observable (but still part of the Observable), i commit the offset with a CommitableBatch.

The commit fails with :

java.lang.IllegalStateException: This consumer has already been closed.

Here is my main logic FYI :

 private def logic(bootstrapServer: String, topic: String) = {
    val kafkaConfig: KafkaConsumerConfig = KafkaConsumerConfig.default.copy(
      bootstrapServers = List(bootstrapServer),
      groupId = "failing-logic",
      autoOffsetReset = AutoOffsetReset.Earliest
    )
    KafkaConsumerObservable
      .manualCommit[String, String](kafkaConfig, List(topic))
      .timeoutOnSlowUpstreamTo(5.seconds, Observable.empty)
      .foldLeft(CommittableOffsetBatch.empty) { case (batch, message) => batch.updated(message.committableOffset) }
      .mapEval(completeBatch => completeBatch.commitAsync())
      .headOrElseL(List.empty)
  }

And here is a repo with a ready sbt project to reproduce it : https://github.com/fchaillou/monix-kafka-issue

Let me know if there is anything else i can do to help
Thanks
Fabien

@paualarco
Copy link
Member

Hello @fchaillou,

Not sure what were you trying to achieve with this example... However I think that is not correctly formulated.

In the test, the observable is set to fail if in 5 seconds in those cases where we have not received the next element (which will always fail since you are only sending two messages). On the other hand the fold operator will keep expecting messages until the downstream sends an Stop, which will never happen because kafka consumer does not stops, but instead you will receive an error from the previous clause.

This is my guess, but probably something wrong is happening there that is forcing the consumer to close before we get to commit the records.
You could maybe try applying an operator that forces the observable to finish like bufferSliding + headL or take(n) in order to trigger the commitAsync.

Hope it helps :)

@fchaillou
Copy link
Author

Hello @paualarco,
This example is just a simplification of code we're using in prod with the exact same logic which works as expected :
The observable is finishing properly except for the commitAsync (in prod right now we worked around the issue by recreating a new consumer that just does the commit which works with some limitations)

Basically, the topic we're reading from is populated in batch fashion and so we want to consume until there is nothing available anymore, that's the purpose of timeoutOnSlowUpstreamTo(5.seconds, Observable.empty) for us.
When we want to continue processing, we just relaunch the same application that will start from the committed offset.

The issue here is that the consumer is closed too soon, and when we reach the commitAsync it is already closed while it should wait for the whole observable to be finished before closing

@paualarco
Copy link
Member

paualarco commented Sep 18, 2021

hi @fchaillou, sorry for the late response, I think I did not understand the issue when I saw it from first time.
A posible solution to delay the closure of the consumer might be to use it under the usage of a Resource. In that case we would not close it on completion of the observable, which will allow to do a final offset commit.

This how it would look like:

  def manualCommitResource[K, V](cfg: KafkaConsumerConfig, topics: List[String])(implicit
                                                                       K: Deserializer[K],
                                                                       V: Deserializer[V]): Resource[Task, KafkaConsumerObservable[K, V, CommittableMessage[K, V]]] = {
    for {
      consumer <- Resource.make(createConsumer[K, V](cfg, topics)){ consumer =>
        Task.evalAsync(consumer.synchronized{ blocking(consumer.close())})
      }
      observable <- Resource.pure(manualCommit(cfg, consumer)))
    } yield (observable)
  }

@Avasil do you agree this is a better way of releasing the kafka consumer? In case yes, should we deprecate the old methods?

@paualarco
Copy link
Member

It should also fix this #186

paualarco added a commit to paualarco/monix-kafka that referenced this issue Oct 3, 2021
Trigger ci

1
Remove printline


Update scala version build
Refinement


Bring back serialization test
@fchaillou
Copy link
Author

fchaillou commented Oct 27, 2021

Hello @paualarco
Sorry for my late reply, it's touching a production app that is pretty stable and I had no reason to work on it until now.

Your suggestion makes a lot of sense but doesn't work because the KafkaConsumerObservable is still closing the consumer on its own in cancelTask :

private def cancelTask(consumer: Consumer[K, V]): Task[Unit] = {
    // Forced asynchronous boundary
    val cancelTask = Task.evalAsync {
      consumer.synchronized(blocking(consumer.close()))
    }

    // By applying memoization, we are turning this
    // into an idempotent action, such that we are
    // guaranteed that consumer.close() happens
    // at most once
    cancelTask.memoize
  }

I'm wondering if the best solution should be to have this change done :

trait KafkaConsumerObservable[K, V, Out] extends Observable[Out] {
  protected def config: KafkaConsumerConfig
  protected def consumer: Resource[Task, Consumer[K, V]]]

@paualarco
Copy link
Member

Hi @fchaillou,
That was just the partial solution, but actually the resource will be the one in charge of closing the consumer, not the Observable any more.
This is the PR with the proposed change: #306

@fchaillou
Copy link
Author

Oh, I didn't understand this was related to another PR.
This makes 100% sense :)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging a pull request may close this issue.

2 participants