Skip to content

Commit

Permalink
Restructure RSocket packages and polish
Browse files Browse the repository at this point in the history
Polish code and relocate `RSocketServerBootstrap` from `server` to
`context` since it's really an `ApplicationContext` concern.

Closes spring-projectsgh-18391
  • Loading branch information
philwebb committed Sep 29, 2019
1 parent de393ab commit 615c6d4
Show file tree
Hide file tree
Showing 12 changed files with 81 additions and 81 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ public static class Server {
/**
* RSocket transport protocol.
*/
private RSocketServer.TRANSPORT transport = RSocketServer.TRANSPORT.TCP;
private RSocketServer.Transport transport = RSocketServer.Transport.TCP;

/**
* Path under which RSocket handles requests (only works with websocket
Expand All @@ -75,11 +75,11 @@ public void setAddress(InetAddress address) {
this.address = address;
}

public RSocketServer.TRANSPORT getTransport() {
public RSocketServer.Transport getTransport() {
return this.transport;
}

public void setTransport(RSocketServer.TRANSPORT transport) {
public void setTransport(RSocketServer.Transport transport) {
this.transport = transport;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,8 @@
import org.springframework.boot.autoconfigure.condition.ConditionalOnWebApplication;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.boot.context.properties.PropertyMapper;
import org.springframework.boot.rsocket.context.RSocketServerBootstrap;
import org.springframework.boot.rsocket.netty.NettyRSocketServerFactory;
import org.springframework.boot.rsocket.server.RSocketServerBootstrap;
import org.springframework.boot.rsocket.server.RSocketServerFactory;
import org.springframework.boot.rsocket.server.ServerRSocketFactoryCustomizer;
import org.springframework.context.annotation.Bean;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@

import org.springframework.boot.autoconfigure.AutoConfigurations;
import org.springframework.boot.rsocket.context.RSocketPortInfoApplicationContextInitializer;
import org.springframework.boot.rsocket.server.RSocketServerBootstrap;
import org.springframework.boot.rsocket.context.RSocketServerBootstrap;
import org.springframework.boot.rsocket.server.RSocketServerFactory;
import org.springframework.boot.rsocket.server.ServerRSocketFactoryCustomizer;
import org.springframework.boot.test.context.runner.ApplicationContextRunner;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ private static class Listener implements ApplicationListener<RSocketServerInitia

@Override
public void onApplicationEvent(RSocketServerInitializedEvent event) {
setPortProperty(this.applicationContext, event.getrSocketServer().address().getPort());
setPortProperty(this.applicationContext, event.getServer().address().getPort());
}

private void setPortProperty(ApplicationContext context, int port) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,14 +14,16 @@
* limitations under the License.
*/

package org.springframework.boot.rsocket.server;
package org.springframework.boot.rsocket.context;

import io.rsocket.SocketAcceptor;

import org.springframework.boot.rsocket.context.RSocketServerInitializedEvent;
import org.springframework.boot.rsocket.server.RSocketServer;
import org.springframework.boot.rsocket.server.RSocketServerFactory;
import org.springframework.context.ApplicationEventPublisher;
import org.springframework.context.ApplicationEventPublisherAware;
import org.springframework.context.SmartLifecycle;
import org.springframework.util.Assert;

/**
* Bootstrap an {@link RSocketServer} and start it with the application context.
Expand All @@ -31,33 +33,34 @@
*/
public class RSocketServerBootstrap implements ApplicationEventPublisherAware, SmartLifecycle {

private final RSocketServer rSocketServer;
private final RSocketServer server;

private ApplicationEventPublisher applicationEventPublisher;
private ApplicationEventPublisher eventPublisher;

public RSocketServerBootstrap(RSocketServerFactory serverFactoryProvider, SocketAcceptor socketAcceptor) {
this.rSocketServer = serverFactoryProvider.create(socketAcceptor);
public RSocketServerBootstrap(RSocketServerFactory serverFactory, SocketAcceptor socketAcceptor) {
Assert.notNull(serverFactory, "ServerFactory must not be null");
this.server = serverFactory.create(socketAcceptor);
}

@Override
public void setApplicationEventPublisher(ApplicationEventPublisher applicationEventPublisher) {
this.applicationEventPublisher = applicationEventPublisher;
this.eventPublisher = applicationEventPublisher;
}

@Override
public void start() {
this.rSocketServer.start();
this.applicationEventPublisher.publishEvent(new RSocketServerInitializedEvent(this.rSocketServer));
this.server.start();
this.eventPublisher.publishEvent(new RSocketServerInitializedEvent(this.server));
}

@Override
public void stop() {
this.rSocketServer.stop();
this.server.stop();
}

@Override
public boolean isRunning() {
RSocketServer server = this.rSocketServer;
RSocketServer server = this.server;
if (server != null) {
return server.address() != null;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,18 +27,17 @@
* @author Brian Clozel
* @since 2.2.0
*/
@SuppressWarnings("serial")
public class RSocketServerInitializedEvent extends ApplicationEvent {

public RSocketServerInitializedEvent(RSocketServer rSocketServer) {
super(rSocketServer);
public RSocketServerInitializedEvent(RSocketServer server) {
super(server);
}

/**
* Access the {@link RSocketServer}.
* @return the embedded RSocket server
*/
public RSocketServer getrSocketServer() {
public RSocketServer getServer() {
return getSource();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,25 +61,13 @@ public InetSocketAddress address() {

@Override
public void start() throws RSocketServerException {
if (this.lifecycleTimeout != null) {
this.channel = this.starter.block(this.lifecycleTimeout);
}
else {
this.channel = this.starter.block();
}
this.channel = block(this.starter, this.lifecycleTimeout);
logger.info("Netty RSocket started on port(s): " + address().getPort());
startDaemonAwaitThread(this.channel);
}

private void startDaemonAwaitThread(CloseableChannel channel) {
Thread awaitThread = new Thread("rsocket") {

@Override
public void run() {
channel.onClose().block();
}

};
Thread awaitThread = new Thread(() -> channel.onClose().block(), "rsocket");
awaitThread.setContextClassLoader(getClass().getClassLoader());
awaitThread.setDaemon(false);
awaitThread.start();
Expand All @@ -93,4 +81,8 @@ public void stop() throws RSocketServerException {
}
}

private <T> T block(Mono<T> mono, Duration timeout) {
return (timeout != null) ? mono.block(timeout) : mono.block();
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ public class NettyRSocketServerFactory implements RSocketServerFactory, Configur

private InetAddress address;

private RSocketServer.TRANSPORT transport = RSocketServer.TRANSPORT.TCP;
private RSocketServer.Transport transport = RSocketServer.Transport.TCP;

private ReactorResourceFactory resourceFactory;

Expand All @@ -73,7 +73,7 @@ public void setAddress(InetAddress address) {
}

@Override
public void setTransport(RSocketServer.TRANSPORT transport) {
public void setTransport(RSocketServer.Transport transport) {
this.transport = transport;
}

Expand Down Expand Up @@ -126,26 +126,28 @@ public NettyRSocketServer create(SocketAcceptor socketAcceptor) {
}

private ServerTransport<CloseableChannel> createTransport() {
if (this.transport == RSocketServer.TRANSPORT.WEBSOCKET) {
if (this.resourceFactory != null) {
HttpServer httpServer = HttpServer.create().tcpConfiguration((tcpServer) -> tcpServer
.runOn(this.resourceFactory.getLoopResources()).addressSupplier(this::getListenAddress));
return WebsocketServerTransport.create(httpServer);
}
else {
return WebsocketServerTransport.create(getListenAddress());
}
if (this.transport == RSocketServer.Transport.WEBSOCKET) {
return createWebSocketTransport();
}
else {
if (this.resourceFactory != null) {
TcpServer tcpServer = TcpServer.create().runOn(this.resourceFactory.getLoopResources())
.addressSupplier(this::getListenAddress);
return TcpServerTransport.create(tcpServer);
}
else {
return TcpServerTransport.create(getListenAddress());
}
return createTcpTransport();
}

private ServerTransport<CloseableChannel> createWebSocketTransport() {
if (this.resourceFactory != null) {
HttpServer httpServer = HttpServer.create().tcpConfiguration((tcpServer) -> tcpServer
.runOn(this.resourceFactory.getLoopResources()).addressSupplier(this::getListenAddress));
return WebsocketServerTransport.create(httpServer);
}
return WebsocketServerTransport.create(getListenAddress());
}

private ServerTransport<CloseableChannel> createTcpTransport() {
if (this.resourceFactory != null) {
TcpServer tcpServer = TcpServer.create().runOn(this.resourceFactory.getLoopResources())
.addressSupplier(this::getListenAddress);
return TcpServerTransport.create(tcpServer);
}
return TcpServerTransport.create(getListenAddress());
}

private InetSocketAddress getListenAddress() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,6 @@ public interface ConfigurableRSocketServerFactory {
* Set the transport that the RSocket server should use.
* @param transport the transport protocol to use
*/
void setTransport(RSocketServer.TRANSPORT transport);
void setTransport(RSocketServer.Transport transport);

}
Original file line number Diff line number Diff line change
Expand Up @@ -50,9 +50,17 @@ public interface RSocketServer {
/**
* Choice of transport protocol for the RSocket server.
*/
enum TRANSPORT {
enum Transport {

TCP, WEBSOCKET
/**
* TCP transport protocol.
*/
TCP,

/**
* WebSocket transport protocol.
*/
WEBSOCKET

}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
* @author Brian Clozel
* @since 2.2.0
*/
@SuppressWarnings("serial")
public class RSocketServerException extends RuntimeException {

public RSocketServerException(String message, Throwable cause) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,17 +59,17 @@
*/
class NettyRSocketServerFactoryTests {

private NettyRSocketServer rSocketServer;
private NettyRSocketServer server;

private RSocketRequester requester;

private static final Duration TIMEOUT = Duration.ofSeconds(3);

@AfterEach
void tearDown() {
if (this.rSocketServer != null) {
if (this.server != null) {
try {
this.rSocketServer.stop();
this.server.stop();
}
catch (Exception ex) {
// Ignore
Expand All @@ -89,47 +89,44 @@ void specificPort() {
NettyRSocketServerFactory factory = getFactory();
int specificPort = SocketUtils.findAvailableTcpPort(41000);
factory.setPort(specificPort);
this.rSocketServer = factory.create(new EchoRequestResponseAcceptor());
this.rSocketServer.start();
this.server = factory.create(new EchoRequestResponseAcceptor());
this.server.start();
this.requester = createRSocketTcpClient();
String payload = "test payload";
String response = this.requester.route("test").data(payload).retrieveMono(String.class).block(TIMEOUT);

assertThat(this.rSocketServer.address().getPort()).isEqualTo(specificPort);
assertThat(this.server.address().getPort()).isEqualTo(specificPort);
assertThat(response).isEqualTo(payload);
assertThat(this.rSocketServer.address().getPort()).isEqualTo(specificPort);
assertThat(this.server.address().getPort()).isEqualTo(specificPort);
}

@Test
void websocketTransport() {
NettyRSocketServerFactory factory = getFactory();
factory.setTransport(RSocketServer.TRANSPORT.WEBSOCKET);
this.rSocketServer = factory.create(new EchoRequestResponseAcceptor());
this.rSocketServer.start();
factory.setTransport(RSocketServer.Transport.WEBSOCKET);
this.server = factory.create(new EchoRequestResponseAcceptor());
this.server.start();
this.requester = createRSocketWebSocketClient();
String payload = "test payload";
String response = this.requester.route("test").data(payload).retrieveMono(String.class).block(TIMEOUT);

assertThat(response).isEqualTo(payload);
}

@Test
void websocketTransportWithReactorResource() {
NettyRSocketServerFactory factory = getFactory();
factory.setTransport(RSocketServer.TRANSPORT.WEBSOCKET);
factory.setTransport(RSocketServer.Transport.WEBSOCKET);
ReactorResourceFactory resourceFactory = new ReactorResourceFactory();
resourceFactory.afterPropertiesSet();
factory.setResourceFactory(resourceFactory);
int specificPort = SocketUtils.findAvailableTcpPort(41000);
factory.setPort(specificPort);
this.rSocketServer = factory.create(new EchoRequestResponseAcceptor());
this.rSocketServer.start();
this.server = factory.create(new EchoRequestResponseAcceptor());
this.server.start();
this.requester = createRSocketWebSocketClient();
String payload = "test payload";
String response = this.requester.route("test").data(payload).retrieveMono(String.class).block(TIMEOUT);

assertThat(response).isEqualTo(payload);
assertThat(this.rSocketServer.address().getPort()).isEqualTo(specificPort);
assertThat(this.server.address().getPort()).isEqualTo(specificPort);
}

@Test
Expand All @@ -142,22 +139,22 @@ void serverCustomizers() {
.will((invocation) -> invocation.getArgument(0));
}
factory.setServerCustomizers(Arrays.asList(customizers[0], customizers[1]));
this.rSocketServer = factory.create(new EchoRequestResponseAcceptor());
this.server = factory.create(new EchoRequestResponseAcceptor());
InOrder ordered = inOrder((Object[]) customizers);
for (ServerRSocketFactoryCustomizer customizer : customizers) {
ordered.verify(customizer).apply(any(RSocketFactory.ServerRSocketFactory.class));
}
}

private RSocketRequester createRSocketTcpClient() {
Assertions.assertThat(this.rSocketServer).isNotNull();
InetSocketAddress address = this.rSocketServer.address();
Assertions.assertThat(this.server).isNotNull();
InetSocketAddress address = this.server.address();
return createRSocketRequesterBuilder().connectTcp(address.getHostString(), address.getPort()).block();
}

private RSocketRequester createRSocketWebSocketClient() {
Assertions.assertThat(this.rSocketServer).isNotNull();
InetSocketAddress address = this.rSocketServer.address();
Assertions.assertThat(this.server).isNotNull();
InetSocketAddress address = this.server.address();
return createRSocketRequesterBuilder().connect(WebsocketClientTransport.create(address)).block();
}

Expand Down

0 comments on commit 615c6d4

Please sign in to comment.