Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,10 @@
public class EchoServerHandler extends ByteToMessageDecoder {
private static SimpleChannelInboundHandler<FullHttpRequest> httpHandler = new HttpServerHandler();

private static ReactiveSocketServerHandler reactiveSocketHandler = ReactiveSocketServerHandler.create((setupPayload, rs) ->
new RequestHandler.Builder().withRequestResponse(payload -> s -> {
s.onNext(payload);
s.onComplete();
}).build());
private static RequestHandler requestHandler = new RequestHandler.Builder().withRequestResponse(payload -> s -> {
s.onNext(payload);
s.onComplete();
}).build();

@Override
protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
Expand Down Expand Up @@ -61,6 +60,8 @@ private void switchToHttp(ChannelHandlerContext ctx) {

private void switchToReactiveSocket(ChannelHandlerContext ctx) {
ChannelPipeline p = ctx.pipeline();
ReactiveSocketServerHandler reactiveSocketHandler =
ReactiveSocketServerHandler.create((setupPayload, rs) -> requestHandler);
p.addLast(reactiveSocketHandler);
p.remove(this);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,7 @@
package io.reactivesocket.transport.tcp.server;

import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelId;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.channel.ChannelPipeline;
import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
Expand All @@ -28,51 +26,51 @@
import io.reactivesocket.LeaseGovernor;
import io.reactivesocket.ReactiveSocket;
import io.reactivesocket.transport.tcp.MutableDirectByteBuf;
import org.agrona.BitUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.concurrent.ConcurrentHashMap;
import static org.agrona.BitUtil.SIZE_OF_INT;

@ChannelHandler.Sharable
public class ReactiveSocketServerHandler extends ChannelInboundHandlerAdapter {
private Logger logger = LoggerFactory.getLogger(ReactiveSocketServerHandler.class);

private ConcurrentHashMap<ChannelId, ServerTcpDuplexConnection> duplexConnections = new ConcurrentHashMap<>();
private static final Logger logger = LoggerFactory.getLogger(ReactiveSocketServerHandler.class);
private static final int MAX_FRAME_LENGTH = Integer.MAX_VALUE >> 1;

private ConnectionSetupHandler setupHandler;

private LeaseGovernor leaseGovernor;
private ServerTcpDuplexConnection connection;

protected ReactiveSocketServerHandler(ConnectionSetupHandler setupHandler, LeaseGovernor leaseGovernor) {
this.setupHandler = setupHandler;
this.leaseGovernor = leaseGovernor;
this.connection = null;
}

public static ReactiveSocketServerHandler create(ConnectionSetupHandler setupHandler) {
return create(setupHandler, LeaseGovernor.UNLIMITED_LEASE_GOVERNOR);
}

public static ReactiveSocketServerHandler create(ConnectionSetupHandler setupHandler, LeaseGovernor leaseGovernor) {
return new
ReactiveSocketServerHandler(
setupHandler,
leaseGovernor);

return new ReactiveSocketServerHandler(setupHandler, leaseGovernor);
}

@Override
public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
ChannelPipeline cp = ctx.pipeline();
if (cp.get(LengthFieldBasedFrameDecoder.class) == null) {
ctx
.pipeline()
.addBefore(
ctx.name(),
LengthFieldBasedFrameDecoder.class.getName(),
new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE >> 1, 0, BitUtil.SIZE_OF_INT, -1 * BitUtil.SIZE_OF_INT, 0));
LengthFieldBasedFrameDecoder frameDecoder =
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Minor: It doesn't make much of a difference but we can move this to server initialization i.e. add these handlers upfront using ChannelInitializer instead of doing it on handlerAdded

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good idea, we'll improve that later.

new LengthFieldBasedFrameDecoder(MAX_FRAME_LENGTH, 0, SIZE_OF_INT, -1 * SIZE_OF_INT, 0);
ctx.pipeline()
.addBefore(ctx.name(), LengthFieldBasedFrameDecoder.class.getName(), frameDecoder);
}
}

@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
connection = new ServerTcpDuplexConnection(ctx);
ReactiveSocket reactiveSocket =
DefaultReactiveSocket.fromServerConnection(connection, setupHandler, leaseGovernor, Throwable::printStackTrace);
// Note: No blocking code here (still it should be refactored)
reactiveSocket.startAndWait();
}

@Override
Expand All @@ -81,29 +79,15 @@ public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception
try {
MutableDirectByteBuf mutableDirectByteBuf = new MutableDirectByteBuf(content);
Frame from = Frame.from(mutableDirectByteBuf, 0, mutableDirectByteBuf.capacity());
channelRegistered(ctx);
ServerTcpDuplexConnection connection = duplexConnections.computeIfAbsent(ctx.channel().id(), i -> {
logger.info("No connection found for channel id: " + i + " from host " + ctx.channel().remoteAddress().toString());
ServerTcpDuplexConnection c = new ServerTcpDuplexConnection(ctx);
ReactiveSocket reactiveSocket = DefaultReactiveSocket.fromServerConnection(c, setupHandler, leaseGovernor, throwable -> throwable.printStackTrace());
reactiveSocket.startAndWait();
return c;
});

if (connection != null) {
connection
.getSubscribers()
.forEach(o -> o.onNext(from));
connection.getSubscribers().forEach(o -> o.onNext(from));
}
} finally {
content.release();
}
}

@Override
public void channelReadComplete(ChannelHandlerContext ctx) {
ctx.flush();
}

@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
super.exceptionCaught(ctx, cause);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,55 +52,52 @@ public class ClientServerTest {
static EventLoopGroup bossGroup = new NioEventLoopGroup(1);
static EventLoopGroup workerGroup = new NioEventLoopGroup(4);

static ReactiveSocketServerHandler serverHandler = ReactiveSocketServerHandler.create((setupPayload, rs) ->
new RequestHandler() {
@Override
public Publisher<Payload> handleRequestResponse(Payload payload) {
return s -> {
//System.out.println("Handling request/response payload => " + s.toString());
Payload response = TestUtil.utf8EncodedPayload("hello world", "metadata");
s.onNext(response);
s.onComplete();
};
}

@Override
public Publisher<Payload> handleRequestStream(Payload payload) {
static RequestHandler requestHandler = new RequestHandler() {
@Override
public Publisher<Payload> handleRequestResponse(Payload payload) {
return s -> {
Payload response = TestUtil.utf8EncodedPayload("hello world", "metadata");
s.onNext(response);
s.onComplete();
};
}

return RxReactiveStreams
.toPublisher(Observable
.range(1, 10)
.map(i -> response));
}
@Override
public Publisher<Payload> handleRequestStream(Payload payload) {
Payload response = TestUtil.utf8EncodedPayload("hello world", "metadata");

@Override
public Publisher<Payload> handleSubscription(Payload payload) {
Payload response = TestUtil.utf8EncodedPayload("hello world", "metadata");
return RxReactiveStreams
.toPublisher(Observable
.range(1, 10)
.map(i -> response));
}

@Override
public Publisher<Payload> handleSubscription(Payload payload) {
Payload response = TestUtil.utf8EncodedPayload("hello world", "metadata");

return RxReactiveStreams
.toPublisher(Observable
.range(1, 10)
.map(i -> response)
.repeat());
}

@Override
public Publisher<Void> handleFireAndForget(Payload payload) {
return Subscriber::onComplete;
}

@Override
public Publisher<Payload> handleChannel(Payload initialPayload, Publisher<Payload> inputs) {
return null;
}

return RxReactiveStreams
.toPublisher(Observable
.range(1, 10)
.map(i -> response)
.repeat());
}

@Override
public Publisher<Void> handleFireAndForget(Payload payload) {
return Subscriber::onComplete;
}

@Override
public Publisher<Payload> handleChannel(Payload initialPayload, Publisher<Payload> inputs) {
return null;
}

@Override
public Publisher<Void> handleMetadataPush(Payload payload) {
return null;
}
@Override
public Publisher<Void> handleMetadataPush(Payload payload) {
return null;
}
);
};

@BeforeClass
public static void setup() throws Exception {
Expand All @@ -112,6 +109,8 @@ public static void setup() throws Exception {
@Override
protected void initChannel(Channel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
ReactiveSocketServerHandler serverHandler =
ReactiveSocketServerHandler.create((setupPayload, rs) -> requestHandler);
pipeline.addLast(serverHandler);
}
});
Expand All @@ -123,7 +122,7 @@ protected void initChannel(Channel ch) throws Exception {
).toBlocking().single();

client = DefaultReactiveSocket
.fromClientConnection(duplexConnection, ConnectionSetupPayload.create("UTF-8", "UTF-8"), t -> t.printStackTrace());
.fromClientConnection(duplexConnection, ConnectionSetupPayload.create("UTF-8", "UTF-8"), Throwable::printStackTrace);

client.startAndWait();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,11 +35,15 @@

public class Ping {
public static void main(String... args) throws Exception {
NioEventLoopGroup eventLoopGroup = new NioEventLoopGroup(1);

Publisher<ClientTcpDuplexConnection> publisher = ClientTcpDuplexConnection
.create(InetSocketAddress.createUnresolved("localhost", 7878), new NioEventLoopGroup(1));
.create(InetSocketAddress.createUnresolved("localhost", 7878), eventLoopGroup);

ClientTcpDuplexConnection duplexConnection = RxReactiveStreams.toObservable(publisher).toBlocking().last();
ReactiveSocket reactiveSocket = DefaultReactiveSocket.fromClientConnection(duplexConnection, ConnectionSetupPayload.create("UTF-8", "UTF-8"), t -> t.printStackTrace());
ConnectionSetupPayload setupPayload = ConnectionSetupPayload.create("UTF-8", "UTF-8");
ReactiveSocket reactiveSocket =
DefaultReactiveSocket.fromClientConnection(duplexConnection, setupPayload, Throwable::printStackTrace);

reactiveSocket.startAndWait();

Expand Down Expand Up @@ -80,13 +84,13 @@ public ByteBuffer getMetadata() {
.toObservable(
reactiveSocket
.requestResponse(keyPayload))
.doOnError(t -> t.printStackTrace())
.doOnError(Throwable::printStackTrace)
.doOnNext(s -> {
long diff = System.nanoTime() - start;
histogram.recordValue(diff);
});
}, 16)
.doOnError(t -> t.printStackTrace())
.doOnError(Throwable::printStackTrace)
.subscribe(new Subscriber<Payload>() {
@Override
public void onCompleted() {
Expand Down
Loading