Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 1 addition & 2 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,7 @@ subprojects {

dependencyManagement {
imports {
// TODO: Upgrade to latest version
mavenBom 'io.projectreactor:reactor-bom:Bismuth-SR11'
mavenBom 'io.projectreactor:reactor-bom:Californium-RELEASE'
}

dependencies {
Expand Down
1 change: 0 additions & 1 deletion rsocket-core/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@ dependencies {
api 'io.netty:netty-buffer'
api 'io.projectreactor:reactor-core'

implementation 'io.projectreactor.addons:reactor-extra'
implementation 'org.jctools:jctools-core'
implementation 'org.slf4j:slf4j-api'

Expand Down
5 changes: 2 additions & 3 deletions rsocket-core/src/main/java/io/rsocket/RSocketClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -430,10 +430,9 @@ private void handleStreamZero(FrameType type, Frame frame) {
lifecycle.terminate(error);
errorConsumer.accept(error);
connection.dispose();
break;
case LEASE:
{
break;
}
break;
case KEEPALIVE:
if (keepAliveHandler != null) {
keepAliveHandler.receive(frame);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@

import static io.rsocket.fragmentation.FrameReassembler.createFrameReassembler;
import static io.rsocket.util.AbstractionLeakingFrameUtils.toAbstractionLeakingFrame;
import static reactor.function.TupleUtils.function;

import io.netty.buffer.ByteBufAllocator;
import io.netty.buffer.PooledByteBufAllocator;
Expand Down Expand Up @@ -115,7 +114,7 @@ public Flux<Frame> receive() {
return delegate
.receive()
.map(AbstractionLeakingFrameUtils::fromAbstractionLeakingFrame)
.concatMap(function(this::toReassembledFrames));
.concatMap(t2 -> toReassembledFrames(t2.getT1(), t2.getT2()));
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Makes me sad, but I get it.

}

@Override
Expand All @@ -125,7 +124,7 @@ public Mono<Void> send(Publisher<Frame> frames) {
return delegate.send(
Flux.from(frames)
.map(AbstractionLeakingFrameUtils::fromAbstractionLeakingFrame)
.concatMap(function(this::toFragmentedFrames)));
.concatMap(t2 -> toFragmentedFrames(t2.getT1(), t2.getT2())));
}

private Flux<Frame> toFragmentedFrames(int streamId, io.rsocket.framing.Frame frame) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@
import io.rsocket.plugins.RSocketInterceptor;
import io.rsocket.test.TestSubscriber;
import io.rsocket.transport.netty.client.TcpClientTransport;
import io.rsocket.transport.netty.server.NettyContextCloseable;
import io.rsocket.transport.netty.server.CloseableChannel;
import io.rsocket.transport.netty.server.TcpServerTransport;
import io.rsocket.util.DefaultPayload;
import io.rsocket.util.RSocketProxy;
Expand Down Expand Up @@ -83,7 +83,7 @@ public Mono<Payload> requestResponse(Payload payload) {
};
}

private NettyContextCloseable server;
private CloseableChannel server;
private RSocket client;
private AtomicInteger requestCount;
private CountDownLatch disconnectionCounter;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
import io.rsocket.RSocketFactory;
import io.rsocket.test.SlowTest;
import io.rsocket.transport.netty.client.TcpClientTransport;
import io.rsocket.transport.netty.server.NettyContextCloseable;
import io.rsocket.transport.netty.server.CloseableChannel;
import io.rsocket.transport.netty.server.TcpServerTransport;
import io.rsocket.util.DefaultPayload;
import java.time.Duration;
Expand All @@ -23,7 +23,7 @@ public class InteractionsLoadTest {
public void channel() {
TcpServerTransport serverTransport = TcpServerTransport.create(0);

NettyContextCloseable server =
CloseableChannel server =
RSocketFactory.receive()
.acceptor((setup, rsocket) -> Mono.just(new EchoRSocket()))
.transport(serverTransport)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
import io.rsocket.RSocket;
import io.rsocket.RSocketFactory;
import io.rsocket.transport.netty.client.TcpClientTransport;
import io.rsocket.transport.netty.server.NettyContextCloseable;
import io.rsocket.transport.netty.server.CloseableChannel;
import io.rsocket.transport.netty.server.TcpServerTransport;
import io.rsocket.util.DefaultPayload;
import io.rsocket.util.EmptyPayload;
Expand All @@ -42,7 +42,7 @@
public class TcpIntegrationTest {
private AbstractRSocket handler;

private NettyContextCloseable server;
private CloseableChannel server;

@Before
public void startup() {
Expand Down
9 changes: 8 additions & 1 deletion rsocket-transport-netty/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,17 @@ plugins {
id 'maven-publish'
id 'com.jfrog.artifactory'
id 'com.jfrog.bintray'
id "com.google.osdetector" version "1.4.0"
}

def os_suffix = ""
if (osdetector.classifier in ["linux-x86_64"] || ["osx-x86_64"] || ["windows-x86_64"]) {
os_suffix = ":" + osdetector.classifier
}

dependencies {
api project(':rsocket-core')
api 'io.projectreactor.ipc:reactor-netty'
api 'io.projectreactor.netty:reactor-netty'

compileOnly 'com.google.code.findbugs:jsr305'

Expand All @@ -35,6 +41,7 @@ dependencies {

testRuntimeOnly 'ch.qos.logback:logback-classic'
testRuntimeOnly 'org.junit.jupiter:junit-jupiter-engine'
testRuntimeOnly 'io.netty:netty-tcnative-boringssl-static:2.0.14.Final' + os_suffix
}

description = 'Reactor Netty RSocket transport implementations (TCP, Websocket)'
Original file line number Diff line number Diff line change
Expand Up @@ -21,50 +21,42 @@
import org.reactivestreams.Publisher;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.ipc.netty.NettyContext;
import reactor.ipc.netty.NettyInbound;
import reactor.ipc.netty.NettyOutbound;
import reactor.netty.Connection;

import java.util.Objects;

/** An implementation of {@link DuplexConnection} that connects via TCP. */
public final class TcpDuplexConnection implements DuplexConnection {

private final NettyContext context;

private final NettyInbound in;

private final NettyOutbound out;
private final Connection connection;

/**
* Creates a new instance
*
* @param in the {@link NettyInbound} to listen on
* @param out the {@link NettyOutbound} to send with
* @param context the {@link NettyContext} to for managing the server
* @param connection the {@link Connection} to for managing the server
*/
public TcpDuplexConnection(NettyInbound in, NettyOutbound out, NettyContext context) {
this.in = in;
this.out = out;
this.context = context;
public TcpDuplexConnection(Connection connection) {
this.connection = Objects.requireNonNull(connection, "connection must not be null");
}

@Override
public void dispose() {
context.dispose();
connection.dispose();
}

@Override
public boolean isDisposed() {
return context.isDisposed();
return connection.isDisposed();
}

@Override
public Mono<Void> onClose() {
return context.onClose();
return connection.onDispose();
}

@Override
public Flux<Frame> receive() {
return in.receive().map(buf -> Frame.from(buf.retain()));
return connection.inbound().receive().map(buf -> Frame.from(buf.retain()));
}

@Override
Expand All @@ -74,6 +66,6 @@ public Mono<Void> send(Publisher<Frame> frames) {

@Override
public Mono<Void> sendOne(Frame frame) {
return out.sendObject(frame.content()).then();
return connection.outbound().sendObject(frame.content()).then();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
import java.net.URI;
import java.util.Objects;
import java.util.Optional;
import reactor.ipc.netty.tcp.TcpServer;
import reactor.netty.tcp.TcpServer;

/**
* An implementation of {@link UriHandler} that creates {@link TcpClientTransport}s and {@link
Expand Down Expand Up @@ -53,6 +53,9 @@ public Optional<ServerTransport> buildServer(URI uri) {
return Optional.empty();
}

return Optional.of(TcpServerTransport.create(TcpServer.create(uri.getHost(), uri.getPort())));
return Optional.of(TcpServerTransport.create(
TcpServer.create()
.host(uri.getHost())
.port(uri.getPort())));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,7 @@
import org.reactivestreams.Publisher;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.ipc.netty.NettyContext;
import reactor.ipc.netty.NettyInbound;
import reactor.ipc.netty.NettyOutbound;
import reactor.netty.Connection;

/**
* An implementation of {@link DuplexConnection} that connects via a Websocket.
Expand All @@ -41,46 +39,38 @@
*/
public final class WebsocketDuplexConnection implements DuplexConnection {

private final NettyContext context;

private final NettyInbound in;

private final NettyOutbound out;
private final Connection connection;

/**
* Creates a new instance
*
* @param in the {@link NettyInbound} to listen on
* @param out the {@link NettyOutbound} to send with
* @param context the {@link NettyContext} to for managing the server
* @param connection the {@link Connection} to for managing the server
*/
public WebsocketDuplexConnection(NettyInbound in, NettyOutbound out, NettyContext context) {
this.in = Objects.requireNonNull(in, "in must not be null");
this.out = Objects.requireNonNull(out, "out must not be null");
this.context = Objects.requireNonNull(context, "context must not be null");
public WebsocketDuplexConnection(Connection connection) {
this.connection = Objects.requireNonNull(connection, "connection must not be null");
}

@Override
public void dispose() {
context.dispose();
connection.dispose();
}

@Override
public boolean isDisposed() {
return context.isDisposed();
return connection.isDisposed();
}

@Override
public Mono<Void> onClose() {
return context.onClose();
return connection.onDispose();
}

@Override
public Flux<Frame> receive() {
return in.receive()
return connection.inbound().receive()
.map(
buf -> {
CompositeByteBuf composite = context.channel().alloc().compositeBuffer();
CompositeByteBuf composite = connection.channel().alloc().compositeBuffer();
ByteBuf length = wrappedBuffer(new byte[FRAME_LENGTH_SIZE]);
FrameHeaderFlyweight.encodeLength(length, 0, buf.readableBytes());
composite.addComponents(true, length, buf.retain());
Expand All @@ -95,7 +85,7 @@ public Mono<Void> send(Publisher<Frame> frames) {

@Override
public Mono<Void> sendOne(Frame frame) {
return out.sendObject(new BinaryWebSocketFrame(frame.content().skipBytes(FRAME_LENGTH_SIZE)))
return connection.outbound().sendObject(new BinaryWebSocketFrame(frame.content().skipBytes(FRAME_LENGTH_SIZE)))
.then();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
import java.net.InetSocketAddress;
import java.util.Objects;
import reactor.core.publisher.Mono;
import reactor.ipc.netty.tcp.TcpClient;
import reactor.netty.tcp.TcpClient;

/**
* An implementation of {@link ClientTransport} that connects to a {@link ServerTransport} via TCP.
Expand All @@ -44,7 +44,7 @@ private TcpClientTransport(TcpClient client) {
* @return a new instance
*/
public static TcpClientTransport create(int port) {
TcpClient tcpClient = TcpClient.create(port);
TcpClient tcpClient = TcpClient.create().port(port);
return create(tcpClient);
}

Expand All @@ -59,7 +59,7 @@ public static TcpClientTransport create(int port) {
public static TcpClientTransport create(String bindAddress, int port) {
Objects.requireNonNull(bindAddress, "bindAddress must not be null");

TcpClient tcpClient = TcpClient.create(bindAddress, port);
TcpClient tcpClient = TcpClient.create().host(bindAddress).port(port);
return create(tcpClient);
}

Expand All @@ -73,7 +73,7 @@ public static TcpClientTransport create(String bindAddress, int port) {
public static TcpClientTransport create(InetSocketAddress address) {
Objects.requireNonNull(address, "address must not be null");

TcpClient tcpClient = TcpClient.create(address.getHostString(), address.getPort());
TcpClient tcpClient = TcpClient.create().addressSupplier(() -> address);
return create(tcpClient);
}

Expand All @@ -92,20 +92,9 @@ public static TcpClientTransport create(TcpClient client) {

@Override
public Mono<DuplexConnection> connect() {
return Mono.create(
sink ->
client
.newHandler(
(in, out) -> {
in.context().addHandler(new RSocketLengthCodec());

TcpDuplexConnection connection =
new TcpDuplexConnection(in, out, in.context());

sink.success(connection);
return connection.onClose();
})
.doOnError(sink::error)
.subscribe());
return client
.doOnConnected(c -> c.addHandlerLast(new RSocketLengthCodec()))
.connect()
.map(TcpDuplexConnection::new);
}
}
Loading