Skip to content

Commit

Permalink
[FLINK-4918] add SSL handler to artifact server
Browse files Browse the repository at this point in the history
This closes apache#2734.
This closes apache#2900.
  • Loading branch information
vijikarthi authored and joseprupi committed Feb 12, 2017
1 parent 8e00b3a commit 311feb8
Show file tree
Hide file tree
Showing 5 changed files with 61 additions and 18 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -497,6 +497,11 @@ public final class ConfigConstants {
public static final String MESOS_RESOURCEMANAGER_TASKS_CONTAINER_IMAGE_NAME =
"mesos.resourcemanager.tasks.container.image.name";

/**
* Config parameter to override SSL support for the Artifact Server
*/
public static final String MESOS_ARTIFACT_SERVER_SSL_ENABLED = "mesos.resourcemanager.artifactserver.ssl.enabled";

/**
* The type of container to use for task managers. Valid values are
* {@code MESOS_RESOURCEMANAGER_TASKS_CONTAINER_TYPE_MESOS} or
Expand Down Expand Up @@ -1181,6 +1186,9 @@ public final class ConfigConstants {

public static final String DEFAULT_MESOS_RESOURCEMANAGER_FRAMEWORK_ROLE = "*";

/** Default value to override SSL support for the Artifact Server */
public static final boolean DEFAULT_MESOS_ARTIFACT_SERVER_SSL_ENABLED = true;

public static final String DEFAULT_MESOS_RESOURCEMANAGER_TASKS_CONTAINER_IMAGE_TYPE = "mesos";

// ------------------------ File System Behavior ------------------------
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@
import org.apache.flink.runtime.jobmanager.MemoryArchivist;
import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
import org.apache.flink.runtime.process.ProcessReaper;
import org.apache.flink.runtime.security.SecurityContext;
import org.apache.flink.runtime.security.SecurityUtils;
import org.apache.flink.runtime.taskmanager.TaskManager;
import org.apache.flink.runtime.util.EnvironmentInformation;
import org.apache.flink.runtime.util.Hardware;
Expand All @@ -70,6 +70,7 @@
import java.net.UnknownHostException;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
Expand Down Expand Up @@ -145,16 +146,15 @@ protected int run(String[] args) {

final Configuration configuration = createConfiguration(workingDir, dynamicProperties);

SecurityContext.SecurityConfiguration sc = new SecurityContext.SecurityConfiguration();
sc.setFlinkConfiguration(configuration);
SecurityUtils.SecurityConfiguration sc = new SecurityUtils.SecurityConfiguration(configuration);
sc.setHadoopConfiguration(HadoopUtils.getHadoopConfiguration());
SecurityContext.install(sc);
SecurityUtils.install(sc);

LOG.info("Running Flink as user {}", UserGroupInformation.getCurrentUser().getShortUserName());

return SecurityContext.getInstalled().runSecured(new SecurityContext.FlinkSecuredRunner<Integer>() {
return SecurityUtils.getInstalledContext().runSecured(new Callable<Integer>() {
@Override
public Integer run() {
public Integer call() {
return runPrivileged(configuration);
}
});
Expand Down Expand Up @@ -279,7 +279,7 @@ protected int runPrivileged(Configuration config) {
LOG.debug("Starting Artifact Server");
final int artifactServerPort = config.getInteger(ConfigConstants.MESOS_ARTIFACT_SERVER_PORT_KEY,
ConfigConstants.DEFAULT_MESOS_ARTIFACT_SERVER_PORT);
artifactServer = new MesosArtifactServer(sessionID, akkaHostname, artifactServerPort);
artifactServer = new MesosArtifactServer(sessionID, akkaHostname, artifactServerPort, config);

// ----------------- (3) Generate the configuration for the TaskManagers -------------------

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,13 +21,14 @@
import java.io.File;
import java.io.IOException;
import java.util.Map;
import java.util.concurrent.Callable;

import org.apache.flink.api.java.hadoop.mapred.utils.HadoopUtils;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.mesos.cli.FlinkMesosSessionCli;
import org.apache.flink.runtime.clusterframework.types.ResourceID;
import org.apache.flink.runtime.security.SecurityContext;
import org.apache.flink.runtime.security.SecurityUtils;
import org.apache.flink.runtime.taskmanager.TaskManager;
import org.apache.flink.runtime.util.EnvironmentInformation;

Expand Down Expand Up @@ -114,19 +115,18 @@ else if (tmpDirs != null) {
String hadoopConfDir = envs.get(MesosConfigKeys.ENV_HADOOP_CONF_DIR);
LOG.info("hadoopConfDir: {}", hadoopConfDir);

SecurityContext.SecurityConfiguration sc = new SecurityContext.SecurityConfiguration();
sc.setFlinkConfiguration(configuration);
SecurityUtils.SecurityConfiguration sc = new SecurityUtils.SecurityConfiguration(configuration);
if(hadoopConfDir != null && hadoopConfDir.length() != 0) {
sc.setHadoopConfiguration(HadoopUtils.getHadoopConfiguration());
}

try {
SecurityContext.install(sc);
SecurityUtils.install(sc);
LOG.info("Mesos task runs as '{}', setting user to execute Flink TaskManager to '{}'",
UserGroupInformation.getCurrentUser().getShortUserName(), effectiveUsername);
SecurityContext.getInstalled().runSecured(new SecurityContext.FlinkSecuredRunner<Object>() {
SecurityUtils.getInstalledContext().runSecured(new Callable<Object>() {
@Override
public Object run() throws Exception {
public Object call() throws Exception {
TaskManager.selectNetworkInterfaceAndRunTaskManager(configuration, resourceId, taskManager);
return null;
}
Expand All @@ -138,4 +138,4 @@ public Object run() throws Exception {
}

}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -43,13 +43,20 @@
import io.netty.handler.codec.http.router.Handler;
import io.netty.handler.codec.http.router.Routed;
import io.netty.handler.codec.http.router.Router;
import io.netty.handler.ssl.SslHandler;
import io.netty.util.CharsetUtil;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.net.SSLUtils;
import org.jets3t.service.utils.Mimetypes;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import javax.net.ssl.SSLContext;
import javax.net.ssl.SSLEngine;
import java.io.File;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.RandomAccessFile;
import java.net.InetSocketAddress;
import java.net.MalformedURLException;
Expand Down Expand Up @@ -87,11 +94,31 @@ public class MesosArtifactServer {

private URL baseURL;

public MesosArtifactServer(String sessionID, String serverHostname, int configuredPort) throws Exception {
private final SSLContext serverSSLContext;

public MesosArtifactServer(String sessionID, String serverHostname, int configuredPort, Configuration config)
throws Exception {
if (configuredPort < 0 || configuredPort > 0xFFFF) {
throw new IllegalArgumentException("File server port is invalid: " + configuredPort);
}

// Config to enable https access to the web-ui
boolean enableSSL = config.getBoolean(
ConfigConstants.MESOS_ARTIFACT_SERVER_SSL_ENABLED,
ConfigConstants.DEFAULT_MESOS_ARTIFACT_SERVER_SSL_ENABLED) &&
SSLUtils.getSSLEnabled(config);

if (enableSSL) {
LOG.info("Enabling ssl for the artifact server");
try {
serverSSLContext = SSLUtils.createSSLServerContext(config);
} catch (Exception e) {
throw new IOException("Failed to initialize SSLContext for the artifact server", e);
}
} else {
serverSSLContext = null;
}

router = new Router();

ChannelInitializer<SocketChannel> initializer = new ChannelInitializer<SocketChannel>() {
Expand All @@ -100,6 +127,13 @@ public MesosArtifactServer(String sessionID, String serverHostname, int configur
protected void initChannel(SocketChannel ch) {
Handler handler = new Handler(router);

// SSL should be the first handler in the pipeline
if (serverSSLContext != null) {
SSLEngine sslEngine = serverSSLContext.createSSLEngine();
sslEngine.setUseClientMode(false);
ch.pipeline().addLast("ssl", new SslHandler(sslEngine));
}

ch.pipeline()
.addLast(new HttpServerCodec())
.addLast(handler.name(), handler)
Expand All @@ -123,9 +157,11 @@ protected void initChannel(SocketChannel ch) {
String address = bindAddress.getAddress().getHostAddress();
int port = bindAddress.getPort();

baseURL = new URL("http", serverHostname, port, "/" + sessionID + "/");
String httpProtocol = (serverSSLContext != null) ? "https": "http";

baseURL = new URL(httpProtocol, serverHostname, port, "/" + sessionID + "/");

LOG.info("Mesos artifact server listening at {}:{}", address, port);
LOG.info("Mesos Artifact Server Base URL: {}, listening at {}:{}", baseURL, address, port);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
package org.apache.flink.runtime.security;

import java.util.concurrent.Callable;
LOG.info("Hadoop security is enabled");

/**
* A security context with may be required to run a Callable.
Expand Down

0 comments on commit 311feb8

Please sign in to comment.