Skip to content

bpo-30966: Add multiprocessing.SimpleQueue.close() #2760

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 4 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions Doc/library/multiprocessing.rst
Original file line number Diff line number Diff line change
Expand Up @@ -862,6 +862,12 @@ For an example of the usage of queues for interprocess communication see

Put *item* into the queue.

.. method:: close()

Close the queue.

.. versionadded:: 3.7


.. class:: JoinableQueue([maxsize])

Expand Down
8 changes: 8 additions & 0 deletions Doc/whatsnew/3.7.rst
Original file line number Diff line number Diff line change
Expand Up @@ -210,6 +210,14 @@ math
New :func:`~math.remainder` function, implementing the IEEE 754-style remainder
operation. (Contributed by Mark Dickinson in :issue:`29962`.)


multiprocessing
---------------

The :class:`multiprocessing.SimpleQueue` class has a new
:meth:`~multiprocessing.SimpleQueue.close` method to explicitly close the
queue.

os
--

Expand Down
48 changes: 35 additions & 13 deletions Lib/multiprocessing/queues.py
Original file line number Diff line number Diff line change
Expand Up @@ -313,35 +313,57 @@ class SimpleQueue(object):
def __init__(self, *, ctx):
self._reader, self._writer = connection.Pipe(duplex=False)
self._rlock = ctx.Lock()
self._poll = self._reader.poll
if sys.platform == 'win32':
self._wlock = None
else:
self._wlock = ctx.Lock()
self._wlock = ctx.Lock()

def _error_closed(self):
return ValueError("operation on closed queue")

def close(self):
with self._rlock, self._wlock:
try:
reader = self._reader
if reader is not None:
self._reader = None
reader.close()
finally:
writer = self._writer
if writer is not None:
self._writer = None
writer.close()

def empty(self):
return not self._poll()
with self._rlock:
if self._reader is None:
raise self._error_closed()
return not self._reader.poll()

def __getstate__(self):
context.assert_spawning(self)
return (self._reader, self._writer, self._rlock, self._wlock)
with self._rlock, self._wlock:
if self._reader is None:
raise self._error_closed()
return (self._reader, self._writer, self._rlock, self._wlock)

def __setstate__(self, state):
# __setstate__() can be called before __init__(), so don't use _rlock
# nor _wlock
(self._reader, self._writer, self._rlock, self._wlock) = state
self._poll = self._reader.poll

def get(self):
with self._rlock:
if self._reader is None:
raise self._error_closed()
res = self._reader.recv_bytes()
# unserialize the data after having released the lock
return _ForkingPickler.loads(res)

def put(self, obj):
# serialize the data before acquiring the lock
obj = _ForkingPickler.dumps(obj)
if self._wlock is None:
# writes to a message oriented win32 pipe are atomic

# On Windows, while writing to a message oriented win32 pipe is atomic,
# wlock is still required to prevent race condition with close()
with self._wlock:
if self._writer is None:
raise self._error_closed()
self._writer.send_bytes(obj)
else:
with self._wlock:
self._writer.send_bytes(obj)
44 changes: 32 additions & 12 deletions Lib/test/_test_multiprocessing.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,25 +2,26 @@
# Unit tests for the multiprocessing package
#

import unittest
import queue as pyqueue
import time
import array
import errno
import gc
import io
import itertools
import sys
import logging
import operator
import os
import gc
import errno
import pickle
import queue as pyqueue
import random
import signal
import array
import socket
import random
import logging
import struct
import operator
import weakref
import test.support
import sys
import test.support.script_helper
import time
import unittest
from unittest import mock
import weakref


# Skip tests if _multiprocessing wasn't built.
Expand Down Expand Up @@ -4236,6 +4237,25 @@ def test_empty(self):

proc.join()

def test_close(self):
queue = multiprocessing.SimpleQueue()
queue.close()

# closing a queue twice should not fail
queue.close()

# operations on closed queue fail with ValueError
with self.assertRaises(ValueError):
queue.put("data")
with self.assertRaises(ValueError):
queue.get()
with self.assertRaises(ValueError):
queue.empty()
with self.assertRaises(ValueError):
with mock.patch('multiprocessing.queues.context.assert_spawning'):
# Test SimpleQueue.__getstate__()
pickle.dumps(queue)

#
# Mixins
#
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
Add a new close() method to multiprocessing.SimpleQueue to explicitly close
the queue.