Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

A Notebook submitter to run Jupyter Notebook inside cluster #13

Merged
merged 10 commits into from
Oct 2, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ jdk:
- oraclejdk8
env:
- HADOOP_VERSION=3.1.1
script: "./gradlew test --stacktrace --info && ./gradlew ciPerformRelease"
script: "./gradlew test --stacktrace --info"

# safelist
branches:
Expand Down
2 changes: 1 addition & 1 deletion settings.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ rootProject.name = 'tony'
*/
rootProject.name = 'tony'

def modules = ['tony-azkaban', 'tony-core', 'tony-cli', 'tony-mini']
def modules = ['tony-azkaban', 'tony-core', 'tony-cli', 'tony-mini', 'tony-proxy']

modules.each { module ->
if (!file(module).directory) {
Expand Down
1 change: 1 addition & 0 deletions tony-cli/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ apply plugin: 'com.github.johnrengelman.shadow'

dependencies {
compile project(':tony-core')
compile project(':tony-proxy')
}

shadowJar {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
* ClusterSubmitter is used to submit a distributed Tony
* job on the cluster.
*
* Example usage:
* Usage:
* java -cp tony-cli-x.x.x-all.jar com.linkedin.tony.cli.ClusterSubmitter
* --src_dir /Users/xxx/hadoop/li-tony_trunk/tony-core/src/test/resources/ \
* --executes /Users/xxx/hadoop/li-tony_trunk/tony/src/test/resources/exit_0_check_env.py \
Expand Down
123 changes: 123 additions & 0 deletions tony-cli/src/main/java/com/linkedin/tony/cli/NotebookSubmitter.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,123 @@
/**
* Copyright 2018 LinkedIn Corporation. All rights reserved. Licensed under the BSD-2 Clause license.
* See LICENSE in the project root for license information.
*/
package com.linkedin.tony.cli;

import com.linkedin.tony.Constants;
import com.linkedin.tony.TonyClient;
import com.linkedin.tony.TonyConfigurationKeys;
import com.linkedin.tony.Utils;
import com.linkedin.tony.rpc.TaskUrl;
import com.linkedin.tonyproxy.ProxyServer;
import java.io.File;
import java.io.IOException;
import java.io.InputStream;
import java.net.ServerSocket;
import java.net.URISyntaxException;
import java.net.URL;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.nio.file.StandardCopyOption;
import java.util.Arrays;
import java.util.UUID;
import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.GnuParser;
import org.apache.commons.cli.Options;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.yarn.exceptions.YarnException;

import static com.linkedin.tony.Constants.*;

/**
* NotebookSubmitter is used to submit a python pex file (for example Jupyter Notebook) to run inside a cluster.
*
* It would first kick off a container inside the cluster that matches the resource request
* (am GPU/Memory/CPU) and run the specified script inside that node. To make it easier for
* Jupyter Notebook, we also bake in a proxy server in the submitter which would automatically
* proxy the request to that node.
*
* Usage:
* // Suppose you have a folder named bin/ at root directory which contains a notebook pex file: linotebook, you can use
* // this command to start the notebook and follow the output message to visit the jupyter notebook page.
* CLASSPATH=$(${HADOOP_HDFS_HOME}/bin/hadoop classpath --glob):./:/home/khu/notebook/tony-cli-0.1.0-all.jar \
* java com.linkedin.tony.cli.NotebookSubmitter --src_dir bin/ --executes "'bin/linotebook --ip=* $DISABLE_TOKEN'"
*/
public class NotebookSubmitter {
oliverhu marked this conversation as resolved.
Show resolved Hide resolved
private static final Log LOG = LogFactory.getLog(NotebookSubmitter.class);

private NotebookSubmitter() { }

public static void main(String[] args) throws Exception {
LOG.info("Starting NotebookSubmitter..");
String jarPath = new File(NotebookSubmitter.class.getProtectionDomain().getCodeSource().getLocation().toURI()).getPath();
Options opts = Utils.getCommonOptions();
opts.addOption("conf", true, "User specified configuration, as key=val pairs");
opts.addOption("conf_file", true, "Name of user specified conf file, on the classpath");
opts.addOption("src_dir", true, "Name of directory of source files.");

int exitCode = 0;
Path cachedLibPath = null;
Configuration hdfsConf = new Configuration();
try (FileSystem fs = FileSystem.get(hdfsConf)) {
cachedLibPath = new Path(fs.getHomeDirectory(), TONY_FOLDER + Path.SEPARATOR + UUID.randomUUID().toString());
fs.mkdirs(cachedLibPath);
fs.copyFromLocalFile(new Path(jarPath), cachedLibPath);
LOG.info("Copying " + jarPath + " to: " + cachedLibPath);
} catch (IOException e) {
LOG.fatal("Failed to create FileSystem: ", e);
exitCode = -1;
}
if (cachedLibPath == null) {
System.exit(-1);
}

String[] updatedArgs = Arrays.copyOf(args, args.length + 4);
updatedArgs[args.length] = "--hdfs_classpath";
updatedArgs[args.length + 1] = cachedLibPath.toString();
updatedArgs[args.length + 2] = "--conf";
updatedArgs[args.length + 3] = TonyConfigurationKeys.APPLICATION_TIMEOUT + "=" + String.valueOf(24 * 60 * 60 * 1000);

TonyClient client = TonyClient.createClientInstance(updatedArgs, new Configuration());
if (client == null) {
System.exit(-1);
}
Thread clientThread = new Thread(() -> {
try {
client.run();
} catch (IOException | URISyntaxException | YarnException | InterruptedException e) {
LOG.error(e);
}
});
clientThread.start();
while (clientThread.isAlive()) {
if (client.getTaskUrls() != null) {
for (TaskUrl taskUrl : client.getTaskUrls()) {
if (taskUrl.getName().equals(Constants.NOTEBOOK_JOB_NAME)) {
String[] hostPort = taskUrl.getUrl().split(":");
ServerSocket localSocket = new ServerSocket(0);
int localPort = localSocket.getLocalPort();
localSocket.close();
ProxyServer server = new ProxyServer(hostPort[0], Integer.parseInt(hostPort[1]), localPort);
LOG.info("If you are running NotebookSubmitter in your local box, please open [localhost:"
+ String.valueOf(localPort) + "] in your browser to visit the page. Otherwise, if "
+ "you're running NotebookSubmitter in a remote machine (like a gateway), please run"
+ " [ssh -L 18888:localhost:" + String.valueOf(localPort)
+ " name_of_this_host] in your laptop and open [localhost:18888] in your browser to "
+ "visit Jupyter Notebook. If the 18888 port is occupied, replace that number with another number.");
server.start();
break;
}
}
}
oliverhu marked this conversation as resolved.
Show resolved Hide resolved
Thread.sleep(1000);
}
clientThread.join();
System.exit(exitCode);

}
}
5 changes: 3 additions & 2 deletions tony-core/src/main/java/com/linkedin/tony/Constants.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,6 @@ public class Constants {
// Distributed TensorFlow job name, e.g. "ps" or "worker",
// as per https://www.tensorflow.org/deploy/distributed
public static final String JOB_NAME = "JOB_NAME";
public static final String PRE_JOB_SHELL = "PRE_JOB_SHELL";
public static final String POST_JOB_SHELL = "POST_JOB_SHELL";
public static final String SESSION_ID = "SESSION_ID";
public static final String PREPROCESSING_JOB = "PREPROCESSING_JOB";
public static final String PY4JGATEWAY = "PY4J_GATEWAY_PORT";
Expand Down Expand Up @@ -48,6 +46,9 @@ public class Constants {

public static final String WORKER_JOB_NAME = "worker";
public static final String PS_JOB_NAME = "ps";
public static final String NOTEBOOK_JOB_NAME = "notebook";
public static final String DRIVER_JOB_NAME = "driver";

public static final String ATTEMPT_NUMBER = "ATTEMPT_NUMBER";

public static final String TEST_AM_CRASH = "TEST_AM_CRASH";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@
public class TonyApplicationMaster {
private static final Log LOG = LogFactory.getLog(TonyApplicationMaster.class);

private ApplicationAttemptId appAttemptID;
private ApplicationAttemptId appAttemptID = null;
private String appIdString;
private String amHostPort;

Expand All @@ -95,6 +95,7 @@ public class TonyApplicationMaster {
private ByteBuffer allTokens;
private Map<String, LocalResource> localResources = new ConcurrentHashMap<>();
private Configuration tonyConf = new Configuration();
private ContainerId containerId;

// The environment set up for the TaskExecutor
private Map<String, String> containerEnv = new ConcurrentHashMap<>();
Expand Down Expand Up @@ -140,11 +141,13 @@ public class TonyApplicationMaster {
private boolean singleNode;
private boolean preprocessFinished = false;
private int preprocessExitCode = 0;
private String proxyUrl;

// Preprocessing job
private boolean enablePreprocessing = false;

// Lifecycle control
private long appTimeout;
private boolean shouldExit = false;

// HeartBeat monitor
Expand Down Expand Up @@ -224,6 +227,8 @@ private boolean init(String[] args) {
cliParser.getOptionValue("executes"),
cliParser.getOptionValue("task_params"));

appTimeout = tonyConf.getInt(TonyConfigurationKeys.APPLICATION_TIMEOUT,
TonyConfigurationKeys.DEFAULT_APPLICATION_TIMEOUT);
workerTimeout = tonyConf.getInt(TonyConfigurationKeys.WORKER_TIMEOUT,
TonyConfigurationKeys.DEFAULT_WORKER_TIMEOUT);
hdfsClasspath = cliParser.getOptionValue("hdfs_classpath");
Expand All @@ -239,7 +244,7 @@ private boolean init(String[] args) {
TonyConfigurationKeys.DEFAULT_SECURITY_ENABLED);
enablePreprocessing = tonyConf.getBoolean(TonyConfigurationKeys.ENABLE_PREPROCESSING_JOB,
TonyConfigurationKeys.DEFAULT_ENABLE_PREPROCESSING_JOB);
ContainerId containerId = ContainerId.fromString(envs.get(ApplicationConstants.Environment.CONTAINER_ID.name()));
containerId = ContainerId.fromString(envs.get(ApplicationConstants.Environment.CONTAINER_ID.name()));
appIdString = containerId.getApplicationAttemptId().getApplicationId().toString();
hbInterval = tonyConf.getInt(TonyConfigurationKeys.TASK_HEARTBEAT_INTERVAL_MS,
TonyConfigurationKeys.DEFAULT_TASK_HEARTBEAT_INTERVAL_MS);
Expand All @@ -251,9 +256,11 @@ private boolean init(String[] args) {
@VisibleForTesting
static String buildBaseTaskCommand(String pythonVenvZip, String pythonBinaryPath, String script,
String taskParams) {
String pythonInterpreter;
String pythonInterpreter = "";
if (pythonVenvZip == null || pythonBinaryPath.startsWith("/")) {
pythonInterpreter = pythonBinaryPath;
if (pythonBinaryPath != null) {
pythonInterpreter = pythonBinaryPath;
}
} else {
// Note that we always extract the Python venv zip to a "venv" (Constants.PYTHON_VENV_DIR) directory.
pythonInterpreter = Constants.PYTHON_VENV_DIR + File.separatorChar + pythonBinaryPath;
Expand Down Expand Up @@ -472,7 +479,20 @@ private boolean monitor() {

int attempt = 0;
containerEnv.put(Constants.ATTEMPT_NUMBER, String.valueOf(attempt));
long expireTime = appTimeout == 0 ? Long.MAX_VALUE : System.currentTimeMillis() + appTimeout;
while (true) {
// Checking timeout
if (System.currentTimeMillis() > expireTime) {
LOG.error("Application times out.");
return false;
}

// Check if client signals we should exit.
if (shouldExit) {
LOG.info("Client signals AM to exit.");
return true;
}

if (preprocessExitCode != 0) {
LOG.info("Preprocess failed with exit code: " + preprocessExitCode);
return false;
Expand Down Expand Up @@ -619,6 +639,7 @@ private int doPreprocessingJob() throws Exception {
int tbPort = tbSocket.getLocalPort();
extraEnv.put(Constants.TB_PORT, String.valueOf(tbPort));
String tbUrl = Utils.getCurrentHostName() + ":" + tbPort;
proxyUrl = tbUrl;
LOG.info("Registering tensorboard url for single node training: " + tbUrl);
registerTensorBoardUrlToRM(tbUrl);
tbSocket.close();
Expand All @@ -632,6 +653,13 @@ private int doPreprocessingJob() throws Exception {
}

extraEnv.put(Constants.PREPROCESSING_JOB, "true");

/**
YARN sets $HOME to /user/yarn which users don't have access to write there.
Unfortunately, some services like Jupyter Notebook wants to write stuff there,
set it to user.dir (root of this container's address).
*/
extraEnv.put("HOME", System.getProperty("user.dir"));
oliverhu marked this conversation as resolved.
Show resolved Hide resolved
extraEnv.put(Constants.PY4JGATEWAY, String.valueOf(gatewayServerPort));
String taskCommand = baseTaskCommand;
LOG.info("Executing command: " + taskCommand);
Expand Down Expand Up @@ -691,7 +719,19 @@ public void reset() {

@Override
public Set<TaskUrl> getTaskUrls() {
if (session != null && session.allTasksScheduled()) {
LOG.info("Client requesting TaskUrls!");

// Special handling for NotebookSubmitter.
if (singleNode && proxyUrl != null) {
HashSet<TaskUrl> additionalTasks = new HashSet<>();
additionalTasks.add(new TaskUrl(Constants.DRIVER_JOB_NAME, "0", Utils.constructContainerUrl(
Utils.getCurrentHostName() + ":"
+ System.getenv(ApplicationConstants.Environment.NM_HTTP_PORT.name()), containerId)));
additionalTasks.add(new TaskUrl(Constants.NOTEBOOK_JOB_NAME, "0", proxyUrl));
return additionalTasks;
}

if (!singleNode && session != null && session.allTasksScheduled()) {
return session.getTFTasks().values().stream()
.flatMap(tasks -> Arrays.stream(tasks).map(TFTask::getTaskUrl))
.collect(Collectors.toSet());
Expand Down
Loading