Permalink
Browse files

multi-thread the pushing of cassandra nodes

  • Loading branch information...
1 parent 8b03865 commit 28478dc81916f3fe4efee8a3ec039708cbe06450 @pquerna committed Mar 5, 2010
Showing with 28 additions and 12 deletions.
  1. +28 −12 run.py
View
40 run.py
@@ -37,6 +37,8 @@
except ImportError:
import json
+from threading import Thread
+
ROOT_DIR = os.path.split(os.path.abspath(__file__))[0]
def log(str):
@@ -126,7 +128,6 @@ def push_master_files(key, master, servers):
sftp.put(pjoin(ROOT_DIR, "py_and_thrift-ubuntu-bin.tar.bz2"), "py_and_thrift-ubuntu-bin.tar.bz2")
exec_wait(client, "tar -xvjf py_and_thrift-ubuntu-bin.tar.bz2 -C /")
sftp.put(pjoin(ROOT_DIR, "stress.py"), "stress.py")
- sftp.put(pjoin(ROOT_DIR, "runtest.sh"), "stress.py")
conf = master_script(servers)
fp = sftp.open("/root/runtest.sh", 'w')
fp.write(conf)
@@ -139,27 +140,34 @@ def push_master_files(key, master, servers):
finally:
client.close()
-def push_cassandra_files(key, local, servers):
- tarball = local[local.rfind("/")+1:]
- for s in servers:
- conninfo = {'hostname': s.public_ip[0],
+class pusher_thread(Thread):
+ def __init__(self, key, s, local, servers):
+ Thread.__init__(self)
+ self.key = key
+ self.s = s
+ self.servers = servers
+ self.local = local
+
+ def run(self):
+ tarball = self.local[self.local.rfind("/")+1:]
+ conninfo = {'hostname': self.s.public_ip[0],
'port': 22,
'username': 'root',
- 'pkey': key,
+ 'pkey': self.key,
'allow_agent': False,
'look_for_keys': False}
client = paramiko.SSHClient()
client.set_missing_host_key_policy(paramiko.AutoAddPolicy())
client.connect(**conninfo)
try:
sftp = client.open_sftp()
- sftp.put(local, tarball)
+ sftp.put(self.local, tarball)
exec_wait(client, "tar -xvzf %s" % (tarball))
dname = tarball[:tarball.rfind("-")]
sftp.symlink(dname, "cassandra")
sftp.put(pjoin(ROOT_DIR, "ivy-shit.tar.bz2"), "ivy-shit.tar.bz2")
exec_wait(client, "tar -xvjf ivy-shit.tar.bz2")
- conf = storage_conf(s, [x for x in servers if x != s])
+ conf = storage_conf(self.s, [x for x in self.servers if x != self.s])
fp = sftp.open("cassandra/conf/storage-conf.xml", 'w')
fp.write(conf)
fp.close()
@@ -168,6 +176,14 @@ def push_cassandra_files(key, local, servers):
exec_wait(client, "cd cassandra; bin/cassandra")
finally:
client.close()
+
+def push_cassandra_files(key, local, servers):
+ t = []
+ for s in servers:
+ t.append(pusher_thread(key, s, local, servers))
+ t[-1].start()
+ for thread in t:
+ thread.join()
def main():
tempdir = tempfile.mkdtemp(prefix="cassandra-bench")
@@ -184,15 +200,15 @@ def main():
servers = boot_servers(driver, cassconf.CLUSTER_SIZE, pubkey)
push_cassandra_files(key, local, servers)
master = boot_master(driver, pubkey)
- push_master_files(key, master)
+ push_master_files(key, master, servers)
print servers
print master
finally:
print "Cleaning up "+ tempdir
- shutil.rmtree(tempdir)
+ #shutil.rmtree(tempdir)
log("Cleaning up any booted servers....")
- driver = get_libcloud_driver()
- [n.destroy() for n in driver.list_nodes() if n.name.find('cbench') != -1]
+ #driver = get_libcloud_driver()
+ #[n.destroy() for n in driver.list_nodes() if n.name.find('cbench') != -1]
if __name__ == "__main__":
main()

0 comments on commit 28478dc

Please sign in to comment.