Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion gradle.properties
Original file line number Diff line number Diff line change
Expand Up @@ -12,4 +12,4 @@
# limitations under the License.
#

version=0.11.13.BUILD-SNAPSHOT
version=0.11.14.BUILD-SNAPSHOT
21 changes: 13 additions & 8 deletions rsocket-core/src/main/java/io/rsocket/RSocketClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,22 +16,27 @@

package io.rsocket;

import io.netty.util.collection.IntObjectHashMap;
import io.rsocket.exceptions.ConnectionErrorException;
import io.rsocket.exceptions.Exceptions;
import io.rsocket.framing.FrameType;
import io.rsocket.internal.LimitableRequestPublisher;
import io.rsocket.internal.UnboundedProcessor;
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscriber;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.publisher.SignalType;
import reactor.core.publisher.UnicastProcessor;

import java.nio.channels.ClosedChannelException;
import java.time.Duration;
import java.util.Collections;
import java.util.Map;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;
import java.util.function.Function;
import org.jctools.maps.NonBlockingHashMapLong;
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscriber;
import reactor.core.publisher.*;

/** Client Side of a RSocket socket. Sends {@link Frame}s to a {@link RSocketServer} */
class RSocketClient implements RSocket {
Expand All @@ -40,8 +45,8 @@ class RSocketClient implements RSocket {
private final Function<Frame, ? extends Payload> frameDecoder;
private final Consumer<Throwable> errorConsumer;
private final StreamIdSupplier streamIdSupplier;
private final NonBlockingHashMapLong<LimitableRequestPublisher> senders;
Copy link
Member

@rdegnan rdegnan Nov 20, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think handleSendProcessorError and handleSendProcessorCancel and terminate should now be synchronized as they iterate over senders/receivers (previously this wasn't necessary because of Cliff Click magic).

private final NonBlockingHashMapLong<UnicastProcessor<Payload>> receivers;
private final Map<Integer, LimitableRequestPublisher> senders;
private final Map<Integer, UnicastProcessor<Payload>> receivers;
private final UnboundedProcessor<Frame> sendProcessor;
private KeepAliveHandler keepAliveHandler;
private final Lifecycle lifecycle = new Lifecycle();
Expand Down Expand Up @@ -69,8 +74,8 @@ class RSocketClient implements RSocket {
this.frameDecoder = frameDecoder;
this.errorConsumer = errorConsumer;
this.streamIdSupplier = streamIdSupplier;
this.senders = new NonBlockingHashMapLong<>(256);
this.receivers = new NonBlockingHashMapLong<>(256);
this.senders = Collections.synchronizedMap(new IntObjectHashMap<>());
this.receivers = Collections.synchronizedMap(new IntObjectHashMap<>());

// DO NOT Change the order here. The Send processor must be subscribed to before receiving
this.sendProcessor = new UnboundedProcessor<>();
Expand Down
30 changes: 18 additions & 12 deletions rsocket-core/src/main/java/io/rsocket/RSocketServer.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,23 +16,29 @@

package io.rsocket;

import static io.rsocket.Frame.Request.initialRequestN;
import static io.rsocket.frame.FrameHeaderFlyweight.FLAGS_C;
import static io.rsocket.frame.FrameHeaderFlyweight.FLAGS_M;

import io.netty.util.collection.IntObjectHashMap;
import io.rsocket.exceptions.ApplicationErrorException;
import io.rsocket.exceptions.ConnectionErrorException;
import io.rsocket.framing.FrameType;
import io.rsocket.internal.LimitableRequestPublisher;
import io.rsocket.internal.UnboundedProcessor;
import java.util.function.Consumer;
import java.util.function.Function;
import org.jctools.maps.NonBlockingHashMapLong;
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
import reactor.core.Disposable;
import reactor.core.publisher.*;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.publisher.SignalType;
import reactor.core.publisher.UnicastProcessor;

import java.util.Collections;
import java.util.Map;
import java.util.function.Consumer;
import java.util.function.Function;

import static io.rsocket.Frame.Request.initialRequestN;
import static io.rsocket.frame.FrameHeaderFlyweight.FLAGS_C;
import static io.rsocket.frame.FrameHeaderFlyweight.FLAGS_M;

/** Server side RSocket. Receives {@link Frame}s from a {@link RSocketClient} */
class RSocketServer implements RSocket {
Expand All @@ -42,8 +48,8 @@ class RSocketServer implements RSocket {
private final Function<Frame, ? extends Payload> frameDecoder;
private final Consumer<Throwable> errorConsumer;

private final NonBlockingHashMapLong<Subscription> sendingSubscriptions;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think handleSendProcessorError and handleSendProcessorCancel should now be synchronized as they iterate over sendingSubscriptions/channelProcessors (previously this wasn't necessary because of Cliff Click magic).

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wrapped the map with Collections.synchronizedMap so it should effectively be doing that.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Collections.synchronizedMap doesn't actually help when you iterate over any of its collection views -- per the documentation:

 * It is imperative that the user manually synchronize on the returned
 * map when iterating over any of its collection views:
 * <pre>
 *  Map m = Collections.synchronizedMap(new HashMap());
 *      ...
 *  Set s = m.keySet();  // Needn't be in synchronized block
 *      ...
 *  synchronized (m) {  // Synchronizing on m, not s!
 *      Iterator i = s.iterator(); // Must be in synchronized block
 *      while (i.hasNext())
 *          foo(i.next());
 *  }
 * </pre>
 * Failure to follow this advice may result in non-deterministic behavior.

If we use forEach everywhere instead of a for loop that would also be fine.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I looked at the code again. I think we are fine - the problem would be if something adds something to map while we're iterating over it. Looking at the non blocking map the same thing would happen. We should probably add something to the request* methods so that you can't add items to the map when handling a cancel or an error, and then this isn't a problem.

private final NonBlockingHashMapLong<UnicastProcessor<Payload>> channelProcessors;
private final Map<Integer, Subscription> sendingSubscriptions;
private final Map<Integer, UnicastProcessor<Payload>> channelProcessors;

private final UnboundedProcessor<Frame> sendProcessor;
private KeepAliveHandler keepAliveHandler;
Expand All @@ -69,8 +75,8 @@ class RSocketServer implements RSocket {
this.requestHandler = requestHandler;
this.frameDecoder = frameDecoder;
this.errorConsumer = errorConsumer;
this.sendingSubscriptions = new NonBlockingHashMapLong<>();
this.channelProcessors = new NonBlockingHashMapLong<>();
this.sendingSubscriptions = Collections.synchronizedMap(new IntObjectHashMap<>());
this.channelProcessors = Collections.synchronizedMap(new IntObjectHashMap<>());

// DO NOT Change the order here. The Send processor must be subscribed to before receiving
// connections
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,6 @@
package io.rsocket.internal;

import io.netty.util.ReferenceCountUtil;
import java.util.Objects;
import java.util.Queue;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
import java.util.concurrent.atomic.AtomicLongFieldUpdater;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
import reactor.core.CoreSubscriber;
Expand All @@ -32,6 +28,11 @@
import reactor.util.concurrent.Queues;
import reactor.util.context.Context;

import java.util.Objects;
import java.util.Queue;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
import java.util.concurrent.atomic.AtomicLongFieldUpdater;

/**
* A Processor implementation that takes a custom queue and allows only a single subscriber.
*
Expand Down
14 changes: 11 additions & 3 deletions rsocket-test/src/main/java/io/rsocket/test/TransportTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import reactor.core.Disposable;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Schedulers;
import reactor.test.StepVerifier;

public interface TransportTest {
Expand Down Expand Up @@ -164,16 +165,23 @@ default void requestChannel3() {
.expectComplete()
.verify(getTimeout());
}

@DisplayName("makes 1 requestChannel request with 512 payloads")
@Test
default void requestChannel512() {
Flux<Payload> payloads = Flux.range(0, 512).map(this::createTestPayload);


Flux.range(0, 1024)
.flatMap(v -> Mono.fromRunnable(()-> check(payloads)).subscribeOn(Schedulers.elastic()), 12)
.blockLast();
}

default void check(Flux<Payload> payloads) {
getClient()
.requestChannel(payloads)
.as(StepVerifier::create)
.expectNextCount(512)
.as("expected 512 items")
.expectComplete()
.verify(getTimeout());
}
Expand Down Expand Up @@ -233,7 +241,7 @@ default void requestStream10_000() {
.expectComplete()
.verify(getTimeout());
}

@DisplayName("makes 1 requestStream request and receives 5 responses")
@Test
default void requestStream5() {
Expand Down
Loading