Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Error committing offsets: Kafka::UnknownMemberId potentially threading related #561

Closed
rhinon opened this issue Apr 10, 2018 · 10 comments

Comments

@rhinon
Copy link

@rhinon rhinon commented Apr 10, 2018

Not a bug report, looking for some direction on debugging this issue we're seeing. I don't believe this to be a bug in ruby-kafka.

  • Version of Ruby: 2.2.3
  • Version of Kafka: 0.10.2.0
  • Version of ruby-kafka: 0.3.16
Basic outline of what we do
kafka = Kafka.new(kafka_args)

kafka_consumer = kafka.consumer(
  group_id: @consumer_group_id,
  offset_commit_interval: 10,
  offset_commit_threshold: 1
)

kafka_consumer.subscribe(topic, start_from_beginning: true, max_bytes_per_partition: (40 * 1e6.to_i))

kafka_consumer.each_message do |message|
  process_message(message.value)
end
Context

As you can see above, we use ruby-kafka to do some basic consumption of messages. #process_message processes messages one at a time, and handles duplicates as the message contains a unique id we can use to ensure we do not double-process.

Whenever we process a message, there is some threading/asynchronous behavior that takes place. This is not considered critical or necessary to ensuring a message is fully processed (if those threaded behaviors fail, we don't care for the purposes of considering that message processed). One of the things we do asynchronously is realtime messaging via pubnub. This has been working without any issues at all until recently.

Question

We recently upgraded the pubnub gem (3.7 -> 4.x) where they internally switched from EventMachine to Celluloid (I'm not super familiar with Celluloid). When we deployed this, we started to see errors committing offsets Kafka::UnknownMemberId. In reading the readme section about (Thread Safety)[https://github.com/zendesk/ruby-kafka#thread-safety], I am wondering if we're doing something unexpected or dangerous here. Originally I thought we would be okay as we are tolerant of any failures in our asynchronous code, which is why we don't block on it.

We are also looking into updating to the latest version as we're quite a bit behind now.

@dasch

This comment has been minimized.

Copy link
Contributor

@dasch dasch commented Apr 11, 2018

Please start by updating to the most recent version of ruby-kafka. UnknownMemberId typically means that the consumer has been kicked out of the group because it hasn't sent a heartbeat in a while, so it could be due to your process_message method taking too long. But start by upgrading the gem.

@filiptepper

This comment has been minimized.

Copy link

@filiptepper filiptepper commented May 28, 2018

It's not related to threading - it's process_message taking too long.

What would be extremely useful is some kind of notice / log that a consumer has been kicked out of group.

@nguyenquangminh0711

This comment has been minimized.

Copy link
Collaborator

@nguyenquangminh0711 nguyenquangminh0711 commented May 29, 2018

@filiptepper The group operations are handled in the Kafka server side, including kicking member out of the group. The clients are only notified via error code in responses, which raises UnknownMemberId in this case. So, I'm afraid that there aren't no other way except capture that exception.

@stouset

This comment has been minimized.

Copy link

@stouset stouset commented May 30, 2018

Jumping in on this because it took me forever to diagnose this issue for where I work.

If you process a batch of messages, and processing takes longer than the session_timeout for the consumer group, your client will time out from the group and the work will be assigned to another consumer. When the current consumes checks back in, it will be told (via UnknownMemberId) that it was assumed to be dead and that its work was already reassigned to someone else.

When you have a partition (or worse, multiple partitions) with data on it that reliably causes this to happen, it can cause your entire consumer group to grind to a halt as consumers keep waiting on one-another to check back in before they can proceed with additional work. Essentially, every cycle, at least one of the members times out and everyone is forced to wait while this happens. Even if your work stays just below the threshold of timing out, the entire cluster is forced to wait regularly for every member to check back in, and can't proceed until they do. So if the work takes 5 minutes to complete, everyone has to wait around idle for 5 minutes to get assigned more work.

There's a few hacky solutions and one real solution. Short-term, what we chose to do is subdivide the work given to each consumer into smaller "microbatches" to ensure we check in for the heartbeat much more regularly:

begin
  consumer.each_batch(
    automatically_mark_as_processed: false,
    min_bytes:     MIN_SAMSA_BATCH_SIZE,
    max_wait_time: MAX_SAMSA_WAIT_INTERVAL
  ) do |batch|
    begin
      batch.messages.each_slice(MICROBATCH_SIZE) do |microbatch|
        microbatch.each do |message|
          # process message here
        end

        # FIXME: this hack has to reach into the internal kafka
        # consumer in order to inform it that we've processed messages
        # and send a heartbeat
        #
        # if this fails (due to a rebalance), we should stop processing
        # the current batch and have the consumer try again
        consumer.mark_message_as_processed(message)
        consumer.send_heartbeat_if_necessaryend
      end

    # Kafka::HeartbeatErrors happen if there's a rebalance that happens
    # between us receiving a batch and committing our work done on that
    # batch; if this happens, someone else has been handed the batch we
    # were working on, so we want to just jump right back in line to get
    # another batch
    rescue Kafka::HeartbeatError, Kafka::RebalanceInProgress
      # `next` is the correct looping construct to use here; `break``
      # would exit us out of `each_batch` loop, and `retry` would
      # simply attempt the logic over agian without checking back in
      # with the server, `next` ends this loop and jumps back in line
      # to ask the server to send us our next batch of entries
      next
    end
  end
  
# general processing errors should be rescued from and retried after
# a short delay, instead of attempting to repeatedly hammer
# the broker 
rescue Kafka::ProcessingError => e
  # we don't want this to be too short, since errors cause the
  # consumer loop to exit and rejoin the group, potentially causing
  # an infinite rebalancing loop
  sleep REJOIN_INTERVAL

  retry
end

Note: this code was extracted ad-hoc from an internal wrapper class for the sake of posting this comment, so it might not work as-is due to errors on my part. Also, some of the begin/end are artifacts from extracting this from smaller methods for the sake of posting this, but it's critically important that the rescues remain logically in the same location — don't try to move the inner rescue to the outside.

This workaround is only effective if you can guarantee that you can process MICROBATCH_SIZE elements within the session_timeout passed when creating the consumer. We use heartbeat_interval: 10.seconds, session_timeout: 100.seconds, so a client has to miss 10 heartbeats before it's assumed by the group to have gone offline.

The real solution is that the client must operate on a background thread. This allows it to heartbeat regularly on-time every time, even if the thread doing the work takes a minute, an hour, or a day to finish its work. Ideally, with every message processed, the worker thread would call mark_message_as_processed so that whenever the next heartbeat_interval period ticks, the client on the background thread can inform the leader about the offset of the most-recently processed message on the partition.

@dasch

This comment has been minimized.

Copy link
Contributor

@dasch dasch commented May 31, 2018

@stouset thanks for the writeup. As for sending heartbeats in a background thread – the Java library does something like this, although there's still a strict timeout on processing code in order to guard against that code being stuck. I'd be open to something like this, but would like to stabilize the v0.7 series first.

@stouset

This comment has been minimized.

Copy link

@stouset stouset commented May 31, 2018

Sure thing. For an immediate improvement,I'd put up a big bold warning that processing a batch should not take more time than heartbeat_interval and must not take longer than session_timeout (and document what the defaults for those values are), or you run the risk of consumers stepping over one-another.

Medium-term, I'd have the each method actually process in batches behind-the-scenes (round-tripping every message is very slow and almost never what you want), and be implemented something like this:

def each(min_bytes: DEFAULT_MIN_BYTES, max_wait_time: DEFAULT_MAX_WAIT)
  self.each_batch(
    automatically_mark_as_processed: false,
    min_bytes:     min_bytes,
    max_wait_time: max_wait_time,
  ) do |batch|
    begin
      batch.messages.each do |message|
        yield

        # it's been awhile, but I think calling these doesn't round-trip back to
        # the broker unless the `heartbeat_interval` has passed, so doing this
        # between every message won't kill performance, but at least allows
        # consumers to checkpoint in even before they're completely finished
        # with a batch
        self.mark_message_as_processed(message)
        self.send_heartbeat_if_necessary
      end
    rescue Kafka::HeartbeatError, Kafka::RebalanceInProgress
      # if we can't heartbeat because we've been kicked out of the
      # consumer group, `next` will cause us to check back in with
      # the server and attempt to rejoin the group and get more
      # messages to process
      next
    end
  end
end

This way, we tell the client about every message we've processed, and we give the client a chance to heartbeat as necessary between each iteration. Now this will only cause a cascading failure if processing a single message takes longer than session_timeout (no consumer will ever be able to successfully process this message in this event). There's probably something we can do here to clearly log this situation to the end-user, probably just by printing/logging a warning in the rescue Kafka::HeartbeatError, Kafka::RebalanceInProgress that lets the user know that the message is taking too long and may never be able to be successfully processed.

Then the "ideal" long-term fix is to have the heartbeat happen on a background thread, with (like you said) an ultimate timeout for the processing thread to guard against it being completely wedged.

@klippx

This comment has been minimized.

Copy link
Contributor

@klippx klippx commented Jun 1, 2018

@stouset

This comment has been minimized.

Copy link

@stouset stouset commented Jun 1, 2018

Agreed, but for what it's worth we had metrics around a lot of this and could see that the cluster was ground to a halt, but it still took us ages to diagnose the underlying problem. Hopefully my writeup helps others who find themselves in our situation!

@ransombriggs

This comment has been minimized.

Copy link

@ransombriggs ransombriggs commented Aug 14, 2019

@stouset thanks for the fix! Our batch performance degraded and we ran into this issue where it continually read the same batch over and over emitting Kafka::UnknownMemberId each time. The work you did allowed us to recover while I fixed the performance degradation. Kudos!

@github-actions

This comment has been minimized.

Copy link

@github-actions github-actions bot commented Oct 14, 2019

Issue has been marked as stale due to a lack of activity.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
7 participants
You can’t perform that action at this time.