Skip to content

Commit

Permalink
adjust task execution interface
Browse files Browse the repository at this point in the history
  • Loading branch information
Bo Peng committed Aug 9, 2019
1 parent ca64fe4 commit 655c741
Show file tree
Hide file tree
Showing 3 changed files with 34 additions and 36 deletions.
27 changes: 16 additions & 11 deletions src/sos/__main__.py
Original file line number Diff line number Diff line change
Expand Up @@ -1113,7 +1113,7 @@ def get_execute_parser(desc_only=False):
],
default='default',
metavar='SIGMODE',
dest='__sig_mode__',
dest='sig_mode',
help='''How runtime signature would be handled, which can be "default"
(save and use signature, default mode in batch mode), "ignore"
(ignore runtime signature, default mode in interactive mode),
Expand Down Expand Up @@ -1195,27 +1195,32 @@ def cmd_execute(args, workflow_args):
if args.queue is None:
# local machine ...
exit_code = []
executor_args = dict(verbosity=args.verbosity,
runmode='dryrun' if args.dryrun else (args.run_mode if args.run_mode else 'run'),
sigmode=args.__sig_mode__,
worker_procs=get_nodelist(args.__worker_procs__),
monitor_interval=monitor_interval,
resource_monitor_interval=resource_monitor_interval)

env.config.update({
'config_file': args.config,
'sig_mode': 'default' if args.sig_mode is None else args.sig_mode,
'run_mode': 'dryrun' if args.dryrun else (args.run_mode if args.run_mode else 'run'),
'sockets': {
},
'exec_mode': None,
'verbosity': args.verbosity,
'worker_procs': get_nodelist(args.__worker_procs__),
})
if not args.executor:
from .task_executor import BaseTaskExecutor
executor = BaseTaskExecutor(**executor_args)
executor = BaseTaskExecutor()
else:
found = False
for entrypoint in pkg_resources.iter_entry_points(group='sos_taskexecutors'):
name = entrypoint.name.strip()
found = False
if name == args.executor:
try:
executor = entrypoint.load()(**executor_args)
executor = entrypoint.load()()
found = True
except Exception as e:
raise RuntimeError(f'Failed to load task executor {name}: {e}')
if not found:
raise RuntimeError(f'Failed to identify task executor {name}.')
raise RuntimeError(f'Failed to identify task executor {args.executor}.')
for task in args.tasks:
#
matched = [
Expand Down
31 changes: 8 additions & 23 deletions src/sos/task_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,10 @@
from .targets import (InMemorySignature, file_target, sos_step, dynamic,
sos_targets)
from .utils import StopInputGroup, env, pickleable, ProcessKilled
from .tasks import TaskFile, remove_task_files
from .tasks import TaskFile, remove_task_files, monitor_interval, resource_monitor_interval
from .step_executor import parse_shared_vars
from .executor_utils import __null_func__, get_traceback_msg, prepare_env, clear_output


def signal_handler(*args, **kwargs):
raise ProcessKilled()

Expand All @@ -30,19 +29,8 @@ class BaseTaskExecutor(object):
should derive from this class.
'''

def __init__(self,
verbosity=None,
runmode='run',
sigmode=None,
worker_procs=None,
monitor_interval=5,
resource_monitor_interval=60):
self.verbosity = verbosity
self.runmode = runmode
self.sigmode = sigmode
self.worker_procs = worker_procs
self.monitor_interval = monitor_interval
self.resource_monitor_interval = resource_monitor_interval
def __init__(self):
pass

def execute(self, task_id):
'''Execute single or master task, return a dictionary'''
Expand All @@ -57,14 +45,11 @@ def execute(self, task_id):
params, runtime = tf.get_params_and_runtime()
sig_content = tf.signature

if self.verbosity is not None:
env.verbosity = self.verbosity
if env.config['verbosity'] is not None:
env.verbosity = env.config['verbosity']

env.config['run_mode'] = self.runmode
if self.runmode == 'dryrun':
if env.config['run_mode'] == 'dryrun':
env.config['sig_mode'] = 'ignore'
elif self.sigmode is not None:
env.config['sig_mode'] = self.sigmode

env.logger.info(f'{task_id} ``started``')

Expand All @@ -81,8 +66,8 @@ def execute(self, task_id):

m = ProcessMonitor(
task_id,
monitor_interval=self.monitor_interval,
resource_monitor_interval=self.resource_monitor_interval,
monitor_interval=monitor_interval,
resource_monitor_interval=resource_monitor_interval,
max_walltime=runtime['_runtime'].get('max_walltime', None),
max_mem=runtime['_runtime'].get('max_mem', None),
max_procs=runtime['_runtime'].get('max_procs', None),
Expand Down
12 changes: 10 additions & 2 deletions src/sos/workers.py
Original file line number Diff line number Diff line change
Expand Up @@ -380,8 +380,16 @@ def run_substep(self, work):

def run_task(self, work):
from .task_executor import BaseTaskExecutor
executor = BaseTaskExecutor(**work['executor_params'])
executor.execute_single_task(**work['task_params'])
executor = BaseTaskExecutor(**work)
if env.result_socket is None:
env.result_socket = create_socket(env.zmq_context, zmq.PUSH)
env.result_socket_port = config["sockets"]["result_push_socket"]
# the result_socket_port contains IP of the worker that request the substep
env.result_socket.connect(env.result_socket_port)

res = executor.execute_single_task(**work)

env.result_socket.send(encode_msg(res))


class WorkerManager(object):
Expand Down

0 comments on commit 655c741

Please sign in to comment.