Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
63 changes: 37 additions & 26 deletions rsocket-core/src/main/java/io/rsocket/RSocketClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,14 +16,19 @@

package io.rsocket;

import static io.rsocket.util.ExceptionUtil.noStacktrace;

import io.netty.buffer.Unpooled;
import io.netty.util.collection.IntObjectHashMap;
import io.rsocket.exceptions.ConnectionException;
import io.rsocket.exceptions.Exceptions;
import io.rsocket.internal.LimitableRequestPublisher;
import io.rsocket.internal.UnboundedProcessor;
import io.rsocket.util.PayloadImpl;
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscriber;
import reactor.core.Disposable;
import reactor.core.publisher.*;

import javax.annotation.Nullable;
import java.nio.channels.ClosedChannelException;
import java.time.Duration;
import java.util.Collection;
Expand All @@ -32,11 +37,8 @@
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.Supplier;
import javax.annotation.Nullable;
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscriber;
import reactor.core.Disposable;
import reactor.core.publisher.*;

import static io.rsocket.util.ExceptionUtil.noStacktrace;

/** Client Side of a RSocket socket. Sends {@link Frame}s to a {@link RSocketServer} */
class RSocketClient implements RSocket {
Expand All @@ -52,7 +54,7 @@ class RSocketClient implements RSocket {
private final IntObjectHashMap<Subscriber<Payload>> receivers;
private final AtomicInteger missedAckCounter;

private final FluxProcessor<Frame, Frame> sendProcessor;
private final UnboundedProcessor<Frame> sendProcessor;

private @Nullable Disposable keepAliveSendSub;
private volatile long timeLastTickSentMs;
Expand Down Expand Up @@ -80,8 +82,7 @@ class RSocketClient implements RSocket {
this.missedAckCounter = new AtomicInteger();

// DO NOT Change the order here. The Send processor must be subscribed to before receiving
// connections
this.sendProcessor = EmitterProcessor.<Frame>create().serialize();
this.sendProcessor = new UnboundedProcessor<>();

if (!Duration.ZERO.equals(tickPeriod)) {
long ackTimeoutMs = ackTimeout.toMillis();
Expand All @@ -98,8 +99,15 @@ class RSocketClient implements RSocket {
})
.subscribe();
}

connection.onClose().doFinally(signalType -> cleanup()).doOnError(errorConsumer).subscribe();

connection
.onClose()
.doFinally(
signalType -> {
cleanup();
})
.doOnError(errorConsumer)
.subscribe();

connection
.send(sendProcessor)
Expand Down Expand Up @@ -205,7 +213,7 @@ public Flux<Payload> requestStream(Payload payload) {

@Override
public Flux<Payload> requestChannel(Publisher<Payload> payloads) {
return handleStreamResponse(Flux.from(payloads), FrameType.REQUEST_CHANNEL);
return handleChannel(Flux.from(payloads), FrameType.REQUEST_CHANNEL);
}

@Override
Expand Down Expand Up @@ -255,6 +263,7 @@ public Flux<Payload> handleRequestStream(final Payload payload) {
} else if (contains(streamId) && !receiver.isTerminated()) {
sendProcessor.onNext(Frame.RequestN.from(streamId, l));
}
sendProcessor.drain();
})
.doOnError(
t -> {
Expand All @@ -268,7 +277,10 @@ public Flux<Payload> handleRequestStream(final Payload payload) {
sendProcessor.onNext(Frame.Cancel.from(streamId));
}
})
.doFinally(s -> removeReceiver(streamId));
.doFinally(
s -> {
removeReceiver(streamId);
});
}));
}

Expand All @@ -291,11 +303,14 @@ private Mono<Payload> handleRequestResponse(final Payload payload) {
return receiver
.doOnError(t -> sendProcessor.onNext(Frame.Error.from(streamId, t)))
.doOnCancel(() -> sendProcessor.onNext(Frame.Cancel.from(streamId)))
.doFinally(s -> removeReceiver(streamId));
.doFinally(
s -> {
removeReceiver(streamId);
});
}));
}

private Flux<Payload> handleStreamResponse(Flux<Payload> request, FrameType requestType) {
private Flux<Payload> handleChannel(Flux<Payload> request, FrameType requestType) {
return started.thenMany(
Flux.defer(
new Supplier<Flux<Payload>>() {
Expand Down Expand Up @@ -328,6 +343,7 @@ public Flux<Payload> get() {
}

if (_firstRequest) {
AtomicBoolean firstPayload = new AtomicBoolean(true);
Flux<Frame> requestFrames =
request
.transform(
Expand All @@ -345,19 +361,10 @@ public Flux<Payload> get() {
})
.map(
new Function<Payload, Frame>() {
boolean firstPayload = true;

@Override
public Frame apply(Payload payload) {
boolean _firstPayload = false;
synchronized (this) {
if (firstPayload) {
firstPayload = false;
_firstPayload = true;
}
}

if (_firstPayload) {
if (firstPayload.compareAndSet(true, false)) {
return Frame.Request.from(
streamId, requestType, payload, l);
} else {
Expand All @@ -372,6 +379,9 @@ public Frame apply(Payload payload) {
sendOneFrame(
Frame.PayloadFrame.from(
streamId, FrameType.COMPLETE));
if (firstPayload.get()) {
receiver.onComplete();
}
}
});

Expand Down Expand Up @@ -522,6 +532,7 @@ private void handleFrame(int streamId, FrameType type, Frame frame) {
if (sender != null) {
int n = Frame.RequestN.requestN(frame);
sender.increaseRequestLimit(n);
sendProcessor.drain();
}
break;
}
Expand Down
30 changes: 19 additions & 11 deletions rsocket-core/src/main/java/io/rsocket/RSocketServer.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,24 +16,29 @@

package io.rsocket;

import static io.rsocket.Frame.Request.initialRequestN;
import static io.rsocket.frame.FrameHeaderFlyweight.FLAGS_C;
import static io.rsocket.frame.FrameHeaderFlyweight.FLAGS_M;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.util.collection.IntObjectHashMap;
import io.rsocket.exceptions.ApplicationException;
import io.rsocket.internal.LimitableRequestPublisher;
import io.rsocket.internal.UnboundedProcessor;
import io.rsocket.util.PayloadImpl;
import java.util.Collection;
import java.util.function.Consumer;
import javax.annotation.Nullable;
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
import reactor.core.Disposable;
import reactor.core.publisher.*;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.publisher.SignalType;
import reactor.core.publisher.UnicastProcessor;

import javax.annotation.Nullable;
import java.util.Collection;
import java.util.function.Consumer;

import static io.rsocket.Frame.Request.initialRequestN;
import static io.rsocket.frame.FrameHeaderFlyweight.FLAGS_C;
import static io.rsocket.frame.FrameHeaderFlyweight.FLAGS_M;

/** Server side RSocket. Receives {@link Frame}s from a {@link RSocketClient} */
class RSocketServer implements RSocket {
Expand All @@ -45,7 +50,7 @@ class RSocketServer implements RSocket {
private final IntObjectHashMap<Subscription> sendingSubscriptions;
private final IntObjectHashMap<UnicastProcessor<Payload>> channelProcessors;

private final FluxProcessor<Frame, Frame> sendProcessor;
private final UnboundedProcessor<Frame> sendProcessor;
private Disposable receiveDisposable;

RSocketServer(
Expand All @@ -58,7 +63,7 @@ class RSocketServer implements RSocket {

// DO NOT Change the order here. The Send processor must be subscribed to before receiving
// connections
this.sendProcessor = EmitterProcessor.<Frame>create().serialize();
this.sendProcessor = new UnboundedProcessor<>();

connection
.send(sendProcessor)
Expand Down Expand Up @@ -302,7 +307,10 @@ private Mono<Void> handleRequestResponse(int streamId, Mono<Payload> response) {
.doOnError(errorConsumer)
.onErrorResume(t -> Mono.just(Frame.Error.from(streamId, t)))
.doOnNext(sendProcessor::onNext)
.doFinally(signalType -> removeSubscription(streamId))
.doFinally(
signalType -> {
removeSubscription(streamId);
})
.then();
}

Expand Down
Loading