Skip to content

bpo-28053: Allow custom reducer when using multiprocessing #15058

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 32 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
32 commits
Select commit Hold shift + click to select a range
5d5b7a4
bpo-28053: Complete and fix custom reducers in multiprocessing.
pablogsal Oct 18, 2018
1fcf62e
Simplify the interface and use convenience functions
pablogsal Dec 9, 2018
4dd92e0
Fix some typos and add new test for custom reducers
pablogsal Mar 16, 2019
5312390
Implement per-context reducers
pablogsal Mar 17, 2019
ac9a8d3
Implement tests for custom contexts
pablogsal Mar 23, 2019
95046c2
Make reducer default to None at class level
pablogsal Mar 23, 2019
c9bf4f5
Fix doc reference
pablogsal Mar 23, 2019
94c6a66
Add docstrings to the test classes
pablogsal Mar 23, 2019
9d39a34
Add context to Process classes && revert back to use get_pickler_class
pablogsal Mar 23, 2019
9ccc067
Adress feedback and add custom context for Listener/Clients
pablogsal May 27, 2019
8704e8e
Provide and document a base class (AbstractPickler) for custom picklers
pablogsal May 27, 2019
722f2b9
Try to simplify tests
pablogsal May 27, 2019
5e03ffa
Try to pass down the context manually to Process·
pablogsal May 28, 2019
8bb0d0d
ENH backward-compat friendly context injection
pierreglaser Jul 19, 2019
508a992
make process_factory actually return a Process
pierreglaser Jul 29, 2019
984cdd9
multiprocessing mirrors dynamically default_context
pierreglaser Jul 31, 2019
6ce49bf
ENH pass the reducer to Process (not the context)
pierreglaser Jul 31, 2019
e2cedd1
adapt the tests to new changes
pierreglaser Jul 31, 2019
c99a542
fix Popen/Process in windows
pierreglaser Jul 31, 2019
aa9215f
CLN unused import
pierreglaser Jul 31, 2019
bb7d03e
ENH pass separately pickling and unpickling routines
pierreglaser Aug 1, 2019
693e2b6
update the tests to test pickling and unpickling
pierreglaser Aug 1, 2019
ee23671
DOC update the docs to match Unpickler/Pickler changes
pierreglaser Aug 1, 2019
283c691
CLN cosmetics
pierreglaser Aug 1, 2019
7154be7
dont try to find the number of dump/load calls
pierreglaser Aug 1, 2019
6af4e1c
TST test custom reducer for Process
pierreglaser Aug 1, 2019
515bbfd
DOC adapt example
pierreglaser Aug 1, 2019
525239a
CLN cosmetics, namings, comments
pierreglaser Aug 1, 2019
87cffec
DOC fix AbstractPickler signature
pierreglaser Aug 1, 2019
a31bdae
CLN make Context.process private, unused import
pierreglaser Aug 7, 2019
cd50558
unused imports and unnecessary changes
pierreglaser Feb 2, 2020
2f117ec
update to use new timeout constants
pierreglaser Feb 2, 2020
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
97 changes: 97 additions & 0 deletions Doc/library/multiprocessing.rst
Original file line number Diff line number Diff line change
Expand Up @@ -1193,6 +1193,103 @@ For example:
the data in the pipe is likely to become corrupted, because it may become
impossible to be sure where the message boundaries lie.

Custom Reduction
~~~~~~~~~~~~~~~~

.. currentmodule:: multiprocessing

Several primitives of the :mod:`multiprocessing` module such as
:class:`multiprocessing.Queue`, :class:`multiprocessing.connection.Listener` or
:class:`multiprocessing.connection.Server` need to serialize and deserialize Python
Comment on lines +1199 to +1203
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Aren't either the currentmodule directive or the multiprocessing.*s in the roles redundant in this part?

objects to communicate between processes. Sometimes it is useful to control what
serialization is to be used for the transport of data in order to support communication
with different versions of Python, use more performant third party tools or custom
strategies.

For this purpose a set of hooks is available to provide alternate implementations of
the reduction mechanism:

.. currentmodule:: multiprocessing.reduction

.. class:: AbstractPickler(file, protocol)

Base class that can be implemented in order to override
serialization methods of the reduction machinery used by multiprocessing.

.. method:: dump(obj)

Write a pickled representation of obj to the open file.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
Write a pickled representation of obj to the open file.
Write a pickled representation of *obj* to the open file.


Defaults to :meth:`pickle.Pickler.dump`

.. class:: AbstractUnpickler(bytes_object, *, fix_imports=True, encoding="ASCII", errors="strict")

Base class that can be implemented in order to override
multiprocessing's default unserialization mechanism.

.. method:: load()

Read a pickled object hierarchy from the open file and return the
reconstituted object hierarchy specified therein.

Defaults to :meth:`pickle.Unpickler.load`

.. class:: AbstractReducer()

Base class providing access to custom ``Pickler`` and ``Unpickler``
classes to be used by ``multiprocessing`` when serializing objects.

.. method:: get_pickler_class():
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
.. method:: get_pickler_class():
.. method:: get_pickler_class()


This method must return an subclass of :class:`AbstractPickler` to be used
by ``multiprocessing`` when serializing objects.

.. method:: get_unpickler_class():
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
.. method:: get_unpickler_class():
.. method:: get_unpickler_class()


This method must return an subclass of :class:`AbstractUnpickler` to be used
by ``multiprocessing`` when unserializing objects.


Note that both methods of the :class:`AbstractReducer` are optional. If
:meth:`get_pickler_class` (resp. :meth:`get_unpickler_class`) is not
implemented, multiprocessing will fallback to the :class:`pickle.Pickler`
(resp. :class:`pickle.Unpickler`) class to serialize objects.

.. currentmodule:: multiprocessing

.. method:: set_reducer(reduction)

Sets a reduction instance to be used for serialization and deserialization
by the module primitive internals. **reduction** must be an instance of a
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
by the module primitive internals. **reduction** must be an instance of a
by the module primitive internals. *reduction* must be an instance of a

Arguments should be marked in italic: https://devguide.python.org/documenting/#rest-inline-markup

subclass of :class:`AbstractReducer`.

.. method:: get_reducer()

Gets the current reduction class in use by the module's primitive internals.

For example, substituting the reducer class to use the :mod:`pickle` protocol
version 2 to be able to communicate with a Python 2.x programs.::
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please change documentation and examples not to reference Python 2 as that long EOL at this point. Just say it in the abstract "communicate with processes using older Python versions" and offer some explanation as to how this is even possible by linking to the relevant APIs in multiprocessing that enable people to create such unexpected monsters. (see #67592 (comment))

Most multiprocessing users view it as a module with a single parent process controlling identical by definition Python version child processes that it launches and controls.


import multiprocessing
from multiprocessing.reduction import AbstractReducer, AbstractPickler

class PicklerProtocol2(AbstractPickler):
def __init__(self, file, protocol=2, **kwargs):
super().__init__(file, protocol, **kwargs)

def dump(self, obj):
return super().dump(obj)


class PickleProtocol2Reducer(AbstractReducer):
def get_pickler_class(self):
return PicklerProtocol2

multiprocessing.set_reducer(PickleProtocol2Reducer())

Notice that using :meth:`multiprocessing.set_reducer` changes the reducer globally. If
changing this setting globally is undesirable you could call :meth:`context.set_reducer`,
where *context* is a context object obtained by calling :func:`get_context`.

Synchronization primitives
~~~~~~~~~~~~~~~~~~~~~~~~~~
Expand Down
6 changes: 5 additions & 1 deletion Lib/multiprocessing/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,11 @@
#

__all__ = [x for x in dir(context._default_context) if not x.startswith('_')]
globals().update((name, getattr(context._default_context, name)) for name in __all__)


def __getattr__(name):
return getattr(context._default_context, name)


#
# XXX These should not really be documented or public.
Expand Down
51 changes: 27 additions & 24 deletions Lib/multiprocessing/connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,8 @@
from . import util

from . import AuthenticationError, BufferTooShort
from .context import reduction
_ForkingPickler = reduction.ForkingPickler
from . import context
from . import get_context

try:
import _winapi
Expand Down Expand Up @@ -114,7 +114,8 @@ def address_type(address):
class _ConnectionBase:
_handle = None

def __init__(self, handle, readable=True, writable=True):
def __init__(self, handle, readable=True, writable=True, ctx=None):
self._ctx = ctx or get_context()
handle = handle.__index__()
if handle < 0:
raise ValueError("invalid handle")
Expand Down Expand Up @@ -203,7 +204,7 @@ def send(self, obj):
"""Send a (picklable) object"""
self._check_closed()
self._check_writable()
self._send_bytes(_ForkingPickler.dumps(obj))
self._send_bytes(self._ctx.get_reducer().dumps(obj))

def recv_bytes(self, maxlength=None):
"""
Expand Down Expand Up @@ -248,7 +249,7 @@ def recv(self):
self._check_closed()
self._check_readable()
buf = self._recv_bytes()
return _ForkingPickler.loads(buf.getbuffer())
return self._ctx.get_reducer().loads(buf.getbuffer())

def poll(self, timeout=0.0):
"""Whether there is any input available to be read"""
Expand Down Expand Up @@ -436,16 +437,16 @@ class Listener(object):
This is a wrapper for a bound socket which is 'listening' for
connections, or for a Windows named pipe.
'''
def __init__(self, address=None, family=None, backlog=1, authkey=None):
def __init__(self, address=None, family=None, backlog=1, authkey=None, *, ctx=None):
family = family or (address and address_type(address)) \
or default_family
address = address or arbitrary_address(family)

_validate_family(family)
if family == 'AF_PIPE':
self._listener = PipeListener(address, backlog)
self._listener = PipeListener(address, backlog, ctx=ctx)
else:
self._listener = SocketListener(address, family, backlog)
self._listener = SocketListener(address, family, backlog, ctx=ctx)

if authkey is not None and not isinstance(authkey, bytes):
raise TypeError('authkey should be a byte string')
Expand Down Expand Up @@ -490,16 +491,16 @@ def __exit__(self, exc_type, exc_value, exc_tb):
self.close()


def Client(address, family=None, authkey=None):
def Client(address, family=None, authkey=None, *, ctx=None):
'''
Returns a connection to the address of a `Listener`
'''
family = family or address_type(address)
_validate_family(family)
if family == 'AF_PIPE':
c = PipeClient(address)
c = PipeClient(address, ctx=ctx)
else:
c = SocketClient(address)
c = SocketClient(address, ctx=ctx)

if authkey is not None and not isinstance(authkey, bytes):
raise TypeError('authkey should be a byte string')
Expand Down Expand Up @@ -580,7 +581,7 @@ class SocketListener(object):
'''
Representation of a socket which is bound to an address and listening
'''
def __init__(self, address, family, backlog=1):
def __init__(self, address, family, backlog=1, *, ctx=None):
self._socket = socket.socket(getattr(socket, family))
try:
# SO_REUSEADDR has different semantics on Windows (issue #2550).
Expand All @@ -603,11 +604,12 @@ def __init__(self, address, family, backlog=1):
)
else:
self._unlink = None
self._ctx = ctx

def accept(self):
s, self._last_accepted = self._socket.accept()
s.setblocking(True)
return Connection(s.detach())
return Connection(s.detach(), ctx=self._ctx)

def close(self):
try:
Expand All @@ -619,15 +621,15 @@ def close(self):
unlink()


def SocketClient(address):
def SocketClient(address, *, ctx=None):
'''
Return a connection object connected to the socket given by `address`
'''
family = address_type(address)
with socket.socket( getattr(socket, family) ) as s:
s.setblocking(True)
s.connect(address)
return Connection(s.detach())
return Connection(s.detach(), ctx=ctx)

#
# Definitions for connections based on named pipes
Expand All @@ -639,7 +641,7 @@ class PipeListener(object):
'''
Representation of a named pipe
'''
def __init__(self, address, backlog=None):
def __init__(self, address, backlog=None, *, ctx=None):
self._address = address
self._handle_queue = [self._new_handle(first=True)]

Expand All @@ -649,6 +651,7 @@ def __init__(self, address, backlog=None):
self, PipeListener._finalize_pipe_listener,
args=(self._handle_queue, self._address), exitpriority=0
)
self._ctx = ctx

def _new_handle(self, first=False):
flags = _winapi.PIPE_ACCESS_DUPLEX | _winapi.FILE_FLAG_OVERLAPPED
Expand Down Expand Up @@ -683,15 +686,15 @@ def accept(self):
finally:
_, err = ov.GetOverlappedResult(True)
assert err == 0
return PipeConnection(handle)
return PipeConnection(handle, ctx=self._ctx)

@staticmethod
def _finalize_pipe_listener(queue, address):
util.sub_debug('closing listener with address=%r', address)
for handle in queue:
_winapi.CloseHandle(handle)

def PipeClient(address):
def PipeClient(address, *, ctx=None):
'''
Return a connection object connected to the pipe given by `address`
'''
Expand All @@ -716,7 +719,7 @@ def PipeClient(address):
_winapi.SetNamedPipeHandleState(
h, _winapi.PIPE_READMODE_MESSAGE, None, None
)
return PipeConnection(h)
return PipeConnection(h, ctx=ctx)

#
# Authentication stuff
Expand Down Expand Up @@ -950,23 +953,23 @@ def reduce_connection(conn):
def rebuild_connection(ds, readable, writable):
sock = ds.detach()
return Connection(sock.detach(), readable, writable)
reduction.register(Connection, reduce_connection)
context.reduction.register(Connection, reduce_connection)

def reduce_pipe_connection(conn):
access = ((_winapi.FILE_GENERIC_READ if conn.readable else 0) |
(_winapi.FILE_GENERIC_WRITE if conn.writable else 0))
dh = reduction.DupHandle(conn.fileno(), access)
dh = context.reduction.DupHandle(conn.fileno(), access)
return rebuild_pipe_connection, (dh, conn.readable, conn.writable)
def rebuild_pipe_connection(dh, readable, writable):
handle = dh.detach()
return PipeConnection(handle, readable, writable)
reduction.register(PipeConnection, reduce_pipe_connection)
context.reduction.register(PipeConnection, reduce_pipe_connection)

else:
def reduce_connection(conn):
df = reduction.DupFd(conn.fileno())
df = context.reduction.DupFd(conn.fileno())
return rebuild_connection, (df, conn.readable, conn.writable)
def rebuild_connection(df, readable, writable):
fd = df.detach()
return Connection(fd, readable, writable)
reduction.register(Connection, reduce_connection)
context.reduction.register(Connection, reduce_connection)
Loading