Skip to content

Conversation

@robertroeser
Copy link
Member

@robertroeser robertroeser commented Nov 19, 2018

Changes the RScocket client and server to use the a synchronized version of the Agrona Int2ObjectHashMap. The NonBlockingHashMapLong was leading to GC pauses.

Uses SendPublisher that writes to the Netty channel directly - on my laptop the performance increase is approx 200% throughput improvement, with better tail latency on the Ping/Pong test as well our internal code that relies on RSocket.

kbahr and others added 4 commits November 19, 2018 16:34
Signed-off-by: Robert Roeser <rroeserr@gmail.com>
* switching frame type flag back to bits so we don't have to traverse an array when looking up flags

Signed-off-by: Robert Roeser <rroeserr@gmail.com>

* jmh test

Signed-off-by: Robert Roeser <rroeserr@gmail.com>

* Added an EMPTY flag

Signed-off-by: Robert Roeser <rroeserr@gmail.com>
Signed-off-by: Robert Roeser <rroeserr@gmail.com>
…RSocket core protocol

Signed-off-by: Robert Roeser <rroeserr@gmail.com>
Signed-off-by: Robert Roeser <rroeserr@gmail.com>
@yschimke
Copy link
Member

LGTM

I keep expecting the use of libraries like reactor to mean that the engine of RSocket is this shiny simple "only one way to logically write this" chrome engine. I'm still getting surprised that we end up writing so much stream infra code. In this case, things like SendPublisher.

Copy link
Member

@mostroverkhov mostroverkhov left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

Copy link
Member

@rdegnan rdegnan left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM other than a few methods that should be synchronized -- I don't think that will make much difference to performance.

private final Function<Frame, ? extends Payload> frameDecoder;
private final Consumer<Throwable> errorConsumer;

private final NonBlockingHashMapLong<Subscription> sendingSubscriptions;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think handleSendProcessorError and handleSendProcessorCancel should now be synchronized as they iterate over sendingSubscriptions/channelProcessors (previously this wasn't necessary because of Cliff Click magic).

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wrapped the map with Collections.synchronizedMap so it should effectively be doing that.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Collections.synchronizedMap doesn't actually help when you iterate over any of its collection views -- per the documentation:

 * It is imperative that the user manually synchronize on the returned
 * map when iterating over any of its collection views:
 * <pre>
 *  Map m = Collections.synchronizedMap(new HashMap());
 *      ...
 *  Set s = m.keySet();  // Needn't be in synchronized block
 *      ...
 *  synchronized (m) {  // Synchronizing on m, not s!
 *      Iterator i = s.iterator(); // Must be in synchronized block
 *      while (i.hasNext())
 *          foo(i.next());
 *  }
 * </pre>
 * Failure to follow this advice may result in non-deterministic behavior.

If we use forEach everywhere instead of a for loop that would also be fine.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I looked at the code again. I think we are fine - the problem would be if something adds something to map while we're iterating over it. Looking at the non blocking map the same thing would happen. We should probably add something to the request* methods so that you can't add items to the map when handling a cancel or an error, and then this isn't a problem.

private final Function<Frame, ? extends Payload> frameDecoder;
private final Consumer<Throwable> errorConsumer;
private final StreamIdSupplier streamIdSupplier;
private final NonBlockingHashMapLong<LimitableRequestPublisher> senders;
Copy link
Member

@rdegnan rdegnan Nov 20, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think handleSendProcessorError and handleSendProcessorCancel and terminate should now be synchronized as they iterate over senders/receivers (previously this wasn't necessary because of Cliff Click magic).

@robertroeser
Copy link
Member Author

@yschimke I'm hoping these changes make it back into reactor-*

import io.rsocket.exceptions.ApplicationErrorException;
import io.rsocket.exceptions.ConnectionErrorException;
import io.rsocket.framing.FrameType;
import io.rsocket.internal.Int2ObjectHashMap;
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Netty already has one.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Switched to it

Signed-off-by: Robert Roeser <rroeserr@gmail.com>
@robertroeser robertroeser merged commit 4f466f6 into rsocket:1.0.x Nov 22, 2018
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

6 participants