Skip to content

Commit

Permalink
[FLINK-9386] Embed netty router
Browse files Browse the repository at this point in the history
This commit replaces netty-router dependency with our own version of it, which is
simplified and adds guarantees about order of matching router patterns.

This is a prerequisite for FLINK-3952. netty-router 1.10 is incompatible with
Netty 4.1, while netty-router 2.2.0 brakes a compatibility in a way that we
were unable to use it.

This closes apache#6031.
  • Loading branch information
pnowojski authored and sampath s committed Jul 26, 2018
1 parent ff6da5b commit 4391c0c
Show file tree
Hide file tree
Showing 23 changed files with 1,377 additions and 135 deletions.
Expand Up @@ -25,6 +25,9 @@
import org.apache.flink.core.fs.Path;
import org.apache.flink.mesos.configuration.MesosOptions;
import org.apache.flink.runtime.net.SSLUtils;
import org.apache.flink.runtime.rest.handler.router.RoutedRequest;
import org.apache.flink.runtime.rest.handler.router.Router;
import org.apache.flink.runtime.rest.handler.router.RouterHandler;

import org.apache.flink.shaded.netty4.io.netty.bootstrap.ServerBootstrap;
import org.apache.flink.shaded.netty4.io.netty.buffer.Unpooled;
Expand All @@ -47,9 +50,6 @@
import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus;
import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpServerCodec;
import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.LastHttpContent;
import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.router.Handler;
import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.router.Routed;
import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.router.Router;
import org.apache.flink.shaded.netty4.io.netty.handler.ssl.SslHandler;
import org.apache.flink.shaded.netty4.io.netty.handler.stream.ChunkedStream;
import org.apache.flink.shaded.netty4.io.netty.handler.stream.ChunkedWriteHandler;
Expand Down Expand Up @@ -135,7 +135,7 @@ public MesosArtifactServer(String prefix, String serverHostname, int configuredP

@Override
protected void initChannel(SocketChannel ch) {
Handler handler = new Handler(router);
RouterHandler handler = new RouterHandler(router, new HashMap<>());

// SSL should be the first handler in the pipeline
if (serverSSLContext != null) {
Expand All @@ -148,7 +148,7 @@ protected void initChannel(SocketChannel ch) {
ch.pipeline()
.addLast(new HttpServerCodec())
.addLast(new ChunkedWriteHandler())
.addLast(handler.name(), handler)
.addLast(handler.getName(), handler)
.addLast(new UnknownFileHandler());
}
};
Expand Down Expand Up @@ -221,7 +221,7 @@ public synchronized URL addPath(Path path, Path remoteFile) throws IOException,
throw new IllegalArgumentException("not expecting an absolute path");
}
URL fileURL = new URL(baseURL, remoteFile.toString());
router.ANY(fileURL.getPath(), new VirtualFileServerHandler(path));
router.addAny(fileURL.getPath(), new VirtualFileServerHandler(path));

paths.put(remoteFile, fileURL);

Expand All @@ -236,7 +236,7 @@ public synchronized void removePath(Path remoteFile) {
} catch (MalformedURLException e) {
throw new RuntimeException(e);
}
router.removePath(fileURL.getPath());
router.removePathPattern(fileURL.getPath());
}
}

Expand Down Expand Up @@ -267,7 +267,7 @@ public synchronized void stop() throws Exception {
* Handle HEAD and GET requests for a specific file.
*/
@ChannelHandler.Sharable
public static class VirtualFileServerHandler extends SimpleChannelInboundHandler<Routed> {
public static class VirtualFileServerHandler extends SimpleChannelInboundHandler<RoutedRequest> {

private FileSystem fs;
private Path path;
Expand All @@ -284,9 +284,9 @@ public VirtualFileServerHandler(Path path) throws IOException {
}

@Override
protected void channelRead0(ChannelHandlerContext ctx, Routed routed) throws Exception {
protected void channelRead0(ChannelHandlerContext ctx, RoutedRequest routedRequest) throws Exception {

HttpRequest request = routed.request();
HttpRequest request = routedRequest.getRequest();

if (LOG.isDebugEnabled()) {
LOG.debug("{} request for file '{}'", request.getMethod(), path);
Expand Down
Expand Up @@ -25,18 +25,19 @@
import org.apache.flink.runtime.rest.handler.RedirectHandler;
import org.apache.flink.runtime.rest.handler.WebHandler;
import org.apache.flink.runtime.rest.handler.legacy.RequestHandler;
import org.apache.flink.runtime.rest.handler.router.RouteResult;
import org.apache.flink.runtime.rest.handler.router.RoutedRequest;
import org.apache.flink.runtime.rest.handler.util.HandlerRedirectUtils;
import org.apache.flink.runtime.rest.handler.util.KeepAliveWrite;
import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
import org.apache.flink.util.ExceptionUtils;

import org.apache.flink.shaded.netty4.io.netty.channel.ChannelHandler;
import org.apache.flink.shaded.netty4.io.netty.channel.ChannelHandlerContext;
import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.FullHttpResponse;
import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpHeaders;
import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpHeaders.Names;
import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponse;
import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus;
import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.router.KeepAliveWrite;
import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.router.Routed;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -79,7 +80,7 @@ public RuntimeMonitorHandler(
localJobManagerAddressFuture,
retriever,
timeout,
Collections.singletonMap(HttpHeaders.Names.ACCESS_CONTROL_ALLOW_ORIGIN, cfg.getAllowOrigin()));
Collections.singletonMap(Names.ACCESS_CONTROL_ALLOW_ORIGIN, cfg.getAllowOrigin()));
this.handler = checkNotNull(handler);
this.allowOrigin = cfg.getAllowOrigin();
}
Expand All @@ -89,19 +90,20 @@ public String[] getPaths() {
}

@Override
protected void respondAsLeader(ChannelHandlerContext ctx, Routed routed, JobManagerGateway jobManagerGateway) {
protected void respondAsLeader(ChannelHandlerContext ctx, RoutedRequest routedRequest, JobManagerGateway jobManagerGateway) {
CompletableFuture<FullHttpResponse> responseFuture;
RouteResult<?> result = routedRequest.getRouteResult();

try {
// we only pass the first element in the list to the handlers.
Map<String, String> queryParams = new HashMap<>();
for (String key : routed.queryParams().keySet()) {
queryParams.put(key, routed.queryParam(key));
for (String key : result.queryParams().keySet()) {
queryParams.put(key, result.queryParam(key));
}

Map<String, String> pathParams = new HashMap<>(routed.pathParams().size());
for (String key : routed.pathParams().keySet()) {
pathParams.put(key, URLDecoder.decode(routed.pathParams().get(key), ENCODING.toString()));
Map<String, String> pathParams = new HashMap<>(result.pathParams().size());
for (String key : result.pathParams().keySet()) {
pathParams.put(key, URLDecoder.decode(result.pathParams().get(key), ENCODING.toString()));
}

queryParams.put(WEB_MONITOR_ADDRESS_KEY, localAddressFuture.get());
Expand Down Expand Up @@ -131,8 +133,8 @@ protected void respondAsLeader(ChannelHandlerContext ctx, Routed routed, JobMana
finalResponse = httpResponse;
}

finalResponse.headers().set(HttpHeaders.Names.ACCESS_CONTROL_ALLOW_ORIGIN, allowOrigin);
KeepAliveWrite.flush(ctx, routed.request(), finalResponse);
finalResponse.headers().set(Names.ACCESS_CONTROL_ALLOW_ORIGIN, allowOrigin);
KeepAliveWrite.flush(ctx, routedRequest.getRequest(), finalResponse);
});
}
}
Expand Up @@ -72,6 +72,7 @@
import org.apache.flink.runtime.rest.handler.legacy.metrics.MetricFetcher;
import org.apache.flink.runtime.rest.handler.legacy.metrics.SubtaskMetricsHandler;
import org.apache.flink.runtime.rest.handler.legacy.metrics.TaskManagerMetricsHandler;
import org.apache.flink.runtime.rest.handler.router.Router;
import org.apache.flink.runtime.webmonitor.handlers.legacy.JarAccessDeniedHandler;
import org.apache.flink.runtime.webmonitor.handlers.legacy.JarDeleteHandler;
import org.apache.flink.runtime.webmonitor.handlers.legacy.JarListHandler;
Expand All @@ -87,7 +88,6 @@
import org.apache.flink.util.ShutdownHookUtil;

import org.apache.flink.shaded.netty4.io.netty.channel.ChannelInboundHandler;
import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.router.Router;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -320,14 +320,14 @@ public WebRuntimeMonitor(

router
// log and stdout
.GET("/jobmanager/log", logFiles.logFile == null ? new ConstantTextHandler("(log file unavailable)") :
.addGet("/jobmanager/log", logFiles.logFile == null ? new ConstantTextHandler("(log file unavailable)") :
new StaticFileServerHandler<>(
retriever,
localRestAddress,
timeout,
logFiles.logFile))

.GET("/jobmanager/stdout", logFiles.stdOutFile == null ? new ConstantTextHandler("(stdout file unavailable)") :
.addGet("/jobmanager/stdout", logFiles.stdOutFile == null ? new ConstantTextHandler("(stdout file unavailable)") :
new StaticFileServerHandler<>(retriever, localRestAddress, timeout, logFiles.stdOutFile));

// Cancel a job via GET (for proper integration with YARN this has to be performed via GET)
Expand Down Expand Up @@ -376,7 +376,7 @@ public WebRuntimeMonitor(
}

// this handler serves all the static contents
router.GET("/:*", new StaticFileServerHandler<>(
router.addGet("/:*", new StaticFileServerHandler<>(
retriever,
localRestAddress,
timeout,
Expand Down Expand Up @@ -515,7 +515,7 @@ private void get(Router router, RequestHandler handler) {

private static <T extends ChannelInboundHandler & WebHandler> void get(Router router, T handler) {
for (String path : handler.getPaths()) {
router.GET(path, handler);
router.addGet(path, handler);
}
}

Expand All @@ -525,7 +525,7 @@ private void delete(Router router, RequestHandler handler) {

private static <T extends ChannelInboundHandler & WebHandler> void delete(Router router, T handler) {
for (String path : handler.getPaths()) {
router.DELETE(path, handler);
router.addDelete(path, handler);
}
}

Expand All @@ -535,7 +535,7 @@ private void post(Router router, RequestHandler handler) {

private static <T extends ChannelInboundHandler & WebHandler> void post(Router router, T handler) {
for (String path : handler.getPaths()) {
router.POST(path, handler);
router.addPost(path, handler);
}
}

Expand Down
Expand Up @@ -28,6 +28,7 @@
import org.apache.flink.runtime.history.FsJobArchivist;
import org.apache.flink.runtime.net.SSLUtils;
import org.apache.flink.runtime.rest.handler.legacy.DashboardConfigHandler;
import org.apache.flink.runtime.rest.handler.router.Router;
import org.apache.flink.runtime.rest.messages.DashboardConfiguration;
import org.apache.flink.runtime.security.SecurityConfiguration;
import org.apache.flink.runtime.security.SecurityUtils;
Expand All @@ -38,8 +39,6 @@
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.ShutdownHookUtil;

import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.router.Router;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -216,7 +215,7 @@ void start() throws IOException, InterruptedException {
LOG.info("Using directory {} as local cache.", webDir);

Router router = new Router();
router.GET("/:*", new HistoryServerStaticFileServerHandler(webDir));
router.addGet("/:*", new HistoryServerStaticFileServerHandler(webDir));

if (!webDir.exists() && !webDir.mkdirs()) {
throw new IOException("Failed to create local directory " + webDir.getAbsoluteFile() + ".");
Expand Down
Expand Up @@ -27,6 +27,7 @@
*****************************************************************************/

import org.apache.flink.runtime.rest.handler.legacy.files.StaticFileServerHandler;
import org.apache.flink.runtime.rest.handler.router.RoutedRequest;

import org.apache.flink.shaded.netty4.io.netty.channel.ChannelFuture;
import org.apache.flink.shaded.netty4.io.netty.channel.ChannelFutureListener;
Expand All @@ -40,7 +41,6 @@
import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpRequest;
import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponse;
import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.LastHttpContent;
import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.router.Routed;
import org.apache.flink.shaded.netty4.io.netty.handler.ssl.SslHandler;
import org.apache.flink.shaded.netty4.io.netty.handler.stream.ChunkedFile;

Expand Down Expand Up @@ -81,7 +81,7 @@
* page is prevented.
*/
@ChannelHandler.Sharable
public class HistoryServerStaticFileServerHandler extends SimpleChannelInboundHandler<Routed> {
public class HistoryServerStaticFileServerHandler extends SimpleChannelInboundHandler<RoutedRequest> {

/** Default logger, if none is specified. */
private static final Logger LOG = LoggerFactory.getLogger(HistoryServerStaticFileServerHandler.class);
Expand All @@ -100,10 +100,10 @@ public HistoryServerStaticFileServerHandler(File rootPath) throws IOException {
// ------------------------------------------------------------------------

@Override
public void channelRead0(ChannelHandlerContext ctx, Routed routed) throws Exception {
String requestPath = routed.path();
public void channelRead0(ChannelHandlerContext ctx, RoutedRequest routedRequest) throws Exception {
String requestPath = routedRequest.getPath();

respondWithFile(ctx, routed.request(), requestPath);
respondWithFile(ctx, routedRequest.getRequest(), requestPath);
}

/**
Expand Down
Expand Up @@ -21,6 +21,8 @@
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.JobManagerOptions;
import org.apache.flink.runtime.net.SSLUtils;
import org.apache.flink.runtime.rest.handler.router.Router;
import org.apache.flink.runtime.rest.handler.router.RouterHandler;
import org.apache.flink.runtime.webmonitor.HttpRequestHandler;
import org.apache.flink.runtime.webmonitor.PipelineErrorHandler;
import org.apache.flink.util.Preconditions;
Expand All @@ -33,8 +35,6 @@
import org.apache.flink.shaded.netty4.io.netty.channel.socket.SocketChannel;
import org.apache.flink.shaded.netty4.io.netty.channel.socket.nio.NioServerSocketChannel;
import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpServerCodec;
import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.router.Handler;
import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.router.Router;
import org.apache.flink.shaded.netty4.io.netty.handler.ssl.SslHandler;
import org.apache.flink.shaded.netty4.io.netty.handler.stream.ChunkedWriteHandler;

Expand All @@ -47,6 +47,7 @@
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.UnknownHostException;
import java.util.HashMap;

/**
* This classes encapsulates the boot-strapping of netty for the web-frontend.
Expand Down Expand Up @@ -77,7 +78,7 @@ public WebFrontendBootstrap(

@Override
protected void initChannel(SocketChannel ch) {
Handler handler = new Handler(WebFrontendBootstrap.this.router);
RouterHandler handler = new RouterHandler(WebFrontendBootstrap.this.router, new HashMap<>());

// SSL should be the first handler in the pipeline
if (serverSSLContext != null) {
Expand All @@ -91,7 +92,7 @@ protected void initChannel(SocketChannel ch) {
.addLast(new HttpServerCodec())
.addLast(new ChunkedWriteHandler())
.addLast(new HttpRequestHandler(uploadDir))
.addLast(handler.name(), handler)
.addLast(handler.getName(), handler)
.addLast(new PipelineErrorHandler(WebFrontendBootstrap.this.log));
}
};
Expand Down
Expand Up @@ -22,7 +22,10 @@
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.concurrent.FutureUtils;
import org.apache.flink.runtime.rest.handler.RedirectHandler;
import org.apache.flink.runtime.rest.handler.router.RoutedRequest;
import org.apache.flink.runtime.rest.handler.router.Router;
import org.apache.flink.runtime.rest.handler.util.HandlerRedirectUtils;
import org.apache.flink.runtime.rest.handler.util.KeepAliveWrite;
import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
import org.apache.flink.runtime.webmonitor.testutils.HttpTestClient;
import org.apache.flink.runtime.webmonitor.utils.WebFrontendBootstrap;
Expand All @@ -31,9 +34,6 @@
import org.apache.flink.shaded.netty4.io.netty.channel.ChannelHandlerContext;
import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponse;
import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus;
import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.router.KeepAliveWrite;
import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.router.Routed;
import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.router.Router;

import org.junit.Assert;
import org.junit.Test;
Expand Down Expand Up @@ -90,7 +90,7 @@ public void testRedirectHandler() throws Exception {
gatewayRetriever,
timeout);

router.GET(restPath, testingHandler);
router.addGet(restPath, testingHandler);
WebFrontendBootstrap bootstrap = new WebFrontendBootstrap(
router,
log,
Expand Down Expand Up @@ -148,10 +148,10 @@ protected TestingHandler(
}

@Override
protected void respondAsLeader(ChannelHandlerContext channelHandlerContext, Routed routed, RestfulGateway gateway) throws Exception {
protected void respondAsLeader(ChannelHandlerContext channelHandlerContext, RoutedRequest routedRequest, RestfulGateway gateway) throws Exception {
Assert.assertTrue(channelHandlerContext.channel().eventLoop().inEventLoop());
HttpResponse response = HandlerRedirectUtils.getResponse(HttpResponseStatus.OK, RESPONSE_MESSAGE);
KeepAliveWrite.flush(channelHandlerContext.channel(), routed.request(), response);
KeepAliveWrite.flush(channelHandlerContext.channel(), routedRequest.getRequest(), response);
}
}

Expand Down
Expand Up @@ -19,10 +19,9 @@
package org.apache.flink.runtime.webmonitor.history;

import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.rest.handler.router.Router;
import org.apache.flink.runtime.webmonitor.utils.WebFrontendBootstrap;

import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.router.Router;

import org.junit.Assert;
import org.junit.Rule;
import org.junit.Test;
Expand All @@ -43,7 +42,7 @@ public class HistoryServerStaticFileServerHandlerTest {
public void testRespondWithFile() throws Exception {
File webDir = tmp.newFolder("webDir");
Router router = new Router()
.GET("/:*", new HistoryServerStaticFileServerHandler(webDir));
.addGet("/:*", new HistoryServerStaticFileServerHandler(webDir));
WebFrontendBootstrap webUI = new WebFrontendBootstrap(
router,
LoggerFactory.getLogger(HistoryServerStaticFileServerHandlerTest.class),
Expand Down

0 comments on commit 4391c0c

Please sign in to comment.