Skip to content

Commit

Permalink
provides ordered stream id issuing
Browse files Browse the repository at this point in the history
Signed-off-by: Oleh Dokuka <shadowgun@i.ua>
  • Loading branch information
OlegDokuka committed May 4, 2020
1 parent 12fd301 commit 7461326
Show file tree
Hide file tree
Showing 10 changed files with 292 additions and 138 deletions.
Expand Up @@ -42,6 +42,7 @@
import java.util.function.Supplier;
import reactor.core.Disposable;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Schedulers;
import reactor.util.retry.Retry;

public class RSocketConnector {
Expand Down Expand Up @@ -293,7 +294,8 @@ public Mono<RSocket> connect(Supplier<ClientTransport> transportSupplier) {
(int) keepAliveInterval.toMillis(),
(int) keepAliveMaxLifeTime.toMillis(),
keepAliveHandler,
requesterLeaseHandler);
requesterLeaseHandler,
Schedulers.single(Schedulers.parallel()));

RSocket wrappedRSocketRequester = interceptors.initRequester(rSocketRequester);

Expand Down
248 changes: 130 additions & 118 deletions rsocket-core/src/main/java/io/rsocket/core/RSocketRequester.java
Expand Up @@ -67,6 +67,7 @@
import reactor.core.publisher.Operators;
import reactor.core.publisher.SignalType;
import reactor.core.publisher.UnicastProcessor;
import reactor.core.scheduler.Scheduler;
import reactor.util.concurrent.Queues;

/**
Expand Down Expand Up @@ -105,6 +106,7 @@ class RSocketRequester implements RSocket {
private final KeepAliveFramesAcceptor keepAliveFramesAcceptor;
private volatile Throwable terminationError;
private final MonoProcessor<Void> onClose;
private final Scheduler serialScheduler;

RSocketRequester(
DuplexConnection connection,
Expand All @@ -115,7 +117,8 @@ class RSocketRequester implements RSocket {
int keepAliveTickPeriod,
int keepAliveAckTimeout,
@Nullable KeepAliveHandler keepAliveHandler,
RequesterLeaseHandler leaseHandler) {
RequesterLeaseHandler leaseHandler,
Scheduler serialScheduler) {
this.connection = connection;
this.allocator = connection.alloc();
this.payloadDecoder = payloadDecoder;
Expand All @@ -126,6 +129,7 @@ class RSocketRequester implements RSocket {
this.senders = new SynchronizedIntObjectHashMap<>();
this.receivers = new SynchronizedIntObjectHashMap<>();
this.onClose = MonoProcessor.create();
this.serialScheduler = serialScheduler;

// DO NOT Change the order here. The Send processor must be subscribed to before receiving
this.sendProcessor = new UnboundedProcessor<>();
Expand Down Expand Up @@ -208,22 +212,23 @@ private Mono<Void> handleFireAndForget(Payload payload) {

final AtomicBoolean once = new AtomicBoolean();

return Mono.defer(
() -> {
if (once.getAndSet(true)) {
return Mono.error(
new IllegalStateException("FireAndForgetMono allows only a single subscriber"));
}
return Mono.<Void>defer(
() -> {
if (once.getAndSet(true)) {
return Mono.error(
new IllegalStateException("FireAndForgetMono allows only a single subscriber"));
}

final int streamId = streamIdSupplier.nextStreamId(receivers);
final ByteBuf requestFrame =
RequestFireAndForgetFrameFlyweight.encodeReleasingPayload(
allocator, streamId, payload);
final int streamId = streamIdSupplier.nextStreamId(receivers);
final ByteBuf requestFrame =
RequestFireAndForgetFrameFlyweight.encodeReleasingPayload(
allocator, streamId, payload);

sendProcessor.onNext(requestFrame);
sendProcessor.onNext(requestFrame);

return Mono.empty();
});
return Mono.empty();
})
.subscribeOn(serialScheduler);
}

private Mono<Payload> handleRequestResponse(final Payload payload) {
Expand Down Expand Up @@ -284,6 +289,7 @@ public void hookOnTerminal(SignalType signalType) {
receivers.remove(streamId, receiver);
}
}))
.subscribeOn(serialScheduler)
.doOnDiscard(ReferenceCounted.class, DROPPED_ELEMENTS_CONSUMER);
});
}
Expand Down Expand Up @@ -356,6 +362,7 @@ void hookOnTerminal(SignalType signalType) {
receivers.remove(streamId);
}
}))
.subscribeOn(serialScheduler, false)
.doOnDiscard(ReferenceCounted.class, DROPPED_ELEMENTS_CONSUMER);
});
}
Expand Down Expand Up @@ -392,120 +399,125 @@ private Flux<? extends Payload> handleChannel(Payload initialPayload, Flux<Paylo

final UnicastProcessor<Payload> receiver = UnicastProcessor.create();

return receiver.transform(
Operators.<Payload, Payload>lift(
(s, actual) ->
new RequestOperator(actual) {
return receiver
.transform(
Operators.<Payload, Payload>lift(
(s, actual) ->
new RequestOperator(actual) {

final BaseSubscriber<Payload> upstreamSubscriber =
new BaseSubscriber<Payload>() {
final BaseSubscriber<Payload> upstreamSubscriber =
new BaseSubscriber<Payload>() {

boolean first = true;
boolean first = true;

@Override
protected void hookOnSubscribe(Subscription subscription) {
// noops
}
@Override
protected void hookOnSubscribe(Subscription subscription) {
// noops
}

@Override
protected void hookOnNext(Payload payload) {
if (first) {
// need to skip first since we have already sent it
// no need to release it since it was released earlier on the request
// establishment
// phase
first = false;
request(1);
return;
}
if (!PayloadValidationUtils.isValid(mtu, payload)) {
payload.release();
cancel();
final IllegalArgumentException t =
new IllegalArgumentException(INVALID_PAYLOAD_ERROR_MESSAGE);
errorConsumer.accept(t);
// no need to send any errors.
sendProcessor.onNext(CancelFrameFlyweight.encode(allocator, streamId));
receiver.onError(t);
return;
}
final ByteBuf frame =
PayloadFrameFlyweight.encodeNextReleasingPayload(
allocator, streamId, payload);

sendProcessor.onNext(frame);
}
@Override
protected void hookOnNext(Payload payload) {
if (first) {
// need to skip first since we have already sent it
// no need to release it since it was released earlier on the
// request
// establishment
// phase
first = false;
request(1);
return;
}
if (!PayloadValidationUtils.isValid(mtu, payload)) {
payload.release();
cancel();
final IllegalArgumentException t =
new IllegalArgumentException(INVALID_PAYLOAD_ERROR_MESSAGE);
errorConsumer.accept(t);
// no need to send any errors.
sendProcessor.onNext(
CancelFrameFlyweight.encode(allocator, streamId));
receiver.onError(t);
return;
}
final ByteBuf frame =
PayloadFrameFlyweight.encodeNextReleasingPayload(
allocator, streamId, payload);

sendProcessor.onNext(frame);
}

@Override
protected void hookOnComplete() {
ByteBuf frame =
PayloadFrameFlyweight.encodeComplete(allocator, streamId);
sendProcessor.onNext(frame);
}

@Override
protected void hookOnError(Throwable t) {
ByteBuf frame = ErrorFrameFlyweight.encode(allocator, streamId, t);
sendProcessor.onNext(frame);
receiver.onError(t);
}

@Override
protected void hookOnComplete() {
ByteBuf frame = PayloadFrameFlyweight.encodeComplete(allocator, streamId);
sendProcessor.onNext(frame);
@Override
protected void hookFinally(SignalType type) {
senders.remove(streamId, this);
}
};

@Override
void hookOnFirstRequest(long n) {
final int streamId = streamIdSupplier.nextStreamId(receivers);
this.streamId = streamId;

final ByteBuf frame =
RequestChannelFrameFlyweight.encodeReleasingPayload(
allocator, streamId, false, n, initialPayload);

senders.put(streamId, upstreamSubscriber);
receivers.put(streamId, receiver);

inboundFlux
.limitRate(Queues.SMALL_BUFFER_SIZE)
.doOnDiscard(ReferenceCounted.class, DROPPED_ELEMENTS_CONSUMER)
.subscribe(upstreamSubscriber);

sendProcessor.onNext(frame);
}

@Override
void hookOnRemainingRequests(long n) {
if (receiver.isDisposed()) {
return;
}

@Override
protected void hookOnError(Throwable t) {
ByteBuf frame = ErrorFrameFlyweight.encode(allocator, streamId, t);
sendProcessor.onNext(frame);
receiver.onError(t);
sendProcessor.onNext(RequestNFrameFlyweight.encode(allocator, streamId, n));
}

@Override
void hookOnCancel() {
senders.remove(streamId, upstreamSubscriber);
if (receivers.remove(streamId, receiver)) {
sendProcessor.onNext(CancelFrameFlyweight.encode(allocator, streamId));
}
}

@Override
protected void hookFinally(SignalType type) {
senders.remove(streamId, this);
@Override
void hookOnTerminal(SignalType signalType) {
if (signalType == SignalType.ON_ERROR) {
upstreamSubscriber.cancel();
}
};

@Override
void hookOnFirstRequest(long n) {
final int streamId = streamIdSupplier.nextStreamId(receivers);
this.streamId = streamId;

final ByteBuf frame =
RequestChannelFrameFlyweight.encodeReleasingPayload(
allocator, streamId, false, n, initialPayload);

senders.put(streamId, upstreamSubscriber);
receivers.put(streamId, receiver);

inboundFlux
.limitRate(Queues.SMALL_BUFFER_SIZE)
.doOnDiscard(ReferenceCounted.class, DROPPED_ELEMENTS_CONSUMER)
.subscribe(upstreamSubscriber);

sendProcessor.onNext(frame);
}

@Override
void hookOnRemainingRequests(long n) {
if (receiver.isDisposed()) {
return;
}

sendProcessor.onNext(RequestNFrameFlyweight.encode(allocator, streamId, n));
}

@Override
void hookOnCancel() {
senders.remove(streamId, upstreamSubscriber);
if (receivers.remove(streamId, receiver)) {
sendProcessor.onNext(CancelFrameFlyweight.encode(allocator, streamId));
}
}

@Override
void hookOnTerminal(SignalType signalType) {
if (signalType == SignalType.ON_ERROR) {
upstreamSubscriber.cancel();
}
receivers.remove(streamId, receiver);
}

@Override
public void cancel() {
upstreamSubscriber.cancel();
super.cancel();
}
}));
receivers.remove(streamId, receiver);
}

@Override
public void cancel() {
upstreamSubscriber.cancel();
super.cancel();
}
}))
.subscribeOn(serialScheduler, false);
}

private Mono<Void> handleMetadataPush(Payload payload) {
Expand Down
Expand Up @@ -39,6 +39,7 @@
import java.util.function.Consumer;
import java.util.function.Supplier;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Schedulers;

public final class RSocketServer {
private static final String SERVER_TAG = "server";
Expand Down Expand Up @@ -222,7 +223,8 @@ private Mono<Void> acceptSetup(
setupPayload.keepAliveInterval(),
setupPayload.keepAliveMaxLifetime(),
keepAliveHandler,
requesterLeaseHandler);
requesterLeaseHandler,
Schedulers.single(Schedulers.parallel()));

RSocket wrappedRSocketRequester = interceptors.initRequester(rSocketRequester);

Expand Down

0 comments on commit 7461326

Please sign in to comment.