-
Notifications
You must be signed in to change notification settings - Fork 18
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add throttle factor resilience #37
Conversation
…sults Why: We are going to introduce a fallback throttle factor that will be used if we fail to obtain a valid throttle factor for some period of time. To drive this logic we will need to tell the observer when there is some failure during observation. Signed-off-by: Robert Young <robeyoun@redhat.com> Co-authored-by: Sam Barker <sam@quadrocket.co.uk>
With this change a throttle factor generated from a successful volume usage observation will be eligible for expiry 5 minutes after it is applied in PolicyBasedThrottle. After 5 minutes, if there have been no successful volume usage observations in the intervening time, then the throttle factor will be set to 0.0. This occurs as part of the scheduled job in reaction to a failed observation. Why: Failures are part of life and we can expect calls from the adminclient to fail at some point. We need to build in some resilience or we risk blocking all message production to a cluster based on some spurious errors. The other side of the coin is the user may have no appetite for the risk of filling a volume while we are failing to observe the cluster disk usage. Signed-off-by: Robert Young <robeyoun@redhat.com> Co-authored-by: Sam Barker <sam@quadrocket.co.uk>
We execute a second runnable that is only responsible for checking the validity. Why: If a volume usage result is not successful, we trigger an expiry check on the current throttle factor. This means a long delay in getting the usage result could prolong how long the cluster operates with the stale throttle factor. Note that we need another thread in the pool or a separate thread for this job as the other job is executing a blocking `get` on a future. Signed-off-by: Robert Young <robeyoun@redhat.com> Co-authored-by: Sam Barker <sam@quadrocket.co.uk>
Why: We now schedule two jobs with a fixed delay, one to observe the volumes and update the throttle factor if required and a second to check if the factor has expired and set a fallback throttle factor if required. This second job is supposed to be a failsafe in case the volume observation is taking a long time, delaying the application of stale checking. If we drive both jobs on one thread then the volume observation job may block the throttle expiration check job. By running with 2 threads the expiration check can run. The factor can now be updated from multiple threads so we need better control over when it is read and written to. With this change we will be sure that it has been changed in our thread. Synchronised should be fine as it's a relatively infrequent operation and we've synchronized some simple computation. Signed-off-by: Robert Young <robeyoun@redhat.com> Co-authored-by: Sam Barker <sam@quadrocket.co.uk>
Adds config properties: - client.quota.callback.static.throttle.factor.fallback Defaults to 1.0, valid values are (0.0, 1.0) - client.quota.callback.static.throttle.factor.validity.duration Defaults to PT5M (5 minutes), any ISO 8601 duration string is valid Signed-off-by: Robert Young <robeyoun@redhat.com> Co-authored-by: Sam Barker <sam@quadrocket.co.uk>
Fixing spotbugs now |
- guard against passing reference to mutable collection - remove throwable getter and use throwable only internally for the toString Signed-off-by: Robert Young <robeyoun@redhat.com> Co-authored-by: Sam Barker <sam@quadrocket.co.uk>
src/main/java/io/strimzi/kafka/quotas/VolumeUsageObservation.java
Outdated
Show resolved
Hide resolved
@@ -73,7 +77,9 @@ public StaticQuotaConfig(Map<String, ?> props, boolean doLog) { | |||
.define(EXCLUDED_PRINCIPAL_NAME_LIST_PROP, LIST, List.of(), MEDIUM, "List of principals that are excluded from the quota") | |||
.define(STORAGE_CHECK_INTERVAL_PROP, INT, 0, MEDIUM, "Interval between storage check runs (in seconds, default of 0 means disabled") | |||
.define(AVAILABLE_BYTES_PROP, LONG, null, nullOrInRangeValidator(atLeast(0)), MEDIUM, "Stop message production if availableBytes <= this value") | |||
.define(AVAILABLE_RATIO_PROP, DOUBLE, null, nullOrInRangeValidator(between(0.0, 1.0)), MEDIUM, "Stop message production if availableBytes / capacityBytes <= this value"), | |||
.define(AVAILABLE_RATIO_PROP, DOUBLE, null, nullOrInRangeValidator(between(0.0, 1.0)), MEDIUM, "Stop message production if availableBytes / capacityBytes <= this value") | |||
.define(THROTTLE_FALLBACK_VALIDITY_DURATION, STRING, "PT5M", iso8601DurationValidator(), MEDIUM, "Stop message production if availableBytes / capacityBytes <= this value") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It also makes me wonder if we should change the definition of/replace STORAGE_CHECK_INTERVAL_PROP
with one that uses iso8601 durations instead of a fixed number of seconds?
src/main/java/io/strimzi/kafka/quotas/VolumeUsageObservation.java
Outdated
Show resolved
Hide resolved
src/main/java/io/strimzi/kafka/quotas/throttle/PolicyBasedThrottle.java
Outdated
Show resolved
Hide resolved
src/main/java/io/strimzi/kafka/quotas/throttle/fallback/FixedDurationExpiryPolicy.java
Outdated
Show resolved
Hide resolved
src/test/java/io/strimzi/kafka/quotas/StaticQuotaCallbackTest.java
Outdated
Show resolved
Hide resolved
Why: It's a more typical way to name a success value or failure Signed-off-by: Robert Young <robeyoun@redhat.com> Co-authored-by: Sam Barker <sam@quadrocket.co.uk>
Why: We don't need to tie it to the storage check interval and it should be cheap to run often, keeping in mind that it info logs on each run Signed-off-by: Robert Young <robeyoun@redhat.com> Co-authored-by: Sam Barker <sam@quadrocket.co.uk>
Signed-off-by: Robert Young <robeyoun@redhat.com> Co-authored-by: Sam Barker <sam@quadrocket.co.uk>
Describe the states we expect result to be in Signed-off-by: Robert Young <robeyoun@redhat.com> Co-authored-by: Sam Barker <sam@quadrocket.co.uk>
Signed-off-by: Robert Young <robeyoun@redhat.com> Co-authored-by: Sam Barker <sam@quadrocket.co.uk>
Signed-off-by: Robert Young <robeyoun@redhat.com> Co-authored-by: Sam Barker <sam@quadrocket.co.uk>
Why: Ticking makes it sound like it can act independently when it is only driven by clients calling tick. Signed-off-by: Robert Young <robeyoun@redhat.com> Co-authored-by: Sam Barker <sam@quadrocket.co.uk>
f6d3199
to
703b1aa
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the PR! It looks good overall, I left a few minor suggestions.
src/main/java/io/strimzi/kafka/quotas/throttle/ThrottleFactor.java
Outdated
Show resolved
Hide resolved
src/main/java/io/strimzi/kafka/quotas/throttle/PolicyBasedThrottle.java
Outdated
Show resolved
Hide resolved
Co-authored-by: Robert Young <robeyoun@redhat.com> Signed-off-by: Sam Barker <sbarker@redhat.com>
Co-authored-by: Robert Young <robeyoun@redhat.com> Signed-off-by: Sam Barker <sbarker@redhat.com>
Co-authored-by: Robert Young <robeyoun@redhat.com> Signed-off-by: Sam Barker <sbarker@redhat.com>
Co-authored-by: Robert Young <robeyoun@redhat.com> Signed-off-by: Sam Barker <sbarker@redhat.com>
@tombentley @ppatierno It looks like you volunteered to review this PR on the community call ;-) |
There are about 45000 people who can attest I was not on the community call but ... at the stadium that time :-D |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I had a first pass and left some comments ...
final Set<Integer> allBrokerIds = nodes.stream().map(Node::id).collect(toSet()); | ||
|
||
admin.describeLogDirs(allBrokerIds) | ||
.allDescriptions() | ||
.whenComplete((logDirsPerBroker, throwable) -> { | ||
if (throwable != null) { | ||
promise.completeExceptionally(throwable); | ||
promise.complete(VolumeUsageResult.failure(VolumeSourceObservationStatus.DESCRIBE_LOG_DIR_ERROR, throwable)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
using just failure
as the other places, or the other way around using the full form everywhere?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
fixed.
log.debug("Successfully described cluster: " + nodes); | ||
} | ||
//Deliberately stay on the adminClient thread as the next thing we do is another admin API call | ||
onDescribeClusterSuccess(nodes, volumeUsageResultPromise); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would ...
Add a comment explaining that this method is going to complete the promise, or just remove the method and putting the code here to have a complete picture about what's happening. The onDescribeClusterSuccess
is not that big but hides the logic imho. Of course, it's a personal preference, others could see it differently.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
While @robobario and quite like onDescribeClusterSucess
and the fact it's "hiding" things we did take your point about inconsistent completion of the promise so have refactored things in 2ff1bae so that everything chains a lot better.
Co-authored-by: Robert Young <robeyoun@redhat.com> Signed-off-by: Sam Barker <sbarker@redhat.com>
Co-authored-by: Robert Young <robeyoun@redhat.com> Signed-off-by: Sam Barker <sbarker@redhat.com>
Co-authored-by: Robert Young <robeyoun@redhat.com> Signed-off-by: Sam Barker <sbarker@redhat.com>
Co-authored-by: Robert Young <robeyoun@redhat.com> Signed-off-by: Sam Barker <sbarker@redhat.com>
Chain calls together in a fashion which handles errors better. Co-authored-by: Robert Young <robeyoun@redhat.com> Signed-off-by: Sam Barker <sbarker@redhat.com>
Co-authored-by: Robert Young <robeyoun@redhat.com> Signed-off-by: Sam Barker <sbarker@redhat.com>
…nstead of the full stacktrace. Co-authored-by: Robert Young <robeyoun@redhat.com> Signed-off-by: Sam Barker <sbarker@redhat.com>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the PR. I made another pass and it looks good to me.
I left a few comments for very minor issues.
try { | ||
log.info("Updating cluster volume usage."); | ||
log.debug("Attempting to describe cluster"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we need 2 log lines here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Debatable, the info entry feels useful for normal operations and I think the debug logging becomes useful when trying to trouble shoot issues with the plug-in.
My preference is to keep both but happy to remove the debug entry if other prefer.
Your comment did prompt me to go and check the rest of the flow and the debug logging wasn't as comprehensive as intended so I've fixed that.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The thing is that both log messages take no parameters, so it's not like the debug one is really adding extra information. If you think it's really important to log that you're doing a describe you could just log.debug("Updating cluster volume usage; attempting to describe cluster")
.
} | ||
|
||
/** | ||
* The cause of the error or {@code null} in case of sucess. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: success
* @return the path identifying the logDir on the broker its hosted by. | ||
*/ | ||
public String getLogDir() { | ||
return logDir; | ||
} | ||
|
||
/** | ||
* The capacity of the underlying Volume. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this called from anywhere? If not should we remove it?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's not currently but I think is needed in the next PR adding metrics, which will include reporting available capacity.
* @return The number available (free) remaining on the volume. | ||
*/ | ||
public long getAvailableBytes() { | ||
return availableBytes; | ||
} | ||
|
||
/** | ||
* | ||
* The consumed capacity of the underlying volume. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this called from anywhere? If not should we remove it?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same answer as above we will need it for the metrics at least and potentially for other future limit policies.
I'm somewhat on the fence if including for future policies is a good choice or not...
Signed-off-by: Sam Barker <sbarker@redhat.com>
Signed-off-by: Sam Barker <sbarker@redhat.com>
return ConfigDef.LambdaValidator.with((name, value) -> { | ||
String duration = (String) value; | ||
try { | ||
Duration.parse(duration); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I do wonder about using an ISO8601 duration. Something like throttle.fallback.validity.seconds
would be much more Kafkaesque, and just as clear, imho.
try { | ||
log.info("Updating cluster volume usage."); | ||
log.debug("Attempting to describe cluster"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The thing is that both log messages take no parameters, so it's not like the debug one is really adding extra information. If you think it's really important to log that you're doing a describe you could just log.debug("Updating cluster volume usage; attempting to describe cluster")
.
* @param throwable the optional throwable | ||
*/ | ||
public Result(T result, Class<? extends Throwable> throwable) { | ||
this.value = result; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Might be worth throwing IAE if value != null && throwable != null
* @return The number of bytes on the volume which have been consumed (used). | ||
*/ | ||
public long getConsumedSpace() { | ||
return capacity - availableBytes; | ||
} | ||
|
||
/** | ||
* | ||
* Expresses the available space as a percentage. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The mismatch between method name and javadoc is confusing. I'd expect a percentage to be in [0, 100] and a ratio in [0, 1], so can we be consistent is using one term or the other?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
fixed in b03dca7
/** | ||
* execution exception while attempting to observe the cluster | ||
*/ | ||
EXECUTION_EXCEPTION |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we need this? How does it differs from DESCRIBE_CLUSTER_ERROR or DESCRIBE_LOG_DIR_ERROR?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looking more closely, it appears the only cases we need to distinguish are success and failure. io.strimzi.kafka.quotas.VolumeUsageResult#failure already captures the cause of failure via its Class<? extends Throwable>
parameter. So we could actually just use a boolean
than declaring an enum
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
EXECUTION_EXCEPTION
, SAFETY_TIMEOUT
, INTERRUPTED
are all classifications for exceptions that can be thrown when waiting on the Future
from the run
. They do sound generic, which is misleading, maybe they should be combined into something like VOLUME_USAGE_EXCEPTION
for any failure in the top run
method, this might be more resistant to refactoring too.
The enum is looking forward to failure metrics. Having a stable classifications for each outcome means we can emit 0 immediately for all cases, heading off issues with alerting on rate/increase
if we create the metric dynamically on first failure. Passing back the exception could lose context about which operation is failing. We also have a direct path from the metric name to an enum usage in code when trying to trace why it was emitted.
So it depends what we want in the metrics. If we want to have counters that break-down into different failure reasons then I'd prefer to have them enumerated up front. If we instead go for just success and fail count and use logs to debug why it's failing we don't need the enum.
Signed-off-by: Sam Barker <sbarker@redhat.com>
Signed-off-by: Sam Barker <sbarker@redhat.com>
Signed-off-by: Sam Barker <sbarker@redhat.com>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I left a few more nits, and noted a possible problem which we might want to investigate a little more, but otherwise this LGTM. Thanks!
public class FixedDurationExpiryPolicy implements ExpiryPolicy { | ||
|
||
|
||
private final Duration expireAfter; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
expireAfter
is a bit confusing to me. It makes it sound like this is an instant rather than a duration. validityDuration
might be clearer.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We switched to validFor
in a61ef71 as we felt that conveyed the intent better.
src/main/java/io/strimzi/kafka/quotas/throttle/fallback/ExpiryPolicy.java
Outdated
Show resolved
Hide resolved
/** | ||
* execution exception while attempting to observe the cluster | ||
*/ | ||
EXECUTION_EXCEPTION |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looking more closely, it appears the only cases we need to distinguish are success and failure. io.strimzi.kafka.quotas.VolumeUsageResult#failure already captures the cause of failure via its Class<? extends Throwable>
parameter. So we could actually just use a boolean
than declaring an enum
.
* Copyright Strimzi authors. | ||
* License: Apache License 2.0 (see the file LICENSE or http://apache.org/licenses/LICENSE-2.0.html). | ||
*/ | ||
package io.strimzi.kafka.quotas.throttle.fallback; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not sure we really need a separate package for the policies, and even if we did it's not clear why that should be called fallback
. Yes, it's currently being used for a policy on fallback throttle factors, but that's an aspect of how the abstraction happens to be being used, it's not intrinsic to the abstraction itself.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
fixed in 27aba26
} | ||
} | ||
|
||
private ThrottleFactor calculateThrottleFactorWithExpiry(VolumeUsageResult observedVolumes) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm all for small methods, but I think this one could be inlined into getNewFactor
and the code would be clearer.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Inlined in 12ebcc1
I think we had split it out as it gave some explanation as to what that code was doing and gave it a sense of symmetry with the maybeFallback
case, but given feedback that it would be clearer inline we have done so.
private synchronized boolean updateFactorAndCheckIfChanged(Function<ThrottleFactor, ThrottleFactor> throttleFactorUpdater) { | ||
ThrottleFactor currentFactor = this.throttleFactor; | ||
throttleFactor = throttleFactorUpdater.apply(currentFactor); | ||
boolean changed = !Objects.equals(currentFactor.getThrottleFactor(), throttleFactor.getThrottleFactor()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If you're intentionally using Objects.equals
to convert the primitive to a Double here then it deserves a comment. Note that Double.equals
is not exactly the same as ==
on two doubles (NaN and ±0 have special handling in Double.equals
), so converting to Double
might avoid some edge cases.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
hmmm I'm not entirely sure we really want to use true equality with doubles given floating point representation, then again we are only really talking about comparing combinations of 0.0
and 1.0
(which are defined in constants) so equality is probably good enough (see Stackoverflow on the subject).
Given the expected range of values dealing in primitive doubles is probably good enough.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Switched to primitive comparisons in c762819
try { | ||
log.info("Updating cluster volume usage."); | ||
CompletableFuture<VolumeUsageResult> volumeUsagePromise = toResultStage(admin.describeCluster().nodes()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I know the logic here hasn't changed, but I realised there is possibly a flaw here.
admin.describeCluster().nodes()
returns the currently alive nodes in the cluster.- So the logic here can't account for brokers which are not currently alive.
- So a broker with a nearly full disk that restarts will be forgotten about:
- Maybe broker 0 got to 99% full and the quota kicks in preventing writes to any brokers in the cluster. What happens if broker 0 is now restarted?
- It will disappear from the
admin.describeCluster()
result (for a time at least), which will release the throttling, allowing appends to the other brokers. Broker 0 might be down for a while if it needed to do log recover on restart. - When broker 0 does rejoin, there is a window before the quotas will get applied again. In the mean time there are appends it needs to replicate, so it fetches and appends and fills its disks.
I think this is all pretty unlikely. In practice it requires a number of things to align, but I think this is possible.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, we also haven't covered a case mentioned in the proposal yet. If a node drops out of the active set between describeCluster and describeLogDirs calls we could consider this an inconsistency and not act on that data. We'll handle this in another PR.
I'll make an Issue for this problem since it shouldn't block this PR
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
Makes it consistent with usage at call sites. Co-authored-by: Tom Bentley <tombentley@users.noreply.github.com> Signed-off-by: Sam Barker <sam@quadrocket.co.uk>
Addresses: strimzi#37 (comment) Signed-off-by: Sam Barker <sbarker@redhat.com>
Addresses: strimzi#37 (comment) Given the range of values involved concerns about NaN and positive and negative infinity are moot. Signed-off-by: Sam Barker <sbarker@redhat.com>
Signed-off-by: Sam Barker <sbarker@redhat.com>
Signed-off-by: Sam Barker <sbarker@redhat.com>
We have finished addressing comments |
@robobario Do, is it ready to merge? |
Yes please, thanks @scholzj |
Thanks for the PR. |
To make the throttle factor logic more resilient in the face of transient failures we want to continue using a previously calculated valid throttle factor for some duration, rather than immediately using the throttle factor fallback after a single failure.
This PR introduces the concept of Throttle Factor Fallback, that is the factor we apply after a previously calculated factor expires. It is configured with optional property
client.quota.callback.static.throttle.factor.fallback
with default value1.0
accepting values in the range(0.0, 1.0)
.This PR introduces the concept of Throttle Factor Validity Duration, that is how long a throttle factor generated from a successful cluster observation can be used before we fall back to the Throttle Factor Fallback. It is configured with optional property
client.quota.callback.static.throttle.factor.validity.duration
with default valuePT5M
accepting any valid ISO8601 duration string.We apply this fallback logic in two places: