Skip to content

Commit

Permalink
Address comments
Browse files Browse the repository at this point in the history
  • Loading branch information
oliverhu committed Oct 2, 2018
1 parent ef35c19 commit eb5ee9e
Show file tree
Hide file tree
Showing 7 changed files with 47 additions and 21 deletions.
2 changes: 1 addition & 1 deletion .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ jdk:
- oraclejdk8
env:
- HADOOP_VERSION=3.1.1
script: "./gradlew test --stacktrace --info && ./gradlew ciPerformRelease"
script: "./gradlew test --stacktrace --info"

# safelist
branches:
Expand Down
2 changes: 1 addition & 1 deletion build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ subprojects {
// Set the year in the license
ext.year = Calendar.getInstance().get(Calendar.YEAR)
skipExistingHeaders = false
excludes(["com/linkedin/tony/rpc/proto/", "**/*.properties", "**/*.txt"])
excludes(["com/linkedin/tony/rpc/proto/", "**/*.properties"])
}
configurations {
hadoopRuntime.extendsFrom(runtime)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
* ClusterSubmitter is used to submit a distributed Tony
* job on the cluster.
*
* Example usage:
* Usage:
* java -cp tony-cli-x.x.x-all.jar com.linkedin.tony.cli.ClusterSubmitter
* --src_dir /Users/xxx/hadoop/li-tony_trunk/tony-core/src/test/resources/ \
* --executes /Users/xxx/hadoop/li-tony_trunk/tony/src/test/resources/exit_0_check_env.py \
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,20 @@

import static com.linkedin.tony.Constants.*;


/**
* NotebookSubmitter is used to submit a python pex file (for example Jupyter Notebook) to run inside a cluster.
*
* It would first kick off a container inside the cluster that matches the resource request
* (am GPU/Memory/CPU) and run the specified script inside that node. To make it easier for
* Jupyter Notebook, we also bake in a proxy server in the submitter which would automatically
* proxy the request to that node.
*
* Usage:
* // Suppose you have a folder named bin/ at root directory which contains a notebook pex file: linotebook, you can use
* // this command to start the notebook and follow the output message to visit the jupyter notebook page.
* CLASSPATH=$(${HADOOP_HDFS_HOME}/bin/hadoop classpath --glob):./:/home/khu/notebook/tony-cli-0.1.0-all.jar \
* java com.linkedin.tony.cli.NotebookSubmitter --src_dir bin/ --executes "'bin/linotebook --ip=* $DISABLE_TOKEN'"
*/
public class NotebookSubmitter {
private static final Log LOG = LogFactory.getLog(NotebookSubmitter.class);

Expand Down Expand Up @@ -82,17 +95,20 @@ public static void main(String[] args) throws Exception {
});
clientThread.start();
while (clientThread.isAlive()) {
if (client.taskUrls != null) {
for (TaskUrl taskUrl : client.taskUrls) {
if (client.getTaskUrls() != null) {
for (TaskUrl taskUrl : client.getTaskUrls()) {
if (taskUrl.getName().equals(Constants.NOTEBOOK_JOB_NAME)) {
String[] hostPort = taskUrl.getUrl().split(":");
ServerSocket localSocket = new ServerSocket(0);
int localPort = localSocket.getLocalPort();
localSocket.close();
ProxyServer server = new ProxyServer(hostPort[0], Integer.parseInt(hostPort[1]), localPort);
LOG.info("Please run [ssh -L 18888:localhost:" + String.valueOf(localPort)
LOG.info("If you are running NotebookSubmitter in your local box, please open [localhost:"
+ String.valueOf(localPort) + "] in your browser to visit the page. Otherwise, if "
+ "you're running NotebookSubmitter in a remote machine (like a gateway), please run"
+ " [ssh -L 18888:localhost:" + String.valueOf(localPort)
+ " name_of_this_host] in your laptop and open [localhost:18888] in your browser to "
+ "visit Jupyter Notebook. If the 18888 port is occupied, replace that number with another number");
+ "visit Jupyter Notebook. If the 18888 port is occupied, replace that number with another number.");
server.start();
break;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -487,6 +487,12 @@ private boolean monitor() {
return false;
}

// Check if client signals we should exit.
if (shouldExit) {
LOG.info("Client signals AM to exit.");
return true;
}

if (preprocessExitCode != 0) {
LOG.info("Preprocess failed with exit code: " + preprocessExitCode);
return false;
Expand Down
30 changes: 17 additions & 13 deletions tony-core/src/main/java/com/linkedin/tony/TonyClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@
package com.linkedin.tony;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.UnmodifiableIterator;
import com.linkedin.tony.rpc.TaskUrl;
import com.linkedin.tony.rpc.impl.ApplicationRpcClient;
import java.io.File;
Expand Down Expand Up @@ -114,8 +116,7 @@ public class TonyClient {
private int maxHbMisses;

// For access from CLI.
public Set<TaskUrl> taskUrls = new HashSet<>();

private Set<TaskUrl> taskUrls = new HashSet<>();

public TonyClient() {
this(new Configuration(false));
Expand All @@ -126,6 +127,10 @@ public TonyClient(Configuration conf) {
tonyConf = conf;
}

public ImmutableSet<TaskUrl> getTaskUrls() {
return ImmutableSet.copyOf(taskUrls);
}

public boolean run() throws IOException, InterruptedException, URISyntaxException, YarnException {
LOG.info("Starting client..");
yarnClient.start();
Expand Down Expand Up @@ -594,7 +599,16 @@ private boolean monitorApplication(ApplicationId appId)
YarnApplicationState state = report.getYarnApplicationState();
FinalApplicationStatus dsStatus = report.getFinalApplicationStatus();
initRpcClient(report);
getApplicationTaskUrls();

// Query AM for taskUrls if taskUrls is empty.
if (amRpcServerInitialized && taskUrls.isEmpty()) {
taskUrls = amRpcClient.getTaskUrls();
if (!taskUrls.isEmpty()) {
// Print TaskUrls
taskUrls.forEach(task -> Utils.printTaskUrl(task, LOG));
}
}

if (YarnApplicationState.FINISHED == state) {
if (FinalApplicationStatus.SUCCEEDED == dsStatus) {
LOG.info("Application has completed successfully. "
Expand Down Expand Up @@ -646,16 +660,6 @@ private void addClientToAMTokenToUGI(ApplicationReport report) throws IOExceptio
}
}

private void getApplicationTaskUrls() throws IOException, YarnException {
if (amRpcServerInitialized && taskUrls.isEmpty()) {
taskUrls = amRpcClient.getTaskUrls();
if (!taskUrls.isEmpty()) {
// Print TaskUrls
taskUrls.forEach(task -> Utils.printTaskUrl(task, LOG));
}
}
}

/**
* Kill a submitted application by sending a call to the ASM
* @param appId Application Id to be killed.
Expand Down
Empty file.

0 comments on commit eb5ee9e

Please sign in to comment.