Skip to content

Commit

Permalink
Remove deprecated total disk usage soft/hard limit types
Browse files Browse the repository at this point in the history
Why:
Acting on the total usage of the cluster makes no sense now that we are
using KIP-827 to determine the usage of all volumes in the active broker
set. Nodes will enter/exit the active set as a matter of course. So the
aggregate usage will fluctuate. Instead we will limit based on the usage
of each volume going forward. IE throttle if any of the volumes is getting
too full.

See Also:
* https://github.com/strimzi/proposals/blob/main/047-cluster-wide-volume-usage-quota-management.md
* [Add min available bytes storage policy](strimzi#33)
  • Loading branch information
robobario committed Feb 15, 2023
1 parent ea44ab3 commit b5cff42
Show file tree
Hide file tree
Showing 6 changed files with 12 additions and 259 deletions.
20 changes: 11 additions & 9 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ If you have a max of 40 MB/second, 2 producers, and one of them is producing mes

## Using with Strimzi

**Warning: this version is not included in Strimzi yet, see tag [0.2.0](https://github.com/strimzi/kafka-quotas-plugin/tree/0.2.0)** for current Strimzi-compatible docs.

The plugin is already included in Strimzi and needs to be configured in the `Kafka` custom resource.
The following example shows the configuration:

Expand All @@ -38,13 +40,15 @@ spec:
client.quota.callback.class: io.strimzi.kafka.quotas.StaticQuotaCallback
client.quota.callback.static.produce: 1000000 # 1 MB/s
client.quota.callback.static.fetch: 1000000 # 1 MB/s
client.quota.callback.static.storage.soft: 239538204672 # 80GB
client.quota.callback.static.storage.hard: 249538204672 # 100GB
client.quota.callback.static.storage.per.volume.limit.min.available.bytes : 5368709120 # 5GB
client.quota.callback.static.storage.check-interval: 5 # Check storage every 5 seconds
client.quota.callback.static.excluded.principal.name.list: principal1,principal2 # Optional list of principals not to be subjected to the quota
# ...
```

Strimzi will be responsible for configuring `client.quota.callback.static.kafka.admin.bootstrap.servers` and other admin
client configurations required to communicate with the broker.

## Using with other Apache Kafka clusters

The Quota plugin can be also used with non-Strimzi Apache Kafka clusters.
Expand All @@ -60,10 +64,10 @@ client.quota.callback.class=io.strimzi.kafka.quotas.StaticQuotaCallback
client.quota.callback.static.produce=1000000
client.quota.callback.static.fetch=1000000

# Storage quota settings in bytes. Clients will be throttled linearly between produce quota and 0 after soft limit.
# In this example the soft limit is set to 80GB and the hard limit to 100GB
client.quota.callback.static.storage.soft=80000000000
client.quota.callback.static.storage.hard=100000000000
client.quota.callback.static.kafka.admin.bootstrap.servers=localhost:9092 # required if storage.check-interval is >0
client.quota.callback.static.kafka.admin.ssl.truststore.location=/tmp/trust.jks # optionally configure the admin client further
# Storage quota settings in bytes. Clients will be throttled to 0 when any volume in the cluster has <= 5GB available bytes
client.quota.callback.static.storage.per.volume.limit.min.available.bytes=5368709120

# Check storage usage every 5 seconds
client.quota.callback.static.storage.check-interval=5
Expand All @@ -75,13 +79,11 @@ client.quota.callback.static.excluded.principal.name.list=principal1,principal2
## Metrics

The plugin currently provides the following metrics:
* `io.strimzi.kafka.quotas:type=StorageChecker,name=TotalStorageUsedBytes` shows the current storage usage
* `io.strimzi.kafka.quotas:type=StorageChecker,name=SoftLimitBytes` shows the currently configured soft limit
* `io.strimzi.kafka.quotas:type=StorageChecker,name=HardLimitBytes` shows the currently configured hard limit
* `io.strimzi.kafka.quotas:type=StaticQuotaCallback,name=Produce` shows the currently configured produce quota
* `io.strimzi.kafka.quotas:type=StaticQuotaCallback,name=Fetch` shows the currently configured fetch quota
* `io.strimzi.kafka.quotas:type=StaticQuotaCallback,name=Request` shows the currently configured request quota


## Building

To build the plugin:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -139,11 +139,7 @@ public void configure(Map<String, ?> configs) {
throttleFactorPolicy = new AvailableBytesThrottleFactorPolicy(availableBytesLimitConfig.get());
log.info("Available bytes limit {}", availableBytesLimitConfig.get());
} else {
storageQuotaSoft = config.getSoftStorageQuota();
storageQuotaHard = config.getHardStorageQuota();

throttleFactorPolicy = new TotalConsumedThrottleFactorPolicy(storageQuotaHard, storageQuotaSoft);
log.info("Total consumed Storage quota (soft, hard): ({}, {}).", storageQuotaSoft, storageQuotaHard);
throw new IllegalStateException("storageCheckInterval > 0 but no limit type configured");
}
final PolicyBasedThrottle factorNotifier = new PolicyBasedThrottle(throttleFactorPolicy, () -> resetQuota.add(ClientQuotaType.PRODUCE));
throttleFactorSource = factorNotifier;
Expand Down
12 changes: 0 additions & 12 deletions src/main/java/io/strimzi/kafka/quotas/StaticQuotaConfig.java
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,6 @@ public class StaticQuotaConfig extends AbstractConfig {
static final String FETCH_QUOTA_PROP = CLIENT_QUOTA_CALLBACK_STATIC_PREFIX + ".fetch";
static final String REQUEST_QUOTA_PROP = CLIENT_QUOTA_CALLBACK_STATIC_PREFIX + ".request";
static final String EXCLUDED_PRINCIPAL_NAME_LIST_PROP = CLIENT_QUOTA_CALLBACK_STATIC_PREFIX + ".excluded.principal.name.list";
static final String STORAGE_QUOTA_SOFT_PROP = CLIENT_QUOTA_CALLBACK_STATIC_PREFIX + ".storage.soft";
static final String STORAGE_QUOTA_HARD_PROP = CLIENT_QUOTA_CALLBACK_STATIC_PREFIX + ".storage.hard";
static final String STORAGE_CHECK_INTERVAL_PROP = CLIENT_QUOTA_CALLBACK_STATIC_PREFIX + ".storage.check-interval";
static final String AVAILABLE_BYTES_PROP = CLIENT_QUOTA_CALLBACK_STATIC_PREFIX + ".storage.per.volume.limit.min.available.bytes";
static final String ADMIN_BOOTSTRAP_SERVER_PROP = CLIENT_QUOTA_CALLBACK_STATIC_PREFIX + ".kafka.admin.bootstrap.servers";
Expand Down Expand Up @@ -70,8 +68,6 @@ public StaticQuotaConfig(Map<String, ?> props, boolean doLog) {
.define(FETCH_QUOTA_PROP, DOUBLE, Double.MAX_VALUE, HIGH, "Consume bandwidth rate quota (in bytes)")
.define(REQUEST_QUOTA_PROP, DOUBLE, Double.MAX_VALUE, HIGH, "Request processing time quota (in seconds)")
.define(EXCLUDED_PRINCIPAL_NAME_LIST_PROP, LIST, List.of(), MEDIUM, "List of principals that are excluded from the quota")
.define(STORAGE_QUOTA_SOFT_PROP, LONG, Long.MAX_VALUE, HIGH, "Hard limit for amount of storage allowed (in bytes)")
.define(STORAGE_QUOTA_HARD_PROP, LONG, Long.MAX_VALUE, HIGH, "Soft limit for amount of storage allowed (in bytes)")
.define(STORAGE_CHECK_INTERVAL_PROP, INT, 0, MEDIUM, "Interval between storage check runs (in seconds, default of 0 means disabled")
.define(AVAILABLE_BYTES_PROP, LONG, null, nullOrGreaterThanZeroValidator(), MEDIUM, "Stop message production if availableBytes <= this value"),
props,
Expand Down Expand Up @@ -102,14 +98,6 @@ Map<ClientQuotaType, Quota> getQuotaMap() {
return m;
}

long getHardStorageQuota() {
return getLong(STORAGE_QUOTA_HARD_PROP);
}

long getSoftStorageQuota() {
return getLong(STORAGE_QUOTA_SOFT_PROP);
}

Optional<Long> getAvailableBytesLimit() {
return Optional.ofNullable(getLong(AVAILABLE_BYTES_PROP));
}
Expand Down

This file was deleted.

31 changes: 0 additions & 31 deletions src/test/java/io/strimzi/kafka/quotas/StaticQuotaCallbackTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -227,37 +227,6 @@ void quotaResetRequired() {
quotaCallback.close();
}

@Test
void storageCheckerMetrics() {
ArgumentCaptor<VolumeObserver> argument = ArgumentCaptor.forClass(VolumeObserver.class);
when(volumeSourceBuilder.withVolumeObserver(argument.capture())).thenReturn(volumeSourceBuilder);

StaticQuotaCallback quotaCallback = new StaticQuotaCallback(volumeSourceBuilder, backgroundScheduler);

quotaCallback.configure(Map.of(
StaticQuotaConfig.STORAGE_QUOTA_SOFT_PROP, 15L,
StaticQuotaConfig.STORAGE_QUOTA_HARD_PROP, 16L,
StaticQuotaConfig.STORAGE_CHECK_INTERVAL_PROP, 10,
StaticQuotaConfig.ADMIN_BOOTSTRAP_SERVER_PROP, "localhost:9092"
));

long availableBytes = 17;
argument.getValue().observeVolumeUsage(List.of(newVolume(availableBytes)));

SortedMap<MetricName, Metric> group = getMetricGroup("io.strimzi.kafka.quotas.StaticQuotaCallback", "StorageChecker");

assertGaugeMetric(group, "SoftLimitBytes", 15L);
assertGaugeMetric(group, "HardLimitBytes", 16L);
assertGaugeMetric(group, "TotalStorageUsedBytes", VOLUME_CAPACITY - availableBytes);

// the mbean name is part of the public api
MetricName name = group.firstKey();
String expectedMbeanName = String.format("io.strimzi.kafka.quotas:type=StorageChecker,name=%s", name.getName());
assertEquals(expectedMbeanName, name.getMBeanName(), "unexpected mbean name");

quotaCallback.close();
}

@Test
void staticQuotaMetrics() {

Expand Down

This file was deleted.

0 comments on commit b5cff42

Please sign in to comment.