Skip to content

Commit

Permalink
Prevent context update after/while rebalancing (#161)
Browse files Browse the repository at this point in the history
This call might have been needed before, but it is no longer required
  • Loading branch information
nicolasguyomar authored and mtagle committed Jun 20, 2019
1 parent 5bdf4eb commit 0208081
Showing 1 changed file with 0 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -123,21 +123,10 @@ public void flush(Map<TopicPartition, OffsetAndMetadata> offsets) {
} catch (InterruptedException err) {
throw new ConnectException("Interrupted while waiting for write tasks to complete.", err);
}
updateOffsets(offsets);

topicPartitionManager.resumeAll();
}

/**
* This really doesn't do much and I'm not totally clear on whether or not I need it.
* But, in the interest of maintaining old functionality, here we are.
*/
private void updateOffsets(Map<TopicPartition, OffsetAndMetadata> offsets) {
for (Map.Entry<TopicPartition, OffsetAndMetadata> offsetEntry : offsets.entrySet()) {
context.offset(offsetEntry.getKey(), offsetEntry.getValue().offset());
}
}

private PartitionedTableId getRecordTable(SinkRecord record) {
TableId baseTableId = topicsToBaseTableIds.get(record.topic());

Expand Down

0 comments on commit 0208081

Please sign in to comment.