Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP
Browse files

multiple refactorings

rational:

reduce complexity and increase readability by making relationships between
objects more specific. It is better to pass sub-objects of minimal scope than
global "god-objects". That way there are less possible interactions.

changes:

- moved SSHKey to Task (it's not really needed inside session for anything.
  Plus, when we recreate Session we don't always need or mean to create a new
  temporary SSH key)

- encapsulated session logs into a narrower sub-class (Session.Logs): that way
  we can pass the narrower class rather than the entire session, which makes it
  clearer what we're doing with it and reduces possible interactions.

- make Watchdog behave as an instance class rather than as a collection of
  classmethods (now that we've narrowed down what we need passing around lots
  of arguments just makes the code more verbose without really reducing
  complexity)
  • Loading branch information...
commit eb64db0c77644bf68a698aea0ae7da427e758c64 1 parent 2d6bca4
@lirazsiri lirazsiri authored
View
39 cloudtask/executor.py
@@ -68,7 +68,7 @@ def func():
return func
- def __init__(self, session, taskconf, ipaddress=None, destroy=None, event_stop=None, launchq=None):
+ def __init__(self, session_logs, taskconf, sshkey, ipaddress=None, destroy=None, event_stop=None, launchq=None):
self.pid = os.getpid()
@@ -77,9 +77,8 @@ def __init__(self, session, taskconf, ipaddress=None, destroy=None, event_stop=N
self.event_stop = event_stop
- self.wlog = session.wlog
- self.mlog = session.mlog
- self.session_key = session.key
+ self.logs = session_logs
+ self.sshkey = sshkey
self.strikes = taskconf.strikes
self.strike = 0
@@ -121,7 +120,7 @@ def handler(s, f):
def callback():
return not stopped.value
- instance = list(self.hub.launch(1, VerboseLog(session.mlog), callback, **taskconf.ec2_opts))[0]
+ instance = list(self.hub.launch(1, VerboseLog(session_logs.manager), callback, **taskconf.ec2_opts))[0]
if not instance or (event_stop and event_stop.is_set()):
raise self.Terminated
@@ -137,17 +136,17 @@ def callback():
try:
self.ssh = SSH(self.ipaddress,
- identity_file=self.session_key.path,
+ identity_file=self.sshkey.path,
login_name=taskconf.user,
callback=self.handle_stop)
except SSH.Error, e:
self.status("unreachable via ssh: " + str(e))
- traceback.print_exc(file=self.wlog)
+ traceback.print_exc(file=self.logs.worker)
raise self.Error(e)
try:
- self.ssh.copy_id(self.session_key.public)
+ self.ssh.copy_id(self.sshkey.public)
if taskconf.overlay:
self.ssh.apply_overlay(taskconf.overlay)
@@ -157,7 +156,7 @@ def callback():
except Exception, e:
self.status("setup failed")
- traceback.print_exc(file=self.wlog)
+ traceback.print_exc(file=self.logs.worker)
raise self.Error(e)
@@ -168,7 +167,7 @@ def _cleanup(self):
if self.cleanup_command:
self.ssh.command(self.cleanup_command).close()
- self.ssh.remove_id(self.session_key.public)
+ self.ssh.remove_id(self.sshkey.public)
except:
pass
@@ -185,7 +184,7 @@ def _cleanup(self):
raise self.Error("Hub didn't destroy worker instance as requested!")
except:
self.status("failed to destroy worker %s" % self.instanceid)
- traceback.print_exc(file=self.wlog)
+ traceback.print_exc(file=self.logs.worker)
raise
def __getstate__(self):
@@ -198,8 +197,8 @@ def status(self, msg, after_output=False):
timestamp = time.strftime('%Y-%m-%d %H:%M:%S', time.localtime())
c = "\n" if after_output else ""
- self.wlog.status.write(c + "# %s [%s] %s\n" % (timestamp, self.ipaddress, msg))
- self.mlog.write("%s (%d): %s\n" % (self.ipaddress, os.getpid(), msg))
+ self.logs.worker.status.write(c + "# %s [%s] %s\n" % (timestamp, self.ipaddress, msg))
+ self.logs.manager.write("%s (%d): %s\n" % (self.ipaddress, os.getpid(), msg))
def __call__(self, job):
command = job.command
@@ -222,7 +221,7 @@ class WorkerDied(Exception):
def handler(ssh_command, buf):
if buf:
read_timeout.reset()
- self.wlog.write(buf)
+ self.logs.worker.write(buf)
if ssh_command.running and timeout.expired():
raise CommandTimeout
@@ -261,7 +260,7 @@ def handler(ssh_command, buf):
else:
if ssh_command.exitcode == 255 and re.match(r'^ssh: connect to host.*:.*$', ssh_command.output):
self.status("worker unreachable # %s" % command)
- self.wlog.write("%s\n" % ssh_command.output)
+ self.logs.worker.write("%s\n" % ssh_command.output)
raise self.Error(SSH.Error(ssh_command.output))
self.status("exit %d # %s" % (ssh_command.exitcode, command), True)
@@ -303,7 +302,7 @@ class CloudExecutor:
class Error(Exception):
pass
- def __init__(self, session, taskconf):
+ def __init__(self, session_logs, taskconf, sshkey):
ipaddresses = taskconf.workers
split = taskconf.split
@@ -315,7 +314,7 @@ def __init__(self, session, taskconf):
ipaddress = ipaddresses[0]
else:
ipaddress = None
- self._execute = CloudWorker(session, taskconf, ipaddress)
+ self._execute = CloudWorker(session_logs, taskconf, sshkey, ipaddress)
self.results = []
else:
@@ -340,7 +339,7 @@ def callback():
hub = Hub(taskconf.hub_apikey)
i = None
try:
- for i, instance in enumerate(hub.launch(new_workers, VerboseLog(session.mlog), callback, **taskconf.ec2_opts)):
+ for i, instance in enumerate(hub.launch(new_workers, VerboseLog(session_logs.manager), callback, **taskconf.ec2_opts)):
launchq.put(instance)
except Exception, e:
unlaunched_workers = new_workers - (i + 1) \
@@ -351,7 +350,7 @@ def callback():
launchq.put(None)
if not isinstance(e, hub.Stopped):
- traceback.print_exc(file=session.mlog)
+ traceback.print_exc(file=session_logs.manager)
threading.Thread(target=thread).start()
@@ -361,7 +360,7 @@ def callback():
else:
ipaddress = None
- worker = Deferred(CloudWorker, session, taskconf, ipaddress,
+ worker = Deferred(CloudWorker, session_logs, taskconf, sshkey, ipaddress,
event_stop=self.event_stop, launchq=launchq)
workers.append(worker)
View
4 cloudtask/reporter.py
@@ -121,8 +121,8 @@ def header(title, c):
print >> sio, header("Fallback session log", "=")
print >> sio, taskconf.fmt()
- mlog = file(session.paths.log).read()
- print >> sio, mlog
+ manager_log = file(session.paths.log).read()
+ print >> sio, manager_log
body = sio.getvalue()
View
150 cloudtask/session.py
@@ -15,12 +15,6 @@
import paths
import errno
-import time
-
-from temp import TempFile
-import uuid
-
-import executil
from taskconf import TaskConf
import pprint
@@ -34,24 +28,6 @@ def makedirs(path, mode=0750):
if e.errno != errno.EEXIST:
raise
-class TempSessionKey(TempFile):
- def __init__(self):
- TempFile.__init__(self, prefix='key_')
- os.remove(self.path)
-
- self.uuid = uuid.uuid4()
- executil.getoutput("ssh-keygen -N '' -f %s -C %s" % (self.path, self.uuid))
-
- @property
- def public(self):
- return self.path + ".pub"
-
- def __del__(self):
- if os.getpid() == self.pid:
- os.remove(self.public)
-
- TempFile.__del__(self)
-
class UNDEFINED:
pass
@@ -114,56 +90,84 @@ def update_retry_failed(self):
self.save()
- class WorkerLog(object):
- def fh(self):
- if not self._fh:
- self._fh = file(join(self.path, str(os.getpid())), "a", 1)
+ class Logs:
+ class Worker(object):
+ def fh(self):
+ if not self._fh:
+ self._fh = file(join(self.path, str(os.getpid())), "a", 1)
- return self._fh
- status = fh = property(fh)
+ return self._fh
+ status = fh = property(fh)
+ def __init__(self, path, tee=False):
+ self._fh = None
+ self.path = path
+ self.tee = tee
- def __init__(self, path, tee=False):
- self._fh = None
- self.path = path
- self.tee = tee
+ @staticmethod
+ def _filter(buf):
+ buf = re.sub(r'Connection to \S+ closed\.\r+\n', '', buf)
+ buf = re.sub(r'\r[^\r\n]+$', '', buf)
+ buf = re.sub(r'.*\r(?![\r\n])','', buf)
+ buf = re.sub(r'\r+\n', '\n', buf)
+
+ return buf
+
+ def write(self, buf):
+ if self.tee:
+ sys.stdout.write(buf)
+ sys.stdout.flush()
+
+ # filter progress bars and other return-carriage crap
+ buf = self._filter(buf)
+
+ if buf:
+ self.fh.write(buf)
+ else:
+ os.utime(self.path, None)
- @staticmethod
- def _filter(buf):
- buf = re.sub(r'Connection to \S+ closed\.\r+\n', '', buf)
- buf = re.sub(r'\r[^\r\n]+$', '', buf)
- buf = re.sub(r'.*\r(?![\r\n])','', buf)
- buf = re.sub(r'\r+\n', '\n', buf)
+ def __getattr__(self, attr):
+ return getattr(self.fh, attr)
- return buf
+ class Manager:
+ def __init__(self, path):
+ self.fh = file(path, "a", 1)
- def write(self, buf):
- if self.tee:
+ def write(self, buf):
+ self.fh.write(buf)
sys.stdout.write(buf)
sys.stdout.flush()
- # filter progress bars and other return-carriage crap
- buf = self._filter(buf)
+ def __getattr__(self, attr):
+ return getattr(self.fh, attr)
- if buf:
- self.fh.write(buf)
- else:
- os.utime(self.path, None)
+ def __init__(self, path_session_log, path_workers):
+ self.pid = os.getpid()
+ self.path_session_log = path_session_log
+ self.path_workers = path_workers
- def __getattr__(self, attr):
- return getattr(self.fh, attr)
+ self._worker = None
+ self._manager = None
- class ManagerLog:
- def __init__(self, path):
- self.fh = file(path, "a", 1)
+ @property
+ def worker(self):
+ if self._worker:
+ return self._worker
- def write(self, buf):
- self.fh.write(buf)
- sys.stdout.write(buf)
- sys.stdout.flush()
+ makedirs(self.path_workers)
- def __getattr__(self, attr):
- return getattr(self.fh, attr)
+ worker = self.Worker(self.path_workers, True if os.getpid() == self.pid else False)
+ self._worker = worker
+ return worker
+
+ @property
+ def manager(self):
+ if self._manager:
+ return self._manager
+
+ manager = self.Manager(self.path_session_log)
+ self._manager = manager
+ return manager
@staticmethod
def new_session_id(sessions_path):
@@ -203,11 +207,7 @@ def __init__(self, sessions_path, id=None):
self.paths = Session.Paths(path)
self.jobs = self.Jobs(self.paths.jobs)
- self._wlog = None
- self._mlog = None
-
- self.key = TempSessionKey()
-
+ self.logs = self.Logs(self.paths.log, self.paths.workers)
self.id = id
def taskconf(self, val=UNDEFINED):
@@ -221,23 +221,3 @@ def taskconf(self, val=UNDEFINED):
print >> file(path, "w"), pprint.pformat(d)
taskconf = property(taskconf, taskconf)
- @property
- def wlog(self):
- if self._wlog:
- return self._wlog
-
- makedirs(self.paths.workers)
- wlog = self.WorkerLog(self.paths.workers, False if self.taskconf.split and self.taskconf.split > 1 else True)
-
- self._wlog = wlog
- return wlog
-
- @property
- def mlog(self):
- if self._mlog:
- return self._mlog
-
- mlog = self.ManagerLog(self.paths.log)
-
- self._mlog = mlog
- return mlog
View
45 cloudtask/task.py
@@ -66,7 +66,7 @@
"""
import os
-from os.path import isdir
+from os.path import *
import sys
import shlex
import getopt
@@ -84,6 +84,28 @@
from reporter import Reporter
from watchdog import Watchdog
+from temp import TempFile
+import executil
+import uuid
+
+class TempSSHKey(TempFile):
+ def __init__(self):
+ TempFile.__init__(self, prefix='key_')
+ os.remove(self.path)
+
+ self.uuid = uuid.uuid4()
+ executil.getoutput("ssh-keygen -N '' -f %s -C %s" % (self.path, self.uuid))
+
+ @property
+ def public(self):
+ return self.path + ".pub"
+
+ def __del__(self):
+ if os.getpid() == self.pid:
+ os.remove(self.public)
+
+ TempFile.__del__(self)
+
class Task:
COMMAND = None
@@ -169,11 +191,11 @@ def main(cls):
if cls.SESSIONS:
opt_sessions = cls.SESSIONS
if not opt_sessions.startswith('/'):
- opt_sessions = os.path.join(dirname(sys.argv[0]), opt_sessions)
+ opt_sessions = join(dirname(sys.argv[0]), opt_sessions)
else:
opt_sessions = os.environ.get('CLOUDTASK_SESSIONS',
- os.path.join(os.environ['HOME'], '.cloudtask'))
+ join(os.environ['HOME'], '.cloudtask'))
for opt, val in opts:
if opt in ('-h', '--help'):
@@ -287,7 +309,7 @@ def main(cls):
print "session %d finished" % session.id
sys.exit(0)
else:
- print >> session.mlog, "session %d: resuming (%d pending, %d finished)" % (session.id, len(session.jobs.pending), len(session.jobs.finished))
+ print >> session.logs.manager, "session %d: resuming (%d pending, %d finished)" % (session.id, len(session.jobs.pending), len(session.jobs.finished))
else:
if cls.COMMAND:
@@ -354,10 +376,10 @@ def work(cls, jobs, session, taskconf):
def status(msg):
timestamp = time.strftime('%Y-%m-%d %H:%M:%S', time.localtime())
- session.mlog.write("%s :: session %d %s\n" % (timestamp, session.id, msg))
+ session.logs.manager.write("%s :: session %d %s\n" % (timestamp, session.id, msg))
status("(pid %d)" % os.getpid())
- print >> session.mlog
+ print >> session.logs.manager
class CaughtSignal(CloudWorker.Terminated):
pass
@@ -369,7 +391,7 @@ def terminate(sig, f):
raise CaughtSignal("caught %s termination signal" % sigs[sig], sig)
- watchdog = Watchdog(session, taskconf)
+ watchdog = Watchdog(session.logs.manager, session.paths.workers, taskconf)
signal.signal(signal.SIGINT, terminate)
signal.signal(signal.SIGTERM, terminate)
@@ -377,9 +399,10 @@ def terminate(sig, f):
executor = None
work_started = time.time()
+ sshkey = TempSSHKey()
try:
- executor = CloudExecutor(session, taskconf)
+ executor = CloudExecutor(session.logs, taskconf, sshkey)
for job in jobs:
executor(job)
@@ -388,10 +411,10 @@ def terminate(sig, f):
except Exception, e:
if isinstance(e, CaughtSignal):
- print >> session.mlog, "# " + str(e[0])
+ print >> session.logs.manager, "# " + str(e[0])
elif not isinstance(e, (CloudWorker.Error, CloudWorker.Terminated)):
- traceback.print_exc(file=session.mlog)
+ traceback.print_exc(file=session.logs.manager)
if executor:
executor.stop()
@@ -414,7 +437,7 @@ def terminate(sig, f):
total = len(session.jobs.finished) + len(session.jobs.pending)
- print >> session.mlog
+ print >> session.logs.manager
status("(%d seconds): %d/%d !OK - %d pending, %d timeouts, %d errors, %d OK" % \
(time.time() - work_started, total - succeeded, total, len(session.jobs.pending), timeouts, errors, succeeded))
View
58 cloudtask/watchdog.py
@@ -100,18 +100,14 @@ class Watchdog:
DESTROY_ERROR_TIMEOUT = 3600*3
DESTROY_ERROR_SLEEP = 300
- @staticmethod
- def log(fh, s):
- fh.write("# watchdog: %s\n" % s)
+ def log(self, s):
+ self.logfh.write("# watchdog: %s\n" % s)
- @classmethod
- def watch(cls, workers_path, logfh, timeout):
+ def watch(self):
+ timeout = self.taskconf.timeout * 2
session_pid = os.getppid()
- watcher = SessionWatcher(session_pid, workers_path)
-
- def log(s):
- cls.log(logfh, s)
+ watcher = SessionWatcher(session_pid, self.path_workers)
idletime = None
@@ -128,19 +124,19 @@ def log(s):
if idletime and idletime > timeout:
- log("session idle after %d seconds" % idletime)
+ self.log("session idle after %d seconds" % idletime)
# SIGTERM active workers
for worker in watcher.active_workers:
try:
- log("kill -TERM %d" % worker.pid)
+ self.log("kill -TERM %d" % worker.pid)
os.kill(worker.pid, signal.SIGTERM)
except:
- traceback.print_exc(file=session.mlog)
+ traceback.print_exc(file=self.logfh)
# wait up to SIGTERM_TIMEOUT for them to terminate
started = time.time()
- while time.time() - started < cls.SIGTERM_TIMEOUT:
+ while time.time() - started < self.SIGTERM_TIMEOUT:
time.sleep(1)
active_workers = watcher.active_workers
if not active_workers:
@@ -149,13 +145,12 @@ def log(s):
# no more Mr. Nice Guy: SIGKILL workers that are still alive
for worker in watcher.active_workers:
try:
- log("kill -KILL %d" % worker.pid)
+ self.log("kill -KILL %d" % worker.pid)
os.kill(worker.pid, signal.SIGKILL)
except:
- traceback.print_exc(file=session.mlog)
+ traceback.print_exc(file=self.logfh)
- @classmethod
- def run(cls, session, taskconf):
+ def run(self):
class Stopped(Exception):
pass
@@ -164,10 +159,8 @@ def stop(s, t):
raise Stopped
signal.signal(signal.SIGTERM, stop)
- # we stop watching because the session ended or because it idled
- workers_path = session.paths.workers
try:
- cls.watch(session.paths.workers, session.mlog, taskconf.timeout * 2)
+ self.watch()
except KeyboardInterrupt:
return
@@ -176,16 +169,12 @@ def stop(s, t):
pass
signal.signal(signal.SIGTERM, signal.SIG_IGN)
- cls.cleanup(session, taskconf)
-
- @classmethod
- def cleanup(cls, session, taskconf):
+ self.cleanup()
- def log(s):
- cls.log(session.mlog, s)
+ def cleanup(self):
def get_zombie_instances():
- wl = logalyzer.WorkersLog(session.paths.workers, taskconf.command)
+ wl = logalyzer.WorkersLog(self.path_workers, self.taskconf.command)
for worker in wl.workers:
if worker.instanceid and not worker.instancetime:
yield worker.instanceid
@@ -195,18 +184,19 @@ def get_zombie_instances():
return
zombie_instances.sort()
- log("destroying zombie instances: " + " ".join(sorted(zombie_instances)))
- hub = Hub(taskconf.hub_apikey)
- retrier = Retrier(cls.DESTROY_ERROR_TIMEOUT, cls.DESTROY_ERROR_SLEEP, session.mlog)
+ self.log("destroying zombie instances: " + " ".join(sorted(zombie_instances)))
+ hub = Hub(self.taskconf.hub_apikey)
+ retrier = Retrier(self.DESTROY_ERROR_TIMEOUT, self.DESTROY_ERROR_SLEEP, self.logfh)
destroyed = retrier(hub.destroy, *zombie_instances)
- log("destroyed zombie instances: " + " ".join(sorted([ instanceid for ipaddress, instanceid in destroyed ])))
+ self.log("destroyed zombie instances: " + " ".join(sorted([ instanceid for ipaddress, instanceid in destroyed ])))
- def __init__(self, session, taskconf):
- self.session = session
+ def __init__(self, logfh, path_workers, taskconf):
+ self.logfh = logfh
+ self.path_workers = path_workers
self.taskconf = taskconf
- self.process = Process(target=self.run, args=(self.session, self.taskconf))
+ self.process = Process(target=self.run)
self.process.start()
def terminate(self):
Please sign in to comment.
Something went wrong with that request. Please try again.