Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix #11: Factor out StorageChecker #20

Merged
merged 2 commits into from
Feb 28, 2022
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
121 changes: 27 additions & 94 deletions src/main/java/io/strimzi/kafka/quotas/StaticQuotaCallback.java
Original file line number Diff line number Diff line change
Expand Up @@ -4,24 +4,18 @@
*/
package io.strimzi.kafka.quotas;

import java.io.IOException;
import java.io.UncheckedIOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.Collectors;

import com.yammer.metrics.Metrics;
import com.yammer.metrics.core.Gauge;
import com.yammer.metrics.core.MetricName;

import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.metrics.Quota;
import org.apache.kafka.common.security.auth.KafkaPrincipal;
Expand All @@ -40,16 +34,16 @@ public class StaticQuotaCallback implements ClientQuotaCallback {

private volatile Map<ClientQuotaType, Quota> quotaMap = new HashMap<>();
private final AtomicLong storageUsed = new AtomicLong(0);
private volatile List<Path> logDirs;
private volatile long storageQuotaSoft = Long.MAX_VALUE;
private volatile long storageQuotaHard = Long.MAX_VALUE;
private volatile int storageCheckInterval = Integer.MAX_VALUE;
private volatile List<String> excludedPrincipalNameList = List.of();
private final AtomicBoolean resetQuota = new AtomicBoolean(false);
final StorageChecker storageChecker = new StorageChecker();
private final StorageChecker storageChecker = new StorageChecker();
private final static long LOGGING_DELAY_MS = 1000;
private AtomicLong lastLoggedMessageSoftTimeMs = new AtomicLong(0);
private AtomicLong lastLoggedMessageHardTimeMs = new AtomicLong(0);
private final String scope = "io.strimzi.kafka.quotas.StaticQuotaCallback";

@Override
public Map<String, String> quotaMetricTags(ClientQuotaType quotaType, KafkaPrincipal principal, String clientId) {
Expand Down Expand Up @@ -139,105 +133,44 @@ public void configure(Map<String, ?> configs) {
quotaMap = config.getQuotaMap();
storageQuotaSoft = config.getSoftStorageQuota();
storageQuotaHard = config.getHardStorageQuota();
storageCheckInterval = config.getStorageCheckInterval();
excludedPrincipalNameList = config.getExcludedPrincipalNameList();
logDirs = Arrays.stream(config.getLogDirs().split(",")).map(Paths::get).collect(Collectors.toList());

List<Path> logDirs = config.getLogDirs().stream().map(Paths::get).collect(Collectors.toList());
storageChecker.configure(config.getStorageCheckInterval(),
logDirs,
this::updateUsedStorage);

log.info("Configured quota callback with {}. Storage quota (soft, hard): ({}, {}). Storage check interval: {}", quotaMap, storageQuotaSoft, storageQuotaHard, storageCheckInterval);
if (!excludedPrincipalNameList.isEmpty()) {
log.info("Excluded principals {}", excludedPrincipalNameList);
}
}

class StorageChecker implements Runnable {
private final Thread storageCheckerThread = new Thread(this, "storage-quota-checker");
private AtomicBoolean running = new AtomicBoolean(false);
private String scope = "io.strimzi.kafka.quotas.StaticQuotaCallback";

private void createCustomMetrics() {
createCustomMetrics();
}

Metrics.newGauge(metricName("TotalStorageUsedBytes"), new Gauge<Long>() {
public Long value() {
return storageUsed.get();
}
});
Metrics.newGauge(metricName("SoftLimitBytes"), new Gauge<Long>() {
public Long value() {
return storageQuotaSoft;
}
});
private void updateUsedStorage(Long newValue) {
var oldValue = storageUsed.getAndSet(newValue);
if (oldValue != newValue) {
resetQuota.set(true);
}
}

private MetricName metricName(String name) {
private MetricName metricName(String name) {
String mBeanName = "io.strimzi.kafka.quotas:type=StorageChecker,name=" + name + "";
return new MetricName("io.strimzi.kafka.quotas", "StorageChecker", name, this.scope, mBeanName);
}

String mBeanName = "io.strimzi.kafka.quotas:type=StorageChecker,name=" + name + "";
return new MetricName("io.strimzi.kafka.quotas", "StorageChecker", name, this.scope, mBeanName);
}
private void createCustomMetrics() {

void startIfNecessary() {
if (running.compareAndSet(false, true)) {
createCustomMetrics();
storageCheckerThread.setDaemon(true);
storageCheckerThread.start();
Metrics.newGauge(metricName("TotalStorageUsedBytes"), new Gauge<Long>() {
public Long value() {
return storageUsed.get();
}
}

void stop() throws InterruptedException {
running.set(false);
storageCheckerThread.interrupt();
storageCheckerThread.join();
}

@Override
public void run() {
if (StaticQuotaCallback.this.logDirs != null
&& StaticQuotaCallback.this.storageQuotaSoft > 0
&& StaticQuotaCallback.this.storageQuotaHard > 0
&& StaticQuotaCallback.this.storageCheckInterval > 0) {
try {
log.info("Quota Storage Checker is now starting");
while (running.get()) {
try {
long diskUsage = checkDiskUsage();
long previousUsage = StaticQuotaCallback.this.storageUsed.getAndSet(diskUsage);
if (diskUsage != previousUsage) {
StaticQuotaCallback.this.resetQuota.set(true);
}
log.debug("Storage usage checked: {}", StaticQuotaCallback.this.storageUsed.get());
Thread.sleep(TimeUnit.SECONDS.toMillis(StaticQuotaCallback.this.storageCheckInterval));
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
break;
} catch (Exception e) {
log.warn("Exception in storage checker thread", e);
}
}
} finally {
log.info("Quota Storage Checker is now finishing");
}
});
Metrics.newGauge(metricName("SoftLimitBytes"), new Gauge<Long>() {
public Long value() {
return storageQuotaSoft;
}
}

long checkDiskUsage() {
return logDirs.stream()
.filter(Files::exists)
.map(path -> apply(() -> Files.getFileStore(path)))
.distinct()
.mapToLong(store -> apply(() -> store.getTotalSpace() - store.getUsableSpace()))
.sum();
}
}

static <T> T apply(IOSupplier<T> supplier) {
try {
return supplier.get();
} catch (IOException e) {
throw new UncheckedIOException(e);
}
}

@FunctionalInterface
interface IOSupplier<T> {
T get() throws IOException;
});
}
}
22 changes: 14 additions & 8 deletions src/main/java/io/strimzi/kafka/quotas/StaticQuotaConfig.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,12 @@
import static org.apache.kafka.common.config.ConfigDef.Importance.HIGH;
import static org.apache.kafka.common.config.ConfigDef.Importance.MEDIUM;
import static org.apache.kafka.common.config.ConfigDef.Type.DOUBLE;
import static org.apache.kafka.common.config.ConfigDef.Type.INT;
import static org.apache.kafka.common.config.ConfigDef.Type.LIST;
import static org.apache.kafka.common.config.ConfigDef.Type.LONG;
import static org.apache.kafka.common.config.ConfigDef.Type.STRING;

/**
* Configuration for the static quota plugin.
*/
public class StaticQuotaConfig extends AbstractConfig {
static final String PRODUCE_QUOTA_PROP = "client.quota.callback.static.produce";
static final String FETCH_QUOTA_PROP = "client.quota.callback.static.fetch";
Expand All @@ -31,6 +32,11 @@ public class StaticQuotaConfig extends AbstractConfig {
static final String STORAGE_CHECK_INTERVAL_PROP = "client.quota.callback.static.storage.check-interval";
static final String LOG_DIRS_PROP = "log.dirs";

/**
* Construct a configuration for the static quota plugin.
* @param props the configuration properties
* @param doLog whether the configurations should be logged
*/
public StaticQuotaConfig(Map<String, ?> props, boolean doLog) {
super(new ConfigDef()
.define(PRODUCE_QUOTA_PROP, DOUBLE, Double.MAX_VALUE, HIGH, "Produce bandwidth rate quota (in bytes)")
Expand All @@ -39,8 +45,8 @@ public StaticQuotaConfig(Map<String, ?> props, boolean doLog) {
.define(EXCLUDED_PRINCIPAL_NAME_LIST_PROP, LIST, List.of(), MEDIUM, "List of principals that are excluded from the quota")
.define(STORAGE_QUOTA_SOFT_PROP, LONG, Long.MAX_VALUE, HIGH, "Hard limit for amount of storage allowed (in bytes)")
.define(STORAGE_QUOTA_HARD_PROP, LONG, Long.MAX_VALUE, HIGH, "Soft limit for amount of storage allowed (in bytes)")
.define(STORAGE_CHECK_INTERVAL_PROP, INT, 0, MEDIUM, "Interval between storage check runs (default of 0 means disabled)")
.define(LOG_DIRS_PROP, STRING, "/tmp/kafka-logs", HIGH, "Broker log directory"),
.define(STORAGE_CHECK_INTERVAL_PROP, LONG, 0, MEDIUM, "Interval between storage check runs (default of 0 means disabled")
.define(LOG_DIRS_PROP, LIST, List.of(), HIGH, "Broker log directories"),
props,
doLog);
}
Expand All @@ -66,12 +72,12 @@ long getSoftStorageQuota() {
return getLong(STORAGE_QUOTA_SOFT_PROP);
}

int getStorageCheckInterval() {
return getInt(STORAGE_CHECK_INTERVAL_PROP);
long getStorageCheckInterval() {
return getLong(STORAGE_CHECK_INTERVAL_PROP);
}

String getLogDirs() {
return getString(LOG_DIRS_PROP);
List<String> getLogDirs() {
return getList(LOG_DIRS_PROP);
}

List<String> getExcludedPrincipalNameList() {
Expand Down
102 changes: 102 additions & 0 deletions src/main/java/io/strimzi/kafka/quotas/StorageChecker.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
/*
* Copyright 2020, Red Hat Inc.
* License: Apache License 2.0 (see the file LICENSE or http://apache.org/licenses/LICENSE-2.0.html).
*/
package io.strimzi.kafka.quotas;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.io.UncheckedIOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Consumer;

/**
* Periodically reports the total storage used by one or more filesystems.
*/
public class StorageChecker implements Runnable {
private static final Logger log = LoggerFactory.getLogger(StorageChecker.class);

private final Thread storageCheckerThread = new Thread(this, "storage-quota-checker");
private final AtomicBoolean running = new AtomicBoolean(false);
private final AtomicLong storageUsed = new AtomicLong(0);

private volatile long storageCheckInterval;
private volatile List<Path> logDirs;
private volatile Consumer<Long> consumer;

void configure(long storageCheckInterval, List<Path> logDirs, Consumer<Long> consumer) {
this.storageCheckInterval = storageCheckInterval;
this.logDirs = logDirs;
this.consumer = consumer;
}

void startIfNecessary() {
if (running.compareAndSet(false, true) && storageCheckInterval > 0) {
storageCheckerThread.setDaemon(true);
storageCheckerThread.start();
}
}

void stop() throws InterruptedException {
if (running.compareAndSet(true, false)) {
storageCheckerThread.interrupt();
storageCheckerThread.join();
}
}

@Override
public void run() {
if (logDirs != null && !logDirs.isEmpty()) {
try {
log.info("Quota Storage Checker is now starting");
while (running.get()) {
try {
long diskUsage = checkDiskUsage();
long previousUsage = storageUsed.getAndSet(diskUsage);
if (diskUsage != previousUsage) {
consumer.accept(diskUsage);
}
log.debug("Storage usage checked: {}", storageUsed.get());
Thread.sleep(TimeUnit.SECONDS.toMillis(storageCheckInterval));
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
break;
} catch (Exception e) {
log.warn("Exception in storage checker thread", e);
}
}
} finally {
log.info("Quota Storage Checker is now finishing");
}
}
}

long checkDiskUsage() {
return logDirs.stream()
.filter(Files::exists)
.map(path -> apply(() -> Files.getFileStore(path)))
.distinct()
.mapToLong(store -> apply(() -> store.getTotalSpace() - store.getUsableSpace()))
.sum();
}

static <T> T apply(IOSupplier<T> supplier) {
try {
return supplier.get();
} catch (IOException e) {
throw new UncheckedIOException(e);
}
}

@FunctionalInterface
interface IOSupplier<T> {
T get() throws IOException;
}
}
44 changes: 0 additions & 44 deletions src/test/java/io/strimzi/kafka/quotas/StaticQuotaCallbackTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,7 @@
package io.strimzi.kafka.quotas;

import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertTrue;

import java.io.IOException;
import java.nio.file.FileStore;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.Map;

import org.apache.kafka.common.security.auth.KafkaPrincipal;
Expand All @@ -33,45 +28,6 @@ void tearDown() {
target.close();
}

@Test
void testStorageCheckCheckDiskUsageZeroWhenMissing() throws IOException {
Path temp = Files.createTempDirectory("checkDiskUsage");
target.configure(Map.of("log.dirs", temp.toAbsolutePath().toString()));
Files.delete(temp);
assertEquals(0, target.storageChecker.checkDiskUsage());
}

@Test
void testStorageCheckCheckDiskUsageAtLeastFileSize() throws IOException {
Path tempDir = Files.createTempDirectory("checkDiskUsage");
Path tempFile = Files.createTempFile(tempDir, "t", ".tmp");
target.configure(Map.of("log.dirs", tempDir.toAbsolutePath().toString()));

try {
Files.writeString(tempFile, "0123456789");
long minSize = Files.size(tempFile);
assertTrue(target.storageChecker.checkDiskUsage() >= minSize);
} finally {
Files.delete(tempFile);
Files.delete(tempDir);
}
}

@Test
void testStorageCheckCheckDiskUsageNotDoubled() throws IOException {
Path tempDir1 = Files.createTempDirectory("checkDiskUsage");
Path tempDir2 = Files.createTempDirectory("checkDiskUsage");
target.configure(Map.of("log.dirs", String.format("%s,%s", tempDir1.toAbsolutePath(), tempDir2.toAbsolutePath())));

try {
FileStore store = Files.getFileStore(tempDir1);
assertEquals(store.getTotalSpace() - store.getUsableSpace(), target.storageChecker.checkDiskUsage());
} finally {
Files.delete(tempDir1);
Files.delete(tempDir2);
}
}

@Test
void quotaDefaults() {
KafkaPrincipal foo = new KafkaPrincipal(KafkaPrincipal.USER_TYPE, "foo");
Expand Down
Loading