Permalink
Browse files

implemented stoppable launch + ctrl-C handling in launch-workers

  • Loading branch information...
1 parent 8d73803 commit a1f230a1e84a041666f22b4e082f8a7f03ebb4f0 @lirazsiri lirazsiri committed Aug 10, 2011
Showing with 52 additions and 11 deletions.
  1. +38 −8 cloudtask/_hub.py
  2. +14 −3 cmd_launch_workers.py
View
@@ -20,7 +20,11 @@ class Hub:
class Error(Exception):
pass
- def __init__(self, apikey, wait_first=30, wait_status=10, wait_retry=5, retries=2):
+ class Stopped(Error):
+ pass
+
+# def __init__(self, apikey, wait_first=30, wait_status=10, wait_retry=5, retries=2):
+ def __init__(self, apikey, wait_first=1, wait_status=0, wait_retry=5, retries=2):
self.apikey = apikey
self.wait_first = wait_first
@@ -43,8 +47,11 @@ def retry(self, callable, *args, **kwargs):
raise self.Error(e)
- def launch(self, howmany, **kwargs):
- """launch <howmany> workers, wait until booted and return their public IP addresses"""
+ def launch(self, howmany, callback=None, **kwargs):
+ """launch <howmany> workers, wait until booted and return their public IP addresses.
+
+ Invoke callback every frequently. If callback returns False, we terminate launching.
+ """
retry = self.retry
hub = _Hub(self.apikey)
@@ -59,19 +66,42 @@ def launch(self, howmany, **kwargs):
if not name:
name = 'core'
+ def get_pending_servers():
+ return [ server
+ for server in retry(hub.servers.get, refresh_cache=True)
+ if server.instanceid in (pending_ids - yielded_ids) ]
+
+ stopped = False
while True:
+
+ if callback and not stopped:
+ stopped = callback() is False
+
+ if stopped:
+ servers = [ server for server in get_pending_servers()
+ if server.status in ('pending', 'running') ]
+
+ if not servers:
+ raise self.Stopped
+
+ for server in servers:
+ if server.status == 'running':
+ retry(server.destroy, auto_unregister=True)
+ pending_ids.remove(server.instanceid)
+
+ time.sleep(self.wait_status)
+ continue
+
if len(pending_ids) < howmany:
server = retry(hub.servers.launch, name, **kwargs)
+ if len(pending_ids) == howmany - 1:
+ server.set_boot_status('tklbam-restore')
pending_ids.add(server.instanceid)
if time.time() - time_started < self.wait_first:
continue
- servers = [ server
- for server in retry(hub.servers.get, refresh_cache=True)
- if server.instanceid in (pending_ids - yielded_ids) ]
-
- for server in servers:
+ for server in get_pending_servers():
if server.status != 'running' or server.boot_status != 'booted':
continue
View
@@ -57,9 +57,8 @@
import sys
import getopt
+import signal
from cloudtask import Hub
-from StringIO import StringIO
-
from lazyclass import lazyclass
def usage(e=None):
@@ -129,7 +128,19 @@ def main():
else:
output = sys.stdout
- for address in Hub(hub_apikey).launch(howmany, **kwargs):
+ class Bool:
+ value = False
+ stopped = Bool()
+
+ def handler(s, f):
+ stopped.value = True
+
+ signal.signal(signal.SIGINT, handler)
+
+ def callback():
+ return not stopped.value
+
+ for address in Hub(hub_apikey).launch(howmany, callback, **kwargs):
print >> output, address
output.flush()

0 comments on commit a1f230a

Please sign in to comment.