Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Critical bug fix: Extra sanity checking when marking offsets as processed #824

Merged
merged 3 commits into from
May 20, 2020

Conversation

theturtle32
Copy link
Contributor

@theturtle32 theturtle32 commented Apr 22, 2020

This pull request resolves an edge case, but one that represents a critical bug for us that has caused us a very significant amount of pain over the last couple years, but has also been quite elusive to nail down.

We have a system that needs to buffer a certain number of messages and insert them into another system all at once as a batch, in a SQL transaction.

To do this, inside the each_message loop, we add messages to an array until we've accumulated enough for a batch. Once the buffer is large enough, we atomically export the batch to the external system, call mark_message_as_processed for each buffered message, and clear the buffer.

The problem we've encountered several times now is that the consumer group may rebalance and end up with different partition assignments before the buffer is full. Once processing resumes after the rebalancing, the next time the buffer is cleared, it will call mark_message_as_processed with some messages from partitions it was consuming from before the rebalance and some partitions from after the rebalance.

Because there's no sanity check in mark_message_as_processed to make sure the partition and offset provided actually correspond to the partitions currently assigned to the group member, it will happily record the offset for the partition it's not supposed to be processing.

The result is that it will happily continue repeatedly re-committing that old, stale offset for the partition(s) it used to be consuming from along with all the new offsets as it continues processing.

This creates a race condition where two members of the consumer group constantly commit offsets for the same partition, back and forth. One member constantly committing the correct offset from its up-to-date processing of that partition, and the other constantly committing an old, stale offset from before the consumer-group rebalance.

If the consumer group is shut down or rebalances again, it's a race between the stale consumer and the active consumer which one will have the last word committing the offsets for the partition they're fighting over. This can result in the consumers suddenly and unpredictably re-processing potentially hundreds of thousands of messages, depending on how long the consumer group remained stable after the last partition assignment.

The fix in this pull request is just to add a simple sanity check to prevent marking offsets as processed for partitions that are not presently assigned to the current consumer group member.

I also added a sanity check to prevent overwriting a newer offset with an older one, in the event that the messages are processed out of order.

I was able to simplify our use case to the following code that demonstrates the issue. You have to start up several instances of this consumer, then start and stop an instance to trigger rebalancing of the consumer group several times until the random timing and partition assignments of the consumer group finally trigger the bug.

consumer = kafka.consumer(group_id: 'cg1', offset_commit_interval: 5)
consumer.subscribe('race-test', default_offset: :earliest)

buffer = []
consumer.each_message(automatically_mark_as_processed: false) do |message|
  buffer << message

  # if the consumer group rebalancing occurs before the buffer is flushed,
  # there's a possibility that it may contain messages from before the
  # rebalancing was triggered.
  if buffer.length >= 15000
    # Shuffling the buffer to simulate processing messages out of order
    buffer.shuffle.each do |buffered_message|
      consumer.mark_message_as_processed(buffered_message)
    end

    partitions_list = buffer.map { |message| message.partition }.sort.uniq
    puts "Clearing buffer, messages from partitions: #{partitions_list.join(', ')}"
    buffer.clear
  end
end

@theturtle32
Copy link
Contributor Author

Update: working on updating the specs to match.

@theturtle32
Copy link
Contributor Author

Fixed tests and added two new cases to verify the sanity check.

@theturtle32
Copy link
Contributor Author

theturtle32 commented Apr 22, 2020

Seems the Kafka 1.1 integration test failed, but the failure appears to be totally unrelated to this pull request. I suspect it's a fluke that would go away if the CircleCI workflow is simply re-run.

@theturtle32
Copy link
Contributor Author

theturtle32 commented Apr 22, 2020

I have an alternate solution that may be more performant that, instead of checking partition assignment every time mark_as_processed is invoked, instead uses the partition assignment to filter the list of offsets passed when calling commit_offsets, e.g.:

    def offsets_to_commit(recommit = false)
      # Do not commit offsets for partitions not assigned to this consumer
      @processed_offsets.each do |topic, offsets|
        offsets.keep_if { |partition, _| @group.assigned_to?(topic, partition) }
      end

      if recommit
        offsets_to_recommit.merge!(@processed_offsets) do |_topic, committed, processed|
          committed.merge!(processed)
        end
      else
        @processed_offsets
      end
    end

Let me know if you'd prefer I commit that solution instead. All the tests pass with either implementation.

Copy link
Contributor

@0x2c7 0x2c7 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The rebalance event is detected via 2 main operations sent to Kafka server after a message is handled: heartbeat and offset committing calls.

If automatically_mark_as_processed is true, after each_message or each_batch block is evaluated, the messages are marked as processed automatically, and then the consumer calls heartbeat and offset commiting. When a reblance event already occurs, the consumer clears all messages in the current buffer and resume consuming from new assigned partitions. Therefore, the consumer never stale messages as processed, and the sanitizing code doesn't affect this case.

If automatically_mark_as_processed is false, ruby-kafka let the users decide when to commit offsets. If a rebalance event occurs while there are still messages in a buffer, sanitzing mark_message_as_processed is reasonable to prevent race condition between stale and new offsets, but it leads to another issue: some messages in a buffer may be proccesed again at other consumers. For example:

Consumer 1:
Message 1 => Proccessed, mark as proccessed
Message 2 => Proccessed, mark as proccessed
Rebalance
Message 3 => Proccessed, marking as processed is ignore
Message 4 => Proccessed, marking as processed is ignore

Consumer 2:
Message 3 => Proccessed, mark as processed
Message 4 => Proccessed, mark as processed

Orignally, the purpose of turning off automatic marking as proccessed is to group store and maybe group relative messages in a temporarily buffer to process them together. This change may break this use case.

I would like to propose another approach, which solves your use issue, and also benefits the use case I mentioned above. Let's expose another API for consumer: Consumer#assigned_to? or Consumer#assignments to check the current assignement.

  • In your use case, just call that API before mark as processed.
  • In other use cases, the consumer may be interested in checking the current partition assignment to reject a whole batch in the buffer to maintain processing integrity, or just to detect rebalance at the application layer.

@theturtle32
Copy link
Contributor Author

theturtle32 commented May 13, 2020

I see what you're saying, but unfortunately, your proposed approach does not in fact solve the use case you mentioned above.

The reason is that after the rebalancing, the other consumer has already been reassigned to the partition containing messages still in the first consumer's buffer and will also process those messages regardless of whether or not the first consumer commits those messages at that time or not.

Kafka inherently, by design, makes an "at least once" message processing guarantee. All correctly implemented consumer applications must – without exception – be written with the assumption in mind that messages may occasionally re-processed by another member of the consumer group. To rely upon offset committing as a sole means of preventing message re-processing is an error.

If re-processing the same message could lead to negative consequences, as in our case as we process a stream of orders and insert them into a warehouse management system to be packed and shipped to customers, you must implement some idempotency mechanism that is not reliant upon Kafka and its consumer offset checkpointing. There is no alternative to that.

The correct default behavior for Kafka-based systems is to err on the side of some possibility of message re-processing.

While I agree that exposing Consumer#assigned_to? or Consumer#assignments would be valuable, and in fact, I may not have bothered to open this pull request if they had been, I still stand by my approach as implementing the kind of default behavior a majority of users would expect out of the box. The ability for a consumer to checkpoint an offset for a partition it is not assigned to process is an error in all cases. It may even make sense for the Kafka broker to be updated to reject any attempts for consumers to do so.

The reason your proposed solution will not be sufficient for the use case you mentioned is as follows. I'm updating and annotating your example timeline:

Assume a topic with two partitions and two consumers.

Consumers 1 & 2 join the group.
Initial Assignments:
Partition 1 -> Consumer 1
Partition 2 -> Consumer 2

Consumer 1 begins processing:
Partition 1, Message 1 => Buffered
Partition 1, Message 2 => Buffered
Partition 1, Message 3 => Buffered
-- Buffer is ready to flush with 3 messages --
Partition 1, Message 1 => Processed, marked as processed
Partition 1, Message 2 => Processed, marked as processed
Partition 1, Message 3 => Processed, marked as processed
Partition 1, Message 4 => Buffered
Partition 1, Message 5 => Buffered
--- Rebalance Occurs ---
New Assignments:
Partition 1 -> Consumer 2
Partition 2 -> Consumer 1
(Consumers swap partition responsibilities.)
Consumer 2 will immediately begin processing with partition 1, message 4,
even though that message is already in Consumer 1's buffer. Consumer 1 no
longer has any ability via the mark_message_as_processed mechanism to
affect what messages Consumer 2 will process.

Consumer 1 continues processing:
Partition 2, Message 1 => Buffered
-- Buffer is ready to flush with 3 messages --
Partition 1, Message 4 => Processed, marked as processed
Partition 1, Message 5 => Processed, marked as processed
Partition 2, Message 1 => Processed, marked as processed

---
At this time, consumer 1 has – in error – added offsets to commit for a
partition that it is not assigned to be processing. Regardless of whether
it wanted to hold those extra messages in its buffer and process them
later and mark them as processed after it was done, it no longer has the
right to do so, as those buffered messages will already be processed by
consumer 2 regardless of what consumer 1 does.

The incorrectly marked-as-processed offsets for the partition it is not processing will remain in ruby-kafka's @processed_offsets data structure on Consumer 1 until the next time the consumer group is rebalanced. Consumer 1 will continue erroneously re-committing the same stale offsets from that partition that it is no longer assigned to process with the brokers every single time offsets are committed until the next rebalancing finally clears it from @processed_offsets.

As both consumers continue to process messages, this will result in them both committing their offsets like this:

Consumer 1 commits:

Partition Committed Offset
1 5
2 1

Consumer 2 commits:

Partition Committed Offset
1 25

Consumer 1 commits:

Partition Committed Offset
1 5
2 25

Consumer 2 commits:

Partition Committed Offset
1 50

Consumer 1 commits:

Partition Committed Offset
1 5
2 50

Consumer 2 commits:

Partition Committed Offset
1 75

Consumer 1 commits:

Partition Committed Offset
1 5
2 75

This results in a race condition where, if consumer 1 is the last to commit its offsets for partition 1 before a rebalance is triggered, all of the message processing progress made by consumer 2 since the previous rebalancing is discarded. At that point, instead of only the two messages originally buffered by consumer 1 before the rebalance being reprocessed, those 2 messages get reprocessed along with all of the messages processed by consumer 2 since the last rebalancing.

I hope this is clear enough to demonstrate that there is no use case in which it is ever appropriate for a consumer to commit offsets for partitions that it is not currently assigned.

@theturtle32
Copy link
Contributor Author

theturtle32 commented May 13, 2020

All that being said, it would be valuable to expose Consumer#assigned_to? or Consumer#assignments to allow for applications to have the flexibility to drop messages out of their internal buffers for partitions that they are no longer assigned.

But I maintain that if a member of a consumer group tries to call mark_message_as_processed for a message in a partition no longer assigned to that consumer, that is a bug. Always, and without exception.

@theturtle32
Copy link
Contributor Author

And since this is such a difficult thing to reason about, I strongly feel developers should be helped out here by being prevented from shooting themselves in the foot without realizing it.

@theturtle32
Copy link
Contributor Author

theturtle32 commented May 19, 2020

@nguyenquangminh0711 @dasch -- bumping this. Thoughts? I know this can be a pretty complicated topic to reason about.

At Dollar Shave Club we're now running in production on our fork that incorporates this patch.

@dasch
Copy link
Collaborator

dasch commented May 20, 2020

Sorry for not getting back to this sooner. I've restarted the CI and will merge once they're green 👍

@dasch dasch merged commit 55a7c88 into zendesk:master May 20, 2020
@theturtle32
Copy link
Contributor Author

Awesome, thanks, @dasch!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

3 participants