Skip to content

Commit

Permalink
Address comments & clean ups
Browse files Browse the repository at this point in the history
  • Loading branch information
oliverhu committed Sep 28, 2018
1 parent 1fc98fd commit eaa5d28
Show file tree
Hide file tree
Showing 5 changed files with 91 additions and 82 deletions.
29 changes: 19 additions & 10 deletions tony-cli/src/main/java/com/linkedin/tony/cli/NotebookSubmitter.java
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,11 @@
*/
package com.linkedin.tony.cli;

import com.linkedin.tony.Constants;
import com.linkedin.tony.TonyClient;
import com.linkedin.tony.TonyConfigurationKeys;
import com.linkedin.tony.Utils;
import com.linkedin.tony.rpc.TaskUrl;
import com.linkedin.tonyproxy.ProxyServer;
import java.io.File;
import java.io.IOException;
Expand Down Expand Up @@ -65,7 +67,7 @@ public static void main(String[] args) throws Exception {
updatedArgs[args.length] = "--hdfs_classpath";
updatedArgs[args.length + 1] = cachedLibPath.toString();
updatedArgs[args.length + 2] = "--conf";
updatedArgs[args.length + 3] = TonyConfigurationKeys.APPLICATION_TIMEOUT + "=" + String.valueOf(24 * 3600 *1000);
updatedArgs[args.length + 3] = TonyConfigurationKeys.APPLICATION_TIMEOUT + "=" + String.valueOf(24 * 60 * 60 * 1000);

TonyClient client = TonyClient.createClientInstance(updatedArgs, new Configuration());
if (client == null) {
Expand All @@ -80,16 +82,23 @@ public static void main(String[] args) throws Exception {
});
clientThread.start();
while (clientThread.isAlive()) {
if (client.notebookUrl != null) {
String[] hostPort = client.notebookUrl.split(":");
ServerSocket localSocket = new ServerSocket(0);
int localPort = localSocket.getLocalPort();
localSocket.close();
ProxyServer server = new ProxyServer(hostPort[0], Integer.parseInt(hostPort[1]), localPort);
LOG.info("Please run [ssh -L 8080:localhost:" + String.valueOf(localPort) + " name_of_this_host] in your laptop and open [localhost:8080] in your browser to visit Notebook");
server.start();
break;
if (client.taskUrls != null) {
for (TaskUrl taskUrl : client.taskUrls) {
if (taskUrl.getName().equals(Constants.NOTEBOOK_JOB_NAME)) {
String[] hostPort = taskUrl.getUrl().split(":");
ServerSocket localSocket = new ServerSocket(0);
int localPort = localSocket.getLocalPort();
localSocket.close();
ProxyServer server = new ProxyServer(hostPort[0], Integer.parseInt(hostPort[1]), localPort);
LOG.info("Please run [ssh -L 18888:localhost:" + String.valueOf(localPort)
+ " name_of_this_host] in your laptop and open [localhost:18888] in your browser to "
+ "visit Jupyter Notebook. If the 18888 port is occupied, replace that number with another number");
server.start();
break;
}
}
}
Thread.sleep(1000);
}
clientThread.join();
System.exit(exitCode);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@
public class TonyApplicationMaster {
private static final Log LOG = LogFactory.getLog(TonyApplicationMaster.class);

private ApplicationAttemptId appAttemptID;
private ApplicationAttemptId appAttemptID = null;
private String appIdString;
private String amHostPort;

Expand Down Expand Up @@ -151,6 +151,7 @@ public class TonyApplicationMaster {
private boolean enablePreprocessing = false;

// Lifecycle control
private long appTimeout;
private boolean shouldExit = false;

// HeartBeat monitor
Expand Down Expand Up @@ -226,6 +227,8 @@ private boolean init(String[] args) {

String psMemoryString = tonyConf.get(TonyConfigurationKeys.PS_MEMORY,
TonyConfigurationKeys.DEFAULT_PS_MEMORY);
appTimeout = tonyConf.getInt(TonyConfigurationKeys.APPLICATION_TIMEOUT,
TonyConfigurationKeys.DEFAULT_APPLICATION_TIMEOUT);
psMemory = Long.parseLong(Utils.parseMemoryString(psMemoryString));
psVCores = tonyConf.getInt(TonyConfigurationKeys.PS_VCORES,
TonyConfigurationKeys.DEFAULT_PS_VCORES);
Expand Down Expand Up @@ -512,7 +515,14 @@ private boolean monitor() {

int attempt = 0;
containerEnv.put(Constants.ATTEMPT_NUMBER, String.valueOf(attempt));
long expireTime = System.currentTimeMillis() + appTimeout;
while (true) {
// Checking timeout
if (System.currentTimeMillis() > expireTime) {
LOG.error("Application times out.");
return false;
}

if (preprocessExitCode != 0) {
LOG.info("Preprocess failed with exit code: " + preprocessExitCode);
return false;
Expand Down Expand Up @@ -675,6 +685,12 @@ private int doPreprocessingJob() throws Exception {
}

extraEnv.put(Constants.PREPROCESSING_JOB, "true");

/**
YARN sets $HOME to /user/yarn which users don't have access to write there.
Unfortunately, some services like Jupyter Notebook wants to write stuff there,
set it to user.dir (root of this container's address).
*/
extraEnv.put("HOME", System.getProperty("user.dir"));
extraEnv.put(Constants.PY4JGATEWAY, String.valueOf(gatewayServerPort));
String taskCommand = baseTaskCommand;
Expand Down Expand Up @@ -737,12 +753,12 @@ public void reset() {
public Set<TaskUrl> getTaskUrls() {
LOG.info("Client requesting TaskUrls!");
if (singleNode && proxyUrl != null) {
return new HashSet<TaskUrl>(){{
add(new TaskUrl(Constants.DRIVER_JOB_NAME, "0", Utils.constructContainerUrl(
HashSet<TaskUrl> additionalTasks = new HashSet<>();
additionalTasks.add(new TaskUrl(Constants.DRIVER_JOB_NAME, "0", Utils.constructContainerUrl(
Utils.getCurrentHostName() + ":"
+ System.getenv(ApplicationConstants.Environment.NM_HTTP_PORT.name()), containerId)));
add(new TaskUrl(Constants.NOTEBOOK_JOB_NAME, "0", proxyUrl));
}};
additionalTasks.add(new TaskUrl(Constants.NOTEBOOK_JOB_NAME, "0", proxyUrl));
return additionalTasks;
}

if (!singleNode && session != null && session.allTasksScheduled()) {
Expand Down
37 changes: 15 additions & 22 deletions tony-core/src/main/java/com/linkedin/tony/TonyClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
Expand Down Expand Up @@ -88,7 +89,6 @@ public class TonyClient {
private int amRpcPort;
private boolean amRpcServerInitialized = false;
private ApplicationRpcClient amRpcClient;
private boolean hasPrintedTaskUrls = false;

private String hdfsConfAddress = null;
private String yarnConfAddress = null;
Expand All @@ -105,15 +105,17 @@ public class TonyClient {
private String srcDir;
private Map<String, String> shellEnv = new HashMap<>();
private Map<String, String> containerEnv = new HashMap<>();

private static final String ARCHIVE_PATH = "tf_archive.zip";
private Configuration tonyConf;
private final long clientStartTime = System.currentTimeMillis();
private Path appResourcesPath;
private int hbInterval;
private int maxHbMisses;

// Used to expose notebookUrl for TonyCLI
public String notebookUrl;
// For access from CLI.
public Set<TaskUrl> taskUrls = new HashSet<>();


public TonyClient() {
this(new Configuration(false));
Expand Down Expand Up @@ -343,6 +345,7 @@ private boolean init(String[] args) throws ParseException {
String[] containerEnvs = cliParser.getOptionValues("container_env");
containerEnv.putAll(Utils.parseKeyValue(containerEnvs));
}
createYarnClient();
return true;
}

Expand Down Expand Up @@ -615,7 +618,7 @@ private boolean monitorApplication(ApplicationId appId)
YarnApplicationState state = report.getYarnApplicationState();
FinalApplicationStatus dsStatus = report.getFinalApplicationStatus();
initRpcClient(report);
printTaskUrls();
getApplicationTaskUrls();
if (YarnApplicationState.FINISHED == state) {
if (FinalApplicationStatus.SUCCEEDED == dsStatus) {
LOG.info("Application has completed successfully. "
Expand All @@ -627,8 +630,7 @@ private boolean monitorApplication(ApplicationId appId)
+ ". Breaking monitoring loop : ApplicationId:" + appId.getId());
return false;
}
} else if (YarnApplicationState.KILLED == state
|| YarnApplicationState.FAILED == state) {
} else if (YarnApplicationState.KILLED == state || YarnApplicationState.FAILED == state) {
LOG.info("Application did not finish."
+ " YarnState=" + state.toString() + ", DSFinalStatus=" + dsStatus.toString()
+ ". Breaking monitoring loop : ApplicationId:" + appId.getId());
Expand Down Expand Up @@ -668,22 +670,16 @@ private void addClientToAMTokenToUGI(ApplicationReport report) throws IOExceptio
}
}

private void printTaskUrls() throws IOException, YarnException {
if (amRpcServerInitialized && !hasPrintedTaskUrls) {
Set<TaskUrl> taskUrls = amRpcClient.getTaskUrls();
private void getApplicationTaskUrls() throws IOException, YarnException {
if (amRpcServerInitialized && taskUrls.isEmpty()) {
taskUrls = amRpcClient.getTaskUrls();
if (!taskUrls.isEmpty()) {
new TreeSet<TaskUrl>(taskUrls).forEach(task -> {
if (task.getName().equals(Constants.NOTEBOOK_JOB_NAME)) {
notebookUrl = task.getUrl();
}
Utils.printTaskUrl(task, LOG);
});
hasPrintedTaskUrls = true;
// Print TaskUrls
taskUrls.forEach(task -> Utils.printTaskUrl(task, LOG));
}
}
}


/**
* Kill a submitted application by sending a call to the ASM
* @param appId Application Id to be killed.
Expand Down Expand Up @@ -717,7 +713,6 @@ public static TonyClient createClientInstance(String[] args, Configuration conf)
TonyClient client;
client = new TonyClient(conf);
boolean sanityCheck = client.init(args);
client.createYarnClient();
if (!sanityCheck) {
LOG.fatal("Failed to init client.");
return null;
Expand All @@ -734,10 +729,8 @@ public static int start(String[] args, Configuration conf) {
boolean result = false;
TonyClient client = null;
try {
client = new TonyClient(conf);
boolean sanityCheck = client.init(args);
client.createYarnClient();
if (!sanityCheck) {
client = createClientInstance(args, conf);
if (client == null) {
LOG.fatal("Failed to init client.");
System.exit(-1);
}
Expand Down
20 changes: 14 additions & 6 deletions tony-core/src/test/java/com/linkedin/tony/TestUtils.java
Original file line number Diff line number Diff line change
Expand Up @@ -54,13 +54,21 @@ public void testIsArchive() {
}

@Test
public void testRenameFile() throws IOException {
public void testIsNotArchive() {
ClassLoader classLoader = getClass().getClassLoader();
File file1 = new File(classLoader.getResource("file_move.txt").getFile());
boolean result = Utils.renameFile(file1.getAbsolutePath(),
file1.getAbsolutePath() + "bak");
assertTrue(Files.exists(Paths.get(file1.getAbsolutePath() + "bak")));
File file1 = new File(classLoader.getResource("exit_0.py").getFile());
assertFalse(Utils.isArchive(file1.getAbsolutePath()));
}


@Test
public void testRenameFile() throws IOException {
File tempFile = File.createTempFile("testRenameFile-", "-suffix");
tempFile.deleteOnExit();
boolean result = Utils.renameFile(tempFile.getAbsolutePath(),
tempFile.getAbsolutePath() + "bak");
assertTrue(Files.exists(Paths.get(tempFile.getAbsolutePath() + "bak")));
assertTrue(result);
Files.deleteIfExists(Paths.get(file1.getAbsolutePath() + "bak"));
Files.deleteIfExists(Paths.get(tempFile.getAbsolutePath() + "bak"));
}
}
61 changes: 22 additions & 39 deletions tony-proxy/src/main/java/com/linkedin/tonyproxy/ProxyServer.java
Original file line number Diff line number Diff line change
Expand Up @@ -23,20 +23,20 @@
public class ProxyServer {

private static final Log LOG = LogFactory.getLog(ProxyServer.class);
private String host;
private String remoteHost;
private int remotePort;
private int localPort;
public ProxyServer(String host, int remotePort, int localPort) {
this.host = host;
public ProxyServer(String remoteHost, int remotePort, int localPort) {
this.remoteHost = remoteHost;
this.remotePort = remotePort;
this.localPort = localPort;
}
public void start() throws IOException {
LOG.info("Starting proxy for " + host + ":" + remotePort
LOG.info("Starting proxy for " + remoteHost + ":" + remotePort
+ " on port " + localPort);
ServerSocket server = new ServerSocket(localPort);
while (true) {
new Proxy(server.accept(), host, remotePort).start();
new Proxy(server.accept(), remoteHost, remotePort).start();
}
}

Expand All @@ -52,57 +52,40 @@ static class Proxy extends Thread {

@Override
public void run() {
try {
try (Socket server = new Socket(serverHost, serverPort);
final InputStream inFromClient = clientSocket.getInputStream();
final OutputStream outToClient = clientSocket.getOutputStream();
final InputStream inFromServer = server.getInputStream();
final OutputStream outToServer = server.getOutputStream();
) {
final byte[] request = new byte[1024];
byte[] reply = new byte[4096];
final InputStream inFromClient = clientSocket.getInputStream();
final OutputStream outToClient = clientSocket.getOutputStream();
Socket server;
try {
server = new Socket(serverHost, serverPort);
} catch (IOException e) {
PrintWriter out = new PrintWriter(new OutputStreamWriter(
outToClient));
out.flush();
throw new RuntimeException(e);
}
final InputStream inFromServer = server.getInputStream();
final OutputStream outToServer = server.getOutputStream();
new Thread(() -> {
int bytes_read;
try {
while ((bytes_read = inFromClient.read(request)) != -1) {
outToServer.write(request, 0, bytes_read);
outToServer.flush();
}
} catch (IOException e) {
LOG.error(e);
}
try {
outToServer.close();
} catch (IOException e) {
e.printStackTrace();
LOG.error(e);
}
}).start();
int bytes_read;
try {
while ((bytes_read = inFromServer.read(reply)) != -1) {
outToClient.write(reply, 0, bytes_read);
outToClient.flush();
}
} catch (IOException e) {
e.printStackTrace();
} finally {
try {
server.close();
} catch (IOException e) {
e.printStackTrace();
}
while ((bytes_read = inFromServer.read(reply)) != -1) {
outToClient.write(reply, 0, bytes_read);
outToClient.flush();
}
outToClient.close();
clientSocket.close();
} catch (IOException e) {
e.printStackTrace();
throw new RuntimeException(e);
} finally {
try {
clientSocket.close();
} catch (IOException e) {
LOG.error(e);
}
}
}

Expand Down

0 comments on commit eaa5d28

Please sign in to comment.