Skip to content

distributed.py - definitely needs update from stopevent; having headaches coding #101

@drallensmith

Description

@drallensmith
Contributor

@bennr01: Hi. I was wanting to check to make sure that distributed evaluation worked with checkpointing (classes that aren't built at the top level of a file can be problematic for that, such as the _EvaluatorSyncManager, being one reason for my concern), so put together a test_xor_example_distributed.py test case, which you can see in my fork's config_work branch. However, with no alterations to distributed.py, it usually had a timeout error while waiting for the stopevent to connect. The one time it didn't, it thought the DistributedEvaluator was still running (since de.started was not reset to False by de.stop()), and so errored out anyway. The fix for the latter error is easy; the former, assuming it's a problem with using an event instead of a Value, is another matter.

I've been trying to make the changeover to stopevent but am having some major headaches getting a proxy value distributed between multiple processes that may be on different machines (my initial try, putting a Value from syncmanager into the queue as the first entry, would have worked... except that other secondary processes didn't get the Value! Oops....). Any suggestions? Using namespace isn't working, which was my most recent try, replacing Value.

I also changed the "master/slave" terminology to "primary/secondary", which is more modern usage.

Activity

bennr01

bennr01 commented on Jul 23, 2017

@bennr01
Contributor

Hi @drallensmith, sorry for causing you some headaches.
I think most of the issues can be fixed by moving the _EvaluatorSyncManager in the module namespace and also define another class, which interacts with the _EvaluatorSyncManager. The reason _EvaluatorSyncManager is currently defined in _start_* is because it needs access to a namespace which is not cleared when the method returns. Using another class for interacting with the _EvaluatorSyncManager should allow us to use the namespace/attributes of that class instead of the namespace/attributes of the DistributedEvaluator. If this class defines __reduce__, it should be possible to instruct pickle to create a new manager upon checkpoint loading instead of trying to load the old manager.

I've been trying to make the changeover to stopevent but am having some major headaches getting a proxy value distributed between multiple processes that may be on different machines (my initial try, putting a Value from syncmanager into the queue as the first entry, would have worked... except that other secondary processes didn't get the Value! Oops....). Any suggestions? Using namespace isn't working.

We could try to request the value manually every time the _secondary_loop checks it.
I can try to do the changes. Should i fork your config_work branch and create a PR to your fork when/if i am successful? This would reduce the number of pull-requests and prevent another merge conflict.

I also changed the "master/slave" terminology to "primary/secondary", which is more modern usage.

primary/secondary is definitely better than the master/slave (And even better than server/client, which i used before master/slave).

Regarding #51: By now i got a small ARM-cluster for testing, but setting it up will take some more time.

drallensmith

drallensmith commented on Jul 23, 2017

@drallensmith
ContributorAuthor

Forking it sounds fine, and no problem re the headaches - thanks for your contribution!

At this point I am actually more concerned re the timeouts and similar errors than pickling - with the exception of the one due to self.started not being set to False by de.stop, they come prior to any attempt to "resurrect" the code. It may be most informative to you to try to run the test_xor_example_distributed.py file with a mostly-unmodified (just change the terminology) distributed.py version (oops on my part for not cleanly commiting just the terminology change - the commit before the most recent two, starting with 42c878, may be most useful).

I'm glad you like the primary/secondary - it isn't quite server/client either, since the secondary nodes aren't making requests of the primary node. The ARM cluster is good news; @D0pa might also be able to help on that.

evolvingfridge

evolvingfridge commented on Jul 23, 2017

@evolvingfridge
Contributor

I want to contribute but fail to understand fallowing:
What is advantage and purpose of using python multi-processing when experiment requires large scale simulation, when MPI solves this issue much more easier (at least for me) and allows multiple setups such as micro services environment or distributed population ?

Additionally I see biggest advantage in using MPI in future is because of Xeon PHI cards, that hopefully will be more affordable.

drallensmith

drallensmith commented on Jul 23, 2017

@drallensmith
ContributorAuthor

I understand your question, and it's a good one - see the comment I just made on #51 for some thoughts on the matter.

drallensmith

drallensmith commented on Jul 24, 2017

@drallensmith
ContributorAuthor

@bennr01 - I have just uploaded a new commit to my fork's config_work branch; I went back to using Value for a do_stop proxy, this time establishing a separate queue for it to get to the secondaries (and be returned by them), plus using dict to figure out that one has to check the do_stop._value attribute for the secondaries (for the primary, just set do_stop to True).

However, test_distributed.py now either hangs on trying to join the single-worker secondary node (if not trying the multiple-worker version), or the child evaluation process (multiple workers) gets an EOFError from the inqueue being closed. (The latter may be fixable by "telling" it this is the equivalent of do_stop being turned on... I am admittedly a bit puzzled by why the code currently treats managers.RemoteError as if it were queue.Empty - this makes sense for the primary, but I'm not sure if it does for the secondaries.)

bennr01

bennr01 commented on Jul 24, 2017

@bennr01
Contributor

@D0pa The main advantage of using multiprocessing instead of mpi4py is that multiprocessing does not need any other dependencies. Most systems which have python installed also have the multiprocessing module available. mpi4py can easily be installed using pip, but it seems like it needs some dependencies (libmpich-dev). I heard that some of the universities make it rather hard to get the root access which may be required to install these dependencies.
The second advantage is that it seems like mpi4py requires more code. When comparing evolve-feedforward-distributed.py and xor-feedforward-mpi-data-scatter.py (from https://github.com/CodeReclaimers/neat-python/pull/72/files), evolve-feedforard-distributed.py has 85 lines of code less than xor-feedforward-mpi-data-scatter.py (however, xor-feedforward-mpi-data-scatter.py has a longer docstring and evolve-feedforward-distributed.py commandline argument support for testing purposes, so this is not an ideal measurement.).
I would like to say that the third advantage is the easier integration into existing code, but this varies between users. Adding support for clusters using distributed requires 3 additional lines of code and a modification to the line containing the population.run call. However, if someone writes an evaluation function which actually requires being run on a cluster, that person would have written his/her code with this in mind, so this is not really an advantage. Most people using clusters will have more experience with mpi4py than with distributed/multiprocessing, which makes it easier for them to use mpi4py. neat.distributed was written for inexperienced people who need to run some evaluations on a cluster without much technical knowledge.
Obviously, there are also some disadvantages of using multiprocessing instead of mpi4py. One being the Xeon PHI cards (and other technology requiring mpi). The probably biggest disadvantage is the performance. I have not measured it, but mpi is probably faster than multiprocessing.managers. But if data transfer rate between the nodes really matters in comparsion with the evaluation time, then the evaluation would probably be faster when using neat.parallel instead of a cluster, wouldn't it?

The reason i chose to use multiprocessing.managers instead of mpi4py was not related to the advantages and disadvantages. When i looked for a way to run neat-python on a cluster, i stumbled upon your PR #72. The PR was closed and not merged, so i thought there was some reason it was not merged. I do not understand most of the MPI code in in your MPI example, so i wanted to either try to split most of the code into a separate module for easier re-usability or create a completely new module. Like i said, because the PR was closed, i thought that there was some reason it was not merged and concluded that using mpi4py may not be the best idea, so i did not even try it.

Thanks for offering your help in both this issue and in #51. My cluster is actually already setup, the only issue is that pip install requires more time than it should.

@drallensmith The real manager and all objects it holds are stored in a seperate child process (according to the documentation of multiprocessing). Due to this, both the primary and secondary nodes are client connected to the process holding the manager. When we try to get an element from the queue, the manager process will request the element. If the queue raises queue.Empty, the exception is caught and serialized. The client which requested the element creates a new Exception from the serialized data. This new Exception may not be of the same type as the original exception, but instead a RemoteError containing the error message and the traceback.
I just took a look at your newest commit to config_work. I am not sure why the code blocks, but the secondary nodes first receive the stop value and then put it back into the queue. Maybe this leads to some problems, as the stop value put back into the queue may not the same the primary node put into the queue.

drallensmith

drallensmith commented on Jul 24, 2017

@drallensmith
ContributorAuthor

Good point regarding the RemoteError. Perhaps except managers.RemoteError as e: followed by checking repr(e) (or the appropriate attribute of e) for the string Empty, and also for EOFError; the latter should result in the equivalent of a stop event (and ditto if an actual EOFError is received by a secondary). I will try to work on this later today, but am not sure when I will be able to get to it.

Regarding putting back the stop value, since there is also code for the primary node to put it onto the queue again if it's empty, putting back the stop value could be deleted to check.

drallensmith

drallensmith commented on Jul 24, 2017

@drallensmith
ContributorAuthor

OK, I implemented the first of the above. test_distributed.py is generally working... and I realized a problem with restoring from a checkpoint the way I was trying in test_xor_example_distributed.py - in the current setup, while run_primary has the information needed (such as the generation of the last checkpoint) to restore, it doesn't have the information on how to recreate the secondary processes/nodes. Without those, if you try to restart it, it hangs. I'm currently trying seeing if exit_on_stop=False will help.

drallensmith

drallensmith commented on Jul 24, 2017

@drallensmith
ContributorAuthor

No, still hangs. After travis gets through running all the tests except test_xor_example_distributed.py, I'll upload a version with that test so that people can explore the problem more.

BTW, I changed the Queue to use multiprocessing.Queue - queue.Queue, as it turns out, is not multiprocess safe. There's also a close() method for the multiprocessing version which may be needed.

drallensmith

drallensmith commented on Jul 24, 2017

@drallensmith
ContributorAuthor

Huh. travis is having a problem with anything above Python 3.5 - authentication errors. It's also claiming that manager in manager.RemoteError is not defined... ah; misspelling, should be managers. Fixing.

drallensmith

drallensmith commented on Jul 24, 2017

@drallensmith
ContributorAuthor

Currently, anything above 3.5 (3.6, etc) is timing out, and when the process is interrupted, it says it was having an AuthenticationError. If that would come under RemoteError, then the primary node would ignore it, starting a loop... but why didn't the secondary nodes exit? Or did they?

drallensmith

drallensmith commented on Jul 24, 2017

@drallensmith
ContributorAuthor

I've gone ahead and uploaded a version with test_xor_example_distributed.py, while letting the other travis build run out. I also added -v to nosetests so we can see a bit more details.

bennr01

bennr01 commented on Jul 24, 2017

@bennr01
Contributor

@drallensmith the digest sent was rejected error is raised when connection.recv_bytes(1024) != b"#WELCOME#". While connection.recv_bytes() should normally return the response, default socket behavior is to return an empty string when the connection is closed. If s.recv_bytes() uses socket.socket().recv(), the AuthenticationError could be explained with a closed connection. I have to check the source of connection.recv_bytes() to check this.

drallensmith

drallensmith commented on Jul 24, 2017

@drallensmith
ContributorAuthor

Good thinking. Of course, at least part of the question then would be "what changed with 3.6+ to give this error when it didn't happen before". Perhaps s.recv_bytes() changed?

13 remaining items

Loading
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

      Participants

      @bennr01@evolvingfridge@drallensmith

      Issue actions

        distributed.py - definitely needs update from stopevent; having headaches coding · Issue #101 · CodeReclaimers/neat-python