- Sponsor
-
Notifications
You must be signed in to change notification settings - Fork 504
Description
@bennr01: Hi. I was wanting to check to make sure that distributed evaluation worked with checkpointing (classes that aren't built at the top level of a file can be problematic for that, such as the _EvaluatorSyncManager, being one reason for my concern), so put together a test_xor_example_distributed.py test case, which you can see in my fork's config_work branch. However, with no alterations to distributed.py, it usually had a timeout error while waiting for the stopevent to connect. The one time it didn't, it thought the DistributedEvaluator was still running (since de.started was not reset to False by de.stop()), and so errored out anyway. The fix for the latter error is easy; the former, assuming it's a problem with using an event instead of a Value, is another matter.
I've been trying to make the changeover to stopevent but am having some major headaches getting a proxy value distributed between multiple processes that may be on different machines (my initial try, putting a Value from syncmanager into the queue as the first entry, would have worked... except that other secondary processes didn't get the Value! Oops....). Any suggestions? Using namespace isn't working, which was my most recent try, replacing Value.
I also changed the "master/slave" terminology to "primary/secondary", which is more modern usage.
Activity
bennr01 commentedon Jul 23, 2017
Hi @drallensmith, sorry for causing you some headaches.
I think most of the issues can be fixed by moving the
_EvaluatorSyncManager
in the module namespace and also define another class, which interacts with the_EvaluatorSyncManager
. The reason_EvaluatorSyncManager
is currently defined in_start_*
is because it needs access to a namespace which is not cleared when the method returns. Using another class for interacting with the_EvaluatorSyncManager
should allow us to use the namespace/attributes of that class instead of the namespace/attributes of theDistributedEvaluator
. If this class defines__reduce__
, it should be possible to instruct pickle to create a new manager upon checkpoint loading instead of trying to load the old manager.We could try to request the value manually every time the
_secondary_loop
checks it.I can try to do the changes. Should i fork your config_work branch and create a PR to your fork when/if i am successful? This would reduce the number of pull-requests and prevent another merge conflict.
primary/secondary is definitely better than the master/slave (And even better than server/client, which i used before master/slave).
Regarding #51: By now i got a small ARM-cluster for testing, but setting it up will take some more time.
drallensmith commentedon Jul 23, 2017
Forking it sounds fine, and no problem re the headaches - thanks for your contribution!
At this point I am actually more concerned re the timeouts and similar errors than pickling - with the exception of the one due to self.started not being set to False by de.stop, they come prior to any attempt to "resurrect" the code. It may be most informative to you to try to run the test_xor_example_distributed.py file with a mostly-unmodified (just change the terminology) distributed.py version (oops on my part for not cleanly commiting just the terminology change - the commit before the most recent two, starting with 42c878, may be most useful).
I'm glad you like the primary/secondary - it isn't quite server/client either, since the secondary nodes aren't making requests of the primary node. The ARM cluster is good news; @D0pa might also be able to help on that.
evolvingfridge commentedon Jul 23, 2017
I want to contribute but fail to understand fallowing:
What is advantage and purpose of using python multi-processing when experiment requires large scale simulation, when MPI solves this issue much more easier (at least for me) and allows multiple setups such as micro services environment or distributed population ?
Additionally I see biggest advantage in using MPI in future is because of Xeon PHI cards, that hopefully will be more affordable.
drallensmith commentedon Jul 23, 2017
I understand your question, and it's a good one - see the comment I just made on #51 for some thoughts on the matter.
drallensmith commentedon Jul 24, 2017
@bennr01 - I have just uploaded a new commit to my fork's config_work branch; I went back to using Value for a
do_stop
proxy, this time establishing a separate queue for it to get to the secondaries (and be returned by them), plus using dict to figure out that one has to check the do_stop._value attribute for the secondaries (for the primary, just set do_stop to True).However, test_distributed.py now either hangs on trying to join the single-worker secondary node (if not trying the multiple-worker version), or the child evaluation process (multiple workers) gets an EOFError from the inqueue being closed. (The latter may be fixable by "telling" it this is the equivalent of do_stop being turned on... I am admittedly a bit puzzled by why the code currently treats managers.RemoteError as if it were queue.Empty - this makes sense for the primary, but I'm not sure if it does for the secondaries.)
bennr01 commentedon Jul 24, 2017
@D0pa The main advantage of using
multiprocessing
instead ofmpi4py
is thatmultiprocessing
does not need any other dependencies. Most systems which have python installed also have themultiprocessing
module available.mpi4py
can easily be installed usingpip
, but it seems like it needs some dependencies (libmpich-dev
). I heard that some of the universities make it rather hard to get the root access which may be required to install these dependencies.The second advantage is that it seems like
mpi4py
requires more code. When comparingevolve-feedforward-distributed.py
andxor-feedforward-mpi-data-scatter.py
(from https://github.com/CodeReclaimers/neat-python/pull/72/files),evolve-feedforard-distributed.py
has 85 lines of code less thanxor-feedforward-mpi-data-scatter.py
(however,xor-feedforward-mpi-data-scatter.py
has a longer docstring andevolve-feedforward-distributed.py
commandline argument support for testing purposes, so this is not an ideal measurement.).I would like to say that the third advantage is the easier integration into existing code, but this varies between users. Adding support for clusters using
distributed
requires 3 additional lines of code and a modification to the line containing thepopulation.run
call. However, if someone writes an evaluation function which actually requires being run on a cluster, that person would have written his/her code with this in mind, so this is not really an advantage. Most people using clusters will have more experience withmpi4py
than withdistributed
/multiprocessing
, which makes it easier for them to usempi4py
.neat.distributed
was written for inexperienced people who need to run some evaluations on a cluster without much technical knowledge.Obviously, there are also some disadvantages of using
multiprocessing
instead ofmpi4py
. One being the Xeon PHI cards (and other technology requiringmpi
). The probably biggest disadvantage is the performance. I have not measured it, butmpi
is probably faster thanmultiprocessing.managers
. But if data transfer rate between the nodes really matters in comparsion with the evaluation time, then the evaluation would probably be faster when usingneat.parallel
instead of a cluster, wouldn't it?The reason i chose to use
multiprocessing.managers
instead ofmpi4py
was not related to the advantages and disadvantages. When i looked for a way to runneat-python
on a cluster, i stumbled upon your PR #72. The PR was closed and not merged, so i thought there was some reason it was not merged. I do not understand most of the MPI code in in your MPI example, so i wanted to either try to split most of the code into a separate module for easier re-usability or create a completely new module. Like i said, because the PR was closed, i thought that there was some reason it was not merged and concluded that usingmpi4py
may not be the best idea, so i did not even try it.Thanks for offering your help in both this issue and in #51. My cluster is actually already setup, the only issue is that
pip install
requires more time than it should.@drallensmith The real manager and all objects it holds are stored in a seperate child process (according to the documentation of
multiprocessing
). Due to this, both the primary and secondary nodes are client connected to the process holding the manager. When we try to get an element from the queue, the manager process will request the element. If the queue raisesqueue.Empty
, the exception is caught and serialized. The client which requested the element creates a new Exception from the serialized data. This new Exception may not be of the same type as the original exception, but instead aRemoteError
containing the error message and the traceback.I just took a look at your newest commit to config_work. I am not sure why the code blocks, but the secondary nodes first receive the stop value and then put it back into the queue. Maybe this leads to some problems, as the stop value put back into the queue may not the same the primary node put into the queue.
drallensmith commentedon Jul 24, 2017
Good point regarding the RemoteError. Perhaps
except managers.RemoteError as e:
followed by checkingrepr(e)
(or the appropriate attribute ofe
) for the stringEmpty
, and also forEOFError
; the latter should result in the equivalent of a stop event (and ditto if an actual EOFError is received by a secondary). I will try to work on this later today, but am not sure when I will be able to get to it.Regarding putting back the stop value, since there is also code for the primary node to put it onto the queue again if it's empty, putting back the stop value could be deleted to check.
drallensmith commentedon Jul 24, 2017
OK, I implemented the first of the above. test_distributed.py is generally working... and I realized a problem with restoring from a checkpoint the way I was trying in test_xor_example_distributed.py - in the current setup, while run_primary has the information needed (such as the generation of the last checkpoint) to restore, it doesn't have the information on how to recreate the secondary processes/nodes. Without those, if you try to restart it, it hangs. I'm currently trying seeing if exit_on_stop=False will help.
drallensmith commentedon Jul 24, 2017
No, still hangs. After travis gets through running all the tests except test_xor_example_distributed.py, I'll upload a version with that test so that people can explore the problem more.
BTW, I changed the Queue to use multiprocessing.Queue - queue.Queue, as it turns out, is not multiprocess safe. There's also a
close()
method for the multiprocessing version which may be needed.drallensmith commentedon Jul 24, 2017
Huh. travis is having a problem with anything above Python 3.5 - authentication errors. It's also claiming that manager in manager.RemoteError is not defined... ah; misspelling, should be managers. Fixing.
drallensmith commentedon Jul 24, 2017
Currently, anything above 3.5 (3.6, etc) is timing out, and when the process is interrupted, it says it was having an AuthenticationError. If that would come under RemoteError, then the primary node would ignore it, starting a loop... but why didn't the secondary nodes exit? Or did they?
drallensmith commentedon Jul 24, 2017
I've gone ahead and uploaded a version with test_xor_example_distributed.py, while letting the other travis build run out. I also added
-v
to nosetests so we can see a bit more details.bennr01 commentedon Jul 24, 2017
@drallensmith the
digest sent was rejected
error is raised whenconnection.recv_bytes(1024) != b"#WELCOME#"
. Whileconnection.recv_bytes()
should normally return the response, default socket behavior is to return an empty string when the connection is closed. Ifs.recv_bytes()
usessocket.socket().recv()
, theAuthenticationError
could be explained with a closed connection. I have to check the source ofconnection.recv_bytes()
to check this.drallensmith commentedon Jul 24, 2017
Good thinking. Of course, at least part of the question then would be "what changed with 3.6+ to give this error when it didn't happen before". Perhaps s.recv_bytes() changed?
13 remaining items