Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

When starting new Java server-processes in a loop, the previous one does not get stopped #320

Closed
erl987 opened this issue May 24, 2018 · 6 comments

Comments

@erl987
Copy link

erl987 commented May 24, 2018

I am experiencing difficulties when starting continuously new Java server processes within a loop. See below for a complete basic example.

If using launch_gateway(die_on_exit=False, ...), the previous process gets closed correctly by gateway.shutdown(). However in that case the Java server-process will not be closed if the parent Python-process is closed with Crtl + C on a Linux command line - this is also not acceptable in my case.

That means I am forced to use die_on_exit=True. But why does gateway.shutdown() does not stop the Java server-process?

Can you please comment on my usage of Py4J. Do I use it in the wrong way? Is there a work-around to ensure that creating the Java server-processes safely stops the previous one and at the same time the Java server-process is always stopped if the Python process dies?

Python code:

from py4j.java_gateway import JavaGateway, GatewayParameters, launch_gateway


def start_api_server():
    port = launch_gateway(classpath="C:\\Users\\User\\Documents\\code\\py4jjavademo\\target\\classes",
                          die_on_exit=True)

    global gateway
    gateway = JavaGateway(gateway_parameters=GatewayParameters(port=port))
    string_generator = gateway.jvm.com.demo.EntryPoint().getStringGenerator()

    return string_generator


def stop_api_server():
    global gateway
    gateway.shutdown()


if __name__ == '__main__':
    for i in range(100):
        try:
            string_generator = start_api_server()

            print(string_generator.get("Content"))
        finally:
            stop_api_server()

Java code:

package com.demo;


public class EntryPoint {

   private StringGenerator stringGenerator;

   public EntryPoint() {
      stringGenerator = new StringGenerator();
   }

   public StringGenerator getStringGenerator() {
      return stringGenerator;
   }
}
package com.demo;


public class StringGenerator {

    public String get(String content) {
        return "A String: " + content;
    }
}
@bartdag
Copy link
Collaborator

bartdag commented Jul 9, 2018

Hi,

after investigating, this is the intended behavior but I'll post a workaround at the end of this reply :-)

The relevant code is here: https://github.com/bartdag/py4j/blob/master/py4j-java/src/main/java/py4j/GatewayServer.java#L820

When die_on_exit is False, the main method starts a GatewayServer instance and returns. As soon as the GatewayServer is shut down, the JVM exits because there is no more thread alive.

When die_on_exit is True, the main method starts a GatewayServer and then listens to the standard input for a newline. This works well if your Python process ends after you shut down the GatewayServer, but this obviously does not work in your case. Historically, launch_gateway was introduced in the codebase to make it easy to start a single JVM but anything more complex than that required custom process orchestration.

Potential Solutions:

  1. Create your own version of launch_gateway and send a newline when you shut down the gateway server. This is the code snippet I added at the end of this reply. I'll add this capability in the next release of Py4J, but releases are a lengthy process and you may not want to wait for that.

  2. On the Java side, create a GatewayServerListener (https://github.com/bartdag/py4j/blob/master/py4j-java/src/main/java/py4j/GatewayServerListener.java) and exit the JVM on serverPostShutdown().

Annotated workaround (see def main() for the basic usage scenario):

import logging
import os
from subprocess import Popen, PIPE
import subprocess
import time

from py4j.java_gateway import (
    JavaGateway, GatewayParameters, find_jar_path, OutputConsumer,
    ProcessConsumer, quiet_close)

logger = logging.getLogger("")

# NOTE 1 - This is a copy of py4j.java_gateway.launch_gateway with the addition of the return_proc parameter.
def launch_gateway(port=0, jarpath="", classpath="", javaopts=[],
                   die_on_exit=False, redirect_stdout=None,
                   redirect_stderr=None, daemonize_redirect=True,
                   java_path="java", create_new_process_group=False,
                   enable_auth=False, return_proc=False):
    """Launch a `Gateway` in a new Java process.

    The redirect parameters accept file-like objects, Queue, or deque. When
    text lines are sent to the stdout or stderr of the child JVM, these lines
    are redirected to the file-like object (``write(line)``), the Queue
    (``put(line)``), or the deque (``appendleft(line)``).

    The text line will contain a newline character.

    Only text output is accepted on stdout and stderr. If you wish to
    communicate with the child JVM through bytes, you need to create your own
    helper function.

    :param port: the port to launch the Java Gateway on.  If no port is
        specified then an ephemeral port is used.
    :param jarpath: the path to the Py4J jar.  Only necessary if the jar
        was installed at a non-standard location or if Python is using
        a different `sys.prefix` than the one that Py4J was installed
        under.
    :param classpath: the classpath used to launch the Java Gateway.
    :param javaopts: an array of extra options to pass to Java (the classpath
        should be specified using the `classpath` parameter, not `javaopts`.)
    :param die_on_exit: if `True`, the Java gateway process will die when
        this Python process exits or is killed.
    :param redirect_stdout: where to redirect the JVM stdout. If None (default)
        stdout is redirected to os.devnull. Otherwise accepts a
        file descriptor, a queue, or a deque. Will send one line at a time
        to these objects.
    :param redirect_stderr: where to redirect the JVM stdout. If None (default)
        stderr is redirected to os.devnull. Otherwise accepts a
        file descriptor, a queue, or a deque. Will send one line at a time to
        these objects.
    :param daemonize_redirect: if True, the consumer threads will be daemonized
        and will not prevent the main Python process from exiting. This means
        the file descriptors (stderr, stdout, redirect_stderr, redirect_stdout)
        might not be properly closed. This is not usually a problem, but in
        case of errors related to file descriptors, set this flag to False.
    :param java_path: If None, Py4J will use $JAVA_HOME/bin/java if $JAVA_HOME
        is defined, otherwise it will use "java".
    :param create_new_process_group: If True, the JVM is started in a new
        process group. This ensures that signals sent to the parent Python
        process are not forwarded to the JVM. For example, sending
        Ctrl-C/SIGINT won't interrupt the JVM. If the python process dies, the
        Java process will stay alive, which may be a problem for some scenarios
        though.
    :param enable_auth: If True, the server will require clients to provide an
        authentication token when connecting.
    :param return_proc: If True, returns the Popen object returned when the JVM
        process was created.

    :rtype: the port number of the `Gateway` server or, when auth enabled,
            a 2-tuple with the port number and the auth token.
    """
    popen_kwargs = {}

    if not jarpath:
        jarpath = find_jar_path()

    if not java_path:
        java_home = os.environ.get("JAVA_HOME")
        if java_home:
            java_path = os.path.join(java_home, "bin", "java")
        else:
            java_path = "java"

    # Fail if the jar does not exist.
    if not os.path.exists(jarpath):
        raise Py4JError("Could not find py4j jar at {0}".format(jarpath))

    # Launch the server in a subprocess.
    classpath = os.pathsep.join((jarpath, classpath))
    command = [java_path, "-classpath", classpath] + javaopts + \
              ["py4j.GatewayServer"]
    if die_on_exit:
        command.append("--die-on-broken-pipe")
    if enable_auth:
        command.append("--enable-auth")
    command.append(str(port))
    logger.debug("Launching gateway with command {0}".format(command))

    # stderr redirection
    close_stderr = False
    if redirect_stderr is None:
        stderr = open(os.devnull, "w")
        close_stderr = True
    elif isinstance(redirect_stderr, Queue) or\
            isinstance(redirect_stderr, deque):
        stderr = PIPE
    else:
        stderr = redirect_stderr
        # we don't need this anymore
        redirect_stderr = None

    # stdout redirection
    if redirect_stdout is None:
        redirect_stdout = open(os.devnull, "w")

    if create_new_process_group:
        popen_kwargs.update(get_create_new_process_group_kwargs())

    proc = Popen(command, stdout=PIPE, stdin=PIPE, stderr=stderr,
                 **popen_kwargs)

    # Determine which port the server started on (needed to support
    # ephemeral ports)
    _port = int(proc.stdout.readline())

    # Read the auth token from the server if enabled.
    _auth_token = None
    if enable_auth:
        _auth_token = proc.stdout.readline()[:-1]

    # Start consumer threads so process does not deadlock/hangs
    OutputConsumer(
        redirect_stdout, proc.stdout, daemon=daemonize_redirect).start()
    if redirect_stderr is not None:
        OutputConsumer(
            redirect_stderr, proc.stderr, daemon=daemonize_redirect).start()
    ProcessConsumer(proc, [redirect_stdout], daemon=daemonize_redirect).start()

    if close_stderr:
        # XXX This will quiet ResourceWarning in Python 3.5+
        # This only close the fd in this process, not in the JVM process, which
        # makes sense.
        quiet_close(stderr)

    if enable_auth:
        output = (_port, _auth_token)
    else:
        output = _port

    if return_proc:
        if isinstance(output, tuple):
            output = output + (proc, )
        else:
            output = (_port, proc)

    return output


# NOTE 2: this is a copy of JavaGateway.launch_gateway with the addition of return_proc
def get_java_gateway(
        port=0, jarpath="", classpath="", javaopts=[],
        die_on_exit=False, redirect_stdout=None,
        redirect_stderr=None, daemonize_redirect=True, java_path="java",
        create_new_process_group=False, enable_auth=False):
    _ret = launch_gateway(
        port, jarpath, classpath, javaopts, die_on_exit,
        redirect_stdout=redirect_stdout, redirect_stderr=redirect_stderr,
        daemonize_redirect=daemonize_redirect, java_path=java_path,
        create_new_process_group=create_new_process_group,
        enable_auth=enable_auth, return_proc=True)

    if enable_auth:
        _port, _auth_token, proc = _ret
    else:
        _port, proc, _auth_token = _ret + (None, )
    gateway = JavaGateway(
        gateway_parameters=GatewayParameters(port=_port,
                                                auth_token=_auth_token))
    # NOTE 3: the Popen object is now available with gateway._proc
    gateway._proc = proc
    return gateway


def main():
    for i in range(10):
        gateway = get_java_gateway(die_on_exit=True)
        print(gateway.jvm.System.currentTimeMillis())
        gateway.shutdown()

        # NOTE 4: This is how you can send a newline and terminate the process
        gateway._proc.stdin.write("\n".encode("utf-8"))
        gateway._proc.stdin.flush()
        
        # NOTE 5: Alternatively terminate the process with the terminate signal
        # gateway._proc.terminate()

        # NOTE 6: in a next release, I would probably do it this way:
        # gateway.shutdown(terminate_launched_jvm=True)

    # NOTE 7: just to let you check that no processes are left running at this point...
    time.sleep(40)


if __name__ == "__main__":
    main()

Do not hesitate if you need further clarifications.

Disclosure: this question was answered as part of our professional service

@erl987
Copy link
Author

erl987 commented Jul 11, 2018

Hi,

thanks for the suggestions. Both solutions actually do solve the problem, the Java-process get now always cleanly shutdown.

Nevertheless I prefer the solution to add a listener to the JavaGateway as it requires no patching of py4j source code.

One issue I encountered was that when redirecting the stdout / stderr to the Java-process, the terminating process also closed the streams. Later iterations in a loop crashed when trying to print to them. The solution is to actually make a copy of the streams like this:

redirect_stdout = os.fdopen(os.dup(sys.stdout.fileno()), sys.stdout.mode) launch_gateway(redirect_stdout=redirect_stdout, ...)

This provides a disposable copy to the subprocess and seems to work.

@bartdag bartdag closed this as completed Aug 1, 2018
dHannasch added a commit to dHannasch/py4j that referenced this issue Jun 6, 2019
…the Java process is saved on the JavaGateway, and can thus be referenced later, including to terminate it by sending the end-of-line character it is waiting for.
@dHannasch
Copy link
Contributor

Just to provide a more minimal example of the Java process not being shut-down-able when die_on_exit=True:

import py4j.java_gateway
gateway = py4j.java_gateway.JavaGateway.launch_gateway(die_on_exit=True)
gateway.close_callback_server(True)
gateway.close(False, True)
gateway.shutdown_callback_server(True)
gateway.shutdown(True)
# Do other things...
# As shown by e.g. ps aux, the Java process is still running!

dHannasch added a commit to dHannasch/py4j that referenced this issue Jul 17, 2019
…object for the Java process is saved on the JavaGateway, and can thus be referenced later, including to terminate it by sending the end-of-line character it is waiting for.

The Popen object is always created on the JavaGateway with name java_process, with a value of None when attaching to an existing Java process so no Popen is available.

Adds tests using the Popen member to test terminating the Java subprocess.

The test waits an arbitrarily-chosen fraction of a second before asserting that the Java process has terminated.
dHannasch added a commit to dHannasch/py4j that referenced this issue Jul 17, 2019
…ess object.

The Popen object is always created on the JavaGateway with name java_process,
with a value of None when no Popen is available because the JavaGateway is
attaching to a preexisting Java process.
It can thus be referenced later, including to terminate it by sending
the end-of-line character it is waiting for.

Adds tests using the Popen member to test terminating the Java subprocess.

The test waits an arbitrarily-chosen fraction of a second
before asserting that the Java process has terminated.
dHannasch added a commit to dHannasch/py4j that referenced this issue Jul 17, 2019
…ess.

The Popen object is always created on the JavaGateway with name java_process,
with a value of None when no Popen is available because the JavaGateway is
attaching to a preexisting Java process.
It can thus be referenced later, including to terminate it by sending
the end-of-line character it is waiting for.

Adds tests using the Popen member to test terminating the Java subprocess.

The test waits an arbitrarily-chosen fraction of a second
before asserting that the Java process has terminated.
@JRosenkranz
Copy link

I see this thread has been closed a while back, but have a quick question regarding the following:

On the Java side, create a GatewayServerListener (https://github.com/bartdag/py4j/blob/master/py4j-java/src/main/java/py4j/GatewayServerListener.java) and exit the JVM on serverPostShutdown().

If I create a GatewayServerListener and add System.exit(0) to serverPostShutdown(), I run into the following

Calling

gateway.java_gateway_server.shutdown()

I receive py4j.protocol.Py4JNetworkError: Answer from Java

Calling

gateway.close_callback_server()
gateway.close()
gateway.shutdown_callback_server()
gateway.shutdown()

I receive no error, but nothing seems to happen

In both cases, I still see my Java Processes running still (it only dies on exit of Python). Just to clarify, I created a Gateway with a callback server as well.

Any help would be appreciated, if there is anything else I have to do within the implementation of the GatewayServerListener or from the python side.

@dHannasch
Copy link
Contributor

dHannasch commented Nov 13, 2019

Currently shutdown() et alia do not work when die_on_exit=True.

As a workaround, you can write an end-of-line to the Java stdin, as shown in the test: https://github.com/bartdag/py4j/blob/master/py4j-python/src/py4j/tests/java_gateway_test.py#L1062
The Java process is listening for that and shuts itself down when it receives that end-of-line.

Of course, that might not work if you've customized the listener on the Java side.

Somewhere I have a context manager that automatically launches and shuts down a Java process this way so it's safe, I can try to get that up into Py4J if that would be useful to people.

@JRosenkranz
Copy link

I think die_on_exit=False is doing what I intended. I was running into an issue trying to use stdin as it said it didn't exist within my java_process. Will let you know if anything is not working, otherwise I think my issue is fixed. Thanks.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

4 participants