Skip to content

vy/reactor-pubsub

Repository files navigation

Actions Status Maven Central License

Finally a Google Cloud Pub/Sub Java 8 driver that you can wrap your head around its internals and put the fun (i.e., backpressure-aware, reactive, efficient, batteries-included, and simple!) back into messaging.

  • It is backpressure-aware, because it only pulls batch of messages whenever you need one and tell it to pull one. It doesn’t operate a covert thread (pool) to pull whenever it sees a fit and ram it through the application.

  • It is reactive, because every request is non-blocking, asynchronous, and wired up with the rest of the application using Reactive Streams.

  • It is efficient, because it works in batches, the basic unit of message exchange between your driver and Pub/Sub. You pull() some, you ack() some. One-message-at-a-time is an illusion created by drivers and incurs a significant performance penalty along with operational complexity under the hood.

  • It is batteries-included, because it provides goodies (out of the box metrics integration, an adaptive rate limiter to help you avoid burning money by continuously pulling and nack’ing messages when something is wrong with your consumer, a ScheduledExecutorService implementation with a bounded task queue to mitigate backpressure violating consumption) that assist real-world production deployments.

  • It is simple, because there are 2 dozens of classes where half is used to represent JSON models transmitted over the wire and the rest is just reactive streams gluing. There are no smart retry mechanisms (though thanks to reactive streams, you can introduce yours), there are no message lease time extending background tasks (hence, you better consume your batch before it times out), etc. Spend a minute on the source code and you are ready to write your own Pub/Sub driver!

Due to the constraints on the development resources (read as, "it is just me"), I needed to take some shortcuts to get this project going:

Getting started

You first need to add the reactor-pubsub artifact into your list of dependencies:

<dependency>
    <groupId>com.vlkan</groupId>
    <artifactId>reactor-pubsub</artifactId>
    <version>${reactor-pubsub.version}</version>
</dependency>

(Note that the Java 9 module name is com.vlkan.pubsub.)

You can create a publisher and publish a message as follows:

// Create the publisher.
String projectName = "awesome-project";
PubsubPublisherConfig publisherConfig = PubsubPublisherConfig
        .builder()
        .setProjectName(projectName)
        .setTopicName("awesome-topic")
        .build();
PubsubPublisher publisher = PubsubPublisher
        .builder()
        .setConfig(publisherConfig)
        .build();

// Publish a message.
publisher
        .publishMessage(
                new PubsubDraftedMessage(
                        "Yolo like nobody's watching!"
                                .getBytes(StandardCharsets.UTF_8)))))
        .doOnSuccess(publishResponse ->
                System.out.format(
                        "Published awesome message ids: %s%n",
                        publishResponse.getMessageIds()))
        .subscribe();

Note that PubsubDraftedMessage constructor has two variants:

  • PubsubDraftedMessage(byte[] payload)

  • PubsubDraftedMessage(byte[] payload, Map<String, String> attributes)

PubsubPublisher provides the following auxiliary methods:

  • publishMessages(List<PubsubDraftedMessage> messages)

  • publishMessage(List<PubsubDraftedMessage> message)

  • publish(PubsubPublishRequest publishRequest)

You can create a subscriber and start receiving messages from a subscription as follows:

// Create a puller.
String subscriptionName = "awesome-subscription";
PubsubPullerConfig pullerConfig = PubsubPullerConfig
        .builder()
        .setProjectName(projectName)
        .setSubscriptionName(subscriptionName)
        .build();
PubsubPuller puller = PubsubPuller
        .builder()
        .setConfig(pullerConfig)
        .build();

// Create an acker.
PubsubAckerConfig ackerConfig = PubsubAckerConfig
        .builder()
        .setProjectName(projectName)
        .setSubscriptionName(subscriptionName)
        .build();
PubsubAcker acker = PubsubAcker
        .builder()
        .setConfig(ackerConfig)
        .build();

// Pull and consume continuously.
puller
        .pullAll()
        .concatMap(pullResponse -> {
            int messageCount = pullResponse.getReceivedMessages().size();
            System.out.format("Just got awesome %d messages!%n", messageCount);
            return acker.ackPullResponse(pullResponse);
        })
        .subscribe();

Note that PubsubAcker provides the following auxiliary methods:

  • ackPullResponse(PubsubPullResponse pullResponse)

  • ackMessages(List<PubsubReceivedMessage> messages)

  • ackMessage(PubsubReceivedMessage message)

  • ackIds(List<String> ackIds)

  • ackId(String ackId)

  • ack(PubsubAckRequest ackRequest)

Utilities

The project ships a couple of utilities where you might find them handy in assembling your messaging pipeline. Even though they are optional, we strongly recommend their usage.

Rate limiter

We strongly encourage everyone to employ the provided rate limiter while consuming messages. The rationale is simple: In order to avoid burning GCP bills for nothing, you better cut down the consumption rate if the rest of the system is indicating a failure.

reactor-pubsub provides the following utilities for rate limiting purposes:

  • RateLimiter is a simple (package local) rate limiter.

  • StagedRateLimiter is a rate limiter with multiple stages. Each stage is composed of a success rate and failure rate pair. In the absence of failure acknowledgements, excessive permit claims replace the active stage with the next faster one, if there is any. Likewise, excessive failure acknowledgements replace the active stage with the next slower one, if there is any.

One can employ the StagedRateLimiter for a PubsubPuller as follows:

// Create the staged rate limiter and its reactor decorator.
String stagedRateLimiterName = projectName + '/' + subscriptionName;
StagedRateLimiter stagedRateLimiter = StagedRateLimiter
        .builder()
        .setName(stagedRateLimiterName)
        .setSpec("1/1m:, 1/30s:1/1m, 1/1s:2/1m, :1/3m")     // (default)
        .build();
StagedRateLimiterReactorDecoratorFactory stagedRateLimiterReactorDecoratorFactory =
        StagedRateLimiterReactorDecoratorFactory
                .builder()
                .setStagedRateLimiter(stagedRateLimiter)
                .build();
Function<Flux<PubsubPullResponse>, Flux<PubsubPullResponse>> stagedRateLimiterFluxDecorator =
        stagedRateLimiterReactorDecoratorFactory.ofFlux();

// Employ the staged rate limiter.
puller
        .pullAll()
        .concatMap(pullResponse -> {
            // ...
            return acker.ackPullResponse(pullResponse);
        })
        .transform(stagedRateLimiterFluxDecorator)
        .subscribe();

The stages are described in increasing success rate limit order using a specification format as follows: 1/1m:, 1/30s:1/1m, 1/1s:2/1m, :1/3m. The specification is a comma-separated list of [success rate limit]:[failure rate limit] pairs where, e.g., 1/1h is used to denote a rate limit of a single permit per 1 hour. Temporal unit must be one of h(ours), m(inutes), or s(econds). The initial failure rate limit and the last success rate limit can be omitted to indicate no rate limits.) This example will result in the following stages.

Table 1. StagedRateLimiter stages for specification 1/1m:, 1/30s:1/1m, 1/1s:2/1m, :1/3m.
stage success rate limit failure rate limit

1

1/1m (once per minute)

infinite

2

1/30s (once per 30 second)

1/1m (once per minute)

3

1/1s (once per second)

2/1m (twice per minute)

4

infinite

1/3m (once per 3 minute)

By contract, initially the active stage is set to the one with the slowest success rate limit.

Bounded SchedulerExecutorService

PubsubPuller, PubsubAccessTokenCache, and StagedRateLimiterReactorDecoratorFactory optionally receive either a ScheduledExecutorService or a Reactor Scheduler in their builders for timed invocations. One can explicitly change the implicit scheduler used by any Reactor Mono<T> or Flux<T> as well. (See Threading and Schedulers in Reactor reference manual.) We strongly suggest employing a common dedicated scheduler for all these cases with a bounded task queue. That said, unfortunately neither the default Reactor Schedulers nor the ScheduledExecutorService implementations provided by the Java Standard library allow one to put a bound on the task queue size. This shortcoming is severely prone to hiding backpressure problems. (See the the relevant concurrency-interest discussion.) To mitigate this, we provide BoundedScheduledThreadPoolExecutor wrapper and strongly recommend to employ it in your Reactor assembly line. Even though this will incur an extra thread context switching cost, this is almost negligible for a majority of the use cases and the benefit will overweight this minor expense. The usage is as simple as follows:

// Create the executor.
ScheduledThreadPoolExecutor executor =
        new ScheduledThreadPoolExecutor(
                Runtime.getRuntime().availableProcessors());
BoundedScheduledThreadPoolExecutor boundedExecutor =
        new BoundedScheduledThreadPoolExecutor(100, executor);
Scheduler scheduler = Schedulers.fromExecutorService(boundedExecutor);

// Set the access token cache executor.
PubsubAccessTokenCache
        .builder()
        .setExecutorService(executor)
        // ...
        .build();

// Set the puller scheduler.
PubsubPuller puller = PubsubPuller
        .builder()
        .setScheduler(scheduler)
        // ...
        .build();

// Employ the scheduler in the Reactor pipeline.
puller
        .pullAll()
        .concatMap(pullResponse -> {
            // ...
            return acker.ackPullResponse(pullResponse);
        })
        .flatMap(this::doSomeOtherAsyncIO)
        .subscribeOn(scheduler)
        .subscribe();

F.A.Q

How can I avoid stream termination when pull fails?

It is a common pitfall to build a message consumption pipeline as follows:

puller
        .pullAll()
        .concatMap(pullResponse -> businessLogic
                .execute(pullResponse)
                .then(acker.ackPullResponse(pullResponse)))
        .subscribe();

Here the Flux<PubsubPullResponse> returned by pullAll() will be terminated if any of the methods along the reactive chain (pullAll(), businessLogic.execute(), ack(), etc.) throws an exception. No matter how many doOnError(), onErrorResume() you plaster there, the damage has been done, the subscription has been cancelled, and pullAll() will not continue pulling anymore. Note that this applies to any Flux and nothing new to the way we leverage it here. To prevent such premature stream termination, you need to retry subscribing. While this can be done as simple as calling retry(), you might also want to check out more fancy options like retryBackoff(). As one final remark, make sure you deal (log?) with the error prior to retrying.

How can I retry ack’s?

See How to use retryWhen for exponential backoff? in Reactor reference manual.

How can I change the GCP credentials?

Unless one provided, all PubsubPublisher, PubsubPuller and PubsubAcker classes use the PubsubAccessTokenCache.getDefaultInstance() and PubsubClient.getDefaultInstance() defaults. By default, PubsubAccessTokenCache leverages GoogleCredentials.getApplicationDefault() provided by the google-auth-library-oauth2-http artifact. This function determines the credentials by trying out the following steps in order:

  1. Credentials file pointed to by the GOOGLE_APPLICATION_CREDENTIALS environment variable

  2. Credentials provided by the Google Cloud SDK gcloud auth application-default login command

  3. Google App Engine built-in credentials

  4. Google Cloud Shell built-in credentials

  5. Google Compute Engine built-in credentials

Rather than relying on this mechanism, one can explicitly set the credentials as follows:

// Create the access token cache.
PubsubAccessTokenCache accessTokenCache = PubsubAccessTokenCache
        .builder()
        .setCredentials("awesome-password")     // null falls back to the defaults
        .build();

// Create the client.
PubsubClient client = PubsubClient
        .builder()
        .setAccessTokenCache(accessTokenCache)
        .build();

// Create the puller.
PubsubPuller puller = PubsubPuller
        .builder()
        .setClient(client)
        // ...
        .build();

// Create the ack'er.
PubsubAcker acker = PubsubAcker
        .builder()
        .setClient(client)
        // ...
        .build();

// Create the publisher.
PubsubPublisher publisher = PubsubPublisher
        .builder()
        .setClient(client)
        // ...
        .build();

How can I enable metrics?

Given Micrometer is used for metrics, you first need to have it in your list of dependencies:

<dependency>
    <groupId>io.micrometer</groupId>
    <artifactId>micrometer-core</artifactId>
    <version>${micrometer.version}</version>
</dependency>

Both PubsubClient and StagedRateLimiterReactorDecoratorFactory provide means to configure metrics. Each can be simply configured as follows:

// Create a meter registry.
MeterRegistry meterRegistry = ...;

// Pass the meter registry to the Pub/Sub client.
PubsubClient
        .builder()
        .setMeterRegistry(meterRegistry)
        .setMeterNamePrefix("pubsub.client")            // default
        .setMeterTags(Collections.emptyMap())           // default
        // ...
        .build();

// Pass the meter registry to the rate limiter factory.
StagedRateLimiterReactorDecoratorFactory
        .builder()
        .setMeterRegistry(meterRegistry)
        .setMeterNamePrefix("pubsub.stagedRateLimiter") // default
        .setMeterTags(Collections.emptyMap())           // default
        // ...
        .build();

Above will publish metrics with the following footprints:

Name Tags Description

pubsub.client.publish.latency

projectName, topicName, result

publish request latency

pubsub.client.publish.count

projectName, topicName

published message count

pubsub.client.{pull,ack}.latency

projectName, subscriptionName, result

pull and ack request latency

pubsub.client.{pull,ack}.count

projectName, subscriptionName

pulleded/acked message count

pubsub.stagedRateLimiter.permitWaitPeriod

name

permit wait period distribution summary

There are a couple of details that need further elaboration here:

  • When PubsubPullerConfig#pullPeriod is set to zero (default), pull requests will only get completed when there are messages. Hence, one might experience high latencies in queues that frequently become empty.

  • When PubsubPullerConfig#pullPeriod is set to a value greater than zero, repeatedly executed pull requests by PubsubPuller#pullAll() will get followed by a pullPeriod delay after an empty response. Hence the published pubsub.client.pull.latency metrics are a combination of both the full and the empty responses.

  • As of this writing, Pub/Sub blocks every pull requests at least ~1.5 seconds before returning an empty response.

How can I run it against the Pub/Sub emulator?

Pub/Sub provides an emulator to test your applications locally. In order to use it in combination with reactor-pubsub, you need to configure the baseUrl of the PubsubClient as follows:

// Create a custom client.
PubsubClientConfig clientConfig = PubsubClientConfig
        .builder()
        .setBaseUrl("http://localhost:8085")
        .build();
PubsubClient client = PubsubClient
        .builder()
        .setConfig(clientConfig)
        .build();

// Create a publisher.
PubsubPublisher publisher = PubsubPublisher
        .builder()
        .setClient(client)
        // ...
        .build();

// Create a puller.
PubsubPuller puller = PubsubPuller
        .builder()
        .setClient(client)
        // ...
        .build();

// Create an acker.
PubsubAcker acker = PubsubAcker
        .builder()
        .setClient(client)
        // ...
        .build();

How fast is reactor-pubsub?

One of the most frequent questions reactor-pubsub is challenged with is how does it perform given the official Pub/Sub client uses Protobuf over HTTP/2 (gRPC), whereas reactor-pubsub uses JSON over HTTP/1?

Before going into convincing figures to elaborate on reactor-pubsubs performance characteristics, there is one thing that deserves attention in particular: JSON over HTTP/1 is a deliberate design decision for simplicity rather than a fallback due to technical limitations. Even though it is opinionated, reactor-pubsub strives to serve as a Pub/Sub client that leverages frequently used tools (e.g., JSON) and idioms Java developers are accustomed to. Further, in case of failures, it should be trivial to spot the smoking gun using a decent IDE debugger.

reactor-pubsub source code ships a reproducible benchmark along with its results. As shared there, one can retrieve (i.e., pull & ack) a payload of 781 MiB in 2,083 ms using two 2.70GHz CPU cores, pull batch size 50, pull concurrency 5, and message payload length 16 KiB. That is, 11,998 messages per second on a single core!* Do you still need more juice? 🙇 Go ahead and create a ticket with your use case, observed performance, and implementation details.

Benchmark Results

Historical account

I (Volkan Yazıcı) would like to take this opportunity to share the historical account from my perspective to justify the effort and defend it against any potential NIH syndrome accusations.

Why did I feel a need to implement a Pub/Sub Java driver from scratch? At bol.com, we heavily use Pub/Sub. There we started our pursuit like the rest of the Pub/Sub users with the official Java drivers provided by Google. Later on we started bumping into backpressure problems: tasks on the shared ScheduledExecutorService were somehow awkwardly dating back and constantly piling up. That was the point I introduced a BoundedScheduledThreadPoolExecutor and shit hit the fan. I figured the official Pub/Sub driver was ramming the fetched batch of messages through the shared executor. My first reaction was to cut down the pull buffer size and the concurrent pull count. That solved a majority of our backpressure-related problems, though created a new one: efficiency. Then I started examining the source code and wasted quite a lot of time trying to make forsaken FlowControlSettings work. This disappointing inquiry resulted in something remarkable: I understood how Pub/Sub works and amazed by the extent of complexity for such a simple task. I have already been using Reactive Streams (RxJava and Reactor) every single work day in the last five years and compiled a thick collection of lessons and recipes out of it. The more I examined the official Pub/Sub Java driver source code, the more I was convinced that I could very well engineer this into something way more simple. I know how to pump JSON payloads over HTTP via Reactor Netty and enjoy a backpressure-aware, reactive comfort out of the box. But that wasn’t the tipping point I had decided to implement my own Pub/Sub Java driver. I made my mind when I witnessed that Google engineers are clueless about these problems.

Why all the fuss about the rate limiting? One morning I came to the office and read an e-mail from one of the platform teams asking how come we managed to burn hundreds of dollars worth of Pub/Sub messaging in the middle of the night. One of the application (non-critical) databases happened to go down for a couple of hours and during that period nodes constantly sucked up messages and nack’ed them due to the database failure. This is an opinionated Pub/Sub driver and in my opinion you should not relentlessly burn Pub/Sub bills if the rest of the application is shouting out there is something going on wrong. Hence, please configure and use the god damn rate limiter.

Security policy

If you have encountered an unlisted security vulnerability or other unexpected behaviour that has security impact, please report them privately to the volkan@yazi.ci email address.

License

Copyright © 2019-2020 Volkan Yazıcı

Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License.


1. This could have been replaced with a more lightweight alternative, but given you have already been using Pub/Sub, it is highly likely that you already sold your soul to some other Google Cloud services too. Hence, no need to introduce an extra dependency.
2. Google APIs Client Library already depends on Jackson.