Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Allow to override prefetch on client and server side #204

Merged
merged 11 commits into from May 12, 2020
Expand Up @@ -36,6 +36,15 @@ public AbstractClientStreamObserverAndPublisher(
super(queue, onSubscribe, onTerminate);
}

public AbstractClientStreamObserverAndPublisher(
Queue<T> queue,
Consumer<CallStreamObserver<?>> onSubscribe,
Runnable onTerminate,
int prefetch,
int lowTide) {
super(queue, prefetch, lowTide, onSubscribe, onTerminate);
}

@Override
public void beforeStart(ClientCallStreamObserver<T> requestStream) {
super.onSubscribe(requestStream);
Expand Down
Expand Up @@ -25,11 +25,21 @@ public abstract class AbstractServerStreamObserverAndPublisher<T>

private volatile boolean abandonDelayedCancel;

public AbstractServerStreamObserverAndPublisher(
ServerCallStreamObserver<?> serverCallStreamObserver,
Queue<T> queue,
Consumer<CallStreamObserver<?>> onSubscribe) {
super(queue, onSubscribe);
super.onSubscribe(serverCallStreamObserver);
}

public AbstractServerStreamObserverAndPublisher(
ServerCallStreamObserver<?> serverCallStreamObserver,
Queue<T> queue,
Consumer<CallStreamObserver<?>> onSubscribe) {
super(queue, onSubscribe);
Consumer<CallStreamObserver<?>> onSubscribe,
int prefetch,
int lowTide) {
super(queue, prefetch, lowTide, onSubscribe);
super.onSubscribe(serverCallStreamObserver);
}

Expand Down
Expand Up @@ -64,8 +64,8 @@ public void request(long n) {
};


protected static final int DEFAULT_CHUNK_SIZE = 512;
protected static final int TWO_THIRDS_OF_DEFAULT_CHUNK_SIZE = DEFAULT_CHUNK_SIZE * 2 / 3;
public static final int DEFAULT_CHUNK_SIZE = 512;
public static final int TWO_THIRDS_OF_DEFAULT_CHUNK_SIZE = DEFAULT_CHUNK_SIZE * 2 / 3;

private static final int UNSUBSCRIBED_STATE = 0;
private static final int SUBSCRIBED_ONCE_STATE = 1;
Expand Down
25 changes: 25 additions & 0 deletions reactor/README.md
Expand Up @@ -128,6 +128,31 @@ Two context propagation techniques are:
2. Make use of Reactor's [`subscriberContext()`](https://github.com/reactor/reactor-core/blob/master/docs/asciidoc/advancedFeatures.adoc#adding-a-context-to-a-reactive-sequence)
API to capture the gRPC context in the call chain.

## Configuration of flow control
Reactor GRPC by default prefetch 512 items on client and server side. When the messages are bigger it
can consume a lot of memory. One can override these default settings using ReactorCallOptions:

Prefetch on client side (client consumes too slow):

```java
ReactorMyAPIStub api = ReactorMyAPIGrpc.newReactorStub(channel)
.withOption(ReactorCallOptions.CALL_OPTIONS_PREFETCH, 16)
.withOption(ReactorCallOptions.CALL_OPTIONS_LOW_TIDE, 4);
```

Prefetch on server side (server consumes too slow):

```java
// Override getCallOptions method in your *ImplBase service class.
// One can use methodId to do method specific override
@Override
protected CallOptions getCallOptions(int methodId) {
return CallOptions.DEFAULT
.withOption(ReactorCallOptions.CALL_OPTIONS_PREFETCH, 16)
.withOption(ReactorCallOptions.CALL_OPTIONS_LOW_TIDE, 4);
}
```

Modules
=======

Expand Down
Expand Up @@ -7,11 +7,11 @@

package com.salesforce.reactorgrpc.stub;

import java.util.function.BiConsumer;
import java.util.function.Function;

import io.grpc.CallOptions;
import io.grpc.stub.CallStreamObserver;
import io.grpc.stub.StreamObserver;
import java.util.function.BiConsumer;
import java.util.function.Function;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.publisher.Operators;
Expand All @@ -31,7 +31,8 @@ private ClientCalls() {
*/
public static <TRequest, TResponse> Mono<TResponse> oneToOne(
Mono<TRequest> monoSource,
BiConsumer<TRequest, StreamObserver<TResponse>> delegate) {
BiConsumer<TRequest, StreamObserver<TResponse>> delegate,
CallOptions options) {
try {
return Mono
.<TResponse>create(emitter -> monoSource.subscribe(
Expand Down Expand Up @@ -65,12 +66,17 @@ public void onCompleted() {
*/
public static <TRequest, TResponse> Flux<TResponse> oneToMany(
Mono<TRequest> monoSource,
BiConsumer<TRequest, StreamObserver<TResponse>> delegate) {
BiConsumer<TRequest, StreamObserver<TResponse>> delegate,
CallOptions options) {
try {

final int prefetch = ReactorCallOptions.getPrefetch(options);
final int lowTide = ReactorCallOptions.getLowTide(options);

return monoSource
.flatMapMany(request -> {
ReactorClientStreamObserverAndPublisher<TResponse> consumerStreamObserver =
new ReactorClientStreamObserverAndPublisher<>(null);
new ReactorClientStreamObserverAndPublisher<>(null, null, prefetch, lowTide);

delegate.accept(request, consumerStreamObserver);

Expand All @@ -88,7 +94,8 @@ public static <TRequest, TResponse> Flux<TResponse> oneToMany(
@SuppressWarnings("unchecked")
public static <TRequest, TResponse> Mono<TResponse> manyToOne(
Flux<TRequest> fluxSource,
Function<StreamObserver<TResponse>, StreamObserver<TRequest>> delegate) {
Function<StreamObserver<TResponse>, StreamObserver<TRequest>> delegate,
CallOptions options) {
try {
ReactorSubscriberAndClientProducer<TRequest> subscriberAndGRPCProducer =
fluxSource.subscribeWith(new ReactorSubscriberAndClientProducer<>());
Expand All @@ -113,14 +120,19 @@ public static <TRequest, TResponse> Mono<TResponse> manyToOne(
@SuppressWarnings("unchecked")
public static <TRequest, TResponse> Flux<TResponse> manyToMany(
Flux<TRequest> fluxSource,
Function<StreamObserver<TResponse>, StreamObserver<TRequest>> delegate) {
Function<StreamObserver<TResponse>, StreamObserver<TRequest>> delegate,
CallOptions options) {
try {

final int prefetch = ReactorCallOptions.getPrefetch(options);
final int lowTide = ReactorCallOptions.getLowTide(options);

ReactorSubscriberAndClientProducer<TRequest> subscriberAndGRPCProducer =
fluxSource.subscribeWith(new ReactorSubscriberAndClientProducer<>());
ReactorClientStreamObserverAndPublisher<TResponse> observerAndPublisher =
new ReactorClientStreamObserverAndPublisher<>(
s -> subscriberAndGRPCProducer.subscribe((CallStreamObserver<TRequest>) s),
subscriberAndGRPCProducer::cancel
subscriberAndGRPCProducer::cancel, prefetch, lowTide
);
delegate.apply(observerAndPublisher);

Expand Down
@@ -0,0 +1,55 @@
/*
* Copyright (c) 2019, Salesforce.com, Inc.
* All rights reserved.
* Licensed under the BSD 3-Clause license.
* For full license text, see LICENSE.txt file in the repo root or https://opensource.org/licenses/BSD-3-Clause
*/

package com.salesforce.reactorgrpc.stub;

import com.salesforce.reactivegrpc.common.AbstractStreamObserverAndPublisher;
import io.grpc.CallOptions;

/**
* Reactor Call options.
*/
public final class ReactorCallOptions {

private ReactorCallOptions() {
}

/**
* Sets Prefetch size of queue.
*/
public static final io.grpc.CallOptions.Key<Integer> CALL_OPTIONS_PREFETCH =
io.grpc.CallOptions.Key.createWithDefault("reactivegrpc.internal.PREFETCH",
Integer.valueOf(AbstractStreamObserverAndPublisher.DEFAULT_CHUNK_SIZE));

/**
* Sets Low Tide of prefetch queue.
*/
public static final io.grpc.CallOptions.Key<Integer> CALL_OPTIONS_LOW_TIDE =
io.grpc.CallOptions.Key.createWithDefault("reactivegrpc.internal.LOW_TIDE",
Integer.valueOf(AbstractStreamObserverAndPublisher.TWO_THIRDS_OF_DEFAULT_CHUNK_SIZE));


/**
* Utility function to get prefetch option.
*/
public static int getPrefetch(final CallOptions options) {
return options == null ? CALL_OPTIONS_PREFETCH.getDefault() : options.getOption(CALL_OPTIONS_PREFETCH);
}

/**
* Utility function to get low tide option together with validation.
*/
public static int getLowTide(final CallOptions options) {
int prefetch = getPrefetch(options);
int lowTide = options == null ? CALL_OPTIONS_LOW_TIDE.getDefault() : options.getOption(CALL_OPTIONS_LOW_TIDE);
if (lowTide >= prefetch) {
throw new IllegalArgumentException(CALL_OPTIONS_LOW_TIDE + " must be less than " + CALL_OPTIONS_PREFETCH);
}
return lowTide;
}

}
Expand Up @@ -30,6 +30,14 @@ class ReactorClientStreamObserverAndPublisher<T>
super(Queues.<T>get(DEFAULT_CHUNK_SIZE).get(), onSubscribe, onTerminate);
}

ReactorClientStreamObserverAndPublisher(
Consumer<CallStreamObserver<?>> onSubscribe,
Runnable onTerminate,
int prefetch,
int lowTide) {
super(Queues.<T>get(DEFAULT_CHUNK_SIZE).get(), onSubscribe, onTerminate, prefetch, lowTide);
}

@Override
public int requestFusion(int requestedMode) {
if ((requestedMode & Fuseable.ASYNC) != 0) {
Expand Down
Expand Up @@ -24,8 +24,10 @@ class ReactorServerStreamObserverAndPublisher<T>

ReactorServerStreamObserverAndPublisher(
ServerCallStreamObserver<?> serverCallStreamObserver,
Consumer<CallStreamObserver<?>> onSubscribe) {
super(serverCallStreamObserver, Queues.<T>get(DEFAULT_CHUNK_SIZE).get(), onSubscribe);
Consumer<CallStreamObserver<?>> onSubscribe,
int prefetch,
int lowTide) {
super(serverCallStreamObserver, Queues.<T>get(DEFAULT_CHUNK_SIZE).get(), onSubscribe, prefetch, lowTide);
}

@Override
Expand Down
Expand Up @@ -7,14 +7,14 @@

package com.salesforce.reactorgrpc.stub;

import java.util.function.Function;

import com.google.common.base.Preconditions;
import io.grpc.CallOptions;
import io.grpc.Status;
import io.grpc.StatusException;
import io.grpc.StatusRuntimeException;
import io.grpc.stub.ServerCallStreamObserver;
import io.grpc.stub.StreamObserver;
import java.util.function.Function;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

Expand Down Expand Up @@ -77,9 +77,14 @@ public static <TRequest, TResponse> void oneToMany(
*/
public static <TRequest, TResponse> StreamObserver<TRequest> manyToOne(
StreamObserver<TResponse> responseObserver,
Function<Flux<TRequest>, Mono<TResponse>> delegate) {
Function<Flux<TRequest>, Mono<TResponse>> delegate,
CallOptions options) {

final int prefetch = ReactorCallOptions.getPrefetch(options);
final int lowTide = ReactorCallOptions.getLowTide(options);

ReactorServerStreamObserverAndPublisher<TRequest> streamObserverPublisher =
new ReactorServerStreamObserverAndPublisher<>((ServerCallStreamObserver<TResponse>) responseObserver, null);
new ReactorServerStreamObserverAndPublisher<>((ServerCallStreamObserver<TResponse>) responseObserver, null, prefetch, lowTide);

try {
Mono<TResponse> rxResponse = Preconditions.checkNotNull(delegate.apply(Flux.from(streamObserverPublisher)));
Expand Down Expand Up @@ -112,9 +117,14 @@ public static <TRequest, TResponse> StreamObserver<TRequest> manyToOne(
*/
public static <TRequest, TResponse> StreamObserver<TRequest> manyToMany(
StreamObserver<TResponse> responseObserver,
Function<Flux<TRequest>, Flux<TResponse>> delegate) {
Function<Flux<TRequest>, Flux<TResponse>> delegate,
CallOptions options) {

final int prefetch = ReactorCallOptions.getPrefetch(options);
final int lowTide = ReactorCallOptions.getLowTide(options);

ReactorServerStreamObserverAndPublisher<TRequest> streamObserverPublisher =
new ReactorServerStreamObserverAndPublisher<>((ServerCallStreamObserver<TResponse>) responseObserver, null);
new ReactorServerStreamObserverAndPublisher<>((ServerCallStreamObserver<TResponse>) responseObserver, null, prefetch, lowTide);

try {
Flux<TResponse> rxResponse = Preconditions.checkNotNull(delegate.apply(Flux.from(streamObserverPublisher)));
Expand Down
13 changes: 9 additions & 4 deletions reactor/reactor-grpc/src/main/resources/ReactorStub.mustache
Expand Up @@ -51,7 +51,7 @@ public final class {{className}} {
@java.lang.Deprecated
{{/deprecated}}
public {{#isManyOutput}}reactor.core.publisher.Flux{{/isManyOutput}}{{^isManyOutput}}reactor.core.publisher.Mono{{/isManyOutput}}<{{outputType}}> {{methodName}}({{#isManyInput}}reactor.core.publisher.Flux{{/isManyInput}}{{^isManyInput}}reactor.core.publisher.Mono{{/isManyInput}}<{{inputType}}> reactorRequest) {
return com.salesforce.reactorgrpc.stub.ClientCalls.{{reactiveCallsMethodName}}(reactorRequest, delegateStub::{{methodName}});
return com.salesforce.reactorgrpc.stub.ClientCalls.{{reactiveCallsMethodName}}(reactorRequest, delegateStub::{{methodName}}, getCallOptions());
}

{{/methods}}
Expand All @@ -63,7 +63,7 @@ public final class {{className}} {
@java.lang.Deprecated
{{/deprecated}}
public {{#isManyOutput}}reactor.core.publisher.Flux{{/isManyOutput}}{{^isManyOutput}}reactor.core.publisher.Mono{{/isManyOutput}}<{{outputType}}> {{methodName}}({{inputType}} reactorRequest) {
return com.salesforce.reactorgrpc.stub.ClientCalls.{{reactiveCallsMethodName}}(reactor.core.publisher.Mono.just(reactorRequest), delegateStub::{{methodName}});
return com.salesforce.reactorgrpc.stub.ClientCalls.{{reactiveCallsMethodName}}(reactor.core.publisher.Mono.just(reactorRequest), delegateStub::{{methodName}}, getCallOptions());
}

{{/unaryRequestMethods}}
Expand Down Expand Up @@ -99,10 +99,15 @@ public final class {{className}} {
{{/methods}}
.build();
}

protected io.grpc.CallOptions getCallOptions(int methodId) {
return null;
}

}

{{#methods}}
private static final int METHODID_{{methodNameUpperUnderscore}} = {{methodNumber}};
public static final int METHODID_{{methodNameUpperUnderscore}} = {{methodNumber}};
{{/methods}}

private static final class MethodHandlers<Req, Resp> implements
Expand Down Expand Up @@ -145,7 +150,7 @@ public final class {{className}} {
case METHODID_{{methodNameUpperUnderscore}}:
return (io.grpc.stub.StreamObserver<Req>) com.salesforce.reactorgrpc.stub.ServerCalls.{{reactiveCallsMethodName}}(
(io.grpc.stub.StreamObserver<{{outputType}}>) responseObserver,
serviceImpl::{{methodName}});
serviceImpl::{{methodName}}, serviceImpl.getCallOptions(methodId));
{{/isManyInput}}
{{/methods}}
default:
Expand Down
25 changes: 25 additions & 0 deletions rx-java/README.md
Expand Up @@ -139,6 +139,31 @@ public class RxContextPropagator {
}
}
```

## Configuration of flow control
RX GRPC by default prefetch 512 items on client and server side. When the messages are bigger it
can consume a lot of memory. One can override these default settings using RxCallOptions:

Prefetch on client side (client consumes too slow):

```java
RxMyAPIStub api = RxMyAPIGrpc.newRxStub(channel)
.withOption(RxCallOptions.CALL_OPTIONS_PREFETCH, 16)
.withOption(RxCallOptions.CALL_OPTIONS_LOW_TIDE, 4);
```

Prefetch on server side (server consumes too slow):

```java
// Override getCallOptions method in your *ImplBase service class.
// One can use methodId to do method specific override
@Override
protected CallOptions getCallOptions(int methodId) {
return CallOptions.DEFAULT
.withOption(RxCallOptions.CALL_OPTIONS_PREFETCH, 16)
.withOption(RxCallOptions.CALL_OPTIONS_LOW_TIDE, 4);
}
```

Modules
=======
Expand Down