New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Python mulitiprocessing.Queue fail to get according to correct sequence #76563
Comments
I try to implement a "producer consumer" like design with mulitiprocessing module in my project, but I found that mulitiprocessing.Queue 's behavior is not as my expected. It seems Queue.get method return the end flag at the end of my queue too early. I am not experienced at muliti-process programing, I am not sure it's a bug or not. For reproduce this, I have simplified my code as following: import time
import multiprocessing as mp
def worker(task_queue, output_queue):
while 1:
i = task_queue.get()
if i is None:
print("Process-%d done"%mp.current_process().pid)
task_queue.task_done()
break
output_queue.put(i+1)
task_queue.task_done()
def outputer(output_queue):
c = 0 # val for count how many obj geted
while 1:
j = output_queue.get()
if j is None:
print("Process(output)-%d done"%mp.current_process().pid)
c += 1
print("outputer get %d objects from the output_queue"%c)
assert output_queue.empty(), "output queue should be empty here"
break
time.sleep(0.0001) # do output here
c += 1
if __name__ == "__main__":
task_queue = mp.JoinableQueue()
#output_queue = mp.JoinableQueue()
output_queue = mp.Queue()
workers = [mp.Process(target=worker, args=(task_queue, output_queue))
for i in range(10)]
outputer = mp.Process(target=outputer, args=(output_queue,))
for w in workers:
w.start()
outputer.start()
for i in range(10**6):
task_queue.put(i)
for w in workers: # put end flag to task queue
task_queue.put(None)
task_queue.join() # wait all tasks done
print("all tasks done.")
print("queue size before put end flag: %d"%output_queue.qsize())
output_queue.put(None) # put end flag to output queue
print("end") Get the output: Process-20923 done
Process-20931 done
Process-20925 done
Process-20930 done
Process-20927 done
Process-20929 done
Process-20928 done
Process-20926 done
Process-20924 done
Process-20932 done
all tasks done.
queue size before put end flag: 914789
end
Process(output)-20933 done
outputer get 90383 objects from the output_queue
Process Process-11:
Traceback (most recent call last):
File "/home/nanguage/S/miniconda3/lib/python3.6/multiprocessing/process.py", line 249, in _bootstrap
self.run()
File "/home/nanguage/S/miniconda3/lib/python3.6/multiprocessing/process.py", line 93, in run
self._target(*self._args, **self._kwargs)
File "joinablequeue.py", line 27, in outputer
assert output_queue.empty(), "output queue should be empty here"
AssertionError: output queue should be empty here I have wait all worker put the output to the output queue use taks_queue.join(), then I put the end flag to the output queue, but according to outputer's printed information, it get the |
Hi Weize, Since this seems be a support question, I suggest you ask it either on https://stackoverflow.com/ or on the Python users' mailing-list https://mail.python.org/mailman/listinfo/python-list |
Closing this issue in the meantime. |
First thing: the code uses the global name Then comes the pain ;-) A multiprocessing queue is a rather complex object under the covers, and the docs don't really spell out all the details. Maybe they should. The docs do vaguely sketch that a "feeder thread" is created in each process using an mp.queue, which feeds object you .put() from an internal buffer into an interprocess pipe. The internal buffer is needed in case you .put() so many objects so fast that feeding them into a pipe directly would cause the OS pipe functions to fail. And that happens in your case: you have 10 producers running at full speed overwhelming a single slow consumer. _Most_ of the data enqueued by output_queue.put(i+1) is sitting in those internal buffers most of the time, and the base interprocess pipe doesn't know anything about them for the duration. The practical consequence: while the queue always reflects the order in which objects were .put() within a single process, there's no guarantee about ordering _across_ processes. Objects are fed from internal buffers into the shared pipe whenever a process's feeder thread happens to wake up and sees that the pipe isn't "too full". task_queue.task_done() only records that an object has been taken off of task_queue and _given_ to output_queue.put(i+1); most of the time, the latter just sticks i+1 into an internal buffer because output_queue's shared pipe is too full to accept another object. Given that this is how things actually work, what you can do instead is add: for w in workers:
w.join() somwehere before output_queue.put(None). A worker process doesn't end until its feeder thread(s) complete feeding all the internal buffer objects into pipes, so .join()'ing a worker is the one "obvious" way to guarantee that all the worker's .put() actions have wholly completed. In which case, there's no point to using a JoinableQueue at all - .task_done() no longer serves any real purpose in the code then. |
Note: these values reflect the state of the issue at the time it was migrated and might not reflect the current state.
Show more details
GitHub fields:
bugs.python.org fields:
The text was updated successfully, but these errors were encountered: