Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with
or
.
Download ZIP
Browse files

Merge remote branch 'liraz/master'

  • Loading branch information...
commit c0198f531076563c890c39bb6393490e85a81f66 2 parents 55a3fdc + b99749d
@alonswartz alonswartz authored
View
16 cloudtask/_hub.py
@@ -11,8 +11,11 @@
from hub import Spawner
+class Error(Exception):
+ pass
+
class Hub(Spawner):
- def launch(self, howmany, callback=None, **kwargs):
+ def launch(self, howmany, logfh=None, callback=None, **kwargs):
"""launch <howmany> workers, wait until booted and return their public IP addresses.
Invoke callback every frequently. If callback returns False, we terminate launching.
@@ -21,8 +24,11 @@ def launch(self, howmany, callback=None, **kwargs):
if 'sec_updates' not in kwargs:
kwargs.update(sec_updates='SKIP')
- name = kwargs.pop('backup_id', None)
- if not name:
- name = 'core'
+ snapshot_id = kwargs.pop('snapshot_id', None)
+ ami_id = kwargs.pop('ami_id', None)
+
+ if snapshot_id and ami_id:
+ raise Error("can't force together unrelated ami and snapshot")
- return Spawner.launch(self, name, howmany, callback, **kwargs)
+ name = snapshot_id or ami_id or 'core'
+ return Spawner.launch(self, name, howmany, logfh, callback, **kwargs)
View
138 cloudtask/ec2cost.py
@@ -0,0 +1,138 @@
+class Error(Exception):
+ pass
+
+class AttrDict(dict):
+ def __getattr__(self, name):
+ if name in self:
+ return self[name]
+ raise AttributeError("no such attribute '%s'" % name)
+
+ def __setattr__(self, name, val):
+ self[name] = val
+
+class Cost(AttrDict):
+ def __init__(self, region, type, size, hourly, upfront=0, reserved=0):
+ self.region = region
+ self.type = type
+ self.size = size
+ self.hourly = hourly
+ self.upfront = upfront
+ self.reserved = reserved
+
+ @property
+ def monthly(self):
+ return (self.hourly * 24) * 30
+
+ @property
+ def year_1(self):
+ if self.reserved == 0:
+ return self.monthly * 12
+
+ return (self.monthly * 12) + (self.upfront / self.reserved)
+
+ @property
+ def year_3(self):
+ return self.year_1 * 3
+
+ @property
+ def human_size(self):
+ return self.size.split(".")[1].capitalize()
+
+ @property
+ def human_hourly(self):
+ #Micro: $0.03/hour (reserved)
+ s = "%s: $%s/hour" % (self.human_size, self.hourly)
+ s += " (reserved)" if self.reserved > 0 else ""
+ return s
+
+ @property
+ def human_upfront(self):
+ #1 year: $220 one-time payment
+ if self.reserved == 1:
+ return "1 year: $%d up-front investment" % self.upfront
+
+ if self.reserved == 3:
+ return "3 years: $%d up-front investment" % self.upfront
+
+ return "$0"
+
+class Costs:
+ def __init__(self):
+ self.costs = []
+
+ def add(self, region, type, size,
+ od_h, y1_u=None, y1_h=None, y3_u=None, y3_h=None):
+ """add ec2 cost
+ region: region codename (e.g., us-east-1)
+ type: instance backed type (ebs | s3)
+ size: instance size (e.g., m1.small)
+ od_h: on-demand hourly cost
+ y1_u: reserved 1 year upfront cost
+ y1_h: reserved 1 year hourly cost
+ y3_u: reserved 3 year upfront cost
+ y3_h: reserved 3 year hourly cost
+ """
+ self.costs.append(Cost(region, type, size, od_h))
+
+ if y1_u and y1_h:
+ self.costs.append(Cost(region, type, size, y1_h, y1_u, 1))
+
+ if y3_u and y3_h:
+ self.costs.append(Cost(region, type, size, y3_h, y3_u, 3))
+
+ def get(self, region, size, type, reserved=0):
+ """get ec2 cost matching region, size, type, optionally reserved years
+
+ from ec2cost import costs
+
+ # regular
+ c = costs.get("us-east-1", "m1.small", "s3")
+ c = costs.get("us-east-1", "m1.small", "ebs")
+
+ # reserved instance for 1 year
+ c1 = costs.get("us-west-1", "t1.micro", "ebs", reserved=1)
+
+ # reserved instance for 3 years
+ c = costs.get("eu-west-1", "c1.medium", "ebs", reserved=3)
+ """
+ for c in self.costs:
+ if c.region == region and \
+ c.size == size and \
+ c.type == type and \
+ c.reserved == reserved:
+
+ return c
+
+ raise Error("No matching cost")
+
+# generate costs (reservation costs are for "medium utilization")
+
+costs = Costs()
+for region in ("us-east-1", "us-west-2"):
+ costs.add(region, "s3", "m1.small", 0.094)
+ costs.add(region, "s3", "c1.medium", 0.187)
+ costs.add(region, "ebs", "t1.micro", 0.020, 54.0, 0.007, 82.0, 0.007)
+ costs.add(region, "ebs", "m1.small", 0.080, 160.0, 0.024, 250.0, 0.019)
+ costs.add(region, "ebs", "c1.medium", 0.165, 415.0, 0.060, 638.0, 0.053)
+
+for region in ("us-west-1", "eu-west-1", "ap-southeast-1"):
+ costs.add(region, "s3", "m1.small", 0.105)
+ costs.add(region, "s3", "c1.medium", 0.209)
+ costs.add(region, "ebs", "t1.micro", 0.025, 54.0, 0.010, 82.0, 0.010)
+ costs.add(region, "ebs", "m1.small", 0.090, 160.0, 0.031, 250.0, 0.025)
+ costs.add(region, "ebs", "c1.medium", 0.186, 415.0, 0.080, 638.0, 0.070)
+
+region = "ap-northeast-1"
+costs.add(region, "s3", "m1.small", 0.105)
+costs.add(region, "s3", "c1.medium", 0.210)
+costs.add(region, "ebs", "t1.micro", 0.027, 57.0, 0.011, 86.0, 0.011)
+costs.add(region, "ebs", "m1.small", 0.092, 168.0, 0.036, 262.5, 0.029)
+costs.add(region, "ebs", "c1.medium", 0.190, 436.0, 0.090, 670.0, 0.080)
+
+region = "sa-east-1"
+costs.add(region, "s3", "m1.small", 0.126)
+costs.add(region, "s3", "c1.medium", 0.250)
+costs.add(region, "ebs", "t1.micro", 0.027, 73.0, 0.009, 111.0, 0.009)
+costs.add(region, "ebs", "m1.small", 0.115, 307.13, 0.040, 473.0, 0.031)
+costs.add(region, "ebs", "c1.medium", 0.230, 614.0, 0.080, 945.0, 0.070)
+
View
245 cloudtask/executor.py
@@ -1,5 +1,5 @@
#
-# Copyright (c) 2010-2011 Liraz Siri <liraz@turnkeylinux.org>
+# Copyright (c) 2010-2012 Liraz Siri <liraz@turnkeylinux.org>
#
# This file is part of CloudTask.
#
@@ -9,8 +9,6 @@
# option) any later version.
#
-from __future__ import with_statement
-
import os
import time
import traceback
@@ -21,7 +19,7 @@
from multiprocessing import Event, Queue
from multiprocessing_utils import Parallelize, Deferred
-from sigignore import sigignore
+import sighandle
from ssh import SSH
from _hub import Hub
@@ -42,10 +40,21 @@ def expired(self):
def reset(self):
self.started = time.time()
+class Job:
+ class Retry(Parallelize.Worker.Retry):
+ pass
+
+ def __init__(self, command, retry_limit):
+ self.command = command
+ self.retry = 0
+ self.retry_limit = retry_limit
+
class CloudWorker:
+ SSH_PING_RETRIES = 3
+
Terminated = Parallelize.Worker.Terminated
- class Error(Exception):
+ class Error(Terminated):
pass
@classmethod
@@ -59,7 +68,7 @@ def func():
return func
- def __init__(self, session, taskconf, address=None, destroy=None, event_stop=None, launchq=None):
+ def __init__(self, session_logs, taskconf, sshkey, ipaddress=None, destroy=None, event_stop=None, launchq=None):
self.pid = os.getpid()
@@ -68,40 +77,57 @@ def __init__(self, session, taskconf, address=None, destroy=None, event_stop=Non
self.event_stop = event_stop
- self.wlog = session.wlog
- self.mlog = session.mlog
- self.session_key = session.key
+ self.logs = session_logs
+ self.sshkey = sshkey
+
+ self.strikes = taskconf.strikes
+ self.strike = 0
self.timeout = taskconf.timeout
self.cleanup_command = taskconf.post
self.user = taskconf.user
- self.address = address
+ self.ipaddress = ipaddress
+ self.instanceid = None
+
self.hub = None
self.ssh = None
if destroy is None:
- if address:
+ if ipaddress:
destroy = False
else:
destroy = True
self.destroy = destroy
- if not address:
+ if not ipaddress:
if not taskconf.hub_apikey:
raise self.Error("can't auto launch a worker without a Hub API KEY")
self.hub = Hub(taskconf.hub_apikey)
- with sigignore(signal.SIGINT, signal.SIGTERM):
- if not launchq:
- self.address = list(self.hub.launch(1, **taskconf.ec2_opts))[0]
- else:
- self.address = launchq.get()
+ if launchq:
+ with sighandle.sigignore(signal.SIGINT, signal.SIGTERM):
+ instance = launchq.get()
+ else:
+ class Bool:
+ value = False
+ stopped = Bool()
+
+ def handler(s, f):
+ stopped.value = True
- if not self.address or (event_stop and event_stop.is_set()):
+ with sighandle.sighandle(handler, signal.SIGINT, signal.SIGTERM):
+ def callback():
+ return not stopped.value
+
+ instance = list(self.hub.launch(1, VerboseLog(session_logs.manager), callback, **taskconf.ec2_opts))[0]
+
+ if not instance or (event_stop and event_stop.is_set()):
raise self.Terminated
- self.status("launched new worker")
+ self.ipaddress, self.instanceid = instance
+
+ self.status("launched worker %s" % self.instanceid)
else:
self.status("using existing worker")
@@ -109,18 +135,18 @@ def __init__(self, session, taskconf, address=None, destroy=None, event_stop=Non
self.handle_stop = self._stop_handler(event_stop)
try:
- self.ssh = SSH(self.address,
- identity_file=self.session_key.path,
+ self.ssh = SSH(self.ipaddress,
+ identity_file=self.sshkey.path,
login_name=taskconf.user,
callback=self.handle_stop)
except SSH.Error, e:
self.status("unreachable via ssh: " + str(e))
- traceback.print_exc(file=self.wlog)
+ traceback.print_exc(file=self.logs.worker)
raise self.Error(e)
try:
- self.ssh.copy_id(self.session_key.public)
+ self.ssh.copy_id(self.sshkey)
if taskconf.overlay:
self.ssh.apply_overlay(taskconf.overlay)
@@ -130,7 +156,7 @@ def __init__(self, session, taskconf, address=None, destroy=None, event_stop=Non
except Exception, e:
self.status("setup failed")
- traceback.print_exc(file=self.wlog)
+ traceback.print_exc(file=self.logs.worker)
raise self.Error(e)
@@ -141,35 +167,39 @@ def _cleanup(self):
if self.cleanup_command:
self.ssh.command(self.cleanup_command).close()
- self.ssh.remove_id(self.session_key.public)
+ self.ssh.remove_id(self.sshkey)
except:
pass
- if self.destroy and self.address and self.hub:
- destroyed = self.hub.destroy(self.address)
- if self.address in destroyed:
- self.status("destroyed worker")
- else:
- self.status("failed to destroy worker")
+ if self.destroy and self.ipaddress and self.hub:
+ try:
+ destroyed = [ (ipaddress, instanceid)
+ for ipaddress, instanceid in self.hub.destroy(self.ipaddress)
+ if ipaddress == self.ipaddress ]
+
+ if destroyed:
+ ipaddress, instanceid = destroyed[0]
+ self.status("destroyed worker %s" % instanceid)
+ except:
+ self.status("failed to destroy worker %s" % self.instanceid)
+ traceback.print_exc(file=self.logs.worker)
+ raise
def __getstate__(self):
- return (self.address, self.pid)
+ return (self.ipaddress, self.pid)
def __setstate__(self, state):
- (self.address, self.pid) = state
+ (self.ipaddress, self.pid) = state
- def status(self, msg):
- wlog = self.wlog
- mlog = self.mlog
+ def status(self, msg, after_output=False):
+ timestamp = time.strftime('%Y-%m-%d %H:%M:%S', time.localtime())
- if wlog:
- timestamp = time.strftime('%Y-%m-%d %H:%M:%S', time.localtime())
- print >> wlog, "# %s [%s] %s" % (timestamp, self.address, msg)
+ c = "\n" if after_output else ""
+ self.logs.worker.status.write(c + "# %s [%s] %s\n" % (timestamp, self.ipaddress, msg))
+ self.logs.manager.write("%s (%d): %s\n" % (self.ipaddress, os.getpid(), msg))
- if mlog and mlog != wlog:
- mlog.write("%s (%d): %s" % (self.address, os.getpid(), msg) + "\n")
-
- def __call__(self, command):
+ def __call__(self, job):
+ command = job.command
timeout = self.timeout
self.handle_stop()
@@ -179,23 +209,30 @@ def __call__(self, command):
timeout = Timeout(timeout)
read_timeout = Timeout(self.ssh.TIMEOUT)
+
+ class CommandTimeout(Exception):
+ pass
+
+ class WorkerDied(Exception):
+ pass
+
def handler(ssh_command, buf):
- if buf and self.wlog:
- self.wlog.write(buf)
+ if buf:
read_timeout.reset()
+ self.logs.worker.write(buf)
if ssh_command.running and timeout.expired():
- ssh_command.terminate()
- self.status("timeout %d # %s" % (timeout.seconds, command))
- return
+ raise CommandTimeout
if read_timeout.expired():
- try:
- self.ssh.ping()
- except self.ssh.Error, e:
- ssh_command.terminate()
- self.status("worker died (%s) # %s" % (e, command))
- raise SSH.TimeoutError
+ for retry in range(self.SSH_PING_RETRIES):
+ try:
+ self.ssh.ping()
+ break
+ except self.ssh.Error, e:
+ pass
+ else:
+ raise WorkerDied(e)
read_timeout.reset()
@@ -207,21 +244,44 @@ def handler(ssh_command, buf):
# SigTerminate raised in serial mode, the other in Parallelized mode
except self.Terminated:
- ssh_command.terminate()
- self.status("terminated # %s" % command)
+ self.status("terminated # %s" % command, True)
raise
- if ssh_command.exitcode is not None:
+ except WorkerDied, e:
+ self.status("worker died (%s) # %s" % (e, command), True)
+ raise self.Error(e)
+
+ except CommandTimeout:
+ self.status("timeout # %s" % command, True)
+ exitcode = None
+
+ else:
if ssh_command.exitcode == 255 and re.match(r'^ssh: connect to host.*:.*$', ssh_command.output):
self.status("worker unreachable # %s" % command)
- raise SSH.Error(ssh_command.output)
+ self.logs.worker.write("%s\n" % ssh_command.output)
+ raise self.Error(SSH.Error(ssh_command.output))
+
+ self.status("exit %d # %s" % (ssh_command.exitcode, command), True)
+ exitcode = ssh_command.exitcode
- self.status("exit %d # %s" % (ssh_command.exitcode, command))
+ finally:
+ ssh_command.terminate()
+
+ if ssh_command.exitcode != 0:
+ self.strike += 1
+ if self.strikes and self.strike >= self.strikes:
+ self.status("terminating worker after %d strikes" % self.strikes)
+ raise self.Error
- if self.wlog:
- print >> self.wlog
+ if job.retry < job.retry_limit:
+ job.retry += 1
+ self.status("will retry (%d of %d)" % (job.retry, job.retry_limit))
- return (str(command), ssh_command.exitcode)
+ raise job.Retry
+ else:
+ self.strike = 0
+
+ return (str(command), exitcode)
def __del__(self):
if os.getpid() != self.pid:
@@ -229,32 +289,41 @@ def __del__(self):
self._cleanup()
+class VerboseLog:
+ def __init__(self, fh):
+ self.fh = fh
+
+ def write(self, s):
+ self.fh.write("# " + s)
+
class CloudExecutor:
class Error(Exception):
pass
- def __init__(self, split, session, taskconf):
- addresses = taskconf.workers
+ def __init__(self, session_logs, taskconf, sshkey):
+ ipaddresses = taskconf.workers
+
+ split = taskconf.split
if split == 1:
split = False
if not split:
- if addresses:
- address = addresses[0]
+ if ipaddresses:
+ ipaddress = ipaddresses[0]
else:
- address = None
- self._execute = CloudWorker(session, taskconf, address)
+ ipaddress = None
+ self._execute = CloudWorker(session_logs, taskconf, sshkey, ipaddress)
self.results = []
else:
- addresses = copy.copy(addresses)
+ ipaddresses = copy.copy(ipaddresses)
workers = []
self.event_stop = Event()
launchq = None
- new_workers = split - len(addresses)
+ new_workers = split - len(ipaddresses)
if new_workers > 0:
if not taskconf.hub_apikey:
raise self.Error("need API KEY to launch %d new workers" % new_workers)
@@ -268,9 +337,9 @@ def callback():
hub = Hub(taskconf.hub_apikey)
i = None
try:
- for i, address in enumerate(hub.launch(new_workers, callback, **taskconf.ec2_opts)):
- launchq.put(address)
- except hub.Error, e:
+ for i, instance in enumerate(hub.launch(new_workers, VerboseLog(session_logs.manager), callback, **taskconf.ec2_opts)):
+ launchq.put(instance)
+ except Exception, e:
unlaunched_workers = new_workers - (i + 1) \
if i is not None \
else new_workers
@@ -279,17 +348,17 @@ def callback():
launchq.put(None)
if not isinstance(e, hub.Stopped):
- traceback.print_exc(file=session.mlog)
+ traceback.print_exc(file=session_logs.manager)
threading.Thread(target=thread).start()
for i in range(split):
- if addresses:
- address = addresses.pop(0)
+ if ipaddresses:
+ ipaddress = ipaddresses.pop(0)
else:
- address = None
+ ipaddress = None
- worker = Deferred(CloudWorker, session, taskconf, address,
+ worker = Deferred(CloudWorker, session_logs, taskconf, sshkey, ipaddress,
event_stop=self.event_stop, launchq=launchq)
workers.append(worker)
@@ -298,14 +367,25 @@ def callback():
self.results = self._execute.results
self.split = split
+ self.job_retry_limit = taskconf.retries
def __call__(self, job):
- result = self._execute(job)
- if not self.split:
- self.results.append(result)
+ if not isinstance(job, Job):
+ job = Job(job, self.job_retry_limit)
+
+ if self.split:
+ return self._execute(job)
+
+ try:
+ result = self._execute(job)
+ except job.Retry:
+ return self(job)
+
+ self.results.append(result)
def stop(self):
if not self.split:
+ self._execute = None
return
self.event_stop.set()
@@ -315,4 +395,5 @@ def stop(self):
def join(self):
if self.split:
self._execute.wait(keepalive=False, keepalive_spares=1)
- self.stop()
+
+ self.stop()
View
341 cloudtask/logalyzer.py
@@ -0,0 +1,341 @@
+#
+# Copyright (c) 2012 Liraz Siri <liraz@turnkeylinux.org>
+#
+# This file is part of CloudTask.
+#
+# CloudTask is open source software; you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by the
+# Free Software Foundation; either version 3 of the License, or (at your
+# option) any later version.
+#
+import os
+from os.path import *
+from cloudtask.session import Session
+
+import re
+
+from StringIO import StringIO
+from datetime import datetime
+
+import ec2cost
+
+def fmt_elapsed(seconds):
+ units = {}
+ for unit, unit_seconds in (('days', 86400), ('hours', 3600), ('minutes', 60), ('seconds', 1)):
+ units[unit] = seconds / unit_seconds
+ seconds = seconds % unit_seconds
+
+
+ if units['days']:
+ formatted = "%dd " % units['days']
+ else:
+ formatted = ""
+
+ formatted += "%02d:%02d:%02d" % (units['hours'], units['minutes'], units['seconds'])
+ return formatted
+
+class Error(Exception):
+ pass
+
+class WorkersLog:
+ class LogEntry:
+ TIMESTAMP_FMT = "%Y-%m-%d %H:%M:%S"
+ def __init__(self, timestamp, title):
+ self.timestamp = datetime.strptime(timestamp, self.TIMESTAMP_FMT)
+ self.title = title
+ self.body = ""
+
+ def __repr__(self):
+ return "LogEntry%s" % `datetime.strftime(self.timestamp, self.TIMESTAMP_FMT), self.title`
+
+ @classmethod
+ def parse_worker_log(cls, fpath):
+ entries = []
+ body = ""
+ for line in file(fpath).readlines():
+ m = re.match(r'^# (\d{4}-\d+-\d+ \d\d:\d\d:\d\d) \[.*?\] (.*)', line)
+ if not m:
+ body += line
+ continue
+ else:
+ body = body.strip()
+ if entries and body:
+ entries[-1].body = body
+ body = ""
+ timestamp, title = m.groups()
+ entries.append(cls.LogEntry(timestamp, title))
+
+ return entries
+
+ class Job:
+ def __init__(self, worker_id, name, result, timestamp, elapsed, output):
+ self.worker_id = worker_id
+ self.name = name
+ self.result = result
+ self.timestamp = timestamp
+ self.elapsed = elapsed
+ self.output = output
+
+ def __repr__(self):
+ return "Job%s" % `self.worker_id, self.name, self.result, self.elapsed`
+
+ class Worker:
+ def __init__(self, worker_id, instanceid, jobs, instancetime, worktime):
+ self.worker_id = worker_id
+ self.instanceid = instanceid
+ self.jobs = jobs
+ self.instancetime = instancetime
+ self.worktime = worktime
+
+ def __repr__(self):
+ return "Worker%s" % `self.worker_id, self.instanceid, self.jobs, self.instancetime, self.worktime`
+
+ @classmethod
+ def get_jobs(cls, log_entries, command):
+ pat = re.compile(r'^(.*?) # %s (.*)' % command)
+
+ jobs = []
+ for i, entry in enumerate(log_entries):
+ m = pat.match(entry.title)
+ if m:
+ result, name = m.groups()
+ started = log_entries[i-1]
+ delta = (entry.timestamp - started.timestamp)
+ elapsed = delta.seconds + delta.days * 86400
+ jobs.append((name, result, started.timestamp, elapsed, started.body))
+
+ return jobs
+
+ @classmethod
+ def get_instance_time(cls, log_entries):
+ launched = None
+ destroyed = None
+
+ for log_entry in log_entries:
+ m = re.match(r'launched worker (.*)', log_entry.title)
+ if m:
+ instanceid = m.group(1)
+ launched = (log_entry.timestamp, instanceid)
+ continue
+
+ m = re.match(r'destroyed worker (.*)', log_entry.title)
+ if m:
+ instanceid = m.group(1)
+ destroyed = (log_entry.timestamp, instanceid)
+
+ if not launched:
+ return None, None
+
+ if launched and destroyed:
+ if destroyed[1] != launched[1]:
+ destroyed = None
+
+ if launched and not destroyed:
+ instanceid = launched[1]
+ return instanceid, None
+
+ delta = destroyed[0] - launched[0]
+ return launched[1], delta.seconds + delta.days * 86400
+
+ def __init__(self, dpath, command):
+ jobs = {}
+ workers = []
+
+ for fname in os.listdir(dpath):
+ worker_id = int(fname)
+ fpath = join(dpath, fname)
+ log_entries = self.parse_worker_log(fpath)
+
+ worker_jobs = [ self.Job(worker_id, *job_args) for
+ job_args in self.get_jobs(log_entries, command) ]
+
+ for job in worker_jobs:
+ name = job.name
+ if name in jobs:
+ conflicting_job = jobs[name]
+ if job.timestamp > conflicting_job.timestamp:
+ jobs[name] = job
+ else:
+ jobs[name] = job
+
+ instanceid, instancetime = self.get_instance_time(log_entries)
+ worktime = sum([ job.elapsed for job in worker_jobs ])
+ workers.append(self.Worker(worker_id, instanceid, len(worker_jobs), instancetime, worktime))
+
+ self.jobs = jobs.values()
+ self.workers = workers
+
+def fmt_table(rows, title=[], groupby=None):
+ col_widths = []
+ for col_index in range(len(rows[0])):
+ col = [ str(row[col_index]) for row in rows + [title] ]
+ col_width = max(map(len, col)) + 5
+ col_widths.append(col_width)
+
+ row_fmt = " ".join([ '%%-%ds' % width for width in col_widths ])
+ sio = StringIO()
+ if title:
+ title = title[:]
+ title[0] = "# " + title[0]
+ print >> sio, row_fmt % tuple(title)
+ print >> sio
+
+ for i, row in enumerate(rows):
+ if groupby and i and groupby(rows[i]) != groupby(rows[i-1]):
+ print >> sio
+
+ print >> sio, row_fmt % tuple(row)
+
+ return sio.getvalue()
+
+def indent(depth, buf):
+ return "\n".join([ " " * depth + line for line in buf.splitlines() ])
+
+def logalyzer(session_path):
+ session_paths = Session.Paths(session_path)
+
+ conf = eval(file(session_paths.conf).read())
+ log = file(session_paths.log).read()
+
+ m = re.search(r'session (\d+).*(\d+) seconds', log, re.MULTILINE)
+ if not m:
+ raise Error("couldn't find session summary")
+
+ id, elapsed = map(int, m.groups())
+
+ jobs = Session.Jobs(session_paths.jobs)
+
+ # calculate stats
+
+ results = [ result for command, result in jobs.finished ]
+
+ class stats:
+ pending = len(jobs.pending)
+ finished = len(jobs.finished)
+ total = pending + finished
+ succeeded = results.count('EXIT=0')
+
+ failures = len(results) - succeeded
+ failures_timeouts = results.count('TIMEOUT')
+ failures_errors = len(results) - succeeded - failures_timeouts
+
+ sio = StringIO()
+
+ def header(level, s):
+ c = "=-"[level]
+ return s + "\n" + c * len(s) + "\n"
+
+ print >> sio, header(0, "session %d: %s elapsed, %d jobs - %d pending, %d failed, %d completed" %
+ (id, fmt_elapsed(elapsed), stats.total, stats.pending, stats.failures, stats.succeeded))
+
+ wl = WorkersLog(session_paths.workers, conf['command'])
+
+ instance_hours = sum([ worker.instancetime/3600 + 1
+ for worker in wl.workers
+ if worker.instanceid and worker.instancetime ])
+
+ work_hours = sum([ worker.worktime
+ for worker in wl.workers
+ if worker.instanceid and worker.instancetime and worker.worktime ])/3600 + 1
+
+ if instance_hours:
+ hourly_cost = ec2cost.costs.get(conf['ec2_region'], conf['ec2_size'], conf['ec2_type'])['hourly']
+
+ print >> sio, "Cost: $%.2f ($%.2f x %d instance hours)" % (instance_hours * hourly_cost, hourly_cost, instance_hours)
+
+ print >> sio, "Efficiency: %d%% (%d work hours vs %d instance hours)" % ((work_hours/float(instance_hours) * 100),
+ work_hours, instance_hours)
+ print >> sio
+
+ print >> sio, "Configuration:"
+ print >> sio
+
+ c = conf
+ workers = "%d x (%s)" % (c['split'] if c['split'] else 1,
+ " : ".join([ c[attr]
+ for attr in ('ec2_region', 'ec2_size', 'ec2_type', 'ami_id', 'snapshot_id')
+ if attr in c and c[attr] ]))
+
+ fields = conf
+ fields['workers'] = workers
+
+ for field in ('command', 'backup_id', 'overlay', 'post', 'pre', 'timeout', 'report', '', 'workers'):
+ if not field:
+ print >> sio
+ elif field in fields and fields[field]:
+ print >> sio, " %-16s %s" % (field.replace('_', '-'), fields[field])
+
+ print >> sio
+
+ workers = wl.workers[:]
+ workers.sort(lambda a,b: cmp(b.jobs, a.jobs))
+
+ rows = []
+ for worker in workers:
+ worker_id = worker.worker_id
+ if worker.instanceid and not worker.instancetime:
+ worker_id = "%d\t# NOT DESTROYED!" % worker_id
+
+ def fN(v):
+ return v if v is not None else '-'
+
+ row = [ worker.jobs,
+ fmt_elapsed(worker.worktime) if worker.worktime else '-',
+ fmt_elapsed(worker.instancetime) if worker.instancetime else '-',
+ worker.instanceid if worker.instanceid else '-',
+ worker_id ]
+ rows.append(row)
+
+ fmted_table = fmt_table(rows, ["JOBS", "WORKTIME", "LIFETIME", "INSTANCE", "WORKER"])
+ print >> sio, indent(8, fmted_table) + "\n"
+
+ if stats.pending:
+ print >> sio, header(0, "%d pending jobs" % stats.pending)
+ print >> sio, " ".join([ job[len(conf['command']):].strip() for job in jobs.pending ])
+ print >> sio
+
+ jobs = wl.jobs[:]
+ jobs.sort(lambda a,b: cmp((a.worker_id, b.elapsed), (b.worker_id, a.elapsed)))
+
+ if stats.failures:
+ single_failure = (stats.failures == 1)
+ print >> sio, header(0, "%d failed - errors: %d, timeout: %d" % (stats.failures,
+ stats.failures_errors,
+ stats.failures_timeouts))
+ if not single_failure:
+ print >> sio, header(1, "Summary")
+
+ failures = [ job for job in jobs if job.result != 'exit 0' ]
+ rows = [ (job.name, fmt_elapsed(job.elapsed), job.result, job.worker_id)
+ for job in failures ]
+
+ fmted_table = fmt_table(rows, ["NAME", "ELAPSED", "RESULT", "WORKER"],
+ groupby=lambda a:a[3])
+ print >> sio, fmted_table
+ fmted_rows = [ line for line in fmted_table.splitlines()[2:] if line ]
+
+ if not single_failure:
+ print >> sio, header(1, "Last output")
+
+ for i, fmted_row in enumerate(fmted_rows):
+ if not single_failure:
+ print >> sio, fmted_row
+ print >> sio
+
+ if failures[i].output:
+ print >> sio, indent(4, "\n".join(failures[i].output.splitlines()[-5:]))
+ print >> sio
+
+ if stats.succeeded:
+ print >> sio, header(0, "%d succeeded" % stats.succeeded)
+
+ completed = [ job for job in jobs if job.result == 'exit 0' ]
+ rows = [ (job.name, fmt_elapsed(job.elapsed), job.worker_id)
+ for job in completed ]
+ fmted_table = fmt_table(rows, ["NAME", "ELAPSED", "WORKER"],
+ groupby=lambda a: a[2])
+ print >> sio, fmted_table
+
+ return sio.getvalue()
+
+
View
57 cloudtask/reporter.py
@@ -1,5 +1,5 @@
#
-# Copyright (c) 2010-2011 Liraz Siri <liraz@turnkeylinux.org>
+# Copyright (c) 2010-2012 Liraz Siri <liraz@turnkeylinux.org>
#
# This file is part of CloudTask.
#
@@ -17,7 +17,10 @@
from email.Message import Message
from command import Command
-import StringIO
+import traceback
+from StringIO import StringIO
+
+import logalyzer
class Error(Exception):
pass
@@ -94,42 +97,38 @@ def __init__(self, expr):
self.recipients = args[1:]
self.sendmail = self.Sendmail()
- @staticmethod
- def fmt_taskconf(taskconf):
- sio = StringIO.StringIO()
+ def __call__(self, session):
+ taskconf = session.taskconf
- table = []
- for attr in ('split', 'command',
- 'ec2-region', 'ec2-size', 'ec2-type',
- 'user', 'backup-id', 'ami-id', 'workers',
- 'overlay', 'post', 'pre', 'timeout', 'report'):
+ jobs_total = len(session.jobs.pending) + len(session.jobs.finished)
+ jobs_completed = len([ job for job, result in session.jobs.finished if result=="EXIT=0" ])
+ jobs_incomplete = jobs_total - jobs_completed
- val = taskconf[attr.replace('-', '_')]
- if isinstance(val, list):
- val = " ".join(val)
- if not val:
- val = "-"
- table.append((attr, val))
+ command = re.sub(r'^\S*/', '', taskconf['command'])
- print >> sio, " Parameter Value"
- print >> sio, " --------- -----"
- print >> sio
- for row in table:
- print >> sio, " %-15s %s" % (row[0], row[1])
+ title = "session %d: %d/%d !OK (%s)" % (session.id, jobs_incomplete, jobs_total, command)
- print >> sio
+ try:
+ body = logalyzer.logalyzer(session.paths.path)
+ except Exception:
+ sio = StringIO()
+ def header(title, c):
+ return title + "\n" + len(title) * c + "\n"
- return sio.getvalue()
+ print >> sio, header("Logalyzer failure", "=")
+ traceback.print_exc(file=sio)
+ print >> sio
+ print >> sio, header("Fallback session log", "=")
+ print >> sio, taskconf.fmt()
- def __call__(self, session):
- mlog = file(session.paths.log).read()
- taskconf = session.taskconf
+ manager_log = file(session.paths.log).read()
+ print >> sio, manager_log
- body = self.fmt_taskconf(taskconf) + mlog
+ body = sio.getvalue()
for recipient in self.recipients:
- subject = "[Cloudtask] " + taskconf.command
+ subject = "[Cloudtask] " + title
self.sendmail(self.sender, recipient,
subject, body)
@@ -158,8 +157,6 @@ def __call__(self, session):
if taskconf.workers:
os.environ['CLOUDTASK_WORKERS'] = " ".join(taskconf.workers)
- os.environ
-
os.system(self.command)
class Reporter:
View
222 cloudtask/session.py
@@ -1,5 +1,5 @@
#
-# Copyright (c) 2010-2011 Liraz Siri <liraz@turnkeylinux.org>
+# Copyright (c) 2010-2012 Liraz Siri <liraz@turnkeylinux.org>
#
# This file is part of CloudTask.
#
@@ -13,18 +13,14 @@
from os.path import *
import sys
-from paths import Paths
+import paths
import errno
-import time
-
-from temp import TempFile
-import uuid
-
-import executil
from taskconf import TaskConf
import pprint
+import re
+
def makedirs(path, mode=0750):
try:
os.makedirs(path, mode)
@@ -32,24 +28,6 @@ def makedirs(path, mode=0750):
if e.errno != errno.EEXIST:
raise
-class TempSessionKey(TempFile):
- def __init__(self):
- TempFile.__init__(self, prefix='key_')
- os.remove(self.path)
-
- self.uuid = uuid.uuid4()
- executil.getoutput("ssh-keygen -N '' -f %s -C %s" % (self.path, self.uuid))
-
- @property
- def public(self):
- return self.path + ".pub"
-
- def __del__(self):
- if os.getpid() == self.pid:
- os.remove(self.public)
-
- TempFile.__del__(self)
-
class UNDEFINED:
pass
@@ -57,7 +35,7 @@ class Session(object):
class Error(Exception):
pass
- class Paths(Paths):
+ class Paths(paths.Paths):
files = ['conf', 'workers', 'log', 'jobs']
class Jobs:
@@ -77,6 +55,17 @@ def __init__(self, path):
else:
self.finished.append((command, state))
+ def save(self):
+ fh = file(self.path, "w")
+
+ for job, result in self.finished:
+ print >> fh, "%s\t%s" % (result, job)
+
+ for job in self.pending:
+ print >> fh, "PENDING\t%s" % job
+
+ fh.close()
+
def update(self, jobs=[], results=[]):
for job, result in results:
if result is None:
@@ -86,53 +75,103 @@ def update(self, jobs=[], results=[]):
self.finished.append((job, state))
- states = self.finished[:]
-
self.pending = list((set(self.pending) | set(jobs)) - \
set([ job for job, result in results ]))
- for job in self.pending:
- states.append((job, "PENDING"))
+ self.save()
- fh = file(self.path, "w")
- for job, state in states:
- print >> fh, "%s\t%s" % (state, job)
- fh.close()
+ def update_retry_failed(self):
+ finished = set(self.finished)
+ failed = set([(job, result) for job, result in finished if result != 'EXIT=0'])
+ ok = finished - failed
- class WorkerLog:
- def __init__(self, path):
- self.path = path
- self.fh = None
+ self.finished = list(ok)
+ self.pending += [ job for job, result in failed ]
- def __getattr__(self, attr):
- if not self.fh:
- self.fh = file(join(self.path, str(os.getpid())), "a", 1)
+ self.save()
- return getattr(self.fh, attr)
+ class Logs:
+ class Worker(object):
+ def fh(self):
+ if not self._fh:
+ self._fh = file(join(self.path, str(os.getpid())), "a", 1)
- class ManagerLog:
- def __init__(self, path):
- self.fh = file(path, "a", 1)
+ return self._fh
+ status = fh = property(fh)
- def write(self, buf):
- self.fh.write(buf)
- sys.stdout.write(buf)
- sys.stdout.flush()
+ def __init__(self, path, tee=False):
+ self._fh = None
+ self.path = path
+ self.tee = tee
- def __getattr__(self, attr):
- return getattr(self.fh, attr)
+ @staticmethod
+ def _filter(buf):
+ buf = re.sub(r'Connection to \S+ closed\.\r+\n', '', buf)
+ buf = re.sub(r'\r[^\r\n]+$', '', buf)
+ buf = re.sub(r'.*\r(?![\r\n])','', buf)
+ buf = re.sub(r'\r+\n', '\n', buf)
- def __init__(self, sessions_path, id=None):
- if not exists(sessions_path):
- makedirs(sessions_path)
+ return buf
- if not isdir(sessions_path):
- raise self.Error("sessions path is not a directory: " + sessions_path)
+ def write(self, buf):
+ if self.tee:
+ sys.stdout.write(buf)
+ sys.stdout.flush()
- new_session = False
- if not id:
- new_session = True
+ # filter progress bars and other return-carriage crap
+ buf = self._filter(buf)
+
+ if buf:
+ self.fh.write(buf)
+ else:
+ os.utime(self.path, None)
+
+ def __getattr__(self, attr):
+ return getattr(self.fh, attr)
+
+ class Manager:
+ def __init__(self, path):
+ self.fh = file(path, "a", 1)
+ def write(self, buf):
+ self.fh.write(buf)
+ sys.stdout.write(buf)
+ sys.stdout.flush()
+
+ def __getattr__(self, attr):
+ return getattr(self.fh, attr)
+
+ def __init__(self, path_session_log, path_workers):
+ self.pid = os.getpid()
+ self.path_session_log = path_session_log
+ self.path_workers = path_workers
+
+ self._worker = None
+ self._manager = None
+
+ @property
+ def worker(self):
+ if self._worker:
+ return self._worker
+
+ makedirs(self.path_workers)
+
+ worker = self.Worker(self.path_workers, True if os.getpid() == self.pid else False)
+ self._worker = worker
+ return worker
+
+ @property
+ def manager(self):
+ if self._manager:
+ return self._manager
+
+ manager = self.Manager(self.path_session_log)
+ self._manager = manager
+ return manager
+
+ @staticmethod
+ def new_session_id(sessions_path):
+ while True:
session_ids = [ int(fname) for fname in os.listdir(sessions_path)
if fname.isdigit() ]
@@ -142,24 +181,33 @@ def __init__(self, sessions_path, id=None):
new_session_id = 1
id = new_session_id
+ try:
+ os.mkdir(join(sessions_path, "%d" % id))
+ except OSError, e:
+ if e.errno != errno.EEXIST:
+ raise
+ continue
- path = join(sessions_path, "%d" % id)
+ return id
- if new_session:
- makedirs(path)
- else:
- if not isdir(path):
- raise Error("no such session '%s'" % id)
+ def __init__(self, sessions_path, id=None):
+ if not exists(sessions_path):
+ makedirs(sessions_path)
- self.paths = Session.Paths(path)
- self.jobs = self.Jobs(self.paths.jobs)
+ if not isdir(sessions_path):
+ raise self.Error("sessions path is not a directory: " + sessions_path)
+
+ if not id:
+ id = self.new_session_id(sessions_path)
- self._wlog = None
- self._mlog = None
+ path = join(sessions_path, "%d" % id)
+ if not isdir(path):
+ raise self.Error("no such session '%s'" % id)
- self.started = time.time()
- self.key = TempSessionKey()
+ self.paths = Session.Paths(path)
+ self.jobs = self.Jobs(self.paths.jobs)
+ self.logs = self.Logs(self.paths.log, self.paths.workers)
self.id = id
def taskconf(self, val=UNDEFINED):
@@ -173,33 +221,3 @@ def taskconf(self, val=UNDEFINED):
print >> file(path, "w"), pprint.pformat(d)
taskconf = property(taskconf, taskconf)
- @property
- def elapsed(self):
- return time.time() - self.started
-
- @property
- def wlog(self):
- if self._wlog:
- return self._wlog
-
- if self.taskconf.split:
- makedirs(self.paths.workers)
- wlog = self.WorkerLog(self.paths.workers)
- else:
- wlog = self.ManagerLog(self.paths.log)
-
- self._wlog = wlog
- return wlog
-
- @property
- def mlog(self):
- if self._mlog:
- return self._mlog
-
- if self.taskconf.split:
- mlog = self.ManagerLog(self.paths.log)
- else:
- mlog = self.wlog
-
- self._mlog = mlog
- return mlog
View
60 cloudtask/ssh.py
@@ -1,5 +1,5 @@
#
-# Copyright (c) 2010-2011 Liraz Siri <liraz@turnkeylinux.org>
+# Copyright (c) 2010-2012 Liraz Siri <liraz@turnkeylinux.org>
#
# This file is part of CloudTask.
#
@@ -9,8 +9,43 @@
# option) any later version.
#
+import os
from os.path import *
-from command import Command, fmt_argv
+from command import Command
+
+from temp import TempFile
+import executil
+import hashlib
+
+class PrivateKey:
+ class Error(Exception):
+ pass
+
+ def __init__(self, path):
+ self.path = abspath(path)
+ if not isfile(self.path):
+ raise self.Error("no such file '%s'" % self.path)
+
+ @property
+ def public(self):
+ try:
+ return executil.getoutput("ssh-keygen -y -P '' -f", self.path)
+ except executil.ExecError, e:
+ raise self.Error("can't get public key for %s: " % self.path + str(e))
+
+ @property
+ def fingerprint(self):
+ return hashlib.sha1(self.public).hexdigest()
+
+class TempPrivateKey(TempFile, PrivateKey):
+ def __init__(self):
+ TempFile.__init__(self, prefix='key_')
+ os.remove(self.path)
+
+ executil.getoutput("ssh-keygen -N '' -f %s" % self.path)
+ os.remove(self.path + ".pub")
+
+ PrivateKey.__init__(self, self.path)
class SSH:
class Error(Exception):
@@ -95,14 +130,14 @@ def command(self, command, pty=False):
callback=self.callback,
pty=pty)
- def copy_id(self, key_path):
- if not key_path.endswith(".pub"):
- key_path += ".pub"
+ def copy_id(self, key):
+ if not isinstance(key, PrivateKey):
+ key = PrivateKey(key)
command = 'mkdir -p $HOME/.ssh; cat >> $HOME/.ssh/authorized_keys'
command = self.command(command)
- command.tochild.write(file(key_path).read())
+ command.tochild.write("%s %s\n" % (key.public, key.fingerprint))
command.tochild.close()
try:
@@ -110,16 +145,11 @@ def copy_id(self, key_path):
except command.Error, e:
raise self.Error("can't add id to authorized keys: " + str(e))
- def remove_id(self, key_path):
- if not key_path.endswith(".pub"):
- key_path += ".pub"
-
- vals = file(key_path).read().split()
- if not vals[0].startswith('ssh'):
- raise self.Error("invalid public key in " + key_path)
- id = vals[-1]
+ def remove_id(self, key):
+ if not isinstance(key, PrivateKey):
+ key = PrivateKey(key)
- command = 'sed -i "/%s/d" $HOME/.ssh/authorized_keys' % id
+ command = 'sed -i "/%s/d" $HOME/.ssh/authorized_keys' % key.fingerprint
command = self.command(command)
try:
View
213 cloudtask/task.py
@@ -1,6 +1,6 @@
#!/usr/bin/python
#
-# Copyright (c) 2010-2011 Liraz Siri <liraz@turnkeylinux.org>
+# Copyright (c) 2010-2012 Liraz Siri <liraz@turnkeylinux.org>
#
# This file is part of CloudTask.
#
@@ -19,19 +19,28 @@
3) CLOUDTASK_{PARAM_NAME} environment variable (lowest precedence)
Options:
+ --resume= Resume pending jobs of a previously run session
+ --retry= Retry failed jobs of a previously run session
+
--force Don't ask for confirmation
+ --ssh-identity= SSH identity keyfile to use (defaults to ~/.ssh/identity)
--hub-apikey= Hub API KEY (required if launching workers)
+
+ --snapshot-id= Launch instance from a snapshot ID
--backup-id= TurnKey Backup ID to restore on launch
--ami-id= Force launch a specific AMI ID (default is the latest Core)
-
+
--ec2-region= Region for instance launch (default: us-east-1)
--ec2-size= Instance launch size (default: m1.small)
--ec2-type= Instance launch type <s3|ebs> (default: s3)
--sessions= Path where sessions are stored (default: $HOME/.cloudtask)
- --timeout= How many seconds to wait before giving up (default: 3600)
+ --timeout= How many seconds to wait for a job before failing (default: 3600)
+ --retries= How many times to retry a failed job (default: 0)
+ --strikes= How many consecutive failures before we retire worker (default: 0 - disabled)
+
--user= Username to execute commands as (default: root)
--pre= Worker setup command
--post= Worker cleanup command
@@ -45,7 +54,7 @@
--report= Task reporting hook, examples:
sh: command || py: file || py: code
- mail: from@foo.com to@bar.com
+ mail: from@foo.com to@bar.com
Usage:
@@ -55,6 +64,9 @@
# resume session 1 while overriding timeout
cloudtask --resume=1 --timeout=6
+ # retry failed jobs in session 2 while overriding the split
+ cloudtask --retry=2 --split=1
+
"""
import os
@@ -65,6 +77,7 @@
import signal
import traceback
import re
+import time
from session import Session
@@ -73,9 +86,13 @@
from taskconf import TaskConf
from reporter import Reporter
+from watchdog import Watchdog
+
+import ssh
class Task:
+ COMMAND = None
DESCRIPTION = None
SESSIONS = None
@@ -95,6 +112,7 @@ def usage(cls, e=None):
print >> sys.stderr, "syntax: %s [ -opts ] [ extra args ]" % sys.argv[0]
print >> sys.stderr, "syntax: %s [ -opts ] --resume=SESSION_ID" % sys.argv[0]
+ print >> sys.stderr, "syntax: %s [ -opts ] --retry=SESSION_ID" % sys.argv[0]
if cls.DESCRIPTION:
print >> sys.stderr, cls.DESCRIPTION.strip()
print >> sys.stderr, "\n".join(__doc__.strip().splitlines()[1:])
@@ -104,9 +122,7 @@ def usage(cls, e=None):
sys.exit(1)
@classmethod
- def confirm(cls, taskconf, split, jobs):
- print >> sys.stderr, "About to launch %d cloud server%s to execute the following task:" % (split, "s" if split and split > 1 else "")
-
+ def confirm(cls, taskconf, jobs):
def filter(job):
job = re.sub('^\s*', '', job[len(taskconf.command):])
return job
@@ -117,35 +133,19 @@ def filter(job):
job_range = ("%s .. %s" % (job_first, job_last)
if job_first != job_last else "%s" % job_first)
- table = [ ('jobs', '%d (%s)' % (len(jobs), job_range)) ]
+ print >> sys.stderr, "About to launch %d cloud server%s to execute %d jobs (%s):" % (taskconf.split,
+ "s" if taskconf.split and taskconf.split > 1 else "",
+ len(jobs), job_range)
- for attr in ('split', 'command', 'hub-apikey',
- 'ec2-region', 'ec2-size', 'ec2-type',
- 'user', 'backup-id', 'ami-id', 'workers',
- 'overlay', 'post', 'pre', 'timeout', 'report'):
-
- val = taskconf[attr.replace('-', '_')]
- if isinstance(val, list):
- val = " ".join(val)
- if not val:
- val = "-"
- table.append((attr, val))
-
- print >> sys.stderr
- print >> sys.stderr, " Parameter Value"
- print >> sys.stderr, " --------- -----"
- print >> sys.stderr
- for row in table:
- print >> sys.stderr, " %-15s %s" % (row[0], row[1])
-
- print >> sys.stderr
+ print >> sys.stderr, "\n" + taskconf.fmt()
orig_stdin = sys.stdin
- sys.stdin = os.fdopen(sys.stderr.fileno(), 'r')
+ sys.stdin = os.fdopen(os.dup(sys.stderr.fileno()), 'r')
while True:
answer = raw_input("Is this really what you want? [yes/no] ")
if answer:
break
+
sys.stdin = orig_stdin
if answer.lower() != "yes":
@@ -162,6 +162,7 @@ def main(cls):
'h', ['help',
'force',
'resume=',
+ 'retry=',
'sessions='] +
[ attr.replace('_', '-') + '='
for attr in TaskConf.__all__ ])
@@ -169,6 +170,7 @@ def main(cls):
usage(e)
opt_resume = None
+ opt_retry = None
opt_force = False
if cls.SESSIONS:
@@ -190,6 +192,12 @@ def main(cls):
except ValueError:
error("--resume session id must be an integer")
+ elif opt == '--retry':
+ try:
+ opt_retry = int(val)
+ except ValueError:
+ error("--retry session id must be an integer")
+
elif opt == '--sessions':
if not isdir(val):
error("--sessions path '%s' is not a directory" % val)
@@ -199,9 +207,18 @@ def main(cls):
elif opt == '--force':
opt_force = True
+ if opt_resume and opt_retry:
+ error("--retry and --resume can't be used together, different modes")
+
if opt_resume:
session = Session(opt_sessions, id=opt_resume)
taskconf = session.taskconf
+ elif opt_retry:
+ session = Session(opt_sessions, id=opt_retry)
+ session.jobs.update_retry_failed()
+ if not session.jobs.pending:
+ error("no failed jobs to retry")
+ taskconf = session.taskconf
else:
session = None
@@ -226,8 +243,11 @@ def main(cls):
taskconf.overlay = abspath(val)
- elif opt == '--timeout':
- taskconf.timeout = int(val)
+ elif opt == '--ssh-identity':
+ if not isfile(val):
+ error("ssh-identity '%s' not a file" % val)
+
+ taskconf.ssh_identity = abspath(val)
elif opt == '--split':
taskconf.split = int(val)
@@ -246,6 +266,9 @@ def main(cls):
if taskconf.backup_id < 1:
error("--backup-id can't be smaller than 1")
+ elif opt[2:] in ('timeout', 'retries', 'strikes'):
+ setattr(taskconf, opt[2:], int(val))
+
else:
opt = opt[2:]
taskconf[opt.replace('-', '_')] = val
@@ -267,7 +290,7 @@ def main(cls):
else:
reporter = None
- if opt_resume:
+ if opt_resume or opt_retry:
if args:
error("--resume incompatible with a command")
@@ -277,7 +300,7 @@ def main(cls):
print "session %d finished" % session.id
sys.exit(0)
else:
- print >> session.mlog, "session %d: resuming (%d pending, %d finished)" % (session.id, len(session.jobs.pending), len(session.jobs.finished))
+ print >> session.logs.manager, "session %d: resuming (%d pending, %d finished)" % (session.id, len(session.jobs.pending), len(session.jobs.finished))
else:
if cls.COMMAND:
@@ -299,6 +322,10 @@ def main(cls):
jobs = []
for line in sys.stdin.readlines():
+ line = re.sub('#.*', '', line)
+ line = line.strip()
+ if not line:
+ continue
args = shlex.split(line)
if isinstance(command, str):
@@ -308,73 +335,119 @@ def main(cls):
jobs.append(job)
+ if not jobs:
+ error("no jobs, nothing to do")
+
split = taskconf.split if taskconf.split else 1
if split > len(jobs):
split = len(jobs)
+ taskconf.split = split
if len(taskconf.workers) < split and not taskconf.hub_apikey:
error("please provide a HUB APIKEY or more pre-launched workers")
if os.isatty(sys.stderr.fileno()) and not opt_force :
- cls.confirm(taskconf, split, jobs)
+ cls.confirm(taskconf, jobs)
if not session:
session = Session(opt_sessions)
-
session.taskconf = taskconf
- print >> session.mlog, "session %d (pid %d)" % (session.id, os.getpid())
+ ok = cls.work(jobs, session, taskconf)
+
+ if reporter:
+ reporter.report(session)
+
+ if not ok:
+ sys.exit(1)
+
+ @classmethod
+ def work(cls, jobs, session, taskconf):
+
+ if taskconf.ssh_identity:
+ sshkey = ssh.PrivateKey(taskconf.ssh_identity)
+ else:
+ sshkey = ssh.TempPrivateKey()
+
+ def status(msg):
+ timestamp = time.strftime('%Y-%m-%d %H:%M:%S', time.localtime())
+ session.logs.manager.write("%s :: session %d %s\n" % (timestamp, session.id, msg))
+
+ status("(pid %d)" % os.getpid())
+ print >> session.logs.manager
+
+ class CaughtSignal(CloudWorker.Terminated):
+ pass
def terminate(sig, f):
signal.signal(sig, signal.SIG_IGN)
- raise CloudWorker.Terminated("caught signal (%d) to terminate" % sig, sig)
+ sigs = dict([ ( getattr(signal, attr), attr)
+ for attr in dir(signal) if attr.startswith("SIG") ])
- signal.signal(signal.SIGINT, terminate)
- signal.signal(signal.SIGTERM, terminate)
+ raise CaughtSignal("caught %s termination signal" % sigs[sig], sig)
- executor = None
+ exception = None
+ while True:
+ watchdog = Watchdog(session.logs.manager, session.paths.workers, taskconf)
- try:
- executor = CloudExecutor(split, session, taskconf)
- for job in jobs:
- executor(job)
+ signal.signal(signal.SIGINT, terminate)
+ signal.signal(signal.SIGTERM, terminate)
+
+ executor = None
- executor.join()
+ work_started = time.time()
+ try:
+ executor = CloudExecutor(session.logs, taskconf, sshkey)
+ for job in jobs:
+ executor(job)
- except Exception, e:
- if not isinstance(e, CloudWorker.Error):
- traceback.print_exc(file=session.mlog)
+ executor.join()
+ executor_results = executor.results
- if executor:
- executor.stop()
- results = executor.results
- else:
- results = []
+ except Exception, exception:
+ if isinstance(exception, CaughtSignal):
+ print >> session.logs.manager, "# " + str(exception[0])
- session.jobs.update(jobs, results)
- print >> session.mlog, "session %d: terminated (%d finished, %d pending)" % \
- (session.id,
- len(session.jobs.finished),
- len(session.jobs.pending))
+ elif not isinstance(exception, (CloudWorker.Error, CloudWorker.Terminated)):
+ traceback.print_exc(file=session.logs.manager)
- sys.exit(1)
+ if executor:
+ executor.stop()
+ executor_results = executor.results
+ else:
+ executor_results = []
+
+ watchdog.terminate()
+ watchdog.join()
+
+ session.jobs.update(jobs, executor_results)
- session.jobs.update(jobs, executor.results)
+ if len(session.jobs.pending) != 0 and executor_results and exception is None:
+ print >> session.logs.manager
+ status("(auto-resuming) %d pending jobs remaining" % len(session.jobs.pending))
+ print >> session.logs.manager
- exitcodes = [ exitcode for command, exitcode in executor.results ]
+ jobs = list(session.jobs.pending)
+ if taskconf.split > len(jobs):
+ taskconf.split = len(jobs)
+ else:
+ break
- succeeded = exitcodes.count(0)
- failed = len(exitcodes) - succeeded
+ session_results = [ result for job, result in session.jobs.finished ]
- print >> session.mlog, "session %d: %d jobs in %d seconds (%d succeeded, %d failed)" % \
- (session.id, len(exitcodes), session.elapsed, succeeded, failed)
+ succeeded = session_results.count("EXIT=0")
+ timeouts = session_results.count("TIMEOUT")
+ errors = len(session_results) - succeeded - timeouts
- if reporter:
- reporter.report(session)
+ pending = len(session.jobs.pending)
+ total = len(session.jobs.finished) + pending
- if session.jobs.pending:
- print >> session.mlog, "session %d: no workers left alive, %d jobs pending" % (session.id, len(session.jobs.pending))
- sys.exit(1)
+ print >> session.logs.manager
+ status("(%d seconds): %d/%d !OK - %d pending, %d timeouts, %d errors, %d OK" % \
+ (time.time() - work_started, total - succeeded, total, pending, timeouts, errors, succeeded))
+
+ ok = (total - succeeded == 0)
+ return ok
# set default class values to TaskConf defaults
for attr in TaskConf.__all__:
View
41 cloudtask/taskconf.py
@@ -1,5 +1,5 @@
#
-# Copyright (c) 2010-2011 Liraz Siri <liraz@turnkeylinux.org>
+# Copyright (c) 2010-2012 Liraz Siri <liraz@turnkeylinux.org>
#
# This file is part of CloudTask.
#
@@ -9,6 +9,8 @@
# option) any later version.
#
+from StringIO import StringIO
+
class TaskConf:
user = 'root'
@@ -19,6 +21,8 @@ class TaskConf:
overlay = None
timeout = 3600
+ retries = 0
+ strikes = 0
split = None
workers = []
@@ -26,6 +30,9 @@ class TaskConf:
hub_apikey = None
backup_id = None
ami_id = None
+ snapshot_id = None
+
+ ssh_identity = None
ec2_region = 'us-east-1'
ec2_size = 'm1.small'
@@ -63,10 +70,34 @@ def ec2_opts(self):
for attr in self.__all__
if attr.startswith('ec2_') ])
opts['label'] = 'Cloudtask: ' + self.command
- if self.backup_id:
- opts['backup_id'] = self.backup_id
- if self.ami_id:
- opts['ami_id'] = self.ami_id
+ for attrname in ('backup_id', 'ami_id', 'snapshot_id'):
+ val = getattr(self, attrname)
+ if val:
+ opts[attrname] = val
return opts
+
+ def fmt(self):
+ sio = StringIO()
+
+ table = []
+ for attr in ('split', 'command', 'ssh-identity', 'hub-apikey',
+ 'ec2-region', 'ec2-size', 'ec2-type',
+ 'user', 'backup-id', 'ami-id', 'snapshot-id', 'workers',
+ 'overlay', 'post', 'pre', 'timeout', 'report'):
+
+ val = self[attr.replace('-', '_')]
+ if isinstance(val, list):
+ val = " ".join(val)
+ if not val:
+ continue
+ table.append((attr, val))
+
+ print >> sio, " Parameter Value"
+ print >> sio, " --------- -----"
+ print >> sio
+ for row in table:
+ print >> sio, " %-15s %s" % (row[0], row[1])
+
+ return sio.getvalue()
View
218 cloudtask/watchdog.py
@@ -0,0 +1,218 @@
+import os
+from os.path import isdir, isfile, join
+
+import time
+import signal
+from multiprocessing import Process
+
+import traceback
+import re
+
+import logalyzer
+from _hub import Hub
+
+class Error(Exception):
+ pass
+
+def pid_exists(pid):
+ return isdir("/proc/%d" % pid)
+
+def get_ppid(pid):
+ if not pid_exists(pid):
+ raise Error("pid %d does not exist" % pid)
+ status = file("/proc/%d/status" % pid).read()
+
+ status_dict = dict([ (key.lower(),val)
+ for key,val in [ re.split(r':\t\s*', line) for line in status.splitlines() ]])
+ return int(status_dict['ppid'])
+
+class SessionWatcher(object):
+ class Worker:
+ def __init__(self, pid, mtime):
+ self.pid = pid
+ self.mtime = mtime
+
+ def __init__(self, session_pid, workers_path):
+ self.session_pid = session_pid
+ self.workers_path = workers_path
+
+ def active_workers(self):
+ """returns list of tuples (worker_pid, worker_log_mtime)"""
+
+ if not isdir(self.workers_path):
+ return []
+
+ active_workers = []
+ for fname in os.listdir(self.workers_path):
+ fpath = join(self.workers_path, fname)
+ if not isfile(fpath):
+ continue
+
+ try:
+ worker_pid = int(fname)
+ except ValueError:
+ continue
+
+ if not pid_exists(worker_pid) or (self.session_pid not in (worker_pid, get_ppid(worker_pid))):
+ continue
+
+ mtime = os.stat(fpath).st_mtime
+ active_workers.append(self.Worker(worker_pid, mtime))
+
+ return active_workers
+ active_workers = property(active_workers)
+
+ def idletime(self):
+ mtimes = [ worker.mtime for worker in self.active_workers ]
+ if not mtimes:
+ return None
+
+ return time.time() - max(mtimes)
+ idletime = property(idletime)
+
+class Retrier:
+ def __init__(self, timeout, errorsleep, errorlog=None):
+ self.timeout = timeout
+ self.errorsleep = errorsleep
+ self.errorlog = errorlog
+
+ def __call__(self, callable, *args, **kwargs):
+ started = time.time()
+
+ while (time.time() - started) < self.timeout:
+ try:
+ return callable(*args, **kwargs)
+
+ except KeyboardInterrupt:
+ break
+
+ except:
+ if self.errorlog:
+ traceback.print_exc(file=self.errorlog)
+
+ time.sleep(self.errorsleep)
+
+ raise
+
+class Watchdog:
+ SIGTERM_TIMEOUT = 300
+
+ DESTROY_ERROR_TIMEOUT = 3600*3
+ DESTROY_ERROR_SLEEP = 300
+
+ def log(self, s):
+ self.logfh.write("# watchdog: %s\n" % s)
+
+ def watch(self):
+ timeout = self.taskconf.timeout * 2
+
+ session_pid = os.getppid()
+ watcher = SessionWatcher(session_pid, self.path_workers)
+
+ idletime = None
+
+ # wait while the session exists and is not idle so long we consider it stuck
+ while pid_exists(session_pid):
+ time.sleep(1)
+
+ idletime = watcher.idletime
+ if idletime is None:
+ continue
+
+ if idletime > timeout:
+ break
+
+
+ if idletime and idletime > timeout:
+ self.log("session idle after %d seconds" % idletime)
+
+ # SIGTERM active workers
+ for worker in watcher.active_workers:
+ try:
+ self.log("kill -TERM %d" % worker.pid)
+ os.kill(worker.pid, signal.SIGTERM)
+ except:
+ traceback.print_exc(file=self.logfh)
+
+ # wait up to SIGTERM_TIMEOUT for them to terminate
+ started = time.time()
+ while time.time() - started < self.SIGTERM_TIMEOUT:
+ time.sleep(1)
+ active_workers = watcher.active_workers
+ if not active_workers:
+ break
+
+ # no more Mr. Nice Guy: SIGKILL workers that are still alive
+ for worker in watcher.active_workers:
+ try:
+ self.log("kill -KILL %d" % worker.pid)
+ os.kill(worker.pid, signal.SIGKILL)
+ except:
+ traceback.print_exc(file=self.logfh)
+
+ def run(self):
+ class Stopped(Exception):
+ pass
+
+ # SIGINT should raise KeyboardInterrupt
+ signal.signal(signal.SIGINT, signal.default_int_handler)
+
+ # SIGTERM sent to us when parent process has finished
+ def stop(s, t):
+ raise Stopped
+ signal.signal(signal.SIGTERM, stop)
+
+ try:
+ self.watch()
+
+ except KeyboardInterrupt:
+ return
+
+ except Stopped:
+ pass
+
+ signal.signal(signal.SIGTERM, signal.SIG_IGN)
+ self.cleanup()
+
+ def cleanup(self):
+
+ def get_zombie_instances():
+ wl = logalyzer.WorkersLog(self.path_workers, self.taskconf.command)
+ for worker in wl.workers:
+ if worker.instanceid and not worker.instancetime:
+ yield worker
+
+ zombie_workers = list(get_zombie_instances())
+ if not zombie_workers:
+ return
+
+ zombie_instances = [ worker.instanceid for worker in zombie_workers ]
+ self.log("destroying zombie instances: " + " ".join(sorted(zombie_instances)))
+ hub = Hub(self.taskconf.hub_apikey)
+ retrier = Retrier(self.DESTROY_ERROR_TIMEOUT, self.DESTROY_ERROR_SLEEP, self.logfh)
+ destroyed_instances = [ instanceid for ipaddress, instanceid in retrier(hub.destroy, *zombie_instances) ]
+ self.log("destroyed zombie instances: " + " ".join(sorted(destroyed_instances)))
+
+ # log destruction to the respective worker logs
+ for zombie_worker in zombie_workers:
+ if zombie_worker.instanceid not in destroyed_instances:
+ continue
+ worker_log = file("%s/%d" % (self.path_workers, zombie_worker.worker_id), "a")
+ timestamp = time.strftime('%Y-%m-%d %H:%M:%S', time.localtime())
+ print >> worker_log, "\n# %s [watchdog] destroyed worker %s" % (timestamp, zombie_worker.instanceid)
+ worker_log.close()
+
+ def __init__(self, logfh, path_workers, taskconf):
+ self.logfh = logfh
+ self.path_workers = path_workers
+ self.taskconf = taskconf
+
+ self.process = Process(target=self.run)
+ self.process.start()
+
+ def terminate(self):
+ self.process.terminate()
+
+ def join(self):
+ self.process.join()
+
View
22 cmd_destroy_workers.py
@@ -1,6 +1,6 @@
#!/usr/bin/env python
#
-# Copyright (c) 2010-2011 Liraz Siri <liraz@turnkeylinux.org>
+# Copyright (c) 2010-2012 Liraz Siri <liraz@turnkeylinux.org>
#
# This file is part of CloudTask.
#
@@ -78,29 +78,29 @@ def main():
else:
fh = file(input)
- addresses = fh.read().splitlines()
- if not addresses:
+ ip_addresses = fh.read().splitlines()
+ if not ip_addresses:
print "no workers to destroy"
return
- destroyed = Hub(hub_apikey).destroy(*addresses)
+ destroyed = Hub(hub_apikey).destroy(*ip_addresses)
if not destroyed:
fatal("couldn't destroy any workers")
- addresses_left = list(set(addresses) - set(destroyed))
- if addresses_left:
- print >> sys.stderr, "warning: can't destroy " + " ".join(addresses_left)
+ ip_addresses_left = list(set(ip_addresses) - set([ ip_address for ip_address, instanceid in destroyed ]))
+ if ip_addresses_left: