Skip to content

Commit

Permalink
feat(api): allow to configure maxBlockingTime on EventStream
Browse files Browse the repository at this point in the history
This commit adds the new EventStream.maxBlockingTime method.
  • Loading branch information
fhussonnois committed Oct 11, 2020
1 parent 471c3d1 commit acb82f5
Show file tree
Hide file tree
Showing 3 changed files with 50 additions and 16 deletions.
Expand Up @@ -41,11 +41,13 @@
*/
public class BasicBlockingRecordQueue<K, V> implements BlockingRecordQueue<K, V> {

private static final int DEFAULT_QUEUE_SIZE_LIMIT = 10_000;
static final int DEFAULT_QUEUE_SIZE_LIMIT = 10_000;

static final Duration DEFAULT_MAX_BLOCK_DURATION = Duration.ofMillis(100);

private final BlockingQueue<KV<K, V>> blockingQueue;

private final Duration offerWaitDuration;
private final Duration maxBlockDuration;

private final AtomicBoolean isClosed = new AtomicBoolean(false);

Expand All @@ -64,22 +66,35 @@ public BasicBlockingRecordQueue() {
* @param queueSizeLimit the queue max capacity.
*/
public BasicBlockingRecordQueue(final int queueSizeLimit) {
this(queueSizeLimit, Duration.ofMillis(100), LimitHandlers.NO_OP);
this(queueSizeLimit, DEFAULT_MAX_BLOCK_DURATION);
}

/**
* Creates a new {@link BasicBlockingRecordQueue} instance.
*
* @param queueSizeLimit the queue max capacity.
* @param maxBlockDuration the maximum duration to wait to wait before giving up, when the queue is full.
*/
public BasicBlockingRecordQueue(final int queueSizeLimit, final Duration maxBlockDuration) {
this(queueSizeLimit, maxBlockDuration, LimitHandlers.NO_OP);
}


/**
* Creates a new {@link BasicBlockingRecordQueue} instance.
*
* @param queueSizeLimit the blocking size limit;
* @param offerWaitDuration the time to wait if necessary for space to become available.
* @param maxBlockDuration the maximum duration to wait to wait before giving up, when the queue is full.
* @param limitHandler the {@link LimitHandler} to invoke after {@code maxBlockDuration} has been reached.
*/
private BasicBlockingRecordQueue(final int queueSizeLimit,
final Duration offerWaitDuration,
final LimitHandler limitHandler) {
public BasicBlockingRecordQueue(final int queueSizeLimit,
final Duration maxBlockDuration,
final LimitHandler limitHandler) {
if (queueSizeLimit <= 0)
throw new IllegalArgumentException("queueSizeLimit must be superior to 0, was :" + queueSizeLimit);

this.blockingQueue = new LinkedBlockingQueue<>(queueSizeLimit);
this.offerWaitDuration = offerWaitDuration;
this.maxBlockDuration = maxBlockDuration;
this.callback = new LimitedQueueCallback(queueSizeLimit);
this.callback.setQueue(this);
this.callback.setLimitHandler(limitHandler);
Expand Down Expand Up @@ -193,7 +208,7 @@ public void send(final KV<K, V> record) {

try {
while (!isClosed.get()) {
if (blockingQueue.offer(record, offerWaitDuration.toMillis(), TimeUnit.MILLISECONDS)) {
if (blockingQueue.offer(record, maxBlockDuration.toMillis(), TimeUnit.MILLISECONDS)) {
if (callback != null) callback.onQueued();
break;
}
Expand Down
Expand Up @@ -53,7 +53,7 @@ public interface BlockingRecordQueue<K, V> {
/**
* @see BlockingQueue#poll(long, TimeUnit)
*/
KV<K, V> poll(final Duration timeout) throws InterruptedException;
KV<K, V> poll(final Duration tmaxBlockingTimeimeout) throws InterruptedException;

/**
* @see BlockingQueue#poll()
Expand Down
Expand Up @@ -20,7 +20,9 @@

import io.streamthoughts.azkarra.api.model.KV;

import java.time.Duration;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.atomic.AtomicBoolean;

/**
Expand All @@ -43,6 +45,7 @@ public static class Builder {
private String eventType;
private Integer queueSize;
private LimitHandler queueLimitHandler;
private Duration maxBlockingTime;

/**
* Creates a new {@link Builder} instance.
Expand All @@ -53,23 +56,39 @@ public Builder(final String eventType) {
this.eventType = eventType;
}

/**
* @param queueSize the queue size.
* @return the {@link Builder}
*/
public Builder withQueueSize(final int queueSize) {
this.queueSize = queueSize;
return this;
}

/**
* @param maxBlockingTime the maximum duration to wait to wait before giving up when the queue is full.
* @return the {@link Builder}
*/
public Builder withMaxBlockingTime(final Duration maxBlockingTime) {
this.maxBlockingTime = maxBlockingTime;
return this;
}

/**
* @param queueLimitHandler the handler to be invoked when the limit queue is reached.
* @return the {@link Builder}
*/
public Builder withQueueLimitHandler(final LimitHandler queueLimitHandler) {
this.queueLimitHandler = queueLimitHandler;
return this;
}

public <K, V> EventStream<K, V> build() {
var queue = queueSize != null ?
new BasicBlockingRecordQueue<K, V>(queueSize) :
new BasicBlockingRecordQueue<K, V>();

if (queueLimitHandler != null)
queue.setLimitHandler(queueLimitHandler);
var queue = new BasicBlockingRecordQueue<K, V>(
Optional.ofNullable(queueSize).orElse(BasicBlockingRecordQueue.DEFAULT_QUEUE_SIZE_LIMIT),
Optional.ofNullable(maxBlockingTime).orElse(BasicBlockingRecordQueue.DEFAULT_MAX_BLOCK_DURATION),
Optional.ofNullable(queueLimitHandler).orElse(LimitHandlers.NO_OP)
);
return new EventStream<>(eventType, queue);
}
}
Expand Down

0 comments on commit acb82f5

Please sign in to comment.