From 3ce376937e0b91756526c341f70721f373515737 Mon Sep 17 00:00:00 2001 From: Dave Syer Date: Wed, 14 Dec 2016 14:48:38 +0000 Subject: [PATCH 1/2] Update to latest reactor-netty stack --- spring-cloud-function-web/pom.xml | 5 +- .../cloud/function/web/RestConfiguration.java | 63 +++++++++++-------- 2 files changed, 40 insertions(+), 28 deletions(-) diff --git a/spring-cloud-function-web/pom.xml b/spring-cloud-function-web/pom.xml index 978ee9c17..9a66e2e00 100644 --- a/spring-cloud-function-web/pom.xml +++ b/spring-cloud-function-web/pom.xml @@ -1,5 +1,6 @@ - + 4.0.0 spring-cloud-function-web @@ -31,7 +32,7 @@ io.projectreactor.ipc reactor-netty - 0.5.1.RELEASE + 0.6.0.BUILD-SNAPSHOT org.springframework.cloud diff --git a/spring-cloud-function-web/src/main/java/org/springframework/cloud/function/web/RestConfiguration.java b/spring-cloud-function-web/src/main/java/org/springframework/cloud/function/web/RestConfiguration.java index 682a2a537..d3a3594e2 100644 --- a/spring-cloud-function-web/src/main/java/org/springframework/cloud/function/web/RestConfiguration.java +++ b/spring-cloud-function-web/src/main/java/org/springframework/cloud/function/web/RestConfiguration.java @@ -16,13 +16,6 @@ package org.springframework.cloud.function.web; -import static org.springframework.http.MediaType.TEXT_PLAIN; -import static org.springframework.web.reactive.function.BodyExtractors.toFlux; -import static org.springframework.web.reactive.function.BodyInserters.fromPublisher; -import static org.springframework.web.reactive.function.RequestPredicates.POST; -import static org.springframework.web.reactive.function.RequestPredicates.contentType; - -import java.util.concurrent.Executors; import java.util.function.Function; import org.reactivestreams.Publisher; @@ -35,22 +28,29 @@ import org.springframework.cloud.function.registry.FunctionRegistry; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; +import org.springframework.http.MediaType; import org.springframework.http.server.reactive.HttpHandler; import org.springframework.http.server.reactive.ReactorHttpHandlerAdapter; import org.springframework.util.StringUtils; -import org.springframework.web.reactive.function.Request; -import org.springframework.web.reactive.function.Response; +import org.springframework.web.reactive.function.RequestPredicates; import org.springframework.web.reactive.function.RouterFunction; import org.springframework.web.reactive.function.RouterFunctions; +import org.springframework.web.reactive.function.ServerRequest; +import org.springframework.web.reactive.function.ServerResponse; + +import static org.springframework.http.codec.BodyExtractors.toFlux; +import static org.springframework.http.codec.BodyInserters.fromPublisher; import reactor.core.publisher.Flux; -import reactor.ipc.netty.http.HttpServer; +import reactor.ipc.netty.NettyContext; +import reactor.ipc.netty.http.server.HttpServer; /** * @author Mark Fisher */ @Configuration -@EnableConfigurationProperties({ FunctionConfigurationProperties.class, WebConfigurationProperties.class }) +@EnableConfigurationProperties({ FunctionConfigurationProperties.class, + WebConfigurationProperties.class }) public class RestConfiguration { @Autowired @@ -69,10 +69,13 @@ public HttpHandler httpHandler(FunctionRegistry registry) { String name = functionProperties.getName(); Function, Flux> function = (name.indexOf(',') == -1) ? registry.lookupFunction(name) - : registry.composeFunction(StringUtils.commaDelimitedListToStringArray(name)); + : registry.composeFunction( + StringUtils.commaDelimitedListToStringArray(name)); FunctionInvokingHandler handler = new FunctionInvokingHandler(function); RouterFunction> route = RouterFunctions.route( - POST(webProperties.getPath()).and(contentType(TEXT_PLAIN)), handler::handleText); + RequestPredicates.POST(webProperties.getPath()) + .and(RequestPredicates.contentType(MediaType.TEXT_PLAIN)), + handler::handleText); return RouterFunctions.toHttpHandler(route); } @@ -89,20 +92,21 @@ private FunctionInvokingHandler(Function, Flux> function) { this.function = function; } - private Response> handleText(Request request) { + private ServerResponse> handleText(ServerRequest request) { Flux input = request.body(toFlux(String.class)); Publisher output = this.function.apply(input); - return Response.ok().body(fromPublisher(output, String.class)); + return ServerResponse.ok().body(fromPublisher(output, String.class)); } } - private static class LifecycleAwareHttpServer implements InitializingBean, DisposableBean { + private static class LifecycleAwareHttpServer + implements InitializingBean, DisposableBean { private final HttpHandler handler; private final int port; - private volatile HttpServer server; + private volatile NettyContext server; private LifecycleAwareHttpServer(HttpHandler handler, int port) { this.handler = handler; @@ -111,27 +115,34 @@ private LifecycleAwareHttpServer(HttpHandler handler, int port) { @Override public void afterPropertiesSet() { - ReactorHttpHandlerAdapter adapter = new ReactorHttpHandlerAdapter(this.handler); + ReactorHttpHandlerAdapter adapter = new ReactorHttpHandlerAdapter( + this.handler); final HttpServer server = HttpServer.create("localhost", port); - this.server = server; - Executors.newSingleThreadExecutor().submit(new Runnable() { + Thread thread = new Thread(new Runnable() { @Override public void run() { - try { - server.startAndAwait(adapter); - } - catch (InterruptedException e) { - Thread.currentThread().interrupt(); + LifecycleAwareHttpServer.this.server = server.newHandler(adapter) + .block(); + while (LifecycleAwareHttpServer.this.server != null) { + try { + Thread.sleep(100L); + } + catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } } } }); + thread.setDaemon(false); + thread.start(); } @Override public void destroy() { if (this.server != null) { - this.server.shutdown(); + this.server.dispose(); + this.server = null; } } } From 7bcafabd20a7878ead5d3a08ff2bcab7eedf0898 Mon Sep 17 00:00:00 2001 From: Dave Syer Date: Thu, 15 Dec 2016 11:28:09 +0000 Subject: [PATCH 2/2] Fix for another API change --- .../cloud/function/web/RestConfiguration.java | 19 ++++++++++--------- 1 file changed, 10 insertions(+), 9 deletions(-) diff --git a/spring-cloud-function-web/src/main/java/org/springframework/cloud/function/web/RestConfiguration.java b/spring-cloud-function-web/src/main/java/org/springframework/cloud/function/web/RestConfiguration.java index d3a3594e2..35a197f61 100644 --- a/spring-cloud-function-web/src/main/java/org/springframework/cloud/function/web/RestConfiguration.java +++ b/spring-cloud-function-web/src/main/java/org/springframework/cloud/function/web/RestConfiguration.java @@ -32,16 +32,17 @@ import org.springframework.http.server.reactive.HttpHandler; import org.springframework.http.server.reactive.ReactorHttpHandlerAdapter; import org.springframework.util.StringUtils; -import org.springframework.web.reactive.function.RequestPredicates; -import org.springframework.web.reactive.function.RouterFunction; -import org.springframework.web.reactive.function.RouterFunctions; -import org.springframework.web.reactive.function.ServerRequest; -import org.springframework.web.reactive.function.ServerResponse; +import org.springframework.web.reactive.function.server.RequestPredicates; +import org.springframework.web.reactive.function.server.RouterFunction; +import org.springframework.web.reactive.function.server.RouterFunctions; +import org.springframework.web.reactive.function.server.ServerRequest; +import org.springframework.web.reactive.function.server.ServerResponse; -import static org.springframework.http.codec.BodyExtractors.toFlux; -import static org.springframework.http.codec.BodyInserters.fromPublisher; +import static org.springframework.web.reactive.function.BodyExtractors.toFlux; +import static org.springframework.web.reactive.function.BodyInserters.fromPublisher; import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; import reactor.ipc.netty.NettyContext; import reactor.ipc.netty.http.server.HttpServer; @@ -72,7 +73,7 @@ public HttpHandler httpHandler(FunctionRegistry registry) { : registry.composeFunction( StringUtils.commaDelimitedListToStringArray(name)); FunctionInvokingHandler handler = new FunctionInvokingHandler(function); - RouterFunction> route = RouterFunctions.route( + RouterFunction route = RouterFunctions.route( RequestPredicates.POST(webProperties.getPath()) .and(RequestPredicates.contentType(MediaType.TEXT_PLAIN)), handler::handleText); @@ -92,7 +93,7 @@ private FunctionInvokingHandler(Function, Flux> function) { this.function = function; } - private ServerResponse> handleText(ServerRequest request) { + private Mono handleText(ServerRequest request) { Flux input = request.body(toFlux(String.class)); Publisher output = this.function.apply(input); return ServerResponse.ok().body(fromPublisher(output, String.class));