Permalink
Browse files

auto-resume session if pending jobs remain

  • Loading branch information...
1 parent eb64db0 commit eddd980a78281ea578b7ef8a3a50052dd95ae4c1 @lirazsiri lirazsiri committed Dec 19, 2012
Showing with 49 additions and 36 deletions.
  1. +46 −36 cloudtask/task.py
  2. +3 −0 cloudtask/watchdog.py
View
@@ -138,7 +138,7 @@ def usage(cls, e=None):
sys.exit(1)
@classmethod
- def confirm(cls, taskconf, split, jobs):
+ def confirm(cls, taskconf, jobs):
def filter(job):
job = re.sub('^\s*', '', job[len(taskconf.command):])
return job
@@ -149,8 +149,8 @@ def filter(job):
job_range = ("%s .. %s" % (job_first, job_last)
if job_first != job_last else "%s" % job_first)
- print >> sys.stderr, "About to launch %d cloud server%s to execute %d jobs (%s):" % (split,
- "s" if split and split > 1 else "",
+ print >> sys.stderr, "About to launch %d cloud server%s to execute %d jobs (%s):" % (taskconf.split,
+ "s" if taskconf.split and taskconf.split > 1 else "",
len(jobs), job_range)
print >> sys.stderr, "\n" + taskconf.fmt()
@@ -350,17 +350,16 @@ def main(cls):
split = taskconf.split if taskconf.split else 1
if split > len(jobs):
split = len(jobs)
+ taskconf.split = split
if len(taskconf.workers) < split and not taskconf.hub_apikey:
error("please provide a HUB APIKEY or more pre-launched workers")
if os.isatty(sys.stderr.fileno()) and not opt_force :
- cls.confirm(taskconf, split, jobs)
+ cls.confirm(taskconf, jobs)
if not session:
session = Session(opt_sessions)
-
- taskconf.split = split
session.taskconf = taskconf
ok = cls.work(jobs, session, taskconf)
@@ -391,56 +390,67 @@ def terminate(sig, f):
raise CaughtSignal("caught %s termination signal" % sigs[sig], sig)
- watchdog = Watchdog(session.logs.manager, session.paths.workers, taskconf)
+ exception = None
+ while True:
+ watchdog = Watchdog(session.logs.manager, session.paths.workers, taskconf)
- signal.signal(signal.SIGINT, terminate)
- signal.signal(signal.SIGTERM, terminate)
+ signal.signal(signal.SIGINT, terminate)
+ signal.signal(signal.SIGTERM, terminate)
- executor = None
+ executor = None
- work_started = time.time()
- sshkey = TempSSHKey()
+ work_started = time.time()
+ sshkey = TempSSHKey()
- try:
- executor = CloudExecutor(session.logs, taskconf, sshkey)
- for job in jobs:
- executor(job)
+ try:
+ executor = CloudExecutor(session.logs, taskconf, sshkey)
+ for job in jobs:
+ executor(job)
- executor.join()
- results = executor.results
+ executor.join()
+ executor_results = executor.results
- except Exception, e:
- if isinstance(e, CaughtSignal):
- print >> session.logs.manager, "# " + str(e[0])
+ except Exception, exception:
+ if isinstance(exception, CaughtSignal):
+ print >> session.logs.manager, "# " + str(exception[0])
- elif not isinstance(e, (CloudWorker.Error, CloudWorker.Terminated)):
- traceback.print_exc(file=session.logs.manager)
+ elif not isinstance(exception, (CloudWorker.Error, CloudWorker.Terminated)):
+ traceback.print_exc(file=session.logs.manager)
- if executor:
- executor.stop()
- results = executor.results
- else:
- results = []
+ if executor:
+ executor.stop()
+ executor_results = executor.results
+ else:
+ executor_results = []
- watchdog.terminate()
- watchdog.join()
+ watchdog.terminate()
+ watchdog.join()
- session.jobs.update(jobs, results)
+ session.jobs.update(jobs, executor_results)
+
+ if len(session.jobs.pending) != 0 and executor_results and exception is None:
+ print >> session.logs.manager
+ status("(auto-resuming) %d pending jobs remaining" % len(session.jobs.pending))
+ print >> session.logs.manager
+
+ jobs = list(session.jobs.pending)
+ if taskconf.split > len(jobs):
+ taskconf.split = len(jobs)
+ else:
+ break
session_results = [ result for job, result in session.jobs.finished ]
- pending = len(session.jobs.pending)
succeeded = session_results.count("EXIT=0")
- pending = len(session.jobs.pending)
timeouts = session_results.count("TIMEOUT")
errors = len(session_results) - succeeded - timeouts
- total = len(session.jobs.finished) + len(session.jobs.pending)
+ pending = len(session.jobs.pending)
+ total = len(session.jobs.finished) + pending
print >> session.logs.manager
-
status("(%d seconds): %d/%d !OK - %d pending, %d timeouts, %d errors, %d OK" % \
- (time.time() - work_started, total - succeeded, total, len(session.jobs.pending), timeouts, errors, succeeded))
+ (time.time() - work_started, total - succeeded, total, pending, timeouts, errors, succeeded))
ok = (total - succeeded == 0)
return ok
View
@@ -154,6 +154,9 @@ def run(self):
class Stopped(Exception):
pass
+ # SIGINT should raise KeyboardInterrupt
+ signal.signal(signal.SIGINT, signal.default_int_handler)
+
# SIGTERM sent to us when parent process has finished
def stop(s, t):
raise Stopped

0 comments on commit eddd980

Please sign in to comment.