New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Worker Job Class #497
Worker Job Class #497
Conversation
Example:
|
pyiron_base/job/worker.py
Outdated
class WorkerJob(PythonTemplateJob): | ||
def __init__(self, project, job_name): | ||
super(WorkerJob, self).__init__(project, job_name) | ||
self.input['project'] = None |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
PythonTemplateJob
facilitates dot notation for input now
An mybinder example for this class is available https://github.com/jan-janssen/pyiron-worker |
Waiting for conda-forge/staged-recipes#16822 |
pyiron_base/job/worker.py
Outdated
for pp, p, job_id in path_lst | ||
] | ||
active_job_ids += [j[2] for j in job_lst] | ||
_ = [pool.put(j) for j in job_lst] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What's the reason not to use a plain multiprocessing.Pool
& map_async
here? I'm a bit wary because I see that aproc
uses inspect.getsource
to transfer the function to the worker instead of pickle or dill.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The list is growing while the worker is already executing it, so a simple map does not work. The current combination in the background uses a Multiprocessing pool plus a queue. In addition I also tried dill
and cloudpickle
but both failed, this might be related to the use of decorators.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't understand why the growing list is a problem.
Inside the while loop the worker checks the job table for submitted jobs are assigned to it via the master_id
and puts them in a list. It could them map_async
the list and mark the job ids as 'running' or 'queued' or whatever. Then in the next iteration of the while loop there may be new submitted jobs which can be handled the same way or there may be submitted jobs that were already found in the last iteration, but since we marked them in the last iteration we can exclude them from the map in this iteration.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The limitation of multiple map_async
calls is that they can not be executed concurrently. So the second call to map_async
is only executed once the first one is finished. The async
only means you get control of the main process, it does not mean that the workers are asynchronous. That is why I created the aproc
interface.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
From my testing this is not the case. I can call map_async
multiple times just fine and as long as there are available workers in the pool. Once the pool is completely used you have to wait naturally, but I assume aproc
is the same in this regard.
Here's my test script
import multiprocessing as mp
def pow2(x):
import time
print(x)
time.sleep(10)
return x**2
with mp.Pool(4) as p:
r1 = p.map_async(pow2, range(2))
r2 = p.map_async(pow2, range(4, 8))
r1.wait()
print(r1.get())
r2.wait()
print(r2.get())
with output
0
1
4
5
<nothing happens here for 10s because we're blocked on r1.wait()>
6
7
[0, 1]
[16, 25, 36, 49]
That shows that the two first jobs from the second map call are run straight away and then the last two once the jobs from the first map are finished. This sounds like exactly what we need to me.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I thought I tried that before but it is working now, so I reverted to map_async
- thanks again for pointing me in this direction.
@pmrv and @liamhuber Any other feedback? Otherwise I would like to merge this soon to be included in the next pyiron_base release. |
I would like to have a small example in the class or module docstring on how to use the worker. The snippet above would do it already. Otherwise it looks good to me. |
I would like to see the class appear in the unit tests. If I understand correctly, as long as the test inherits from |
As we no longer have the example job I am not exactly sure what job to test it with. The ScriptJob which is used for many other tests is not really sufficient. |
Why would script job be insufficient? |
However, we have pyiron_base/pyiron_base/_tests.py Lines 92 to 103 in 1846711
Maybe this is sufficient? |
No because the class defined in the test can not be reloaded in the subprocess on the worker. But I got it working with the script job instead. |
Follow up to #474
Implementing the workers as separate job class using multiprocessing