Skip to content

Commit

Permalink
Merge pull request apache#16 from Leemoonsoo/py4jdocker
Browse files Browse the repository at this point in the history
Make python docker interpreter work using py4j
Thank you!!
  • Loading branch information
HyungSung committed Mar 18, 2017
2 parents 8a016c9 + e511ebe commit c9b195b
Show file tree
Hide file tree
Showing 6 changed files with 370 additions and 35 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,200 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.zeppelin.python;

import org.apache.zeppelin.interpreter.*;
import org.apache.zeppelin.scheduler.Scheduler;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.*;
import java.nio.file.Paths;
import java.util.Properties;
import java.util.regex.Matcher;
import java.util.regex.Pattern;

/**
* Helps run python interpreter on a docker container
*/
public class PythonDockerInterpreter extends Interpreter {
Logger logger = LoggerFactory.getLogger(PythonDockerInterpreter.class);
Pattern activatePattern = Pattern.compile("activate\\s*(.*)");
Pattern deactivatePattern = Pattern.compile("deactivate");
Pattern helpPattern = Pattern.compile("help");
private File zeppelinHome;

public PythonDockerInterpreter(Properties property) {
super(property);
}

@Override
public void open() {
if (System.getenv("ZEPPELIN_HOME") != null) {
zeppelinHome = new File(System.getenv("ZEPPELIN_HOME"));
} else {
zeppelinHome = Paths.get("..").toAbsolutePath().toFile();
}
}

@Override
public void close() {

}

@Override
public InterpreterResult interpret(String st, InterpreterContext context) {
File pythonScript = new File(getPythonInterpreter().getScriptPath());
InterpreterOutput out = context.out;

Matcher activateMatcher = activatePattern.matcher(st);
Matcher deactivateMatcher = deactivatePattern.matcher(st);
Matcher helpMatcher = helpPattern.matcher(st);

if (st == null || st.isEmpty() || helpMatcher.matches()) {
printUsage(out);
return new InterpreterResult(InterpreterResult.Code.SUCCESS);
} else if (activateMatcher.matches()) {
String image = activateMatcher.group(1);
pull(out, image);

// mount pythonscript dir
String mountPythonScript = "-v " +
pythonScript.getParentFile().getAbsolutePath() +
":/_zeppelin_tmp ";

// mount zeppelin dir
String mountPy4j = "-v " +
zeppelinHome.getAbsolutePath() +
":/_zeppelin ";

// set PYTHONPATH
String pythonPath = ":/_zeppelin/" + PythonInterpreter.ZEPPELIN_PY4JPATH + ":" +
":/_zeppelin/" + PythonInterpreter.ZEPPELIN_PYTHON_LIBS;

setPythonCommand("docker run -i --rm " +
mountPythonScript +
mountPy4j +
"-e PYTHONPATH=\"" + pythonPath + "\" " +
image +
" python /_zeppelin_tmp/" + pythonScript.getName());
restartPythonProcess();
out.clear();
return new InterpreterResult(InterpreterResult.Code.SUCCESS, "\"" + image + "\" activated");
} else if (deactivateMatcher.matches()) {
setPythonCommand(null);
restartPythonProcess();
return new InterpreterResult(InterpreterResult.Code.SUCCESS, "Deactivated");
} else {
return new InterpreterResult(InterpreterResult.Code.ERROR, "Not supported command: " + st);
}
}


public void setPythonCommand(String cmd) {
PythonInterpreter python = getPythonInterpreter();
python.setPythonCommand(cmd);
}

private void printUsage(InterpreterOutput out) {
try {
out.setType(InterpreterResult.Type.HTML);
out.writeResource("output_templates/docker_usage.html");
} catch (IOException e) {
logger.error("Can't print usage", e);
}
}

@Override
public void cancel(InterpreterContext context) {

}

@Override
public FormType getFormType() {
return FormType.NONE;
}

@Override
public int getProgress(InterpreterContext context) {
return 0;
}

/**
* Use python interpreter's scheduler.
* To make sure %python.docker paragraph and %python paragraph runs sequentially
*/
@Override
public Scheduler getScheduler() {
PythonInterpreter pythonInterpreter = getPythonInterpreter();
if (pythonInterpreter != null) {
return pythonInterpreter.getScheduler();
} else {
return null;
}
}

private void restartPythonProcess() {
PythonInterpreter python = getPythonInterpreter();
python.close();
python.open();
}

protected PythonInterpreter getPythonInterpreter() {
LazyOpenInterpreter lazy = null;
PythonInterpreter python = null;
Interpreter p = getInterpreterInTheSameSessionByClassName(PythonInterpreter.class.getName());

while (p instanceof WrappedInterpreter) {
if (p instanceof LazyOpenInterpreter) {
lazy = (LazyOpenInterpreter) p;
}
p = ((WrappedInterpreter) p).getInnerInterpreter();
}
python = (PythonInterpreter) p;

if (lazy != null) {
lazy.open();
}
return python;
}

public boolean pull(InterpreterOutput out, String image) {
int exit = 0;
try {
exit = runCommand(out, "docker", "pull", image);
} catch (IOException | InterruptedException e) {
logger.error(e.getMessage(), e);
throw new InterpreterException(e);
}
return exit == 0;
}

protected int runCommand(InterpreterOutput out, String... command)
throws IOException, InterruptedException {
ProcessBuilder builder = new ProcessBuilder(command);
builder.redirectErrorStream(true);
Process process = builder.start();
InputStream stdout = process.getInputStream();
BufferedReader br = new BufferedReader(new InputStreamReader(stdout));
String line;
while ((line = br.readLine()) != null) {
out.write(line + "\n");
}
int r = process.waitFor(); // Let the process finish.
return r;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,7 @@
import java.io.OutputStreamWriter;
import java.io.PipedInputStream;
import java.io.PipedOutputStream;
import java.net.ServerSocket;
import java.net.URISyntaxException;
import java.net.URL;
import java.net.*;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.Collection;
Expand Down Expand Up @@ -59,6 +57,7 @@
import org.slf4j.LoggerFactory;

import py4j.GatewayServer;
import py4j.commands.Command;

/**
* Python interpreter for Zeppelin.
Expand All @@ -78,7 +77,7 @@ public class PythonInterpreter extends Interpreter implements ExecuteResultHandl
private String py4jLibPath;
private String pythonLibPath;

private String pythonCommand = DEFAULT_ZEPPELIN_PYTHON;
private String pythonCommand;

private GatewayServer gatewayServer;
private DefaultExecutor executor;
Expand All @@ -95,11 +94,10 @@ public class PythonInterpreter extends Interpreter implements ExecuteResultHandl

Integer statementSetNotifier = new Integer(0);


public PythonInterpreter(Properties property) {
super(property);
try {
File scriptFile = File.createTempFile("zeppelin_python-", ".py");
File scriptFile = File.createTempFile("zeppelin_python-", ".py", new File("/tmp"));
scriptPath = scriptFile.getAbsolutePath();
} catch (IOException e) {
throw new InterpreterException(e);
Expand Down Expand Up @@ -128,6 +126,10 @@ private void createPythonScript() {
logger.info("File {} created", scriptPath);
}

public String getScriptPath() {
return scriptPath;
}

private void copyFile(File out, String sourceFile) {
ClassLoader classLoader = getClass().getClassLoader();
try {
Expand All @@ -141,7 +143,7 @@ private void copyFile(File out, String sourceFile) {
}
}

private void createGatewayServerAndStartScript() {
private void createGatewayServerAndStartScript() throws UnknownHostException {
createPythonScript();
if (System.getenv("ZEPPELIN_HOME") != null) {
py4jLibPath = System.getenv("ZEPPELIN_HOME") + File.separator + ZEPPELIN_PY4JPATH;
Expand All @@ -153,13 +155,28 @@ private void createGatewayServerAndStartScript() {
}

port = findRandomOpenPortOnAllLocalInterfaces();
gatewayServer = new GatewayServer(this, port);
gatewayServer = new GatewayServer(this,
port,
GatewayServer.DEFAULT_PYTHON_PORT,
InetAddress.getByName("0.0.0.0"),
InetAddress.getByName("0.0.0.0"),
GatewayServer.DEFAULT_CONNECT_TIMEOUT,
GatewayServer.DEFAULT_READ_TIMEOUT,
(List) null);

gatewayServer.start();

// Run python shell
CommandLine cmd = CommandLine.parse(getPythonCommand());
cmd.addArgument(scriptPath, false);
String pythonCmd = getPythonCommand();
CommandLine cmd = CommandLine.parse(pythonCmd);

if (!pythonCmd.endsWith(".py")) {
// PythonDockerInterpreter set pythoncmd with script
cmd.addArgument(getScriptPath(), false);
}
cmd.addArgument(Integer.toString(port), false);
cmd.addArgument(getLocalIp(), false);

executor = new DefaultExecutor();
outputStream = new InterpreterOutputStream(logger);
PipedOutputStream ps = new PipedOutputStream();
Expand All @@ -185,6 +202,7 @@ private void createGatewayServerAndStartScript() {
py4jLibPath + File.pathSeparator + pythonLibPath);
}

logger.info("cmd = {}", cmd.toString());
executor.execute(cmd, env, this);
pythonscriptRunning = true;
} catch (IOException e) {
Expand All @@ -207,7 +225,11 @@ public void open() {
registerHook(HookType.POST_EXEC_DEV, "z._displayhook()");
}
// Add matplotlib display hook
createGatewayServerAndStartScript();
try {
createGatewayServerAndStartScript();
} catch (UnknownHostException e) {
throw new InterpreterException(e);
}
}

@Override
Expand Down Expand Up @@ -244,25 +266,18 @@ public void close() {
*/
public class PythonInterpretRequest {
public String statements;
public String jobGroup;

public PythonInterpretRequest(String statements, String jobGroup) {
public PythonInterpretRequest(String statements) {
this.statements = statements;
this.jobGroup = jobGroup;
}

public String statements() {
return statements;
}

public String jobGroup() {
return jobGroup;
}
}

public PythonInterpretRequest getStatements() {
synchronized (statementSetNotifier) {

while (pythonInterpretRequest == null && pythonscriptRunning && pythonScriptInitialized) {
try {
statementSetNotifier.wait(1000);
Expand Down Expand Up @@ -350,7 +365,7 @@ public InterpreterResult interpret(String cmd, InterpreterContext contextInterpr
return new InterpreterResult(Code.ERROR, errorMessage);
}

pythonInterpretRequest = new PythonInterpretRequest(cmd, null);
pythonInterpretRequest = new PythonInterpretRequest(cmd);
statementOutput = null;

synchronized (statementSetNotifier) {
Expand Down Expand Up @@ -420,16 +435,17 @@ public List<InterpreterCompletion> completion(String buf, int cursor) {
return null;
}

public void setPythonPath(String pythonPath) {
this.pythonPath = pythonPath;
}

public void setPythonCommand(String cmd) {
logger.info("Set Python Command : {}", cmd);
pythonCommand = cmd;
}

public String getPythonCommand() {
return pythonCommand;
if (pythonCommand == null) {
return DEFAULT_ZEPPELIN_PYTHON;
} else {
return pythonCommand;
}
}

private Job getRunningJob(String paragraphId) {
Expand Down Expand Up @@ -462,8 +478,14 @@ public GUI getGui() {
return context.getGui();
}

public Integer getPy4jPort() {
return port;
String getLocalIp() {
try {
return Inet4Address.getLocalHost().getHostAddress();
} catch (UnknownHostException e) {
logger.error("can't get local IP", e);
}
// fall back to loopback addreess
return "127.0.0.1";
}

private int findRandomOpenPortOnAllLocalInterfaces() {
Expand Down

0 comments on commit c9b195b

Please sign in to comment.