Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KAFKA-2063: Address Jason's comments, add tests, fix bugs, merge trunk #3

Merged
merged 22 commits into from Sep 16, 2016

Conversation

ijuma
Copy link

@ijuma ijuma commented Sep 16, 2016

@nepal, can you please review and merge if you're happy?

Tim-Brooks and others added 20 commits September 12, 2016 20:28
Here is the patch on github ijuma.

Acquiring the consumer lock (the single thread access controls) requires that the consumer be open. I changed the closed variable to be volatile so that another thread's writes will visible to the reading thread.

Additionally, there was an additional check if the consumer was closed after the lock was acquired. This check is no longer necessary.

This is my original work and I license it to the project under the project's open source license.

Author: Tim Brooks <tim@uncontended.net>

Reviewers: Jason Gustafson <jason@confluent.io>

Closes apache#1637 from tbrooks8/KAFKA-2311
Author: Dong Lin <lindong28@gmail.com>

Reviewers: Joel Koshy <jjkoshy.w@gmail.com>, Jiangjie Qin <becket.qin@gmail.com>

Closes apache#1851 from lindong28/KAFKA-4158
Followed the same naming pattern as the producer sender thread.

Author: Ismael Juma <ismael@juma.me.uk>

Reviewers: Jason Gustafson

Closes apache#1854 from ijuma/heartbeat-thread-name
Author: David Chen <mvjome@gmail.com>

Reviewers: Ewen Cheslack-Postava <ewen@confluent.io>

Closes apache#1853 from mvj3/KAFKA-4162
Author: Jason Gustafson <jason@confluent.io>

Reviewers: Ismael Juma <ismael@juma.me.uk>

Closes apache#1857 from hachikuji/KAFKA-4172
Author: Jason Gustafson <jason@confluent.io>

Reviewers: Guozhang Wang <wangguoz@gmail.com>

Closes apache#1855 from hachikuji/KAFKA-4160
Set the NUM_STREAM_THREADS_CONFIG = 1 in SmokeTestClient as we get locking issues when we have NUM_STREAM_THREADS_CONFIG > 1 and we have Standby Tasks, i.e., replicas. This is because the Standby Tasks can be assigned to the same KafkaStreams instance as the active task, hence the directory is locked

Author: Damian Guy <damian.guy@gmail.com>

Reviewers: Eno Thereska, Guozhang Wang

Closes apache#1861 from dguy/fix-smoketest
…ch-response-size

* apache/trunk:
  HOTFIX: fix KafkaStreams SmokeTest
  KAFKA-4160: Ensure rebalance listener not called with coordinator lock
  KAFKA-4172; Ensure fetch responses contain the requested partitions
  KAFKA-4162: Fixed typo "rebalance"
  MINOR: Give a name to the coordinator heartbeat thread
  KAFKA-4158; Reset quota to default value if quota override is deleted
  KAFKA-2311; Make KafkaConsumer's ensureNotClosed method thread-safe
…tion

Fix for bug outlined in KAFKA-4131

Author: bbejeck <bbejeck@gmail.com>

Reviewers: Damian Guy, Guozhang Wang

Closes apache#1843 from bbejeck/KAFKA-4131_mulitple_regex_consumers_cause_npe
This applies to Replication Quotas
based on KIP-73 [(link)](https://cwiki.apache.org/confluence/display/KAFKA/KIP-73+Replication+Quotas) originally motivated by KAFKA-1464.

System Tests Run: https://jenkins.confluent.io/job/system-test-kafka-branch-builder/544/

**This first PR demonstrates the approach**.

**_Overview of Change_**
The guts of this change are relatively small. Throttling occurs on both leader and follower sides. A single class tracks the throttled throughput in and out of each broker (**_ReplicationQuotaManager_**).

On the follower side, the Follower Throttled Rate is calculated as fetch responses arrive. Then, before the next fetch request is sent, we check to see if the quota is violated, removing throttled partitions from the request if it is. This is all encapsulated in a few lines of code in the **_ReplicaFetcherThread_**. There is existing code to handle temporal back off, if the request ends up being empty.

On the leader side it's a little more complex. When a fetch request arrives in the leader, it is built, partition by partition, in **_ReplicaManager.readFromLocalLog_**. As we put each partition into the fetch response, we check if the total size fits in the current quota. If the quota is exceeded, the partition will not be added to the fetch response. Importantly, we don't increase the quota at this point, we just check to see if the bytes will fit.

Now, if there aren't enough bytes to send the response immediately, which is common if we're catching up and throttled, then the request will be put in purgatory. I've added some simple code to **_DelayedFetch_** to handle throttled partitions (throttled partitions are checked against the quota, rather than the messages available in the log).

When the delayed fetch completes, and exits purgatory, _**ReplicaManager.readFromLocalLog**_ will be called again. This is why _**ReplicaManager.readFromLocalLog**_ does not actually increase the quota, it just checks whether enough bytes are available for a partition.

Finally, when there are enough bytes to be sent, or the delayed fetch times out, the response will be sent. Before it is sent the throttled-outbound-rate is increased, based on the size of throttled partitions being sent. This is at the end of _**KafkaApis.handleFetchRequest**_, exactly where client quotas are recorded.

There is an acceptance test which asserts the whole throttling process stabilises on the desired value. This covers a number of use cases including many-to-many replication. See **_ReplicationQuotaTest_**.

Note:
It should be noted that this protocol can over-request. The request is built, based on the quota at time t1 (_ReplicaManager.readFromLocalLog_). The bytes in the response are recorded at time t2 (end of _KafkaApis.handleFetchRequest_), where t2 > t1. For this reason I originally included an OverRequestedRate as a JMX metric, but testing has not seen revealed any obvious issue. Over-requesting is quickly compensated by subsequent requests, stabilising close to the quota value.

_**Main stuff left to do:**_
- The fetch size is currently unbounded. This will be addressed in KIP-74, but we need to ensure this ensures requests don’t go beyond the throttle window.
- There are two failures showing up in the system tests on this branch:  StreamsSmokeTest.test_streams (which looks like it fails regularly) and OffsetValidationTest.test_broker_rolling_bounce (which I need to look into)

_**Stuff left to do that could be deferred:**_
- Add the extra metrics specified in the KIP.
- There are no system tests.
- There is no validation for the cluster size / throttle combination that could lead to ISR dropouts

Author: Ben Stopford <benstopford@gmail.com>

Reviewers: Ismael Juma <ismael@juma.me.uk>, Apurva Mehta <apurva@confluent.io>, Jun Rao <junrao@gmail.com>

Closes apache#1776 from benstopford/rep-quotas-v2
…ch-response-size

* apache/trunk:
  KAFKA-1464; Add a throttling option to the Kafka replication
  KAFKA-4131; Multiple Regex KStream-Consumers cause Null pointer exception
…sage size

It is now consistent with maxSize=0.
@ijuma ijuma changed the title KAFKA-2063: Address Jason's comments, add tests and fix bugs KAFKA-2063: Address Jason's comments, add tests, fix bugs, merge trunk Sep 16, 2016
@ijuma ijuma changed the base branch from trunk to kip-74 September 16, 2016 11:22
@ijuma ijuma force-pushed the kafka-2063-bound-fetch-response-size branch 3 times, most recently from 8305cf4 to bafadf1 Compare September 16, 2016 11:30
The behaviour in brokers changed so it should never happen.
@ijuma ijuma force-pushed the kafka-2063-bound-fetch-response-size branch from bafadf1 to f5eb8d5 Compare September 16, 2016 11:31
@nepal nepal merged commit 6bc8783 into nepal:kip-74 Sep 16, 2016
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
9 participants