Skip to content

Commit

Permalink
Improved py4j wrappers so that errors are handled more nicely.
Browse files Browse the repository at this point in the history
Improved py4j startup to make it easier to avoid port binding problems.
  • Loading branch information
Mark Granroth-Wilding committed Mar 23, 2016
1 parent b2dd286 commit 9fbf42c
Show file tree
Hide file tree
Showing 7 changed files with 122 additions and 47 deletions.
53 changes: 39 additions & 14 deletions src/java/pimlico/core/Py4JGatewayStarter.java
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
package pimlico.core;

import py4j.GatewayServer;
import py4j.Py4JNetworkException;

import java.net.BindException;
import java.util.List;

/**
Expand All @@ -13,21 +15,44 @@ public static void startGateway(Object entryPoint) {
}

public static void startGateway(Object entryPoint, int port, int pythonPort) {
// Create a gateway server, using this as an entry point
GatewayServer gatewayServer;
if (port != 0) {
if (pythonPort == 0)
pythonPort = port + 1;
try {
// Create a gateway server, using this as an entry point
GatewayServer gatewayServer;
if (port != 0) {
if (pythonPort == 0)
pythonPort = port + 1;

gatewayServer = new GatewayServer(entryPoint, port, pythonPort, 0, 0, (List)null);
} else
gatewayServer = new GatewayServer(entryPoint);
gatewayServer = new GatewayServer(entryPoint, port, pythonPort, 0, 0, (List) null);
} else
gatewayServer = new GatewayServer(entryPoint);

// Set the server running
gatewayServer.start();
// Output the port to stdout
int listening_port = gatewayServer.getListeningPort();
System.out.println("" + listening_port);
System.out.flush();
try {
// Set the server running
gatewayServer.start();
} catch (Py4JNetworkException e) {
// Catch the case of a port bind exception, since this is common, and output a useful message
if (e.getCause() instanceof BindException) {
System.err.println(
"could not bind to port " + gatewayServer.getPort() + " (" + e.getCause() + "). " +
"Try connecting on a different port by setting py4j_port in local config file"
);
System.out.println("ERROR");
System.out.flush();
System.exit(1);
} else throw e;
}

// Output the port to stdout
int listening_port = gatewayServer.getListeningPort();
System.out.println("" + listening_port);
System.out.flush();
} catch (RuntimeException e) {
// If we have any errors starting the server, output them on stderr
System.err.println("Error starting server: " + e);
// Also output a line to stdout, so the caller isn't left hanging waiting for something
System.out.println("ERROR");
System.out.flush();
System.exit(1);
}
}
}
14 changes: 4 additions & 10 deletions src/java/pimlico/opennlp/TokenizerGateway.java
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
import opennlp.tools.tokenize.Tokenizer;
import opennlp.tools.tokenize.TokenizerME;
import opennlp.tools.tokenize.TokenizerModel;
import py4j.GatewayServer;
import pimlico.core.Py4JGatewayStarter;

import java.io.File;

Expand Down Expand Up @@ -58,9 +58,9 @@ public static void main(String[] args) {
argParser.description("Run the OpenNLP tokenizer and sentence detector, providing access to it via Py4J");
argParser.addArgument("sent_model").help("Sentence detection model");
argParser.addArgument("tok_model").help("Tokenization model");
argParser.addArgument("--port").type(Integer.class).help("Specify a port for gateway server to run on");
argParser.addArgument("--port").type(Integer.class).help("Specify a port for gateway server to run on").setDefault(0);
argParser.addArgument("--python-port").type(Integer.class).help("Specify a port for gateway server to use " +
"to response to Python");
"to response to Python").setDefault(0);

Namespace opts = null;
try {
Expand All @@ -76,12 +76,6 @@ public static void main(String[] args) {
// Load the gateway instance
TokenizerGateway entryPoint = new TokenizerGateway(sentModelFilename, tokModelFilename);
// Create a gateway server, using this as an entry point
GatewayServer gatewayServer;
if (opts.getInt("port") != null)
gatewayServer = new GatewayServer(entryPoint, opts.getInt("port"));
else
gatewayServer = new GatewayServer(entryPoint);
// Set the server running
gatewayServer.start();
Py4JGatewayStarter.startGateway(entryPoint, opts.getInt("port"), opts.getInt("python_port"));
}
}
2 changes: 1 addition & 1 deletion src/python/pimlico/cli/run.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ def run_cmd(pipeline, opts):
if __name__ == "__main__":
parser = argparse.ArgumentParser(description="Main command line interface to PiMLiCo")
parser.add_argument("pipeline_config", help="Config file to load a pipeline from")
parser.add_argument("--debug", help="Output verbose debugging info", action="store_true")
parser.add_argument("--debug", "-d", help="Output verbose debugging info", action="store_true")
subparsers = parser.add_subparsers(help="Select a sub-command")

check = subparsers.add_parser("check",
Expand Down
38 changes: 34 additions & 4 deletions src/python/pimlico/core/external/java.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,12 @@
import os
import time
from subprocess import Popen, PIPE, check_output, STDOUT, CalledProcessError
import fcntl
import sys

from pimlico import JAVA_LIB_DIR, JAVA_BUILD_DIR
from pimlico.core.modules.base import DependencyError
from pimlico.utils.communicate import timeout_process

CLASSPATH = ":".join(["%s/*" % JAVA_LIB_DIR, JAVA_BUILD_DIR])

Expand Down Expand Up @@ -61,12 +65,25 @@ def check_java():


class Py4JInterface(object):
def __init__(self, gateway_class, port=None, python_port=None, gateway_args=[]):
def __init__(self, gateway_class, port=None, python_port=None, gateway_args=[], pipeline=None):
"""
If pipeline is given, configuration is looked for there. If found, this overrides config given
in other kwargs.
"""
self.python_port = python_port
self.gateway_args = gateway_args
self.gateway_class = gateway_class
self.port = port

# Look for config in the pipeline
start_port = pipeline.local_config.get("py4j_port", None)
if start_port is not None:
# Config gives just a single port number
# If it's given, use the following port for the other direction of communication
self.port = int(start_port)
self.python_port = int(start_port) + 1

self.process = None
self.gateway = None

Expand Down Expand Up @@ -122,9 +139,22 @@ def launch_gateway(gateway_class="py4j.GatewayServer", args=[],
proc = Popen(command, stdout=PIPE, stdin=PIPE, stderr=PIPE)

# Determine which port the server started on (needed to support ephemeral ports)
# NB: had an odd problem where the server had an error (socket already in use), but didn't exit, so this hangs
# TODO Look into this at some point
output = proc.stdout.readline()
# Don't hang on an error running the gateway launcher
try:
with timeout_process(proc, 1.0):
output = proc.stdout.readline()
except Exception, e:
# Try reading stderr to see if there's any info there
error_output = proc.stderr.read().strip("\n ")
raise JavaProcessError("error reading first line from gateway process: %s. Error output: %s" %
(e, error_output))
# Check whether there was an error reported
output = output.strip("\n ")
if output == "ERROR":
# Read error output from stderr
error_output = proc.stderr.read().strip("\n ")
raise JavaProcessError("Py4J gateway had an error starting up: %s" % error_output)

try:
port_used = int(output)
except ValueError:
Expand Down
23 changes: 10 additions & 13 deletions src/python/pimlico/modules/opennlp/pos/exec.py
Original file line number Diff line number Diff line change
@@ -1,15 +1,17 @@
from pimlico.core.external.java import Py4JInterface
from pimlico.core.external.java import Py4JInterface, JavaProcessError
from pimlico.core.modules.execute import ModuleExecutionError
from pimlico.core.modules.map import DocumentMapModuleExecutor
from py4j.java_collections import ListConverter


class ModuleExecutor(DocumentMapModuleExecutor):
def preprocess(self, info):
start_port = info.pipeline.local_config.get("py4j_start_port", None)
start_port = int(start_port) if start_port is not None else None
# Start a tokenizer process
self.tagger = StreamTagger(info.model_path, start_port=start_port)
self.tagger.start()
self.tagger = StreamTagger(info.model_path, pipeline=info.pipeline)
try:
self.tagger.start()
except JavaProcessError, e:
raise ModuleExecutionError("error starting tokenizer process: %s" % e)

def process_document(self, filename, doc):
sentences = doc.splitlines()
Expand All @@ -29,8 +31,8 @@ def postprocess(self, info, error=False):


class StreamTagger(object):
def __init__(self, model_path, start_port=None):
self.start_port = start_port
def __init__(self, model_path, pipeline=None):
self.pipeline = pipeline
self.model_path = model_path
self.interface = None

Expand All @@ -39,14 +41,9 @@ def tag(self, sentences):
return list(self.interface.gateway.entry_point.posTag(sentence_list))

def start(self):
if self.start_port is None:
python_port = None
else:
python_port = self.start_port + 1

# Start a tokenizer process running in the background via Py4J
self.interface = Py4JInterface("pimlico.opennlp.PosTaggerGateway", gateway_args=[self.model_path],
port=self.start_port, python_port=python_port)
pipeline=self.pipeline)
self.interface.start()

def stop(self):
Expand Down
17 changes: 12 additions & 5 deletions src/python/pimlico/modules/opennlp/tokenize/exec.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,17 @@
from pimlico.core.external.java import Py4JInterface
from pimlico.core.external.java import Py4JInterface, JavaProcessError
from pimlico.core.modules.execute import ModuleExecutionError
from pimlico.core.modules.map import DocumentMapModuleExecutor


class ModuleExecutor(DocumentMapModuleExecutor):
def preprocess(self, info):
# Start a tokenizer process
self.tokenizer = StreamTokenizer(info.sentence_model_path, info.token_model_path)
self.tokenizer.start()
self.tokenizer = StreamTokenizer(info.sentence_model_path, info.token_model_path,
pipeline=info.pipeline)
try:
self.tokenizer.start()
except JavaProcessError, e:
raise ModuleExecutionError("error starting tokenizer process: %s" % e)

def process_document(self, filename, doc):
# Run tokenization
Expand All @@ -20,7 +25,8 @@ def postprocess(self, info, error=False):


class StreamTokenizer(object):
def __init__(self, sentence_model_path, token_model_path):
def __init__(self, sentence_model_path, token_model_path, pipeline=None):
self.pipeline = pipeline
self.token_model_path = token_model_path
self.sentence_model_path = sentence_model_path

Expand All @@ -32,7 +38,8 @@ def tokenize(self, document):
def start(self):
# Start a tokenizer process running in the background via Py4J
self.interface = Py4JInterface("pimlico.opennlp.TokenizerGateway",
gateway_args=[self.sentence_model_path, self.token_model_path])
gateway_args=[self.sentence_model_path, self.token_model_path],
pipeline=self.pipeline)
self.interface.start()

def stop(self):
Expand Down
22 changes: 22 additions & 0 deletions src/python/pimlico/utils/communicate.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,25 @@
from contextlib import contextmanager
from threading import Timer


@contextmanager
def timeout_process(proc, timeout):
"""
Context manager for use in a `with` statement. If the with block hasn't completed after the given number
of seconds, the process is killed.
:param proc: process to kill if timeout is reached before end of block
:return:
"""
timer = Timer(timeout, proc.kill)
# Set a timer going
timer.start()
# Continue executing the with block
try:
yield timer
finally:
# Cancel the timer now, if it's still running
timer.cancel()


class StreamCommunicationPacket(object):
Expand Down

0 comments on commit 9fbf42c

Please sign in to comment.