Skip to content
Permalink
Browse files

master/language: add methods to set experiment pipeline/priority/flus…

…h defaults
  • Loading branch information...
cjbe authored and jordens committed Feb 2, 2019
1 parent b2177ef commit 8659c769cb2cf9a5c5787b0f266faac9b011e2fa
Showing with 25 additions and 7 deletions.
  1. +17 −0 artiq/language/environment.py
  2. +2 −2 artiq/master/experiments.py
  3. +2 −2 artiq/master/worker.py
  4. +4 −3 artiq/master/worker_impl.py
@@ -207,10 +207,12 @@ def __init__(self, managers_or_parent, *args, **kwargs):
self.__device_mgr = managers_or_parent[0]
self.__dataset_mgr = managers_or_parent[1]
self.__argument_mgr = managers_or_parent[2]
self.__scheduler_defaults = managers_or_parent[3]
else:
self.__device_mgr = managers_or_parent.__device_mgr
self.__dataset_mgr = managers_or_parent.__dataset_mgr
self.__argument_mgr = managers_or_parent.__argument_mgr
self.__scheduler_defaults = {}
managers_or_parent.register_child(self)

self.__in_build = True
@@ -364,6 +366,21 @@ def setattr_dataset(self, key, default=NoDefault, archive=True):
dataset and of the attribute are the same."""
setattr(self, key, self.get_dataset(key, default, archive))

def set_default_scheduling(self, priority=None, pipeline_name=None, flush=None):
"""Sets the default scheduling options.
This function should only be called from ``build``."""
if not self.__in_build:
raise TypeError("set_default_scheduling() should only "
"be called from build()")

if priority is not None:
self.__scheduler_defaults["priority"] = int(priority)
if pipeline_name is not None:
self.__scheduler_defaults["pipeline_name"] = pipeline_name
if flush is not None:
self.__scheduler_defaults["flush"] = flush


class Experiment:
"""Base class for top-level experiments.
@@ -46,7 +46,8 @@ def __init__(self, worker_handlers):
entry = {
"file": filename,
"class_name": class_name,
"arginfo": arginfo
"arginfo": arginfo,
"scheduler_defaults": class_desc["scheduler_defaults"]
}
entry_dict[name] = entry

@@ -115,7 +116,6 @@ def close(self):
t1 = time.monotonic()
new_explist = await _RepoScanner(self.worker_handlers).scan(wd)
logger.info("repository scan took %d seconds", time.monotonic()-t1)

update_from_dict(self.explist, new_explist)
finally:
self._scanning = False
@@ -303,8 +303,8 @@ def _get_log_source(self):
await self._create_process(logging.WARNING)
r = dict()

def register(class_name, name, arginfo):
r[class_name] = {"name": name, "arginfo": arginfo}
def register(class_name, name, arginfo, scheduler_defaults):
r[class_name] = {"name": name, "arginfo": arginfo, "scheduler_defaults": scheduler_defaults}
self.register_experiment = register
await self._worker_action({"action": "examine", "file": file},
timeout)
@@ -174,11 +174,12 @@ def examine(device_mgr, dataset_mgr, file):
if name[-1] == ".":
name = name[:-1]
argument_mgr = TraceArgumentManager()
exp_class((device_mgr, dataset_mgr, argument_mgr))
scheduler_defaults = {}
cls = exp_class((device_mgr, dataset_mgr, argument_mgr, scheduler_defaults))
arginfo = OrderedDict(
(k, (proc.describe(), group, tooltip))
for k, (proc, group, tooltip) in argument_mgr.requested_args.items())
register_experiment(class_name, name, arginfo)
register_experiment(class_name, name, arginfo, scheduler_defaults)
finally:
new_keys = set(sys.modules.keys())
for key in new_keys - previous_keys:
@@ -277,7 +278,7 @@ def main():
os.makedirs(dirname, exist_ok=True)
os.chdir(dirname)
argument_mgr = ProcessArgumentManager(expid["arguments"])
exp_inst = exp((device_mgr, dataset_mgr, argument_mgr))
exp_inst = exp((device_mgr, dataset_mgr, argument_mgr, {}))
put_object({"action": "completed"})
elif action == "prepare":
exp_inst.prepare()

0 comments on commit 8659c76

Please sign in to comment.
You can’t perform that action at this time.