Skip to content
This repository has been archived by the owner on Jan 19, 2022. It is now read-only.

Reactive support for Pub/Sub subscription #1461

Merged
merged 29 commits into from
Mar 22, 2019
Merged

Reactive support for Pub/Sub subscription #1461

merged 29 commits into from
Mar 22, 2019

Conversation

elefeint
Copy link
Contributor

@elefeint elefeint commented Feb 8, 2019

The polling Pub/Sub flux is demand-sensitive, implementing the pull/push strategy.
Unbounded/bounded demand is treated differently:

  • For unlimited demand, the Pub/Sub subscription will be polled regularly, at intervals determined by pollingPeriodMs parameter passed in when creating the Flux.
  • For bounded demand, as many messages as possible (up to the requested number) are delivered immediately, with the remaining messages delivered as they become available.

Fixes #186.

Copy link
Contributor

@artembilan artembilan left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Some review.
Good stuff, but more questions, than answers 😄

@GetMapping(value = "/getmessages", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public Flux<? super String> getMessages() {
Flux<ConvertedAcknowledgeablePubsubMessage<String>> flux
= FluxSubscriber.createPolledFlux(this.template, String.class);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Doesn't look like conversion logic propagation is a good idea in terms of Reactive Streams.
You just need to request data and then downstream in the .map() decide to what to convert it into.

* @author Elena Felder
* @since 1.2
*/
public class FluxSubscriber {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The name of the class is misleading. There is already a Subscriber abstraction in the Reactive Streams and Flux just simple deal with them as is.

Essentially what you did here is a factory. so, please, reconsider the class name and I believe something like PubSub must be included in its name.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Something like PubSubSubscriberFactory?
Then we can move the .subscribe() methods into this, as we discussed before for the refactoring.

On the other hand, we already have a SubscriberFactory, so I could just add the Flux creation logic into the existing DefaultSubscriberFactory.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You can if reactor-core is hard dependency, not optional.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, good point -- it should probably be optional. so it should not sit in a core class. Reactive is not the most common usecase.


LOGGER.info(" Current remaining demand: " + remainingDemand);
List<ConvertedAcknowledgeablePubsubMessage<T>> messages
= this.subscriberOperations.pullAndConvert("reactiveSubscription",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

M-m-m. I think this is not intentional.
Why do you use an explicit, non-configurable subscription name ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Definitely not intentional; I'll move it into a parameter.

return Flux.create(sink -> {
sink.onRequest((numRequested) -> {
// request must be non-blocking.
executorService.submit(new PubSubPullTask<T>(numRequested, sink, subscriberOperations, targetType));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think that not your responsibility to do this.
What you, essentially, have is simply covered by the subscribeOn().

Although since underlying API is really not reactive (PullResponse):

this.receivedMessages_.add(input.readMessage(ReceivedMessage.parser(), extensionRegistry));

Does it really make sense to promote this from the Framework perspective?..

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The problem is that I am making a blocking pull call here, so if I request on the same thread, the code will violate the reactive streams rule that request should be non-obstructing.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There might be a way to do the pull asynchronously without your own ExecutorService.
See here: https://github.com/googleapis/google-cloud-java/blob/master/google-cloud-examples/src/main/java/com/google/cloud/examples/pubsub/snippets/SubscriberSnippets.java#L213

subscriber.pullCallable().futureCall()

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's hard for me to object, but I think your thread spawning per subscriber is still bad idea.
Consider to use somehow Schedulers.elastic().

I will try to pull some one from our Reactor team to see what that think on the matter...

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh! futureCall(). that's an answer! Right., we need to add it into the PubSubSubscriberOperations API and use it from this Flux trick.
There is simply need to be have a flatMap(e -> Mono.fromFuture()) and so on with flatMapIterable().

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

futureCall() sounded good in theory, but now we have potentially multiple outstanding calls to PubSub with additional callbacks being spawned.
Flux making an additional request when the data is 75% or so processed, combined with PubSub synchronous pull timing out after 10 minutes makes for messy logic.

I'd almost prefer to take a "poll interval" parameter, accumulate total outstanding demand in the custom flux, and then make "return_immediately=true" requests at an interval.

int numReceived = messages.size();
messages.forEach(m -> {
LOGGER.info("Sending message to subscriber: " + m.getPayload());
sink.next(m);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would send the whole list as a single event, then would have have a flatMapIterable() before returning from the createPolledFlux()

@codecov-io
Copy link

codecov-io commented Feb 15, 2019

Codecov Report

Merging #1461 into master will decrease coverage by 10.13%.
The diff coverage is 89.13%.

Impacted file tree graph

@@             Coverage Diff              @@
##             master   #1461       +/-   ##
============================================
- Coverage     78.94%   68.8%   -10.14%     
+ Complexity     1989    1692      -297     
============================================
  Files           244     246        +2     
  Lines          6526    6572       +46     
  Branches        662     667        +5     
============================================
- Hits           5152    4522      -630     
- Misses         1094    1755      +661     
- Partials        280     295       +15
Flag Coverage Δ Complexity Δ
#integration ? ?
#unittests 68.8% <89.13%> (+0.14%) 1692 <9> (+9) ⬆️
Impacted Files Coverage Δ Complexity Δ
...toconfigure/pubsub/GcpPubSubAutoConfiguration.java 90.47% <ø> (ø) 38 <0> (ø) ⬇️
...ure/pubsub/GcpPubSubReactiveAutoConfiguration.java 100% <100%> (ø) 3 <3> (?)
...oud/gcp/pubsub/reactive/PubSubReactiveFactory.java 88.37% <88.37%> (ø) 6 <6> (?)
...main/kotlin/com/example/app/PubSubConfiguration.kt 0% <0%> (-100%) 0% <0%> (-5%)
...ample/src/main/java/com/example/WebController.java 0% <0%> (-100%) 0% <0%> (-3%)
...a/spanner/repository/query/SpannerQueryMethod.java 0% <0%> (-100%) 0% <0%> (-5%)
...ple/src/main/java/com/example/MyAppProperties.java 0% <0%> (-100%) 0% <0%> (-5%)
...ple/src/main/java/com/example/HelloController.java 0% <0%> (-100%) 0% <0%> (-2%)
...ample/src/main/java/com/example/BluegrassBand.java 0% <0%> (-100%) 0% <0%> (-2%)
...figure/config/GcpConfigBootstrapConfiguration.java 0% <0%> (-100%) 0% <0%> (-2%)
... and 74 more

Continue to review full report at Codecov.

Legend - Click here to learn more
Δ = absolute <relative> (impact), ø = not affected, ? = missing data
Powered by Codecov. Last update 72d7464...3ba8683. Read the comment docs.


if (remainingDemand > 0) {
// Ensure requested messages get fulfilled eventually.
produceMessages(subscriptionName, remainingDemand, sink, subscriberOperations);
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Between the need to keep onRequest() non-blocking and attempting to fulfill outstanding demand, the code relying on gRPC's futureCall() became messy and with potentially too many simultaneous calls to Pub/Sub. And adding callbacks to ApiFuture requires a separate executor, anyway (I reused the former ackExecutor in the latest implementation).

I would prefer to go back to maintaining a thread pool (but a shared one, not one per flux) in PubSubReactiveFactory, and using non-blocking polling at a customizable interval to satisfy remaining demand.

@artembilan @meltsufin What do you think?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, the call stack can get huge with this recursive call. So, it might make sense to go back to the synchronous pull with a pooled executor. It doesn't look like we're able to get away from providing our own executor anyway. WDYT @artembilan?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would say that's not our problem to deal with remained here and die in the stack overflow.

Please, see Subscription.request() JavaDocs:

* A {@link Publisher} can send less than is requested if the stream ends but
* then must emit either {@link Subscriber#onError(Throwable)} or {@link Subscriber#onComplete()}.

so, our goal here is to produce exactly requested amount or less and then onComplete(): FluxSink.complete().

Not sure why have you gone a bit different way...

Also the rule number 1 of Publisher specification: https://github.com/reactive-streams/reactive-streams-jvm/blob/v1.0.2/README.md#specification

The total number of onNext´s signalled by a Publisher to a Subscriber MUST be less than or equal to the total number of elements requested by that Subscriber´s Subscription at all times.

I think you are missing the fact that our createPolledPublisher() is just finite and that's already out of our scope to keep it hot and constantly updating.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is a cold publisher useful in Pub/Sub context? If the requirement is "get up to N messages and complete", then the client code can just use generate:

Flux.<List<AcknowledgeablePubsubMessage>>generate(
  () -> subscriberOperations.pull(subscriptionName, demand, true)
).flatMapIterable(Function.identity());

It seems to me based on discussion in reactor/reactor-core#314 that onRequest() was added to support our exact usecase of an unlimited flux that still responds to backpressure.

Off-topic, if I start a metal band, "We Die in the Stack Overflow" will be its name.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hm... Then I have no answer or my Reactive knowledge is limited...

I wonder if we can bring someone who is an expert in the area to give us some feedback...

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Well, everything I learned about Reactive I learned in the last month, so I am hardly an expert. Could we maybe set up a meeting with Reactive folks?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thinking about this a bit more, I'm not sure what's wrong with just returning whatever we get from Pub/Sub with the return-immediately=true, and not trying to fulfill the full demand. If Pub/Sub returns less than the demand, we just call onComplete assuming there are no more messages for the time being. The client can always resume its requests.
What's our outstanding question for the experts? I think the specification is fairly precise.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, just talked to @elefeint. I see why it's bad to terminate the stream when the demand is unmet by Pub/Sub. It doesn't work for infinite streams. So, it looks like it's better to keep the subscription open knowing that Pub/Sub can eventually provide more messages. We just need to poll it on a regular basis. Ideally, Pub/Sub would support true reactive streaming itself, but for the time being we need to simulate it by polling.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hey that'd be great to meet if you folks want to introduce us to this in a zoom or somewhere and we can pair-review together !

public void setAckExecutor(Executor ackExecutor) {
Assert.notNull(ackExecutor, "ackExecutor can't be null.");
this.ackExecutor = ackExecutor;
public void setCallbackExecutor(Executor callbackExecutor) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you realize that this is a breaking change? We just can't rename API just because.
Please, consider to deprecate exiting method in favor of a new one.
And let's keep it here at least for the next version!

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Point well taken. If this is the direction in which this code will go, I'll add the proper deprecation (I hope it does not go there, though -- per discussion below, I would prefer to revert the changes to PubSubSubscriberTemplate and manage multi threading in PubSubReactiveFactory instead).


if (remainingDemand > 0) {
// Ensure requested messages get fulfilled eventually.
produceMessages(subscriptionName, remainingDemand, sink, subscriberOperations);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would say that's not our problem to deal with remained here and die in the stack overflow.

Please, see Subscription.request() JavaDocs:

* A {@link Publisher} can send less than is requested if the stream ends but
* then must emit either {@link Subscriber#onError(Throwable)} or {@link Subscriber#onComplete()}.

so, our goal here is to produce exactly requested amount or less and then onComplete(): FluxSink.complete().

Not sure why have you gone a bit different way...

Also the rule number 1 of Publisher specification: https://github.com/reactive-streams/reactive-streams-jvm/blob/v1.0.2/README.md#specification

The total number of onNext´s signalled by a Publisher to a Subscriber MUST be less than or equal to the total number of elements requested by that Subscriber´s Subscription at all times.

I think you are missing the fact that our createPolledPublisher() is just finite and that's already out of our scope to keep it hot and constantly updating.

@meltsufin meltsufin added the pubsub GCP PubSub label Feb 22, 2019
Copy link
Contributor Author

@elefeint elefeint left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

From in-person review with @smaldini @artembilan @meltsufin


PubSubReactiveSubscription reactiveSubscription = new PubSubReactiveSubscription(subscriptionName);

return Flux.interval(Duration.ofMillis(delayInMilliseconds))
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

onBackpressureDrop()


return Flux.interval(Duration.ofMillis(delayInMilliseconds))
.doOnRequest(reactiveSubscription::addDemand)
.map(t -> reactiveSubscription.pullMessages())
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

collapse map and flatmap into one operation

public Publisher<AcknowledgeablePubsubMessage> createPolledPublisher(
String subscriptionName, int delayInMilliseconds) {

PubSubReactiveSubscription reactiveSubscription = new PubSubReactiveSubscription(subscriptionName);
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

avoid creating subscription object outside of flux creation.
bottom line: use Flux.create for more control over the process

@elefeint
Copy link
Contributor Author

elefeint commented Mar 6, 2019

@smaldini @artembilan @meltsufin I've changed the Pub/Sub flux creation implementation as we discussed last Monday (with Flux.create and special treatment of unlimited demand). Could you take another look?

I still owe tests.

Copy link
Contributor

@artembilan artembilan left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nothing critical.
The solution looks great! 👍

}

/**
* Creates an infinite stream {@link Publisher} of {@link AcknowledgeablePubsubMessage} objects.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

* @param pollingPeriod how frequently to poll the source subscription in case of unlimited demand.
* @return infinite stream of {@link AcknowledgeablePubsubMessage} objects.
*/
public Publisher<AcknowledgeablePubsubMessage> createPolledPublisher(String subscriptionName, long pollingPeriod) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think our (Spring) programming module to expect Reactive Streams interfaces as method arguments, but return just an explicit Reacor types: Flux and Mono.
No reason to narrow it if you deal internally with only Reactor.

messages.forEach(sink::next);
}

return messages;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we may end up with the contract just really return an amount we have just pulled, not the whole list of messages.

<dependency>
<groupId>io.projectreactor</groupId>
<artifactId>reactor-core</artifactId>
<optional>true</optional>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I believe it can be non-optional here already.
This is end-user code, so that is fully OK to have it as a hard dependency.

return Flux.from(flux)
.doOnNext(message -> {
System.out.println("Received a message: " + message.getPubsubMessage().getMessageId());
message.ack();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we need to document this manual ack'ing or maybe consider an auto-ack mode as well...

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Customizable ack mode is a good idea. Would it make sense to move (properly, with deprecation) AckMode from o.s.c.gcp.pubsub.integration up a level, or to simply use a boolean autoAck parameter when creating the Flux?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think boolean autoAck is fully enough, but let's really consider this improvement when we are already done with basics in this PR!

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I created #1517 to track; it will be a small follow-up PR.

docs/src/main/asciidoc/pubsub.adoc Outdated Show resolved Hide resolved

The `Flux` then represents an infinite stream of GCP Pub/Sub messages coming in through the specified subscription.
The full range of Project Reactor operations can be applied to the stream.
For example, the infinite stream can become a finite one with `limitRequest` operation:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should also use simpler language, like "If you only want to fetch 5 messages from Pub/Sub, you can use a finite stream...".

* @param pollingPeriod how frequently to poll the source subscription in case of unlimited demand.
* @return infinite stream of {@link AcknowledgeablePubsubMessage} objects.
*/
public Flux<AcknowledgeablePubsubMessage> createPolledFlux(String subscriptionName, long pollingPeriod) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Units for pollingPeriod should be clarified. I would name the parameter pollingPeriodMs and clarify units in the javadoc.

* @return number of messages retrieved
*/
protected int pullToSink(long demand, boolean block) {
int numMessagesToPull = demand > Integer.MAX_VALUE ? Integer.MAX_VALUE : (int) demand;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why not just only allow int demand as parameter and let the caller deal with conversion?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The demand comes from FluxSink.onRequest() parameter, which is a Long. Ultimately, there is tension between Reactor (which operates on Long demand) and PubSub (which operates on int demand), so it has to get resolved somewhere in my code.

Or did you mean to simply move the conversion from PubSubBlockingLimitedDemandPullTask.run() up to PubSubReactiveFactory.createPolledFlux.createPolledFlux()?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Right, hand the responsibility to the caller.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I looked at the code again -- the responsibility for converting from long to int belongs with the polling task, as close to the Pub/Sub polling operation is possible. Consider a request of Integer.MAX_VALUE + 1,000. If we convert to int before creating PubSubBlockingLimitedDemandPullTask, then only a demand of Integer.MAX_VALUE will be fulfilled, leaving the extra demand of 1,000 unmet.

Leaving the conversion responsibility with PubSubBlockingLimitedDemandPullTask allows it to think in terms of long internally, subtracting up to Integer.MAX_VALUE messages at each poll.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the conversion belongs in createPolledFlux. How would the demand of Integer.MAX_VALUE + 1,000 be fulfilled, if you cap it at Integer.MAX_VALUE here? Automatically changing demand > Integer.MAX_VALUE to Integer.MAX_VALUE seems a bug waiting to happen. If the flux ever requests a demand greater than Integer.MAX_VALUE and it's not the special Long.MAX_VALUE we should probably just throw an unsupported exception in createPolledFlux.


private static final Log LOGGER = LogFactory.getLog(PubSubReactiveFactory.class);

private PubSubSubscriberOperations subscriberOperations;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

final

this.subscriptionName, numMessagesToPull, !block);

if (!this.sink.isCancelled()) {
messages.forEach(sink::next);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder what is wrong with sending the whole List<AcknowledgeablePubsubMessage> batch to the sink as a single event and then use a flatMapIterable(Function.identity()) before returning the Flux to end-user...

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

flatMapIterable changes demand, which could look unintuitive to the consumer (if subscription requests 3 messages, flatMapIterable() will cause 256 to be requested instead).

Also (this is not a good reason, just relevant) when I was testing, I ran into an issue with additional demand not being propagated from HTTP request to the Pub/Sub flux even though the initial request was fulfilled. I'd like to reproduce a minimal case to understand whether this behaves as intended or is a bug.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK. Then let's deffer it to @smaldini .

I'm not sure how to be with this situation then 😄

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi! Great work @elefeint! We were also thinking of implementing our own Reactor Pubsub client due to improper back-pressure semantics which is giving us quite a lot of headache and thankfully @bsideup pointed this ticket to us. About this specific messages.forEach(sink::next) line, I have a question: If the demand is a big number, wouldn't you be fetching a gigantic batch of messages from Pubsub and choking the consumer via over-production? I would rather cap it by Math.min(demand, maxDemand) (e.g., maxDemand=100 by default) for each request(demand) and wait for any follow up request()s that the consumer will signal at its convenience. Otherwise my impression is that we will be periodically bombarding the consumer with messages regardless of its availability. Is there a detail I am missing?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If the consumer is specifying the large demand, isn't it a signal that it can handle that number of messages? Certainly throttling can be implemented, but it may be unintuitive to consumers that do want a larger batch.

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Most of the time consumers do not specify anything and simple subscribe() just triggers a request(Long.MAX_VALUE). Though in the case of a message queue poller, "Apparently the consumer is asking for a cargo ship load of messages." does not sound like a practical assumption to me. Consider the following case:

  1. There are 1e6 messages in the queue.
  2. request(unbounded) triggers a continuous messages.forEach(sink::next) sequence.
  3. Scheduler threads start working on the initial tasks and the rest quickly get piled up in the work queue of the ScheduledExecutorService.
  4. After a while, majority of the messages timeout, work queue starts choking memory, and every task threads pick up turn out to be obsolete.

This is the problem I am dealing for the last couple of months at work. Let me know if there is a way I can work around this.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@vy I understand now. As a practical matter, would adding limitRequest() or limitRate() operators to the invocation chain work?

@smaldini The conversation about default behavior of subscribe() is an interesting one.
Would it make sense to open an issue on reactor-core? Unbounded demand is not the most common usecase, so perhaps a .subscribeWithInitialDemand() operator makes sense?

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@elefeint I suppose a combination of limit{Request,Rate}() would do the trick, though it still expects an explicit action from the consumer side. I would still favor shipping a solution with decent defaults minimizing any effort from user side.

I think subscribeWithInitialDemand() is sort of against the Reactor's fluent API encouraging composition. Isn't it just a shortcut for .limitRequest(n).subscribe()?

Nevertheless, @smaldini, we would really appreciate your insights on the general back-pressure issue. (@simonbasle, is this something you can help us out?)

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

FYI limitRate only, not limitRequest. limitRequest will cut the requests after the threshold specified as an argument

Copy link
Contributor Author

@elefeint elefeint Apr 11, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@vy Would you mind creating an issue in this project for adding the maxMessagesPerPoll property? I've been thinking about this some more, and for unbounded demand scenario it makes sense to provide a configurable limit, especially if it's combined with a configurable project-wide default for pollingPeriodMs.
The firehose of messages would likely still be the default, but smoothing message arrival by getting N messages per poll seems valuable.

meltsufin and others added 2 commits March 11, 2019 11:19
Co-Authored-By: elefeint <41136058+elefeint@users.noreply.github.com>
@elefeint elefeint changed the title wip reactive polling pubsub Reactive support for Pub/Sub subscription Mar 11, 2019
Spring Operator and others added 2 commits March 18, 2019 11:29
This commit updates URLs to prefer the https protocol. Redirects are not followed to avoid accidentally expanding intentionally shortened URLs (i.e. if using a URL shortener).

# Fixed URLs

## Fixed Success
These URLs were switched to an https URL with a 2xx status. While the status was successful, your review is still recommended.

* http://docs.oracle.com/javaee/7/api/ with 1 occurrences migrated to:
  https://docs.oracle.com/javaee/7/api/ ([https](https://docs.oracle.com/javaee/7/api/) result 200).
* http://docs.oracle.com/javase/7/docs/api/ with 1 occurrences migrated to:
  https://docs.oracle.com/javase/7/docs/api/ ([https](https://docs.oracle.com/javase/7/docs/api/) result 200).
* http://docs.spring.io/spring/docs/4.1.x/javadoc-api/ with 1 occurrences migrated to:
  https://docs.spring.io/spring/docs/4.1.x/javadoc-api/ ([https](https://docs.spring.io/spring/docs/4.1.x/javadoc-api/) result 200).
* http://maven.apache.org/xsd/maven-4.0.0.xsd with 51 occurrences migrated to:
  https://maven.apache.org/xsd/maven-4.0.0.xsd ([https](https://maven.apache.org/xsd/maven-4.0.0.xsd) result 200).
* http://stackoverflow.com/questions/1593051/how-to-programmatically-determine-the-current-checked-out-git-branch with 1 occurrences migrated to:
  https://stackoverflow.com/questions/1593051/how-to-programmatically-determine-the-current-checked-out-git-branch ([https](https://stackoverflow.com/questions/1593051/how-to-programmatically-determine-the-current-checked-out-git-branch) result 200).
* http://stackoverflow.com/questions/29300806/a-bash-script-to-check-if-a-string-is-present-in-a-comma-separated-list-of-strin with 1 occurrences migrated to:
  https://stackoverflow.com/questions/29300806/a-bash-script-to-check-if-a-string-is-present-in-a-comma-separated-list-of-strin ([https](https://stackoverflow.com/questions/29300806/a-bash-script-to-check-if-a-string-is-present-in-a-comma-separated-list-of-strin) result 200).
* http://www.apache.org/licenses/LICENSE-2.0 with 2 occurrences migrated to:
  https://www.apache.org/licenses/LICENSE-2.0 ([https](https://www.apache.org/licenses/LICENSE-2.0) result 200).
* http://fasterxml.github.com/jackson-core/javadoc/2.0.0/ with 1 occurrences migrated to:
  https://fasterxml.github.com/jackson-core/javadoc/2.0.0/ ([https](https://fasterxml.github.com/jackson-core/javadoc/2.0.0/) result 301).
* http://www.spring.io with 5 occurrences migrated to:
  https://www.spring.io ([https](https://www.spring.io) result 301).

# Ignored
These URLs were intentionally ignored.

* http://maven.apache.org/POM/4.0.0 with 102 occurrences
* http://www.w3.org/2001/XMLSchema-instance with 51 occurrences
Copy link
Contributor Author

@elefeint elefeint left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

from in-person review

protected static class ReactivePubSubAutoconfiguration {

@Bean
@ConditionalOnMissingBean
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should there be a property preventing this object from instantiating?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For what reason?
Doesn't look like this light object is some kind of overhead for the target application.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Right, it's not necessary to keep it off by default because it's just a wrapper around a SubscriberTemplate.
But I am also going to add a default scheduler bean into autoconfig and extract reactive configuration into a separate file, at which point it will make sense to add an overridable matchIfMissing=true property spring.cloud.gcp.pubsub.reactive.enabled.

// unlimited demand
Disposable task = Schedulers.single().schedulePeriodically(
new PubSubNonBlockingUnlimitedDemandPullTask(subscriptionName, sink), 0, pollingPeriodMs, TimeUnit.MILLISECONDS);
sink.onCancel(task::dispose);
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

pass task directly

* @param pollingPeriodMs how frequently to poll the source subscription in case of unlimited demand, in milliseconds.
* @return infinite stream of {@link AcknowledgeablePubsubMessage} objects.
*/
public Flux<AcknowledgeablePubsubMessage> createPolledFlux(String subscriptionName, long pollingPeriodMs) {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

rename to poll(subscriptionName, pollingPeriodMs)

if (numRequested == Long.MAX_VALUE) {
// unlimited demand
Disposable task = Schedulers.single().schedulePeriodically(
new PubSubNonBlockingUnlimitedDemandPullTask(subscriptionName, sink), 0, pollingPeriodMs, TimeUnit.MILLISECONDS);
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

remove PubSub prefix

demand -= pullToSink(intDemand, true);
}
catch (DeadlineExceededException e) {
LOGGER.trace("Blocking pull timed out due to empty subscription " + this.subscriptionName + "; retrying.");
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

add isTraceEnabled()

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No reason: this is in the catch() block, so only in case of exception.
It's not going to be a big deal to build a logging message in case of this exception.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This exception is actually expected. So, it may happen quiet often.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Every 10 minutes by default, which is negligible in computer speed. On the other hand, we might as well add isTraceEnabled() for consistency.

sink.onCancel(task::dispose);
}
else {
Schedulers.single().schedule(new PubSubBlockingLimitedDemandPullTask(subscriptionName, numRequested, sink));
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

use elastic(), create single worker (don't call scheduler() directly)
create worker before request

also, kill task when sink is cancelled -- 10 minutes is too long

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

reuse the worker for both, unlimited and limited demand

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

autoconfigure the scheduler

protected static class ReactivePubSubAutoconfiguration {

@Bean
@ConditionalOnMissingBean
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For what reason?
Doesn't look like this light object is some kind of overhead for the target application.

demand -= pullToSink(intDemand, true);
}
catch (DeadlineExceededException e) {
LOGGER.trace("Blocking pull timed out due to empty subscription " + this.subscriptionName + "; retrying.");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No reason: this is in the catch() block, so only in case of exception.
It's not going to be a big deal to build a logging message in case of this exception.

Copy link
Contributor

@meltsufin meltsufin left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Changes look good!
Would it be possible to add a integration test for the sample, like we have for other samples?

Co-Authored-By: elefeint <41136058+elefeint@users.noreply.github.com>
artembilan
artembilan previously approved these changes Mar 21, 2019
@elefeint
Copy link
Contributor Author

I'll add an integration test in a follow-up PR -- I also want to change the sample, so users can send messages through a web UI and not have to have gcloud installed or multi-task with a third command-line window.

This commit updates URLs to prefer the https protocol.
Copy link

@smaldini smaldini left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice job @elefeint and team!

@elefeint elefeint merged commit 8df2a6b into master Mar 22, 2019
@elefeint elefeint deleted the pubsub-reactive branch March 22, 2019 18:10
Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.
Labels
pubsub GCP PubSub
Development

Successfully merging this pull request may close these issues.

None yet

7 participants