Skip to content

Commit

Permalink
Add Javadoc to make the JDK 17 build happy.
Browse files Browse the repository at this point in the history
Signed-off-by: Sam Barker <sbarker@redhat.com>
  • Loading branch information
SamBarker committed Jan 31, 2023
1 parent 9a752ae commit 4abf66e
Show file tree
Hide file tree
Showing 9 changed files with 104 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -9,17 +9,28 @@
import java.util.List;
import java.util.concurrent.CopyOnWriteArrayList;

/**
* Determines if the number of available bytes on any given volume falls below the configured limit.
*/
public class AvailableBytesThrottleFactorSupplier implements ThrottleFactorSupplier {
private final List<Runnable> listeners;
private final long availableBytesLimit;

private volatile double factor = 1.0;

/**
* Creates and configures the throttle factor supplier
* @param availableBytesLimit the minimum number of bytes below which the throttle should be applied.
*/
public AvailableBytesThrottleFactorSupplier(long availableBytesLimit) {
this.availableBytesLimit = availableBytesLimit;
listeners = new CopyOnWriteArrayList<>();
}

/**
* Accepts an updated collection of volumes to validate against the configured limit.
* @param volumes the new collection of volumes to be considered
*/
@Override
public void accept(Collection<Volume> volumes) {
double initial = factor;
Expand Down
11 changes: 11 additions & 0 deletions src/main/java/io/strimzi/kafka/quotas/ClusterVolumeSource.java
Original file line number Diff line number Diff line change
Expand Up @@ -23,13 +23,24 @@

import static java.util.stream.Collectors.toSet;


/**
* Leverages <a href="https://cwiki.apache.org/confluence/display/KAFKA/KIP-827%3A+Expose+log+dirs+total+and+usable+space+via+Kafka+API">KIP-827</a>
* to gather volume usage statistics for each Kafka log dir reported by the cluster.
*
* A listener is registered with this volume source to act on the disk usage information.
*/
public class ClusterVolumeSource implements Runnable {

private final Consumer<Collection<Volume>> volumeConsumer;
private final Admin admin;

private static final Logger log = LoggerFactory.getLogger(ClusterVolumeSource.class);

/**
* @param admin The Kafka Admin client to be used for gathering inf
* @param volumeConsumer the listener to be notified of the volume usage.
*/
public ClusterVolumeSource(Admin admin, Consumer<Collection<Volume>> volumeConsumer) {
this.volumeConsumer = volumeConsumer;
this.admin = admin;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,9 @@ public class StaticQuotaCallback implements ClientQuotaCallback {
private final ScheduledExecutorService backgroundScheduler;

/**
* Constructs the Static Quota Callback class
* Default constructor for production use.
* <p>
* It provides a default {@link io.strimzi.kafka.quotas.VolumeSourceBuilder#VolumeSourceBuilder()} and a single threaded executor for running background tasks on a named thread.
*/
public StaticQuotaCallback() {
this(new VolumeSourceBuilder(), Executors.newSingleThreadScheduledExecutor(r -> {
Expand All @@ -58,6 +60,11 @@ public StaticQuotaCallback() {
}));
}

/**
* Secondary constructor visible for testing purposes
* @param localVolumeSource the {@link io.strimzi.kafka.quotas.VolumeSourceBuilder#VolumeSourceBuilder()} to use
* @param backgroundScheduler the scheduler for executing background tasks.
*/
/*test*/ StaticQuotaCallback(VolumeSourceBuilder localVolumeSource, ScheduledExecutorService backgroundScheduler) {
this.volumeSourceBuilder = localVolumeSource;
this.backgroundScheduler = backgroundScheduler;
Expand Down
4 changes: 2 additions & 2 deletions src/main/java/io/strimzi/kafka/quotas/StaticQuotaConfig.java
Original file line number Diff line number Diff line change
Expand Up @@ -42,8 +42,8 @@ public class StaticQuotaConfig extends AbstractConfig {
static final String STORAGE_QUOTA_HARD_PROP = CLIENT_QUOTA_CALLBACK_STATIC_PREFIX + ".storage.hard";
static final String STORAGE_CHECK_INTERVAL_PROP = CLIENT_QUOTA_CALLBACK_STATIC_PREFIX + ".storage.check-interval";
static final String LOG_DIRS_PROP = "log.dirs";
public static final String AVAILABLE_BYTES_PROP = CLIENT_QUOTA_CALLBACK_STATIC_PREFIX + ".storage.per.volume.limit.min.available.bytes";
public static final String ADMIN_BOOTSTRAP_SERVER_PROP = CLIENT_QUOTA_CALLBACK_STATIC_PREFIX + ".kafka.admin.bootstrap.servers";
static final String AVAILABLE_BYTES_PROP = CLIENT_QUOTA_CALLBACK_STATIC_PREFIX + ".storage.per.volume.limit.min.available.bytes";
static final String ADMIN_BOOTSTRAP_SERVER_PROP = CLIENT_QUOTA_CALLBACK_STATIC_PREFIX + ".kafka.admin.bootstrap.servers";
private final KafkaClientConfig kafkaClientConfig;

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,10 @@
* A value of `0.0` implies that there is no quota available regardless of the defined quota.
*/
public interface ThrottleFactorSupplier extends Supplier<Double>, Consumer<Collection<Volume>> {
/**
* Register a listener to be invoked when the throttle factor is changed.
* @param listener to be executed whenever the factor is updated.
*/
void addUpdateListener(Runnable listener);

}
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,11 @@

import static io.strimzi.kafka.quotas.StaticQuotaCallback.metricName;

/**
* Backwards compatible ThrottleFactorSupplier which calculates the total aggregate volume usage and compares that against soft and hard limits.
* <p>
* It will progressively increase the throttle factor as the usage grows between the hard and soft limits. Once the usage is greater than or equal to the hard limit the throttle factor will be {@code 0}.
*/
@Deprecated
public class TotalConsumedThrottleFactorSupplier implements ThrottleFactorSupplier, Consumer<Collection<Volume>> {

Expand All @@ -26,6 +31,12 @@ public class TotalConsumedThrottleFactorSupplier implements ThrottleFactorSuppli

private volatile Double throttleFactor = 1.0d;

/**
* Configures the throttle calculation with both a soft and hard limit.
* No validation of the limits is performed.
* @param consumedBytesHardLimit the total number of bytes once reached (or exceeded) the full throttle should apply.
* @param consumedBytesSoftLimit the total number of bytes once passed the throttling should apply.
*/
public TotalConsumedThrottleFactorSupplier(long consumedBytesHardLimit, long consumedBytesSoftLimit) {
this.consumedBytesHardLimit = consumedBytesHardLimit;
this.consumedBytesSoftLimit = consumedBytesSoftLimit;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,14 @@

import java.util.Collection;

/**
* An implementation of {@link ThrottleFactorSupplier} which applies no limits.
*/
public class UnlimitedThrottleSupplier implements ThrottleFactorSupplier {

/**
* Global singleton instance of the Unlimited supplier.
*/
public static final UnlimitedThrottleSupplier UNLIMITED_QUOTA_SUPPLIER = new UnlimitedThrottleSupplier();

private UnlimitedThrottleSupplier() {
Expand Down
29 changes: 29 additions & 0 deletions src/main/java/io/strimzi/kafka/quotas/Volume.java
Original file line number Diff line number Diff line change
Expand Up @@ -7,35 +7,64 @@

import java.util.Objects;

/**
* Represents a single volume on a specific Kafka broker.
*/
public class Volume {
private final String brokerId;
private final String logDir;
private final long capacity;
private final long availableBytes;

/**
*
* @param brokerId which broker does this volume belong too
* @param logDir the specific logDir the volume hosts
* @param capacity How many bytes the volume holds
* @param availableBytes How many available bytes remain on the volume.
*/
public Volume(String brokerId, String logDir, long capacity, long availableBytes) {
this.brokerId = brokerId;
this.logDir = logDir;
this.capacity = capacity;
this.availableBytes = availableBytes;
}

/**
*
* @return The brokerId for the broker holding the volume
*/
public String getBrokerId() {
return brokerId;
}

/**
*
* @return the path identifying the logDir on the broker its hosted by.
*/
public String getLogDir() {
return logDir;
}

/**
* @return The size of the volume in bytes.
*/
public long getCapacity() {
return capacity;
}

/**
*
* @return The number available (free) remaining on the volume.
*/
public long getAvailableBytes() {
return availableBytes;
}

/**
*
* @return The number of bytes on the volume which have been consumed (used).
*/
public long getConsumedSpace() {
return capacity - availableBytes;
}
Expand Down
22 changes: 22 additions & 0 deletions src/main/java/io/strimzi/kafka/quotas/VolumeSourceBuilder.java
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,9 @@
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.LogDirDescription;

/**
* A builder which ensures the {@see <a href="https://cwiki.apache.org/confluence/display/KAFKA/KIP-827%3A+Expose+log+dirs+total+and+usable+space+via+Kafka+API">KIP-827 API</a>} is available and will throw exceptions if not.
*/
public class VolumeSourceBuilder implements AutoCloseable {

private final Supplier<Boolean> kip827Available;
Expand All @@ -21,10 +24,19 @@ public class VolumeSourceBuilder implements AutoCloseable {
private StaticQuotaConfig config;
private Consumer<Collection<Volume>> volumesConsumer;

/**
* Default production constructor for production usage.
* Which will lazily create a Kafka admin client using the supplied config.
*/
public VolumeSourceBuilder() {
this(VolumeSourceBuilder::testForKip827, kafkaClientConfig -> AdminClient.create(kafkaClientConfig.getKafkaClientConfig()));
}

/**
* Secondary constructor visible for testing.
* @param kip827Available used to determine if KIP-827 API's are available
* @param adminClientFactory factory function for creating Admin clients with the builders config.
*/
/* test */ VolumeSourceBuilder(Supplier<Boolean> kip827Available, Function<StaticQuotaConfig.KafkaClientConfig, Admin> adminClientFactory) {
this.kip827Available = kip827Available;
this.adminClientFactory = adminClientFactory;
Expand All @@ -39,11 +51,21 @@ public VolumeSourceBuilder() {
}
}

/**
*
* @param config The plug-in configuration to use.
* @return this to allow fluent usage of the builder.
*/
public VolumeSourceBuilder withConfig(StaticQuotaConfig config) {
this.config = config;
return this;
}

/**
*
* @param volumesConsumer The volume consumer to register for updates.
* @return this to allow fluent usage of the builder.
*/
public VolumeSourceBuilder withVolumeConsumer(Consumer<Collection<Volume>> volumesConsumer) {
this.volumesConsumer = volumesConsumer;
return this;
Expand Down

0 comments on commit 4abf66e

Please sign in to comment.