New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add support for checking several limits before running task on engine #827
Comments
This is already possible with the current dependency mechanism - you can define perfectly arbitrary python functions, to be run on the engine, that will determine whether the job should run there: http://ipython.org/ipython-doc/dev/parallel/parallel_task.html#functional-dependencies |
Thanks for the hint.
It feels like there should have been some example, because of the ':', but instead the next chapter about 'Graph Dependencies' follows. I try to add a dependency in the following way, but it doesn't seem to work:
|
I will definitely mock up some examples, but the basic idea is that if your task raises an UnmetDependency error, it will halt, and try again somewhere else. The The dependent object doesn't modify the original function, it creates a new one: run_script_on_os = dependent(run_script_with_env, engine_has_os, os_name)
ar = lbview.apply(run_script_on_os, render_script, env_dict) If you want the function itself to always depend on something, you can use @depend(engine_has_os, os_name)
def run_script_with_env(...):
...
lbview.apply(run_script_with_env, ...) Or if you want to consider the dependency unmet somewhere in the middle of your task, you can just raise the error yourself: def my_task():
from IPython.parallel.error import UnmetDependency
do_some_work()
if condition:
raise UnmetDependency
continue_with_work() |
Thanks for your examples.
Is it also possible to refer to the current engine_id? |
Now I tried it with only one dependent function which wraps the other functions: run_script_with_env_and_deps = dependent(DrQueue.run_script_with_env, self.check_deps, os_name, minram, mincores, pool_name)
ar = self.lbview.apply(run_script_with_env_and_deps, render_script, env_dict) check_deps function: def check_deps(self, os_name, minram, mincores, pool_name):
if self.engine_has_os(os_name) == False:
return False
elif self.engine_has_minram(minram) == False:
return False
elif self.engine_has_mincores(mincores) == False:
return False
elif self.engine_is_in_pool(pool_name) == False:
return False
else:
return True IPython gives me the following exception: Traceback (most recent call last):
File "send_job.py", line 100, in <module>
main()
File "send_job.py", line 71, in main
client.job_run(job)
File "/Users/kaazoo/Documents/Entwicklung/drqueue-entwicklung/drqueue-ipython/DrQueue/client.py", line 217, in job_run
ar = self.lbview.apply(run_script_with_env_and_deps, render_script, env_dict)
File "/opt/local/Library/Frameworks/Python.framework/Versions/2.7/lib/python2.7/site-packages/IPython/parallel/client/view.py", line 209, in apply
return self._really_apply(f, args, kwargs)
File "<string>", line 2, in _really_apply
File "/opt/local/Library/Frameworks/Python.framework/Versions/2.7/lib/python2.7/site-packages/IPython/parallel/client/view.py", line 57, in sync_results
ret = f(self, *args, **kwargs)
File "<string>", line 2, in _really_apply
File "/opt/local/Library/Frameworks/Python.framework/Versions/2.7/lib/python2.7/site-packages/IPython/parallel/client/view.py", line 46, in save_ids
ret = f(self, *args, **kwargs)
File "/opt/local/Library/Frameworks/Python.framework/Versions/2.7/lib/python2.7/site-packages/IPython/parallel/client/view.py", line 980, in _really_apply
subheader=subheader)
File "/opt/local/Library/Frameworks/Python.framework/Versions/2.7/lib/python2.7/site-packages/IPython/parallel/client/client.py", line 992, in send_apply_message
bufs = util.pack_apply_message(f,args,kwargs)
File "/opt/local/Library/Frameworks/Python.framework/Versions/2.7/lib/python2.7/site-packages/IPython/parallel/util.py", line 267, in pack_apply_message
msg = [pickle.dumps(can(f),-1)]
cPickle.PicklingError: Can't pickle <type 'instancemethod'>: attribute lookup __builtin__.instancemethod failed |
instancemethods aren't picklable - this is happening because you are trying to execute instance methods remotely, which doesn't work, as that would require sending the entire instance, which you probably don't want to do. I don't know enough about your code, but it's possible that using Do these methods really need to have references to the It is possible to access the engine_id. The best way is to set the engine ids from the client, and then get it out of globals(), but if you can't rely on that, you can get it from the application: from IPython.config.application import Application
eid = Application.instance().engine.id I should probably initialize the user_ns with the id, though. |
Thanks a lot. Moving the functions to the main module instead of using instance methods makes it work. |
Getting the current engine_id also works. If not already existing, this would also make sense for some users to be documented. |
Now, I don't obtain the engine_id when i execute my program. How can i obtain it from this meth Application.instance() |
It would be nice to be able to define some limits for a task, for example operating system, minimal amount of RAM, minimal amount of CPU cores. If there are different computers connected to ipcontroller, tasks would only be queued to engines which are able to fully execute those tasks.
The text was updated successfully, but these errors were encountered: