Skip to content

Commit

Permalink
provides ordered stream id issuing
Browse files Browse the repository at this point in the history
Signed-off-by: Oleh Dokuka <shadowgun@i.ua>
  • Loading branch information
OlegDokuka committed May 2, 2020
1 parent 12fd301 commit 98bbcd6
Show file tree
Hide file tree
Showing 13 changed files with 257 additions and 119 deletions.
9 changes: 9 additions & 0 deletions rsocket-core/src/main/java/io/rsocket/DuplexConnection.java
Expand Up @@ -23,6 +23,7 @@
import org.reactivestreams.Subscriber;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Scheduler;

/** Represents a connection with input/output that the protocol uses. */
public interface DuplexConnection extends Availability, Closeable {
Expand Down Expand Up @@ -86,6 +87,14 @@ default Mono<Void> sendOne(ByteBuf frame) {
*/
ByteBufAllocator alloc();

/**
* Returns associated to this connection {@link Scheduler} that will process all submitted tasks
* in an ordered / serial fashion.
*
* @return events' ordered {@link Scheduler}
*/
Scheduler eventLoopScheduler();

@Override
default double availability() {
return isDisposed() ? 0.0 : 1.0;
Expand Down
242 changes: 125 additions & 117 deletions rsocket-core/src/main/java/io/rsocket/core/RSocketRequester.java
Expand Up @@ -208,22 +208,23 @@ private Mono<Void> handleFireAndForget(Payload payload) {

final AtomicBoolean once = new AtomicBoolean();

return Mono.defer(
() -> {
if (once.getAndSet(true)) {
return Mono.error(
new IllegalStateException("FireAndForgetMono allows only a single subscriber"));
}
return Mono.<Void>defer(
() -> {
if (once.getAndSet(true)) {
return Mono.error(
new IllegalStateException("FireAndForgetMono allows only a single subscriber"));
}

final int streamId = streamIdSupplier.nextStreamId(receivers);
final ByteBuf requestFrame =
RequestFireAndForgetFrameFlyweight.encodeReleasingPayload(
allocator, streamId, payload);
final int streamId = streamIdSupplier.nextStreamId(receivers);
final ByteBuf requestFrame =
RequestFireAndForgetFrameFlyweight.encodeReleasingPayload(
allocator, streamId, payload);

sendProcessor.onNext(requestFrame);
sendProcessor.onNext(requestFrame);

return Mono.empty();
});
return Mono.empty();
})
.subscribeOn(connection.eventLoopScheduler());
}

private Mono<Payload> handleRequestResponse(final Payload payload) {
Expand Down Expand Up @@ -284,6 +285,7 @@ public void hookOnTerminal(SignalType signalType) {
receivers.remove(streamId, receiver);
}
}))
.subscribeOn(connection.eventLoopScheduler())
.doOnDiscard(ReferenceCounted.class, DROPPED_ELEMENTS_CONSUMER);
});
}
Expand Down Expand Up @@ -356,6 +358,7 @@ void hookOnTerminal(SignalType signalType) {
receivers.remove(streamId);
}
}))
.subscribeOn(connection.eventLoopScheduler())
.doOnDiscard(ReferenceCounted.class, DROPPED_ELEMENTS_CONSUMER);
});
}
Expand Down Expand Up @@ -392,120 +395,125 @@ private Flux<? extends Payload> handleChannel(Payload initialPayload, Flux<Paylo

final UnicastProcessor<Payload> receiver = UnicastProcessor.create();

return receiver.transform(
Operators.<Payload, Payload>lift(
(s, actual) ->
new RequestOperator(actual) {
return receiver
.transform(
Operators.<Payload, Payload>lift(
(s, actual) ->
new RequestOperator(actual) {

final BaseSubscriber<Payload> upstreamSubscriber =
new BaseSubscriber<Payload>() {
final BaseSubscriber<Payload> upstreamSubscriber =
new BaseSubscriber<Payload>() {

boolean first = true;
boolean first = true;

@Override
protected void hookOnSubscribe(Subscription subscription) {
// noops
}
@Override
protected void hookOnSubscribe(Subscription subscription) {
// noops
}

@Override
protected void hookOnNext(Payload payload) {
if (first) {
// need to skip first since we have already sent it
// no need to release it since it was released earlier on the request
// establishment
// phase
first = false;
request(1);
return;
}
if (!PayloadValidationUtils.isValid(mtu, payload)) {
payload.release();
cancel();
final IllegalArgumentException t =
new IllegalArgumentException(INVALID_PAYLOAD_ERROR_MESSAGE);
errorConsumer.accept(t);
// no need to send any errors.
sendProcessor.onNext(CancelFrameFlyweight.encode(allocator, streamId));
receiver.onError(t);
return;
}
final ByteBuf frame =
PayloadFrameFlyweight.encodeNextReleasingPayload(
allocator, streamId, payload);

sendProcessor.onNext(frame);
}
@Override
protected void hookOnNext(Payload payload) {
if (first) {
// need to skip first since we have already sent it
// no need to release it since it was released earlier on the
// request
// establishment
// phase
first = false;
request(1);
return;
}
if (!PayloadValidationUtils.isValid(mtu, payload)) {
payload.release();
cancel();
final IllegalArgumentException t =
new IllegalArgumentException(INVALID_PAYLOAD_ERROR_MESSAGE);
errorConsumer.accept(t);
// no need to send any errors.
sendProcessor.onNext(
CancelFrameFlyweight.encode(allocator, streamId));
receiver.onError(t);
return;
}
final ByteBuf frame =
PayloadFrameFlyweight.encodeNextReleasingPayload(
allocator, streamId, payload);

sendProcessor.onNext(frame);
}

@Override
protected void hookOnComplete() {
ByteBuf frame =
PayloadFrameFlyweight.encodeComplete(allocator, streamId);
sendProcessor.onNext(frame);
}

@Override
protected void hookOnError(Throwable t) {
ByteBuf frame = ErrorFrameFlyweight.encode(allocator, streamId, t);
sendProcessor.onNext(frame);
receiver.onError(t);
}

@Override
protected void hookOnComplete() {
ByteBuf frame = PayloadFrameFlyweight.encodeComplete(allocator, streamId);
sendProcessor.onNext(frame);
@Override
protected void hookFinally(SignalType type) {
senders.remove(streamId, this);
}
};

@Override
void hookOnFirstRequest(long n) {
final int streamId = streamIdSupplier.nextStreamId(receivers);
this.streamId = streamId;

final ByteBuf frame =
RequestChannelFrameFlyweight.encodeReleasingPayload(
allocator, streamId, false, n, initialPayload);

senders.put(streamId, upstreamSubscriber);
receivers.put(streamId, receiver);

inboundFlux
.limitRate(Queues.SMALL_BUFFER_SIZE)
.doOnDiscard(ReferenceCounted.class, DROPPED_ELEMENTS_CONSUMER)
.subscribe(upstreamSubscriber);

sendProcessor.onNext(frame);
}

@Override
void hookOnRemainingRequests(long n) {
if (receiver.isDisposed()) {
return;
}

@Override
protected void hookOnError(Throwable t) {
ByteBuf frame = ErrorFrameFlyweight.encode(allocator, streamId, t);
sendProcessor.onNext(frame);
receiver.onError(t);
sendProcessor.onNext(RequestNFrameFlyweight.encode(allocator, streamId, n));
}

@Override
void hookOnCancel() {
senders.remove(streamId, upstreamSubscriber);
if (receivers.remove(streamId, receiver)) {
sendProcessor.onNext(CancelFrameFlyweight.encode(allocator, streamId));
}
}

@Override
protected void hookFinally(SignalType type) {
senders.remove(streamId, this);
@Override
void hookOnTerminal(SignalType signalType) {
if (signalType == SignalType.ON_ERROR) {
upstreamSubscriber.cancel();
}
};

@Override
void hookOnFirstRequest(long n) {
final int streamId = streamIdSupplier.nextStreamId(receivers);
this.streamId = streamId;

final ByteBuf frame =
RequestChannelFrameFlyweight.encodeReleasingPayload(
allocator, streamId, false, n, initialPayload);

senders.put(streamId, upstreamSubscriber);
receivers.put(streamId, receiver);

inboundFlux
.limitRate(Queues.SMALL_BUFFER_SIZE)
.doOnDiscard(ReferenceCounted.class, DROPPED_ELEMENTS_CONSUMER)
.subscribe(upstreamSubscriber);

sendProcessor.onNext(frame);
}

@Override
void hookOnRemainingRequests(long n) {
if (receiver.isDisposed()) {
return;
}

sendProcessor.onNext(RequestNFrameFlyweight.encode(allocator, streamId, n));
}

@Override
void hookOnCancel() {
senders.remove(streamId, upstreamSubscriber);
if (receivers.remove(streamId, receiver)) {
sendProcessor.onNext(CancelFrameFlyweight.encode(allocator, streamId));
}
}

@Override
void hookOnTerminal(SignalType signalType) {
if (signalType == SignalType.ON_ERROR) {
upstreamSubscriber.cancel();
}
receivers.remove(streamId, receiver);
}

@Override
public void cancel() {
upstreamSubscriber.cancel();
super.cancel();
}
}));
receivers.remove(streamId, receiver);
}

@Override
public void cancel() {
upstreamSubscriber.cancel();
super.cancel();
}
}))
.subscribeOn(connection.eventLoopScheduler());
}

private Mono<Void> handleMetadataPush(Payload payload) {
Expand Down
Expand Up @@ -24,6 +24,7 @@
import org.reactivestreams.Publisher;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Scheduler;

/**
* A {@link DuplexConnection} implementation that reassembles {@link ByteBuf}s.
Expand Down Expand Up @@ -75,6 +76,11 @@ public Flux<ByteBuf> receive() {
});
}

@Override
public Scheduler eventLoopScheduler() {
return delegate.eventLoopScheduler();
}

@Override
public ByteBufAllocator alloc() {
return delegate.alloc();
Expand Down
Expand Up @@ -30,6 +30,7 @@
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.publisher.MonoProcessor;
import reactor.core.scheduler.Scheduler;

/**
* {@link DuplexConnection#receive()} is a single stream on which the following type of frames
Expand Down Expand Up @@ -202,6 +203,11 @@ public Flux<ByteBuf> receive() {
}));
}

@Override
public Scheduler eventLoopScheduler() {
return source.eventLoopScheduler();
}

@Override
public ByteBufAllocator alloc() {
return source.alloc();
Expand Down
Expand Up @@ -33,6 +33,7 @@
import reactor.core.Disposable;
import reactor.core.Disposables;
import reactor.core.publisher.*;
import reactor.core.scheduler.Scheduler;
import reactor.util.concurrent.Queues;

public class ResumableDuplexConnection implements DuplexConnection, ResumeStateHolder {
Expand Down Expand Up @@ -106,6 +107,11 @@ public ResumableDuplexConnection(
reconnect(duplexConnection);
}

@Override
public Scheduler eventLoopScheduler() {
return curConnection.eventLoopScheduler();
}

@Override
public ByteBufAllocator alloc() {
return curConnection.alloc();
Expand Down

0 comments on commit 98bbcd6

Please sign in to comment.