From a027c6807bfcdafa1e5039ab901a0118a793eb29 Mon Sep 17 00:00:00 2001 From: Robert Roeser Date: Sat, 14 Oct 2017 16:11:35 -0700 Subject: [PATCH] make sure frames are emitted in the correct order --- .../main/java/io/rsocket/RSocketClient.java | 23 +++++++++---------- .../main/java/io/rsocket/RSocketServer.java | 11 ++++++--- 2 files changed, 19 insertions(+), 15 deletions(-) diff --git a/rsocket-core/src/main/java/io/rsocket/RSocketClient.java b/rsocket-core/src/main/java/io/rsocket/RSocketClient.java index a1017e62c..b7ac9642f 100644 --- a/rsocket-core/src/main/java/io/rsocket/RSocketClient.java +++ b/rsocket-core/src/main/java/io/rsocket/RSocketClient.java @@ -16,18 +16,14 @@ package io.rsocket; +import static io.rsocket.util.ExceptionUtil.noStacktrace; + import io.netty.buffer.Unpooled; import io.netty.util.collection.IntObjectHashMap; import io.rsocket.exceptions.ConnectionException; import io.rsocket.exceptions.Exceptions; import io.rsocket.internal.LimitableRequestPublisher; import io.rsocket.util.PayloadImpl; -import org.reactivestreams.Publisher; -import org.reactivestreams.Subscriber; -import reactor.core.Disposable; -import reactor.core.publisher.*; - -import javax.annotation.Nullable; import java.nio.channels.ClosedChannelException; import java.time.Duration; import java.util.Collection; @@ -36,8 +32,11 @@ import java.util.function.Consumer; import java.util.function.Function; import java.util.function.Supplier; - -import static io.rsocket.util.ExceptionUtil.noStacktrace; +import javax.annotation.Nullable; +import org.reactivestreams.Publisher; +import org.reactivestreams.Subscriber; +import reactor.core.Disposable; +import reactor.core.publisher.*; /** Client Side of a RSocket socket. Sends {@link Frame}s to a {@link RSocketServer} */ class RSocketClient implements RSocket { @@ -53,7 +52,7 @@ class RSocketClient implements RSocket { private final IntObjectHashMap> receivers; private final AtomicInteger missedAckCounter; - private final EmitterProcessor sendProcessor; + private final FluxProcessor sendProcessor; private @Nullable Disposable keepAliveSendSub; private volatile long timeLastTickSentMs; @@ -82,7 +81,7 @@ class RSocketClient implements RSocket { // DO NOT Change the order here. The Send processor must be subscribed to before receiving // connections - this.sendProcessor = EmitterProcessor.create(); + this.sendProcessor = EmitterProcessor.create().serialize(); if (!Duration.ZERO.equals(tickPeriod)) { long ackTimeoutMs = ackTimeout.toMillis(); @@ -91,7 +90,7 @@ class RSocketClient implements RSocket { started .thenMany(Flux.interval(tickPeriod)) .doOnSubscribe(s -> timeLastTickSentMs = System.currentTimeMillis()) - .flatMap(i -> sendKeepAlive(ackTimeoutMs, missedAcks)) + .concatMap(i -> sendKeepAlive(ackTimeoutMs, missedAcks)) .doOnError( t -> { errorConsumer.accept(t); @@ -425,7 +424,7 @@ protected void cleanup() { synchronized (RSocketClient.this) { subscribers = receivers.values(); publishers = senders.values(); - + senders.clear(); receivers.clear(); } diff --git a/rsocket-core/src/main/java/io/rsocket/RSocketServer.java b/rsocket-core/src/main/java/io/rsocket/RSocketServer.java index d6138aace..d3481625f 100644 --- a/rsocket-core/src/main/java/io/rsocket/RSocketServer.java +++ b/rsocket-core/src/main/java/io/rsocket/RSocketServer.java @@ -45,7 +45,7 @@ class RSocketServer implements RSocket { private final IntObjectHashMap sendingSubscriptions; private final IntObjectHashMap> channelProcessors; - private final EmitterProcessor sendProcessor; + private final FluxProcessor sendProcessor; private Disposable receiveDisposable; RSocketServer( @@ -58,7 +58,7 @@ class RSocketServer implements RSocket { // DO NOT Change the order here. The Send processor must be subscribed to before receiving // connections - this.sendProcessor = EmitterProcessor.create(); + this.sendProcessor = EmitterProcessor.create().serialize(); connection .send(sendProcessor) @@ -67,7 +67,12 @@ class RSocketServer implements RSocket { .subscribe(); this.receiveDisposable = - connection.receive().flatMap(this::handleFrame).doOnError(errorConsumer).then().subscribe(); + connection + .receive() + .flatMapSequential(this::handleFrame) + .doOnError(errorConsumer) + .then() + .subscribe(); this.connection .onClose()