Skip to content
This repository
tree: 62b5cdbd4c
Fetching contributors…

Octocat-spinner-32-eaf2f5

Cannot retrieve contributors at this time

file 379 lines (320 sloc) 13.705 kb
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378
# ccm clusters

import common, yaml, os, subprocess, shutil, repository, time, re
from node import Node, NodeError
from bulkloader import BulkLoader

class Cluster():
    def __init__(self, path, name, partitioner=None, cassandra_dir=None, create_directory=True, cassandra_version=None, verbose=False):
        self.name = name
        self.nodes = {}
        self.seeds = []
        self.partitioner = partitioner
        self._config_options = {}
        self.__log_level = "INFO"
        self.__path = path
        self.__version = None
        if create_directory:
            # we create the dir before potentially downloading to throw an error sooner if need be
            os.mkdir(self.get_path())

        try:
            if cassandra_version is None:
                # at this point, cassandra_dir should always not be None, but
                # we keep this for backward compatibility (in loading old cluster)
                if cassandra_dir is not None:
                    self.__cassandra_dir = os.path.abspath(cassandra_dir)
                    self.__version = self.__get_version_from_build()
            else:
                dir, v = repository.setup(cassandra_version, verbose)
                self.__cassandra_dir = dir
                self.__version = v if v is not None else self.__get_version_from_build()

            if create_directory:
                common.validate_cassandra_dir(self.__cassandra_dir)
                self.__update_config()
        except:
            if create_directory:
                shutil.rmtree(self.get_path())
            raise

    def set_partitioner(self, partitioner):
        self.partitioner = partitioner
        self.__update_config()
        return self

    def set_cassandra_dir(self, cassandra_dir=None, cassandra_version=None, verbose=False):
        if cassandra_version is None:
            self.__cassandra_dir = cassandra_dir
            common.validate_cassandra_dir(cassandra_dir)
            self.__version = self.__get_version_from_build()
        else:
            dir, v = repository.setup(cassandra_version, verbose)
            self.__cassandra_dir = dir
            self.__version = v if v is not None else self.__get_version_from_build()
        self.__update_config()
        for node in self.nodes.values():
            node.import_config_files()
        return self

    def get_cassandra_dir(self):
        common.validate_cassandra_dir(self.__cassandra_dir)
        return self.__cassandra_dir

    def nodelist(self):
        return [ self.nodes[name] for name in sorted(self.nodes.keys()) ]

    def version(self):
        return self.__version

    @staticmethod
    def load(path, name):
        cluster_path = os.path.join(path, name)
        filename = os.path.join(cluster_path, 'cluster.conf')
        with open(filename, 'r') as f:
            data = yaml.load(f)
        try:
            cassandra_dir = None
            if 'cassandra_dir' in data:
                cassandra_dir = data['cassandra_dir']
                repository.validate(cassandra_dir)

            cluster = Cluster(path, data['name'], cassandra_dir=cassandra_dir, create_directory=False)
            node_list = data['nodes']
            seed_list = data['seeds']
            if 'partitioner' in data:
                cluster.partitioner = data['partitioner']
            if 'config_options' in data:
                cluster._config_options = data['config_options']
            if 'log_level' in data:
                cluster.__log_level = data['log_level']
        except KeyError as k:
            raise common.LoadError("Error Loading " + filename + ", missing property:" + k)

        for node_name in node_list:
            cluster.nodes[node_name] = Node.load(cluster_path, node_name, cluster)
        for seed_name in seed_list:
            cluster.seeds.append(cluster.nodes[seed_name])

        return cluster

    def add(self, node, is_seed, data_center=None):
        if node.name in self.nodes:
            raise common.ArgumentError('Cannot create existing node %s' % node.name)
        self.nodes[node.name] = node
        if is_seed:
            self.seeds.append(node)
        self.__update_config()
        node.data_center = data_center
        node.set_log_level(self.__log_level)
        node._save()
        if data_center is not None:
            self.__update_topology_files()
        return self

    def populate(self, nodes, debug=False, tokens=None):
        node_count = nodes
        dcs = []
        if isinstance(nodes, list):
            self.set_configuration_options(values={'endpoint_snitch' : 'org.apache.cassandra.locator.PropertyFileSnitch'})
            node_count = 0
            i = 0
            for c in nodes:
                i = i + 1
                node_count = node_count + c
                for x in xrange(0, c):
                    dcs.append('dc%d' % i)

        if node_count < 1:
            raise common.ArgumentError('invalid node count %s' % nodes)

        for i in xrange(1, node_count + 1):
            if 'node%s' % i in self.nodes.values():
                raise common.ArgumentError('Cannot create existing node node%s' % i)

        if tokens is None:
            tokens = Cluster.balanced_tokens(node_count, domain_size=64 if self.version() >= '1.2' else 128)

        for i in xrange(1, node_count + 1):
            tk = None
            if i-1 < len(tokens):
                tk = tokens[i-1]
            dc = dcs[i-1] if i-1 < len(dcs) else None

            binary = None
            if self.version() >= '1.2':
                binary = ('127.0.0.%s' % i, 8000)
            node = Node('node%s' % i,
                        self,
                        False,
                        ('127.0.0.%s' % i, 9160),
                        ('127.0.0.%s' % i, 7000),
                        str(7000 + i * 100),
                        (str(0), str(2000 + i * 100))[debug == True],
                        tk,
                        binary_interface=binary)
            self.add(node, True, dc)
            self.__update_config()
        return self

    @staticmethod
    def balanced_tokens(node_count, domain_size=128):
        return [ (i*(2**(domain_size-1)/node_count)) for i in range(0, node_count) ]

    def remove(self, node=None):
        if node is not None:
            if not node.name in self.nodes:
                return

            del self.nodes[node.name]
            if node in self.seeds:
                self.seeds.remove(node)
            self.__update_config()
            node.stop()
            shutil.rmtree(node.get_path())
        else:
            self.stop()
            shutil.rmtree(self.get_path())

    def clear(self):
        self.stop()
        for node in self.nodes.values():
            node.clear()

    def get_path(self):
        return os.path.join(self.__path, self.name)

    def get_seeds(self):
        return [ s.network_interfaces['storage'][0] for s in self.seeds ]

    def show(self, verbose):
        if len(self.nodes.values()) == 0:
            print "No node in this cluster yet"
            return
        for node in self.nodes.values():
            if (verbose):
                node.show(show_cluster=False)
                print ""
            else:
                node.show(only_status=True)

    def start(self, no_wait=False, verbose=False):
        started = []
        marks = []
        for node in self.nodes.values():
            if not node.is_running():
                p = node.start(update_pid=False)
                started.append((node, p))

        if no_wait and not verbose:
            time.sleep(2) # waiting 2 seconds to check for early errors and for the pid to be set
        else:
            for node, p in started:
                for line in p.stdout:
                    if verbose:
                        print "[%s] %s" % (node.name, line.rstrip('\n'))
                for line in p.stderr:
                    if verbose:
                        print "[%s ERROR] %s" % (node.name, line.rstrip('\n'))
                if verbose:
                    print "----"

        for node, p in started:
            # ugly? indeed!
            while not os.path.exists(node.logfilename()):
                time.sleep(.01)
            marks.append((node, node.mark_log()))

        self.__update_pids(started)

        for node, p in started:
            if not node.is_running():
                raise NodeError("Error starting {0}.".format(node.name), p)

        if not no_wait and self.version() >= "0.8":
            # 0.7 gossip messages seems less predictible that from 0.8 onwards and
            # I don't care enough
            for node, mark in marks:
                for other_node, _ in marks:
                    if other_node is not node:
                        node.watch_log_for_alive(other_node, from_mark=mark)

        return started

    def stop(self, wait=True, gently=False):
        not_running = []
        for node in self.nodes.values():
            if not node.stop(wait, gently=gently):
                not_running.append(node)
        return not_running

    def set_log_level(self, new_level):
        known_level = [ 'TRACE', 'DEBUG', 'INFO', 'WARN', 'ERROR' ]
        if new_level not in known_level:
            raise common.ArgumentError("Unknown log level %s (use one of %s)" % (new_level, " ".join(known_level)))

        self.__log_level = new_level
        self.__update_config()

        for node in self.nodelist():
            node.set_log_level(new_level)

    def nodetool(self, nodetool_cmd):
        for node in self.nodes.values():
            if node.is_running():
                node.nodetool(nodetool_cmd)
        return self

    def stress(self, stress_options):
        stress = common.get_stress_bin(self.get_cassandra_dir())
        livenodes = [ node.network_interfaces['storage'][0] for node in self.nodes.values() if node.is_live() ]
        if len(livenodes) == 0:
            print "No live node"
            return
        args = [ stress, '-d', ",".join(livenodes) ] + stress_options
        try:
            subprocess.call(args)
        except KeyboardInterrupt:
            pass
        return self

    def run_cli(self, cmds=None, show_output=False, cli_options=[]):
        livenodes = [ node for node in self.nodes.values() if node.is_live() ]
        if len(livenodes) == 0:
            raise common.ArgumentError("No live node")
        livenodes[0].run_cli(cmds, show_output, cli_options)

    def set_configuration_options(self, values=None, batch_commitlog=None):
        if values is not None:
            for k, v in values.iteritems():
                self._config_options[k] = v
        if batch_commitlog is not None:
            if batch_commitlog:
                self._config_options["commitlog_sync"] = "batch"
                self._config_options["commitlog_sync_batch_window_in_ms"] = 5
                self._config_options["commitlog_sync_period_in_ms"] = None
            else:
                self._config_options["commitlog_sync"] = "periodic"
                self._config_options["commitlog_sync_period_in_ms"] = 10000
                self._config_options["commitlog_sync_batch_window_in_ms"] = None

        for node in self.nodes.values():
            node.import_config_files()
        return self

    def flush(self):
        self.nodetool("flush")

    def compact(self):
        self.nodetool("compact")

    def drain(self):
        self.nodetool("drain")

    def repair(self):
        self.nodetool("repair")

    def cleanup(self):
        self.nodetool("cleanup")

    def decommission(self):
        for node in self.nodes.values():
            if node.is_running():
                node.decommission()

    def removeToken(self, token):
        self.nodetool("removeToken " + str(token))

    def bulkload(self, options):
        loader = BulkLoader(self)
        loader.load(options)

    def scrub(self, options):
        for node in self.nodes.values():
            node.scrub(options)

    def __get_version_from_build(self):
        cassandra_dir = self.get_cassandra_dir()
        build = os.path.join(cassandra_dir, 'build.xml')
        with open(build) as f:
            for line in f:
                match = re.search('name="base\.version" value="([0-9.]+)[^"]*"', line)
                if match:
                    return match.group(1)
        raise common.CCMError("Cannot find version")

    def __update_config(self):
        node_list = [ node.name for node in self.nodes.values() ]
        seed_list = [ node.name for node in self.seeds ]
        filename = os.path.join(self.__path, self.name, 'cluster.conf')
        with open(filename, 'w') as f:
            yaml.safe_dump({
                'name' : self.name,
                'nodes' : node_list,
                'seeds' : seed_list,
                'partitioner' : self.partitioner,
                'cassandra_dir' : self.__cassandra_dir,
                'config_options' : self._config_options,
                'log_level' : self.__log_level
            }, f)

    def __update_pids(self, started):
        for node, p in started:
            node._update_pid(p)

    def __update_topology_files(self):
        dcs = [('default', 'dc1')]
        for node in self.nodelist():
            if node.data_center is not None:
                dcs.append((node.address(), node.data_center))

        content = ""
        for k, v in dcs:
            content = "%s%s=%s:r1\n" % (content, k, v)

        for node in self.nodelist():
            topology_file = os.path.join(node.get_conf_dir(), 'cassandra-topology.properties')
            with open(topology_file, 'w') as f:
                f.write(content)
Something went wrong with that request. Please try again.