make iostream multiprocessing aware #2712

Closed
wants to merge 2 commits into from

4 participants

@piti118

Fix #2438 and #2422
Old iostream will segfault on multiprocessing like this since it tries to reuse zmq socket

import multiprocessing as mp
def f(x):
    print 'hello',x
pool = [mp.Process(target=f,args=(i,)) for i in range(10)]
for p in pool: p.start()
for p in pool: p.join()
@piti118

I'd like to get comment on this implementation. Is multiprocessing.Manager.queue() implementation a good idea? It might break something else that I'm not aware off. @minrk

@minrk minrk commented on the diff Jan 1, 2013
IPython/zmq/iostream.py
@@ -50,8 +57,15 @@ def flush(self):
if hasattr(self.pub_socket, 'flush'):
# socket itself has flush (presumably ZMQStream)
self.pub_socket.flush()
- self._buffer.close()
- self._new_buffer()
+ else:
+ pass#on a forked process don't flush
+
+ def getpid(self):
+ p = getattr(os,'getpid',None)
+ if p is not None:
+ return p()
+ else:#windows
+ return None
@minrk
IPython member
minrk added a note Jan 1, 2013

getpid is available on Windows - I don't think you need any special handling here.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
@minrk
IPython member

Other than the unnecessary protection of os.getpid, I don't see anything wrong with this approach.
Do you want to add a test?

@jasongrout
IPython member

IIRC, we've had problems with multiprocessing queues in high-volume situations on the Sage cell server where things would lock up (again, if I recall correctly). It would be interesting to stress-test this with lots of output and see how things are affected, even on a single process.

@kramer314: do you remember exactly what our problems were with the queues?

@kramer314

If I remember correctly (it was a while ago), one caveat with multiprocessing queues is that it's not a good idea to try and use multiprocessing queues in situations where arbitrary amounts of data might be passed to the queue. The discussion here (http://bugs.python.org/issue8426#msg143081) is somewhat relevant. You can try and set limits on queue sizes or try and use a joinable multiprocessing queue, but at least when I messed around with it, that only mitigated the problem.

In our situation (the Sage cell server), output from executing user code would be put onto a shared multiprocessing queue in the process of returning output to the user. One user process executing something like:

while True:
   print "!"

was causing the entire shared queue to deadlock, meaning that output from all other processes executing code on that computer would never be displayed to their respective users. Additionally (and perhaps more importantly), the queue would not free up after resource limits killed the offending process. I don't recall if using a Manager object with a multiprocessing queue fixes the second issue. I never found a solution for our old architecture; the problem went away when we rewrote the entire backend to use ZMQ sockets for message passing.

I'd think that the fact that multiprocessing queues have the potential to deadlock isn't necessarily a huge problem - if one needs to put arbitrary amounts of data onto a shared queue-like object that absolutely can't deadlock, there might be a larger design issue that should be addressed.

@minrk
IPython member

So maybe the queue should be implemented with zmq itself, rather than MP?

@jasongrout
IPython member

I think the issue boils down to: multiprocessing queue blocks when it gets full, and zmq throws things away when its buffer gets full. Which is better?

@jasongrout
IPython member

It seems a shame that we have to give up the very simple/fast StringIO that will apply in the vast majority of the cases. I believe there is a way to hook into multiprocessing to run initialization code when a fork happens. Maybe we could just use that to reinitialize the zmq context, and then each process will be sending its own output independently.

@piti118

Hmmm was it blocking trying to get stuff in the que or does it blocking becuase of this line?

while not self._buffer.empty():
    data += self._buffer.get()

If it's this line we can work around-ish by

numdump = 0
MAXDUMP = 10000 # or something
while not self._buffer.empty() and numdump<MAXDUMP:
    data += self._buffer.get()
    numdump += 1
@piti118

@minrk How do I write the test for this kind of thing?

@jasongrout
IPython member

I haven't run experiments on your code, so I don't know anything about real-world performance on your code. That's why I suggest doing some experiments (I haven't) with sending massive amounts of stdout, both from child processes and with just a single process (no forking).

From the link @kramer314 posted above, it seems that the real problem comes in trying to join a process that hasn't finished sending its data to the queue, and the join just stalls while the queue is full. I guess it's not the actual child process, since there is a separate thread writing to the queue for that child process.

Do we really need all of the stdout data to go through the master process? Usually, doesn't each child process write independently to stdout?

@piti118

FYI from a red warning box in multiprocessing it says:

Warning As mentioned above, if a child process has put items on a queue (and it has not used JoinableQueue.cancel_join_thread()), then that process will not terminate until all buffered items have been flushed to the pipe.
This means that if you try joining that process you may get a deadlock unless you are sure that all items which have been put on the queue have been consumed. Similarly, if the child process is non-daemonic then the parent process may hang on exit when it tries to join all its non-daemonic children.
Note that a queue created using a manager does not have this issue. See Programming guidelines.

Seems like we are safe?

@minrk
IPython member

piti118: the test should be in IPython/zmq/tests,
and it should:

  1. start a kernel
  2. send a command that forks a subprocess, which will print
  3. validates the output that the print statement arrived, and the subprocess did not crash

The other zmq tests should give you a sense of how to do this. If you aren't comfortable figuring it out, I can write the test.

But the other notes here suggest to me that we should not be using MP to do this, it is too easy to cause problems, and maybe a raw zmq solution is preferable.

@jasongrout
IPython member

Yes, from the docs it seems we are safe. However, it also says, "[Managers] are, however, slower than using shared memory."

So...how much are we giving up by adopting Managers even in single-process situations, and how much are we clogging thing by sending all output through the master process?

@piti118

I don't think we would be in any trouble for single process though. Since executing flush will blocks the master process from adding stuff to the que by definition(right?). That's my guess, I don't fully understand the ipython messaging architecture though.

The problem will only comes when child process keeps writing to the que while master process trying to flush them all out.

I think the better solution for that loop is

while not self._buffer.empty():
    data += self._buffer.get()
    if len(data) > some_number:
        print_data_out
        data=''

This way it prints out in the mean time it's trying to empty the que which doesn't give user the feeling of being hungup.

@minrk I'll try to look for an example there. Shouldn't be too hard. I'll let you know if I fail to understand those examples.

@minrk
IPython member

This way it prints out in the mean time it's trying to empty the que which doesn't give user the feeling of being hungup.

True, but it allows sys.stdout.flush to hang the main process indefinitely, just because the subprocess is printing a lot.

@piti118

Is there a way to get a lock on that queue returns from manager telling all other processes to stop adding/messing with the queue? That would fix all our problems.

@minrk
IPython member

@piti118: lock = manager.Lock()

@piti118

I think I found a way to do it right with manager.Lock(). Will make a commit shortly

@dec.parametric
def test_stream():
    flush_channels()

    msg_id, reply = execute("print('hi')")

    stdout = KM.sub_channel.get_msg(timeout=2)
    for tst in validate_message(stdout, 'stream', msg_id):
        yield tst
    content = stdout['content']
    yield nt.assert_equal(content['name'], u'stdout')
    yield nt.assert_equal(content['data'], u'hi\n')

Is this the example what I want? What does parametric do?

@minrk
IPython member

For just the simple single-process case that is ~100% of IPython usage:

%timeit
for i in range(100):
    print i

This is almost 100x slower than master (175ms vs 2ms in master vs 1ms in terminal) with this current mp Queue approach, and an extra ~10% slower (210ms) if I add locks on write/flush. I think another solution (probably raw zmq) is going to be necessary.

@piti118

I agree that's not acceptable.

@piti118

Can you run a profile on it? Just making sure the slowed down is not because of something avoidable.

@piti118

Let me also try iostream+manager.lock hybrid if the slow down is from que this should fix it.

Edited: never mind iostream isn't share among processes.

@minrk
IPython member

profile shows that ~all of the time is spend in _multiprocessing.Connection.recv calls. My guess is that mp.Queue is just a super slow data structure (not surprising, since it's for generic inter-process comm).

@piti118

How would zmq make it faster? I never use zmq before (Always want to learn how to do use it though). Wouldn't it have to open pipe or do something similar with large overhead too(which in the end will make it as slows as queue)?

@minrk
IPython member

The important thing is that it will make it faster for the most common case of no subprocesses.
It may not be faster when there are subprocesses, but that doesn't matter nearly as much.

@piti118

I see. You are right. I was looking also at pthread_atfork hook that would transform iostream to from fast single process to multiple process aware on first forking or something. Doesn't seem it's in python yet though. This solution would be a bit more complicated than zmq and won't be as good as zmq too.

Let me try learn zmq quick and make another PR and close this one.

@piti118

I have an idea. How about maintaining two buffers one for main process(StringIO) and other for all other things(Manager.Queue())?

This will neglect the ordering of print between main process and child process but will be ~as fast as before for single process use case.

@minrk
IPython member

This will neglect the ordering of print between main process and child process but will be ~as fast as before for single process use case.

That might work.

@piti118

Hmmm do you know if StringIO is thread safe?

@minrk
IPython member

I don't.

@piti118

I have a solution with zmq with some holes which I'm not sure what's the best course of action.

Solution is the following:

  • Constructor

    • store master_pid = os.getpid()
    • create master_pullsocket
    • create master_listener_thread target=Listener
    • store this_pid = os.getpid()
  • Listener

    • self._master_write(master_pullsocket.recv_unicode()) while True (quit on some signal)
  • On write

    • Master process: os.getpid()==master_pid do the same thing with stringio. (thread falls here I think)
    • Child process: os.getpid()!=master_pid
      • Newly created os.getpid()!=this_pid
        • set this_pid = os.getpid()
        • create pushsocket and connect to master socket
        • push whatever it tries to write to master socket
      • Old Child process:
        • push whatever it tries to write to master socket
  • On flush

    • Master thread: do the same thing as it was
    • Child thread: ignore?? Not even sure what I should do.

Now there are something I'm not sure:

  • Does os.getpid returns the same pid in a child "thread" and parent "thread"? (I'm guessing yes?)
  • Suppose I create a thread from the master process. Then I fork a process from master process. Does the newly forked process has a copy of that child attached? (I'm guessing no?)
  • Is push/pull the best pattern to use here? I was trying to find a solution with pub sub but I don't think it can be done transparently after forking though since it require subscriber in master process to subscribe the publisher on forked process one. We could send some signal to parent process but we might lose message.
  • This will screw up the ordering with main process anyway(but in lesser extent) maybe two buffers solution is the best?
@minrk
IPython member

You still need to check pid on both write and flush - you are not guaranteed that a write is called before flush in the subprocess.

Does os.getpid returns the same pid in a child "thread" and parent "thread"? (I'm guessing yes?)

Yes, threads live in the same process

Suppose I create a thread from the master process. Then I fork a process from master process. Does the newly forked process has a copy of that child attached? (I'm guessing no?)

I'm not sure what you mean - children are not 'attached'.

Is push/pull the best pattern to use here?

Yes, I would use push/pull. pub/sub will likely drop message in this kind of short-lived context.

This will screw up the ordering with main thread anyway(but in lesser extent) maybe two buffers solution is the best?

I don't care about ordering from threads - it is not guaranteed at all.

@piti118

This will screw up the ordering with main thread anyway(but in lesser extent) maybe two buffers solution is the best?
I don't care about ordering from threads - it is not guaranteed at all.

I was thinking of the case where user use semaphore/lock to arrange their out put in certain order. Non atomic push pull might rearrange it.

But this is probably not relevant anymore see the new PR.

@piti118

closing this see #2734 instead

@piti118 piti118 closed this Jan 3, 2013
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment