Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

PUBhandler can not be used with multiprocessing #710

Closed
fujiaoliu opened this issue Jul 27, 2015 · 7 comments
Closed

PUBhandler can not be used with multiprocessing #710

fujiaoliu opened this issue Jul 27, 2015 · 7 comments

Comments

@fujiaoliu
Copy link

I have a class 'MyLogger' for sending messages to log server by using PUBhandler. A exception raised when MyLogger is instanced in LogWorker.__init__() method (like version 1), however, it is ok if MyLogger is instanced in LogWorker.log_worker() method (version 2). any suggestions would be appreciated.

import logging
from multiprocessing import Process
import os
import random
import sys
import time

import zmq
from zmq.log.handlers import PUBHandler


class MyLogger(object):
    ''''''

    def __init__(self, port, handler=None):
        self.port = port
        self.handler = handler or self._construct_sock_handler()
        self.logger = logging.getLogger()
        self.logger.setLevel(logging.INFO)
        if not self.logger.handlers:
            self.logger.addHandler(self.handler)


    def _construct_sock_handler(self):
        context = zmq.Context()
        log_sock = context.socket(zmq.PUB)
        log_sock.connect("tcp://127.0.0.1:%i" % self.port)
        time.sleep(0.1)
        handler = PUBHandler(log_sock)
        return handler


    def get_logger(self):
        return self.logger


def sub_logger(port, level=logging.DEBUG):
    ctx = zmq.Context()
    sub = ctx.socket(zmq.SUB)
    sub.bind('tcp://127.0.0.1:%i' % port)
    sub.setsockopt(zmq.SUBSCRIBE, "")
    logging.basicConfig(level=level)

    while True:
        level, message = sub.recv_multipart()
        if message.endswith('\n'):
            # trim trailing newline, which will get appended again
            message = message[:-1]
        log = getattr(logging, level.lower())
        log(message)


class LogWorker(object):

    def __init__(self):
        - pass   # version 1
        + self.logger = MyLogger(port).get_logger()   # version 2

    def log_worker(self, port):
        - self.logger = MyLogger(port).get_logger() # version 1
        print "starting logger at %i with level=%s" % (os.getpid(), logging.DEBUG)

        while True:
            level = logging.INFO
            self.logger.log(level, "Hello from %i!" % os.getpid())
            time.sleep(1)

if __name__ == '__main__':
    if len(sys.argv) > 1:
        n = int(sys.argv[1])
    else:
        n = 2

    port = 5555

    workers = [Process(target=LogWorker().log_worker, args=(port,)) for _ in range(n)]
    [w.start() for w in workers]

    try:
        sub_logger(port)
    except KeyboardInterrupt:
        pass
    finally:
        [ w.terminate() for w in workers ]
@fujiaoliu fujiaoliu changed the title PUBhandler can't used in PUBhandler can't used in __init__() method Jul 27, 2015
@fujiaoliu fujiaoliu changed the title PUBhandler can't used in __init__() method PUBhandler could not be used in __init__() method Jul 27, 2015
@minrk minrk changed the title PUBhandler could not be used in __init__() method PUBhandler could not be used with multiprocessing Jul 29, 2015
@minrk minrk changed the title PUBhandler could not be used with multiprocessing PUBhandler can not be used with multiprocessing Jul 29, 2015
@minrk
Copy link
Member

minrk commented Jul 29, 2015

You cannot pass zmq contexts or sockets across the fork boundary that happens when you instantiate a subprocess with multiprocessing. You have to make sure that you create your Context after you are in the subprocess.

In your example, you are creating the LogWorker in the main process, then creating a new process to run just one method of the Worker. Since it looks like you don't actually have any use for the LogWorker in your main process, I would probably recommend waiting to instantiate the object until you reach the subprocess, e.g.:

def work():
    worker = LogWorker(port)
    worker.log_worker()

workers = [ Process(target=work) for _ in range(n) ]

In this way, it doesn't matter where you instantiate the logger in the LogWorker's lifecycle, because the LogWorker object only exists in the subprocess.

@fujiaoliu
Copy link
Author

thanks for you answer, i am not test but i think you are right ! A socket is passed to PUBhandler in MyLogger._construct_sock_handler method in main process, and then calling the same logger's method, for example logger.info which need the socket to send record in the logger.handler.emit method in subprocess causes the problem. I implements a handler which only takes tcp address (don't take socket) as parameter to solve the problem for my project , and it works .

class RgHandler(logging.Handler):

    def __init__(self, log_server_addr):
        logging.Handler.__init__(self)
        self.log_server_addr = log_server_addr
        self.sock = None

    def format(self, record):
        fmt = self.formatter or _defaultFormatter
        return fmt.format(record)

    def make_socket(self):
        context = zmq.Context()
        sock = context.socket(zmq.PUB)
        sock.connect(self.log_server_addr)
        time.sleep(0.1)
        return sock

    def create_socket(self):
        for n in range(5):
            try:
                self.sock = self.make_socket()
                break
            except zmq.ZMQError:
                time.sleep(n+1)

    def handleError(self, record):
        if self.sock is not None:
            self.sock.close()
            self.sock.context.term()
            self.sock = None
        else:
            logging.Handler.handleError(self, record)


    def emit(self, record):
        try:
            self.create_socket()
            self.sock.send_pyobj(self.format(record))
            self.sock.close()
            self.sock.context.term()
        except (KeyboardInterrupt, SystemExit):
            raise
        except:
            self.handleError(record)

    def close(self):
        self.acquire()
        try:
            if self.sock:
                self.sock.close()
                self.sock.context.term()
        finally:
            self.release()
        logging.Handler.close(self)

@minrk
Copy link
Member

minrk commented Jul 29, 2015

Glad it works.

@minrk minrk closed this as completed Jul 29, 2015
@fujiaoliu
Copy link
Author

thank you :-)

@mirceaulinic
Copy link

Hi - i run into the same issue, i was not aware of this behaviour, although it totally makes sense reading @minrk's detailed explanation.

I have a situation when multiple subprocesses need to send messages over the same publisher. What would be your recommendation in that case? An immediate solution would be sending back the messages to the parent (e.g. through a Pipe) - but is it possible to avoid this overhead and have a straight usage of the same publisher instance across multiple subprocesses?

Thank you,
--Mircea

@minrk
Copy link
Member

minrk commented Apr 3, 2017

is it possible to avoid this overhead and have a straight usage of the same publisher instance across multiple subprocesses?

The zmq solution would be use a proxy device, so each subprocess sends messages to a single device, and subscribers connect to the same device. The code of your publishers is identical to what it would be without the device (still a zmq PUB socket), except that the PUB socket connects to the relay, rather than binding itself.

For instance:

from multiprocessing import Process
import os
import time

import zmq

N_WORKERS = 4
N_MSGS = 5

PROXY_SUB_URL = 'ipc://proxy-internal'
PROXY_PUB_URL = 'tcp://127.0.0.1:5556'

def publisher():
    ctx = zmq.Context()
    pub = ctx.socket(zmq.PUB)
    pub.connect(PROXY_SUB_URL)
    pid = os.getpid()
    for i in range(N_MSGS):
        time.sleep(1)
        pub.send(b'%i:%i' % (pid, i))
    print('worker %i done' % pid)

def proxy():
    ctx = zmq.Context()
    sub = ctx.socket(zmq.SUB)
    sub.subscribe(b'')
    sub.bind(PROXY_SUB_URL)
    
    pub = ctx.socket(zmq.PUB)
    pub.bind(PROXY_PUB_URL)
    try:
        zmq.proxy(sub, pub)
    except KeyboardInterrupt:
        return

def subscriber():
    ctx = zmq.Context()
    sub = ctx.socket(zmq.SUB)
    sub.subscribe(b'')
    sub.connect(PROXY_PUB_URL)
    for i in range(N_MSGS * N_WORKERS):
        print('subscriber received %s' % sub.recv_string())

workers = [ Process(target=publisher) for i in range(N_WORKERS) ]
[ w.start() for w in workers ]

proxy_proc = Process(target=proxy)
proxy_proc.start()

subscriber()
proxy_proc.terminate()

@mirceaulinic
Copy link

Fantastic, thanks a lot @minrk!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

3 participants