Skip to content

Commit

Permalink
Revert f691cee and add BoundedScheduledThreadPoolExecutor back.
Browse files Browse the repository at this point in the history
My personal experience with Reactor BoundedElasticScheduler has been
nothing but suffering[1]. I am not happy with packaging something that
should have been provided by Reactor, but this single file is still way
simpler, maintainable, *and* without any catches compared to the
BoundedElasticScheduler.

[1] reactor/reactor-core#1992
  • Loading branch information
vy committed Jan 15, 2020
1 parent 945a712 commit 39aefdb
Show file tree
Hide file tree
Showing 4 changed files with 323 additions and 26 deletions.
2 changes: 0 additions & 2 deletions CHANGELOG.adoc
Expand Up @@ -8,8 +8,6 @@
- Avoid cloning request payload bytes.
- Remove `BoundedScheduledThreadPoolExecutor`. (#10)
- Extend `PubsubPublisher` with auxiliary methods.
- Make sure both message payload and attributes cannot be empty.
Expand Down
77 changes: 68 additions & 9 deletions README.adoc
Expand Up @@ -24,7 +24,9 @@ into messaging.
- It is *batteries-included*, because it provides goodies (out of the box
metrics integration, an adaptive rate limiter to help you avoid burning money
by continuously pulling and nack'ing messages when something is wrong with
your consumer) that assist real-world production deployments.
your consumer, a `ScheduledExecutorService` implementation with a bounded task
queue to mitigate backpressure violating consumption) that assist real-world
production deployments.
- It is *simple*, because there are 2 dozens of classes where half is used to
represent JSON models transmitted over the wire and the rest is just reactive
Expand Down Expand Up @@ -239,6 +241,63 @@ stages.
By contract, initially the active stage is set to the one with the slowest
success rate limit.

=== Bounded `SchedulerExecutorService`

`PubsubPuller`, `PubsubAccessTokenCache`, and
`StagedRateLimiterReactorDecoratorFactory` optionally receive either a
`ScheduledExecutorService` or a Reactor `Scheduler` in their builders for timed
invocations. One can explicitly change the implicit scheduler used by any
Reactor `Mono<T>` or `Flux<T>` as well. (See
https://projectreactor.io/docs/core/release/reference/#schedulers[Threading and
Schedulers] in Reactor reference manual.) We strongly suggest employing a common
dedicated scheduler for all these cases with a _bounded task queue_. That said,
unfortunately neither the default Reactor ``Scheduler``s nor the
`ScheduledExecutorService` implementations provided by the Java Standard library
allow one to put a bound on the task queue size. This shortcoming is severely
prone to hiding backpressure problems. (See the
http://cs.oswego.edu/pipermail/concurrency-interest/2019-April/016861.html[the
relevant concurrency-interest discussion].) To mitigate this, we provide
`BoundedScheduledThreadPoolExecutor` wrapper and strongly recommend to employ it
in your Reactor assembly line. Even though this will incur an extra thread
context switching cost, this is almost negligible for a majority of the use
cases and the benefit will overweight this minor expense. The usage is as simple
as follows:

```java
// Create the executor.
ScheduledThreadPoolExecutor executor =
new ScheduledThreadPoolExecutor(
Runtime.getRuntime().availableProcessors());
BoundedScheduledThreadPoolExecutor boundedExecutor =
new BoundedScheduledThreadPoolExecutor(100, executor);
Scheduler scheduler = Schedulers.fromExecutorService(boundedExecutor);

// Set the access token cache executor.
PubsubAccessTokenCache
.builder()
.setExecutorService(executor)
// ...
.build();

// Set the puller scheduler.
PubsubPuller puller = PubsubPuller
.builder()
.setScheduler(scheduler)
// ...
.build();

// Employ the scheduler in the Reactor pipeline.
puller
.pullAll()
.concatMap(pullResponse -> {
// ...
return acker.ackPullResponse(pullResponse);
})
.flatMap(this::doSomeOtherAsyncIO)
.subscribeOn(scheduler)
.subscribe();
```

== F.A.Q

=== How can I avoid stream termination when pull fails?
Expand Down Expand Up @@ -492,14 +551,14 @@ https://cloud.google.com/pubsub/docs/quickstart-client-libraries[the official
Java drivers] provided by Google. Later on we started bumping into backpressure
problems: tasks on the shared `ScheduledExecutorService` were somehow awkwardly
dating back and constantly piling up. That was the point I introduced a
`BoundedScheduledThreadPoolExecutor` (which is later on removed from the sources
after Reactor started providing a similar scheduler) and shit hit the fan. I
figured the official Pub/Sub driver was ramming the fetched batch of messages
through the shared executor. My first reaction was to cut down the pull buffer
size and the concurrent pull count. That solved a majority of our
backpressure-related problems, though created a new one: efficiency. Then I
started examining the source code and wasted quite a lot of time trying to make
forsaken https://github.com/googleapis/gax-java/blob/master/gax/src/main/java/com/google/api/gax/batching/FlowControlSettings.java[FlowControlSettings]
link:src/main/java/com/vlkan/pubsub/util/BoundedScheduledThreadPoolExecutor.java[BoundedScheduledThreadPoolExecutor]
and shit hit the fan. I figured the official Pub/Sub driver was ramming the
fetched batch of messages through the shared executor. My first reaction was to
cut down the pull buffer size and the concurrent pull count. That solved a
majority of our backpressure-related problems, though created a new one:
efficiency. Then I started examining the source code and wasted quite a lot of
time trying to make forsaken
https://github.com/googleapis/gax-java/blob/master/gax/src/main/java/com/google/api/gax/batching/FlowControlSettings.java[FlowControlSettings]
work. This disappointing inquiry resulted in something remarkable: I understood
how Pub/Sub works and amazed by the extent of complexity for such a simple task.
I have already been using Reactive Streams (RxJava and Reactor) every single
Expand Down
33 changes: 18 additions & 15 deletions src/main/java/com/vlkan/pubsub/PubsubAccessTokenCache.java
Expand Up @@ -18,6 +18,7 @@

import com.google.auth.oauth2.GoogleCredentials;
import com.google.auth.oauth2.ServiceAccountCredentials;
import com.vlkan.pubsub.util.BoundedScheduledThreadPoolExecutor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -41,21 +42,23 @@ public class PubsubAccessTokenCache {
private static final class DefaultExecutorServiceHolder {

private static final ScheduledExecutorService INSTANCE =
new ScheduledThreadPoolExecutor(1, new ThreadFactory() {

private final AtomicInteger threadCounter = new AtomicInteger(0);

@Override
public Thread newThread(Runnable runnable) {
String name = String.format(
"PubsubAccessTokenCacheWorker-%02d",
threadCounter.incrementAndGet());
Thread thread = new Thread(runnable, name);
thread.setDaemon(true);
return thread;
}

});
new BoundedScheduledThreadPoolExecutor(
100,
new ScheduledThreadPoolExecutor(1, new ThreadFactory() {

private final AtomicInteger threadCounter = new AtomicInteger(0);

@Override
public Thread newThread(Runnable runnable) {
String name = String.format(
"PubsubAccessTokenCacheWorker-%02d",
threadCounter.incrementAndGet());
Thread thread = new Thread(runnable, name);
thread.setDaemon(true);
return thread;
}

}));

}

Expand Down
@@ -0,0 +1,237 @@
/*
* Copyright 2019 Volkan Yazıcı
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permits and
* limitations under the License.
*/

package com.vlkan.pubsub.util;

import java.util.Collection;
import java.util.List;
import java.util.concurrent.*;

/**
* A {@link ScheduledThreadPoolExecutor} wrapper enforcing a bound on the task
* queue size. Excessive task queue growth yields {@link
* RejectedExecutionException} errors. {@link RejectedExecutionHandler}s are
* not supported since they expect a {@link ThreadPoolExecutor} in their
* arguments.
*
* <p>Java Standard library unfortunately doesn't provide any {@link
* ScheduledExecutorService} implementations that one can provide a bound on
* the task queue. This shortcoming is prone to hide backpressure problems. See
* <a href="http://cs.oswego.edu/pipermail/concurrency-interest/2019-April/016861.html">the
* relevant concurrency-interest discussion</a> for {@link java.util.concurrent}
* lead Doug Lea's tip for enforcing a bound via {@link
* ScheduledThreadPoolExecutor#getQueue()}.
*/
@SuppressWarnings("NullableProblems")
public class BoundedScheduledThreadPoolExecutor implements ScheduledExecutorService {

private final int queueCapacity;

private final ScheduledThreadPoolExecutor executor;

public BoundedScheduledThreadPoolExecutor(
int queueCapacity,
ScheduledThreadPoolExecutor executor) {
if (queueCapacity < 1) {
throw new IllegalArgumentException(
"was expecting a non-zero positive queue capacity");
}
this.queueCapacity = queueCapacity;
this.executor = executor;
}

/**
* {@inheritDoc}
*/
@Override
public synchronized ScheduledFuture<?> schedule(
Runnable command,
long delay,
TimeUnit unit) {
ensureQueueCapacity(1);
return executor.schedule(command, delay, unit);
}

/**
* {@inheritDoc}
*/
@Override
public synchronized <V> ScheduledFuture<V> schedule(
Callable<V> callable,
long delay,
TimeUnit unit) {
ensureQueueCapacity(1);
return executor.schedule(callable, delay, unit);
}

/**
* {@inheritDoc}
*/
@Override
public synchronized ScheduledFuture<?> scheduleAtFixedRate(
Runnable command,
long initialDelay,
long period,
TimeUnit unit) {
ensureQueueCapacity(1);
return executor.scheduleAtFixedRate(command, initialDelay, period, unit);
}

/**
* {@inheritDoc}
*/
@Override
public ScheduledFuture<?> scheduleWithFixedDelay(
Runnable command,
long initialDelay,
long delay,
TimeUnit unit) {
ensureQueueCapacity(1);
return executor.scheduleWithFixedDelay(command, initialDelay, delay, unit);
}

/**
* {@inheritDoc}
*/
@Override
public void shutdown() {
executor.shutdown();
}

/**
* {@inheritDoc}
*/
@Override
public List<Runnable> shutdownNow() {
return executor.shutdownNow();
}

/**
* {@inheritDoc}
*/
@Override
public boolean isShutdown() {
return executor.isShutdown();
}

/**
* {@inheritDoc}
*/
@Override
public boolean isTerminated() {
return executor.isTerminated();
}

/**
* {@inheritDoc}
*/
@Override
public boolean awaitTermination(long timeout, TimeUnit unit)
throws InterruptedException {
return executor.awaitTermination(timeout, unit);
}

/**
* {@inheritDoc}
*/
@Override
public synchronized <T> Future<T> submit(Callable<T> task) {
ensureQueueCapacity(1);
return executor.submit(task);
}

/**
* {@inheritDoc}
*/
@Override
public synchronized <T> Future<T> submit(Runnable task, T result) {
ensureQueueCapacity(1);
return executor.submit(task, result);
}

/**
* {@inheritDoc}
*/
@Override
public synchronized Future<?> submit(Runnable task) {
ensureQueueCapacity(1);
return executor.submit(task);
}

/**
* {@inheritDoc}
*/
@Override
public synchronized <T> List<Future<T>> invokeAll(
Collection<? extends Callable<T>> tasks)
throws InterruptedException {
ensureQueueCapacity(tasks.size());
return executor.invokeAll(tasks);
}

/**
* {@inheritDoc}
*/
@Override
public synchronized <T> List<Future<T>> invokeAll(
Collection<? extends Callable<T>> tasks,
long timeout,
TimeUnit unit)
throws InterruptedException {
ensureQueueCapacity(tasks.size());
return executor.invokeAll(tasks, timeout, unit);
}

/**
* {@inheritDoc}
*/
@Override
public synchronized <T> T invokeAny(Collection<? extends Callable<T>> tasks)
throws InterruptedException, ExecutionException {
ensureQueueCapacity(tasks.size());
return executor.invokeAny(tasks);
}

/**
* {@inheritDoc}
*/
@Override
public synchronized <T> T invokeAny(
Collection<? extends Callable<T>> tasks,
long timeout,
TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException {
ensureQueueCapacity(tasks.size());
return executor.invokeAny(tasks, timeout, unit);
}

/**
* {@inheritDoc}
*/
@Override
public synchronized void execute(Runnable command) {
ensureQueueCapacity(1);
executor.submit(command);
}

private void ensureQueueCapacity(int taskCount) {
int queueSize = executor.getQueue().size();
if ((queueSize + taskCount) >= queueCapacity) {
throw new RejectedExecutionException();
}
}

}

0 comments on commit 39aefdb

Please sign in to comment.