New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
iostream fix for multiprocessing with no performance hit to single process #2734
Conversation
So, what it does here is that it has two buffers. Whenever multiprocess is detected, it switch to multiprocess mode and use que as buffer instead of StringIO. This way there would be no performance hit to single process use case. %%timeit
for i in range(100):
print i Before any kind of fork it gives ~2ms/loop. After something like import multiprocessing as mp
def f(x):
print 'hello',x
pool = [mp.Process(target=f,args=(i,)) for i in range(10)]
for p in pool: p.start()
for p in pool: p.join() %%timeit
for i in range(100):
print i will be around 200ms/loop. |
Still couldn't quite figure out how to write a test for this though.... |
Why switch modes for the master process? Why doesn't the master process only write to the regular buffer, and just use the Queue for subprocesses? I know it ensures ordering of print statements between the master process and the subprocesses, but that's not a guarantee that I feel like we need to make (it's not even a guarantee that the terminal makes, really). |
import multiprocessing as mp
from time import sleep
manager = mp.Manager()
v = manager.Value('i',9)
def f(x, v):
while(v.value!=x):
pass
print 'hello',x
v.value -= 1
pool = [mp.Process(target=f,args=(i,v)) for i in range(10)]
for p in pool: p.start()
for p in pool: p.join() It's not 100% guaranteed(since the buffer will be merge only when the master process know what's going on) but It definitely gives a more sensible ordering though. |
I think this approach makes sense. If you make any print statements from a forked process you will trigger (forever) the super slow MP-safe approach. It's unfortunate that it will stick around forever, even while no subprocesses are running, but I think it's acceptable for now. Things to check:
One alternative would be to only write to the Queue on flush, rather than on every write. This would change the sync point for interleaved output, but would result in many fewer Queue items. |
Could you write one simple test to do the testing? I can implement the others but i'm not sure where to start writing test. |
Sure, no problem. |
Found one bug with current implementation though. If I let fork.process run and kill ipython notebook server before it joins... it segfault. I'm guessing it's trying to do something funny with that socket. |
tests new mp.Process behavior for PR ipython#2734
If you pull from my '2734' branch: https://github.com/minrk/ipython/tree/2734, you will get a few basic tests. |
Hm, there is an issue with the mp manager, though. mp.Manager creates subprocesses, and they don't get properly cleaned up if the master process is terminated: import os, signal
from multiprocessing import Manager
m = Manager()
os.kill(os.getpid(), signal.SIGTERM) will always leave the manager's process running. This will have to be addressed (it may be another reason that using a Manager will simply not be acceptable). |
Thanks, will pull that soon. But for issue your raise, I'm not sure about inner working of ipython signal handling and manger but wouldn't something like this work? def __del__(self):
self._manager.shutdown() Not sure what happens if the old process still trying to write to it though. |
Or may be we should implement a poor man queue from mp.sharedctypes |
On Thu, Jan 3, 2013 at 4:44 PM, Piti Ongmongkolkul <notifications@github.com
|
mp.sharedctypes has lock though(those without raw in its name). What do you mean by corruption issues? http://docs.python.org/2/library/multiprocessing.html#module-multiprocessing.sharedctypes Although I'm not sure about dynamic allocation... |
On Thu, Jan 3, 2013 at 5:07 PM, Piti Ongmongkolkul <notifications@github.com
The mp docs are littered with warnings about corruption issues. But even without corruption, there are other issues. For instance, what
|
The doc said you are screwed
I think that's what manager.shutdown is for though if it's gets called that the right places we should be ok. |
But that's exactly the point - this simply cannot be guaranteed. This means that Manager can never be an acceptable tool for this. |
Why wouldn't something like this work? def __del__(self):
if self.is_masterprocess():
self._manager.shutdown() Wouldn't del get called when terminating ipython kernel? I'm not talking about kill -9 but SIGTERM or something proper. |
On Thu, Jan 3, 2013 at 6:37 PM, Piti Ongmongkolkul <notifications@github.com
|
Hmmm that cuts manager out from picture completely.... I'm thinking about memory map file solution. That way I delegate the problem to OS level instead. |
Just thinking out loud here: ZMQ with pull on flush... would screwed up the order. |
On Thu, Jan 3, 2013 at 6:59 PM, Piti Ongmongkolkul <notifications@github.com
|
That brings me to one of the previous question: whether the forked process after threading will have a copy of thread from the master process attach to it or not. (sorry I learned this long time ago and couldn't remember the finer details) Concretely Our master process is |
On Thu, Jan 3, 2013 at 7:18 PM, Piti Ongmongkolkul <notifications@github.com
I have a prototype working with zmq push/pull that I can show you, if you
|
I stop writing zmq with polling thread midway because of the following concern. def poller():
lock()
write_to_buffer() #<<< supposed fork happens when this thread is here
unlock() Lock is acquired forever(??) in the forked process or double unlock or what happen? |
Hmm thread gets automatically killed when main process is killed? If not then we, have a zombie. |
On Thu, Jan 3, 2013 at 8:09 PM, Piti Ongmongkolkul <notifications@github.com
|
You may be right. From my vague memory I recall that child thread will have no idea the parent thread gets kill and need signal handler to do that. I may be wrong though let me consult pthread manual. |
Or maybe python thread is something different... I give up. I trust you though 😉 |
you don't need any inter-process locks at all.
So the background process never grabs any lock. |
good point. will have another PR (hopefully will fix this for real this time). Thanks for being so patient with me. |
Now that I think about it again it wouldn't work though. Here is the problem def poll(self):
while True:
if ismasterprocess(): #doesn't guard sufficiently though
thread_lock()
pull_socket.pull() <<< suppose fork on p0 happens while t0 is deep inside that call
writer_to_buffer()
thread_unlock()
else:
break Now after fork from This would mean that it might reuse the zmq socket and casue the segfault again. |
from fork man page:
Exactly 1 single calling thread is duplicated. I worried too much. http://pubs.opengroup.org/onlinepubs/009695399/functions/fork.html |
If you want to pull from my forksafe_nomp branch, that should get us most of the way there. |
I also have my zmq. The implementation is a little different, a bit shorter. The major different from yours is that child process flush get cascaded to parent. Mine doesn't have uuid though. However, for me if fork and join is fast message drops like crazy. https://github.com/piti118/ipython/tree/mp_fix_iostream_zmq <<< a bunch of debug message is still there |
Hmmm actually I'm not sure if your solution will have this problem. Run this like 3 times and it will definitely hung up. import zmq
import multiprocessing as mp
import threading as t
def listener(quit):
context = zmq.Context()
puller = context.socket(zmq.PULL)
puller.bind('tcp://127.0.0.1:5555')
i=0
while not quit.is_set():
i+=1
print 'poll',i
s = puller.poll(1000)
if s:
m = puller.recv_string()
print m
def sender(i):
context = zmq.Context()
pusher = context.socket(zmq.PUSH)
pusher.connect('tcp://127.0.0.1:5555')
pusher.send_string(str(i))
quit = t.Event()
listen_thread = t.Thread(target=listener, args=(quit,))
listen_thread.start()
numsend=100
pool = [mp.Process(target=sender, args=(i,)) for i in range(numsend)]
for i, p in enumerate(pool):
print 'start', i
p.start()
for i, p in enumerate(pool):
print 'join',i
p.join()
print 'hey'
quit.set()
listen_thread.join() |
On Fri, Jan 4, 2013 at 2:35 AM, Piti Ongmongkolkul <notifications@github.com
|
Feel free to close this PR since your solution is clearly superior. |
superseded by #2791 |
tests new mp.Process behavior for PR ipython#2734
tests new mp.Process behavior for PR ipython#2734
Better version of #2712