Skip to content

Commit

Permalink
Use a common_dict to extract common information from sos_dict of subt…
Browse files Browse the repository at this point in the history
…asks of master task #1185
  • Loading branch information
Bo Peng committed Jan 25, 2019
1 parent ac86d94 commit f481be7
Show file tree
Hide file tree
Showing 4 changed files with 23 additions and 3 deletions.
1 change: 0 additions & 1 deletion src/sos/executor_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -231,7 +231,6 @@ def create_task(global_def, task_stmt, task_params):
task_tags.append(tag)

# save task to a file
task_vars['__task_vars__'] = copy.copy(task_vars)
taskdef = TaskParams(
name='{} (index={})'.format(
env.sos_dict['step_name'], env.sos_dict['_index']),
Expand Down
4 changes: 2 additions & 2 deletions src/sos/step_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ def get_job(self, all_tasks=False):
for idx, (task_id, taskdef, _) in slot:
master.push(task_id, taskdef)
ids.append(master.ID)
TaskFile(master.ID).save(master)
TaskFile(master.ID).save(master.finalize())
env.signature_push_socket.send_pyobj(['workflow', 'task', master.ID,
f"{{'creation_time': {time.time()}}}"])
self._unsubmitted_slots = []
Expand Down Expand Up @@ -146,7 +146,7 @@ def get_job(self, all_tasks=False):
master.push(task_id, taskdef)
# the last piece
if master is not None:
TaskFile(master.ID).save(master)
TaskFile(master.ID).save(master.finalize())
env.signature_push_socket.send_pyobj(['workflow', 'task', master.ID,
f"{{'creation_time': {time.time()}}}"])
ids.append(master.ID)
Expand Down
4 changes: 4 additions & 0 deletions src/sos/task_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -238,6 +238,8 @@ def copy_out_and_err(result):
p = Pool(params.num_workers)
results = []
for tid, tdef in params.task_stack:
if hasattr(params, 'common_dict'):
tdef.sos_dict.update(params.common_dict)
results.append(p.apply_async(_execute_task,
((tid, tdef, {tid: sig_content.get(tid, {})}), verbosity, runmode,
sigmode, None, None), callback=copy_out_and_err))
Expand All @@ -257,6 +259,8 @@ def copy_out_and_err(result):
else:
results = []
for tid, tdef in params.task_stack:
if hasattr(params, 'common_dict'):
tdef.sos_dict.update(params.common_dict)
# no monitor process for subtasks
res = _execute_task((tid, tdef, {tid: sig_content.get(tid, {})}), verbosity=verbosity, runmode=runmode,
sigmode=sigmode, monitor_interval=None, resource_monitor_interval=None)
Expand Down
17 changes: 17 additions & 0 deletions src/sos/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,23 @@ def push(self, task_id, params):
self.ID = f'M{len(self.task_stack)}_{self.task_stack[0][0]}'
self.name = self.ID

def finalize(self):
if not self.task_stack:
return
common_dict = None
common_keys = set()
for id, params in self.task_stack:
if common_dict is None:
common_dict = params.sos_dict
common_keys = set(params.sos_dict.keys())
else:
common_keys = {key for key in common_keys if key in params.sos_dict and common_dict[key] == params.sos_dict[key]}
if not common_keys:
break
self.common_dict = {x:common_dict[x] for x in common_keys}
for id, params in self.task_stack:
params.sos_dict = {k:v for k,v in params.sos_dict.items() if k not in common_keys}
return self

class TaskStatus(Enum):
new = 0
Expand Down

0 comments on commit f481be7

Please sign in to comment.