Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Publisher hangs when doing multiprocessing, but not when threading #1232

Closed
RobertLucian opened this issue Oct 10, 2018 · 3 comments
Closed

Comments

@RobertLucian
Copy link

So I've got the following 3 classes that play the role of PUB(Publisher) and SUB(Subscriber).

formatters = {
    logging.DEBUG: logging.Formatter("[%(name)s] %(message)s"),
    logging.INFO: logging.Formatter("[%(name)s] %(message)s"),
    logging.WARN: logging.Formatter("[%(name)s] %(message)s"),
    logging.ERROR: logging.Formatter("[%(name)s] %(message)s"),
    logging.CRITICAL: logging.Formatter("[%(name)s] %(message)s")
}

class PUBLogger:
    def __init__(self, host, port = 5555, level = logging.DEBUG):
        self._logger = logging.getLogger(__name__)
        self._logger.setLevel(level)
        self.ctx = zmq.Context()
        self.pub = self.ctx.socket(zmq.PUB)
        # self.pub.setsockopt(zmq.LINGER, 0)
        self.pub.connect('tcp://{0}:{1}'.format(socket.gethostbyname(host), port))
        self._handler = PUBHandler(self.pub)
        self._handler.formatters = formatters
        self._logger.addHandler(self._handler)

    @property
    def logger(self):
        return self._logger

class SUBLogger:
    def __init__(self, 
        ip, 
        port = 5555, 
        output_dir = '', 
        logfile_name = 'output.log', 
        stdout = False,
        level = logging.DEBUG):

        self.output_dir = output_dir
        self._logger = logging.getLogger()
        self._logger.setLevel(level)

        self.ctx = zmq.Context()
        self._sub = self.ctx.socket(zmq.SUB)
        self._sub.bind('tcp://*:{1}'.format(ip, port))
        self._sub.setsockopt_string(zmq.SUBSCRIBE, "")

        formatter = logging.Formatter("%(asctime)s;%(levelname)s;%(funcName)s - %(message)s")

        handler = handlers.RotatingFileHandler(os.path.join(output_dir, logfile_name), "w", 100 * 1024 * 1024, 10)
        handler.setLevel(level)
        handler.setFormatter(formatter)
        self._logger.addHandler(handler)

        if stdout is True:
            handler_stdout = logging.StreamHandler(sys.stdout)
            handler_stdout.setLevel(level)
            handler_stdout.setFormatter(formatter)
            self._logger.addHandler(handler_stdout)

    @property
    def sub(self):
        return self._sub

    @property
    def logger(self):
        return self._logger

class ThreadedLoggerSUB(Thread):
    def __init__(self, ip, port = 5555, stdout = False, level = logging.DEBUG):
        super(ThreadedLoggerSUB, self).__init__()
        self._sub_logger = SUBLogger(ip, port, stdout = stdout, level = level)
        self._event = Event()
        self._ready = Event()
    
    def run(self):
        self._ready.set()
        while not self._event.is_set():
            try:
                topic, message = self._sub_logger.sub.recv_multipart(flags = zmq.NOBLOCK)
                if isinstance(topic, str):
                    log_msg = getattr(logging, topic.lower())
                    log_msg(message)
            except zmq.ZMQError as zmq_error:
                if zmq_error.errno == zmq.EAGAIN:
                    pass
        self._sub_logger.sub.close()

    def stop(self):
        self._event.set()

    def wait(self):
        self._ready.wait()

In the main process where everything is coordinated, I start a ThreadedLoggerSUB thread that runs throughout the runtime of the program.
During its runtime, I spin a process within the program with a manager (from multiprocessing module) and in the constructor of the class (let's call the class ThreadToBeRunInSeparateProcess, it's basically inherited from Thread) that's getting instantiated in this separate process, a PUBLogger object is created and the logger object is returned from it.

The problem is that in roughly ~50% of the cases, the logger object in this separate process hangs when I want to log something. When it doesn't hang, I get to see the logs in the main process.

What's interesting is that if I run a ThreadToBeRunInSeparateProcess thread in the main process, it works just fine - I never get to see it hanging up, which tells me, there has got be something going on with the managers.

Do you have any idea why pyzmq is having this problem?

Thank you!

Robert

@minrk
Copy link
Member

minrk commented Oct 10, 2018

Do you have a complete example that reproduces the problem? There are a few different cases that could look like a hang:

  1. is the process stuck in a blocking call?
  2. is it calling send, but not actually transmitting (no subscribers)?
  3. is it sending, but the receiver isn't waking? (sub-side polling problem?)

etc.

A complete example with some diagnostics showing what's really happening would help get to the bottom of it.

@RobertLucian
Copy link
Author

Here's a complete example.

from zmq.log.handlers import PUBHandler
from logging import handlers
from threading import Thread, Event
from time import time, sleep
from multiprocessing import Process
from multiprocessing.managers import BaseManager
import logging
import zmq
import random
import time
import zmq
import socket
import os
import sys

formatters = {
    logging.DEBUG: logging.Formatter("[%(name)s] %(message)s"),
    logging.INFO: logging.Formatter("[%(name)s] %(message)s"),
    logging.WARN: logging.Formatter("[%(name)s] %(message)s"),
    logging.ERROR: logging.Formatter("[%(name)s] %(message)s"),
    logging.CRITICAL: logging.Formatter("[%(name)s] %(message)s")
}

class PUBLogger:
    def __init__(self, host, port = 5555, level = logging.DEBUG):
        self._logger = logging.getLogger(__name__)
        self._logger.setLevel(level)
        self.ctx = zmq.Context()
        self.pub = self.ctx.socket(zmq.PUB)
        # self.pub.setsockopt(zmq.LINGER, 0)
        self.pub.connect('tcp://{0}:{1}'.format(socket.gethostbyname(host), port))
        self._handler = PUBHandler(self.pub)
        self._handler.formatters = formatters
        self._logger.addHandler(self._handler)

    @property
    def logger(self):
        return self._logger

class SUBLogger:
    def __init__(self, 
        ip, 
        port = 5555, 
        output_dir = '', 
        logfile_name = 'output.log', 
        stdout = False,
        level = logging.DEBUG):

        self.output_dir = output_dir
        self._logger = logging.getLogger()
        self._logger.setLevel(level)

        self.ctx = zmq.Context()
        self._sub = self.ctx.socket(zmq.SUB)
        self._sub.bind('tcp://*:{1}'.format(ip, port))
        self._sub.setsockopt_string(zmq.SUBSCRIBE, "")

        formatter = logging.Formatter("%(asctime)s;%(levelname)s;%(funcName)s - %(message)s")

        handler = handlers.RotatingFileHandler(os.path.join(output_dir, logfile_name), "w", 100 * 1024 * 1024, 10)
        handler.setLevel(level)
        handler.setFormatter(formatter)
        self._logger.addHandler(handler)

        if stdout is True:
            handler_stdout = logging.StreamHandler(sys.stdout)
            handler_stdout.setLevel(level)
            handler_stdout.setFormatter(formatter)
            self._logger.addHandler(handler_stdout)

    @property
    def sub(self):
        return self._sub

    @property
    def logger(self):
        return self._logger

class ThreadedLoggerSUB(Thread):
    def __init__(self, ip, port = 5555, stdout = False, level = logging.DEBUG):
        super(ThreadedLoggerSUB, self).__init__()
        self._sub_logger = SUBLogger(ip, port, stdout = stdout, level = level)
        self._event = Event()
        self._ready = Event()
    
    def run(self):
        self._ready.set()
        while not self._event.is_set():
            try:
                topic, message = self._sub_logger.sub.recv_multipart(flags = zmq.NOBLOCK)
                if isinstance(topic, str):
                    log_msg = getattr(logging, topic.lower())
                    log_msg(message)
            except zmq.ZMQError as zmq_error:
                if zmq_error.errno == zmq.EAGAIN:
                    pass
        self._sub_logger.sub.close()

    def stop(self):
        self._event.set()

    def wait(self):
        self._ready.wait()

class TestLogger():
    def __init__(self):
        self._logger = PUBLogger('localhost').logger
    
    def test_log(self):
        self._logger.debug('generic log message 0')

class MyManager(BaseManager):
    pass

MyManager.register('TestLogger', TestLogger)

def main():
    logger_sub = ThreadedLoggerSUB('localhost', stdout=True)
    logger_sub.start()
    logger_sub.wait()

    logger_main = PUBLogger('localhost').logger
    logger_main.debug('generic log message 1')

    with MyManager() as manager:
        logger1 = manager.TestLogger()
        logger1.test_log()

    logger_sub.stop()
    logger_sub.join()

if __name__ == "__main__":
    main()

So if you run this script repeatedly for 10-20 times, at some point the program will not exit and instead, it will just hang in there. This is different than what I've originally told you, where the program would hang in 50% of the cases. I'm seeing a pattern here: the more there is to run, the sooner/more often it hangs.

To avoid this, just get rid of the main publisher logger called logger_main. If you do this, no matter how many times you call the program, it won't hang anymore. And this is valid for as many separate processes for TestLogger class as you want to run - in the above example we've only got one called logger1.

One way to avoid this is to move everything that's in the main thread/process in a separate process and do the logging there. Doing this seems to avoid getting the program blocked.

Do you have any idea? Could it be something badly implemented in these classes for the logging thing?

Thank you!

@minrk
Copy link
Member

minrk commented Oct 15, 2018

I think the root of the problem is that you are trying to use a single PUB socket created in one process and then used in another, which doesn't work. I'm a little surprised it hangs instead of segfaulting, but I'm not surprised it doesn't work. I suspect this is not intentional, but it occurs when you register the PUBHandler in the creating of PUBLogger. Here's what's happening:

  1. logger_main = PUBLogger('localhost').logger. This creates the __main__ logger, and registers a PUBHandler. logging.getLogger(__name__).handlers is a list containing one PUBHandler, created in the main process.
  2. with MyManager() as manager. This creates a forked process. Notably, the __main__ logger at this point inherits the configured logger object with PUBHandler registered. This handler will no longer work, and must not be used.
  3. logger1 = manager.TestLogger() creates a new PUBHandler and adds it to the existing logger, but does not replace the defunct, inherited handler. There are now two PUBHandler objects in the subprocess, registered with the same logger. One will work, the other will fail in weird ways, including hanging.

The simplest fix in your code is to replace:

self._logger.addHandler(handler)

with

# register my handler to the exclusion of any existing handlers:
self._logger.handlers = [handler]

to ensure that the parent's PUBHandler is not inherited. There are other ways to accomplish the same thing, but the root is always the same: make sure that a PUBHandler created in the main process is not used by forked subprocesses.

@minrk minrk closed this as completed Feb 28, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants