Permalink
Browse files

Allow specifying tokens to populate and other bug fixes

  • Loading branch information...
1 parent 47dd4cf commit dca3d49cc2a4e6ad8855dae8db5ec813d91c8b64 @pcmanus pcmanus committed Sep 29, 2011
Showing with 97 additions and 11 deletions.
  1. +36 −3 ccm_lib/cluster.py
  2. +54 −7 ccm_lib/node.py
  3. +7 −1 ccm_lib/repository.py
View
@@ -1,6 +1,6 @@
# ccm clusters
-import common, yaml, os, subprocess, shutil, repository
+import common, yaml, os, subprocess, shutil, repository, time
from node import Node, NodeError
class Cluster():
@@ -88,7 +88,7 @@ def add(self, node, is_seed):
self.__update_config()
return self
- def populate(self, node_count):
+ def populate(self, node_count, tokens=None):
if node_count < 1 or node_count >= 10:
raise common.ArgumentError('invalid node count %s' % node_count)
@@ -97,17 +97,27 @@ def populate(self, node_count):
raise common.ArgumentError('Cannot create existing node node%s' % i)
for i in xrange(1, node_count + 1):
+ tk = None
+ if tokens is not None:
+ try:
+ tk = tokens[i-1]
+ except IndexError:
+ pass
node = Node('node%s' % i,
self,
False,
('127.0.0.%s' % i, 9160),
('127.0.0.%s' % i, 7000),
str(7000 + i * 100),
- None)
+ tk)
self.add(node, True)
self.__update_config()
return self
+ @staticmethod
+ def balanced_tokens(node_count):
+ return [ (i*(2**127)/node_count) for i in range(0, node_count) ]
+
def remove(self, node=None):
if node is not None:
if not node.name in self.nodes:
@@ -147,9 +157,14 @@ def show(self, verbose):
def start(self, no_wait=False, verbose=False):
started = []
+ marks = []
for node in self.nodes.values():
if not node.is_running():
p = node.start(update_pid=False)
+ # ugly? indeed!
+ while not os.path.exists(node.logfilename()):
+ time.sleep(.01)
+ marks.append((node, node.mark_log()))
started.append((node, p))
if no_wait:
@@ -168,6 +183,12 @@ def start(self, no_wait=False, verbose=False):
if not node.is_running():
raise NodeError("Error starting {0}.".format(node.name), p)
+ if not no_wait:
+ for node, mark in marks:
+ for other_node, _ in marks:
+ if other_node is not node:
+ node.watch_log_for_alive(other_node, from_mark=mark)
+
return started
def stop(self, wait=True):
@@ -217,6 +238,18 @@ def unset_configuration_option(self, name):
node.unset_configuration_option(name)
return self
+ def flush(self):
+ self.nodetool("flush")
+
+ def compact(self):
+ self.nodetool("compact")
+
+ def repair(self):
+ self.nodetool("repair")
+
+ def cleanup(self):
+ self.nodetool("cleanup")
+
def __update_config(self):
node_list = [ node.name for node in self.nodes.values() ]
seed_list = [ node.name for node in self.seeds ]
View
@@ -19,6 +19,9 @@ class TimeoutError(Exception):
def __init__(self, data):
Exception.__init__(self, str(data))
+# Groups: 1 = cf, 2 = tmp or none, 3 = suffix (Compacted or Data.db)
+_sstable_regexp = re.compile('([\S])+-(tmp-)?[\S]+-([a-zA-Z.]+)')
+
class Node():
def __init__(self, name, cluster, auto_bootstrap, thrift_interface, storage_interface, jmx_port, initial_token, save=True):
self.name = name
@@ -162,7 +165,7 @@ def watch_log_for_death(self, nodes, from_mark=None, timeout=600):
def watch_log_for_alive(self, nodes, from_mark=None, timeout=60):
tofind = nodes if isinstance(nodes, list) else [nodes]
- tofind = [ "%s state jump to normal" % node.address() for node in tofind ]
+ tofind = [ "%s is now UP" % node.address() for node in tofind ]
self.watch_log_for(tofind, from_mark=from_mark, timeout=timeout)
def start(self, join_ring=True, no_wait=False, verbose=False, update_pid=True, wait_other_notice=False):
@@ -296,14 +299,25 @@ def set_log_level(self, new_level):
common.replace_in_file(conf_file, append_pattern, append_pattern + l)
return self
- def clear(self, clear_all = False):
- data_dirs = [ 'data', 'commitlogs']
- if clear_all:
- data_dirs = data_dirs + [ 'saved_caches', 'logs']
+ def clear(self, clear_all = False, only_data = False):
+ data_dirs = [ 'data' ]
+ if not only_data:
+ data_dirs = data_dirs + [ 'commitlogs']
+ if clear_all:
+ data_dirs = data_dirs + [ 'saved_caches', 'logs']
for d in data_dirs:
full_dir = os.path.join(self.get_path(), d)
- shutil.rmtree(full_dir)
- os.mkdir(full_dir)
+ if only_data:
+ for dir in os.listdir(full_dir):
+ full_dir = os.path.join(full_dir, dir)
+ if os.path.isdir(full_dir) and not dir is "system":
+ for f in os.listdir(full_dir):
+ full_path = os.path.join(full_dir, f)
+ if os.path.isfile(full_path):
+ os.remove(full_path)
+ else:
+ shutil.rmtree(full_dir)
+ os.mkdir(full_dir)
def decommission(self):
self.status = Status.DECOMMISIONNED
@@ -358,6 +372,39 @@ def stress(self, stress_options):
except KeyboardInterrupt:
pass
+ def data_size(self, live_data=True):
+ data_dir = os.path.join(self.get_path(), 'data')
+ size = 0
+ for dir in os.listdir(data_dir):
+ full_dir = os.path.join(data_dir, dir)
+ if os.path.isdir(full_dir) and not dir.endswith("system"):
+ for f in os.listdir(full_dir):
+ full_path = os.path.join(full_dir,f)
+ if os.path.isfile(full_path):
+ if live_data:
+ m = _sstable_regexp.match(f)
+ if m is None or m.group(2) is not None or m.group(3) != "Data.db":
+ continue
+ if os.path.exists(full_path.replace("Data.db", "Compacted")):
+ continue
+ size += os.path.getsize(full_path)
+ return size
+
+ def flush(self):
+ self.nodetool("flush")
+
+ def compact(self):
+ self.nodetool("compact")
+
+ def repair(self):
+ self.nodetool("repair")
+
+ def move(self, new_token):
+ self.nodetool("move " + str(new_token))
+
+ def cleanup(self):
+ self.nodetool("cleanup")
+
def import_config_files(self):
self.__update_config()
View
@@ -1,7 +1,7 @@
# downloaded sources handling
from __future__ import with_statement
-import os, shutil, urllib2, tarfile, tempfile, subprocess
+import os, shutil, urllib2, tarfile, tempfile, subprocess, stat
import common
ARCHIVE="http://archive.apache.org/dist/cassandra"
@@ -46,6 +46,12 @@ def download_version(version, url=None, verbose=False):
lf.write("\n\n--- cassandra/stress build ------------\n")
stress_dir = os.path.join(target_dir, "tools", "stress") if version >= "0.8.0" else os.path.join(target_dir, "contrib", "stress")
try:
+ # set permissions correctly, seems to not always be the case
+ stress_bin_dir = os.path.join(stress_dir, 'bin')
+ for f in os.listdir(stress_bin_dir):
+ full_path = os.path.join(stress_bin_dir, f)
+ os.chmod(full_path, stat.S_IRUSR | stat.S_IWUSR | stat.S_IXUSR | stat.S_IRGRP | stat.S_IXGRP | stat.S_IROTH | stat.S_IXOTH)
+
if subprocess.call(['ant', 'build'], cwd=stress_dir, stdout=lf, stderr=lf) is not 0:
raise common.CCMError("Error compiling Cassandra stress tool. See %s for details (you will still be able to use ccm but not the stress related commands)" % logfile)
except IOError as e:

0 comments on commit dca3d49

Please sign in to comment.