Skip to content

bpo-28053: Complete and fix custom reducers in multiprocessing. #9959

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 13 commits into from

Conversation

pablogsal
Copy link
Member

@pablogsal pablogsal commented Oct 18, 2018

This PR tries to complete and fix the implementation of the custom reducer classes in multiprocessing.

Important

I have marked the PR as DO-NOT-MERGE because I have still several doubts about the previous implemented API, regarding the AbstractReducer base class and the methods that the user needs to implement and how the rest of the library interacts with multiprocessing.reducer. For example:

  1. I am not sure multiprocessing.reducer.dumps and multiprocessing.reducer.register are needed outside the ForklingPickler class and how that interacts with the ABC.

  2. I am not sure the AbstractReducer is implemented completely (there is no abstract methods marked).

This PR is a draft implementation of the complete API, tests and documentation so we can discuss how to implement these correctly in a better way.

https://bugs.python.org/issue28053

Copy link
Member

@tirkarthi tirkarthi left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just a few minor typos I found while reading the PR.

Edit : Sorry, just read at the end that it's a draft implementation. Feel free to ignore them if needed.

@pablogsal pablogsal force-pushed the bpo28053 branch 2 times, most recently from 11763cf to 0745094 Compare October 19, 2018 20:26
@pitrou
Copy link
Member

pitrou commented Nov 6, 2018

Thanks @pablogsal for attacking this issue :-)

@1st1 1st1 removed their request for review November 6, 2018 20:57
@pablogsal pablogsal force-pushed the bpo28053 branch 3 times, most recently from 6ee544d to e27684c Compare December 9, 2018 19:16
@pablogsal
Copy link
Member Author

pablogsal commented Dec 9, 2018

@pitrou Thank you very much for the review!

I have simplified the API. Now setting a custom reducer looks like this:

   import multiprocessing
   from multiprocessing.reduction import AbstractReducer, ForkingPickler

    class ForkingPicklerProtocol2(ForkingPickler):
       @classmethod
       def dumps(cls, obj, pickle_protocol=2):
           return super().dumps(obj, protocol=pickle_protocol)

    class PickleProtocol2Reducer(AbstractReducer):
       def get_pickler_class(self):
           return ForkingPicklerProtocol2

    multiprocessing.set_reducer(PickleProtocol2Reducer)

I am making the interface a bit more strict, so multiprocessing.set_reducer() must be called with a subclass of AbstractReducer and get_pickler_class must return a subclass of pickler.Pickle. This way, the constructor and the rest of the methods needed for the multiprocessing reduction machinery will be there. I have added some new test that check this behaviour.

@pablogsal pablogsal force-pushed the bpo28053 branch 2 times, most recently from 8a68606 to 5e2fe4f Compare December 10, 2018 23:02
@pitrou
Copy link
Member

pitrou commented Feb 6, 2019

@pablogsal Do you need another review on this?

@pablogsal pablogsal force-pushed the bpo28053 branch 5 times, most recently from a16e157 to 856716c Compare May 27, 2019 17:44
@pablogsal
Copy link
Member Author

@pitrou It took me a while but I have stabilized all tests and fixed some details on Windows. I have also added Listener and Client to the context so they also can benefit from custom reducers. Please, check my previous comment regarding some details.

This patch is already very big and very very complex and when errors happen they are extremely obscure or platform dependent, so I apologize in advance if I miss something obvious, but I have too many spinning plates.

Could you take another look?

@pablogsal pablogsal requested a review from pitrou May 27, 2019 18:03
@pablogsal pablogsal force-pushed the bpo28053 branch 3 times, most recently from 715733f to 7ee45e1 Compare May 27, 2019 20:01
Copy link
Member

@pitrou pitrou left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the update. It seems there are test failures on all 3 CI platforms...


Defaults to :meth:`pickle.Pickle.dump`

.. classmethod:: loads(bytes_object, *, fix_imports=True, encoding="ASCII", errors="strict")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are the optional arguments required? Does multiprocessing ever pass them explicitly?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also, the method / classmethod asymmetry is weird and doesn't help designing an implementation. Do you think that can be fixed (one way or the other)?

Copy link
Member Author

@pablogsal pablogsal May 27, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, I find this very weird as well. We can make the load() be a method but that would require instantiating a Pickler() object for no reason (AbstractPickler must inherit from Pickler to make the dump work correctly). It will help with the asymmetry, though.

What do you think?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Notice that to instantiate the Pickler class we need to provide a dummy file-like object (probably a StringIO instance). I find that suboptimal as well.

Copy link
Member Author

@pablogsal pablogsal May 27, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The other possibility is making dump a method. In that case, we would need to create a Pickler instance and copy and update the dispatch table over it every time is called.


.. method:: get_pickler_class():

This method must return an subclass of :class:`pickler.Pickler` to be used by
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This does not make it very clear the relation ship between pickler.Pickler and AbstractPickler.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That should be AbstractPickler

@@ -1187,6 +1187,81 @@ For example:
the data in the pipe is likely to become corrupted, because it may become
impossible to be sure where the message boundaries lie.

Custom Reduction
~~~~~~~~~~~~~~~~
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You'll need some versionadded directive at some point.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we add this? Technically this PR is fixing the previous implementation, although as the old one was broken, one could argue that we are adding the feature.

@@ -51,14 +51,35 @@ def dumps(cls, obj, protocol=None):
cls(buf, protocol).dump(obj)
return buf.getbuffer()

loads = pickle.loads
@classmethod
def loads(cls, bytes_object, *, fix_imports=True,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Uh... I hadn't noticed these were class methods...

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The problem is that loads do not need to instantiate a Pickler class so it was designed here as a class method.

Would you prefer it to be a regular method that does the same (defers the call to pickle.loads)?



def loads(s, *, fix_imports=True, encoding="ASCII", errors="strict"):
return ForkingPickler.loads(s, fix_imports=fix_imports,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

By the way, I see that sharedctypes is still using _ForkingPickler directly. Should it be fixed as well?


@classmethod
def _put_and_get_in_queue(cls, queue, parent_can_continue):
parent_can_continue.set()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If you want to re-use the event afterwards, you have to clear it at some point. But I'm not sure all this synchronization is necessary (the queue already synchronizes for you).

p = self.custom_ctx.Process(target=self._put_and_get_in_queue,
args=(queue, parent_can_continue))
p.start()
parent_can_continue.wait()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You shouldn't need this wait, AFAICT.

parent_can_continue.set()
queue.put("Something")
queue.get(timeout=TIMEOUT2)
close_queue(queue)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it useful to close the queue explicitly?

Copy link
Member Author

@pablogsal pablogsal May 27, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yep, there were a bunch of race conditions in the CI related to this process not finishing. It seems is related to the queue thread doing something. I did not dig deeper but as soon as I added the extra synchronization, the failures went away.

I will increase the timeouts and try to remove this to see what happens.

element = queue.get(timeout=TIMEOUT3)
self.assertEqual(element, "Something")
queue.put("Other_Something")
parent_can_continue.wait()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure you need this, either (and I don't think you have to close the queue, unless it helps test something?).

Copy link
Member Author

@pablogsal pablogsal May 27, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Check the previous comment.

I will try to increase the timeouts and remove the event to see if the failures do not appear.



@unittest.skipUnless(HAS_REDUCTION, "test needs multiprocessing.reduction")
def test_queue_custom_reducer_over_default_context(self):
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same comments as above about events and queues.

@pablogsal pablogsal force-pushed the bpo28053 branch 6 times, most recently from a6734a0 to f20e142 Compare May 27, 2019 23:20
@pablogsal
Copy link
Member Author

pablogsal commented May 28, 2019

@pitrou There is some failures in Windows that I am investigating but I found a problem.

In multiprocessing/context.py there is no way of passing down the current context to the Process class. The Process class is an attribute of DefaultContext. If we change that to a function that injects the context down, any class that is inheriting from multiprocessing.Process will be broken because now it would not be a class (is a function).

This is the only exception as any other class is defined in BaseContext as a method that injects the context down, but not the process. Passing the context to the process is necessary because it calls dump (check for example multiprocessing/popen_spawn_win32.Popen.init).

I don't know how to solve this, but basically tests like test_queue_custom_reducer_over_custom_context are not possible as there is no way for the call to Process to pass down the context unless you do something ugly as:

self.custom_ctx.Process(..., ctx=self.custom_ctx)

Trying to do something like:

class DefaultContext(BaseContext):
    @property
    def Process(self):
        class _Process(Process):
             _ctx = self.get_context()
        return _Process

fails because _Process will not be picklable downstream. So I am out of ideas.

As you are more used to the architecture of the multiprocessing module, do you see a way of solving this?

If you don't see a way, I'm afraid that custom reducers per context cannot be implemented because of the way multiprocessing is architected.

@pierreglaser
Copy link
Contributor

pierreglaser commented Jul 19, 2019

@pablogsal thanks a lot for putting this PR together, this is great work. This feature is very promising.

Regarding the issue concerning the mp.Process class, it is definitely tricky. I think we should leverage the fact that we only want this context injection into the process class when the user explicitly asks for custom reduction behavior.
As a result, we can make Process a property of Context objects, and make the property return a different thing depending on if the user asked for custom reduction, which is a new feature (no backward compat issue), or not.

See proof of concept below

    @property
    def Process(self):
        if not self._custom_reduction_enabled:
            # Ensure backward compatibility by returning a class when no
            # custom reducer was specified
            return _Process  # ForkProcess for ForkContext, Process for BaseContext etc.
        else:
            return self.process_factory

    def process_factory(self, *args, **kwargs):
        p = Process(*args, **kwargs)
        p._ctx = self.get_context()
        return p

More complete implementation here:
https://github.com/pablogsal/cpython/compare/bpo28053...pierreglaser:inject-custom-context-into-process-cls?expand=1

Gist showing a usage example and its behavior:
https://gist.github.com/pierreglaser/ed4f9f9e784a3571cfdc8b969d32085f

What do you think?

EDIT: Another question is whether or not we consider that Context custom classes are part of the public API. The only place I think where Context classes are mentioned in the python docs is there:

Alternatively, you can use :func:`get_context` to obtain a context
object. Context objects have the same API as the multiprocessing
module, and allow one to use multiple start methods in the same

If we want to enable set_reducer with custom Context classes that have a Process attribute, most probably pointing to a Process class, then my above snippet will break -- how to handle this case is still an open question.

@pierreglaser
Copy link
Contributor

pierreglaser commented Jul 29, 2019

Another topic that appears many times in this PR is the strange AbstractPickler.loads classmethod and the symmetry between the load and dump for the Pickler subclasses returned by get_pickler_class.

A good way IMO to re-establish the symmetry here would be to use actual Unpickler classes to load pickle strings, instead of using smoke load methods of Pickler subclasses.

Thus, we could add an optional get_unpickler_class to the AbstractReducer API. The returned class would have to implement a load instance method. This way, the symmetry is re-established, and we do not create un-necessary Pickler instances at load time.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

7 participants