Skip to content

Commit

Permalink
Add command to enable DSE's AOSS
Browse files Browse the repository at this point in the history
  • Loading branch information
ptnapoleon committed Apr 25, 2018
1 parent 23392d6 commit 1db7384
Show file tree
Hide file tree
Showing 7 changed files with 84 additions and 6 deletions.
12 changes: 9 additions & 3 deletions ccmlib/cluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ def __init__(self, path, name, partitioner=None, install_dir=None, create_direct
self.partitioner = partitioner
self._config_options = {}
self._dse_config_options = {}
self._misc_config_options = {}
self._environment_variables = {}
self.__log_level = "INFO"
self.__path = path
Expand Down Expand Up @@ -316,7 +317,7 @@ def balanced_tokens_across_dcs(self, dcs):
tokens.extend(new_tokens)
return tokens

def remove(self, node=None):
def remove(self, node=None, gently=False):
if node is not None:
if node.name not in self.nodes:
return
Expand All @@ -325,10 +326,10 @@ def remove(self, node=None):
if node in self.seeds:
self.seeds.remove(node)
self._update_config()
node.stop(gently=False)
node.stop(gently=gently)
self.remove_dir_with_retry(node.get_path())
else:
self.stop(gently=False)
self.stop(gently=gently)
self.remove_dir_with_retry(self.get_path())

# We can race w/shutdown on Windows and get Access is denied attempting to delete node logs.
Expand Down Expand Up @@ -572,6 +573,10 @@ def verify(self, options):
for node in list(self.nodes.values()):
node.verify(options)

def enable_aoss(self):
common.error("Cannot enable AOSS in C* clusters")
exit(1)

def update_log4j(self, new_log4j_config):
# iterate over all nodes
for node in self.nodelist():
Expand All @@ -597,6 +602,7 @@ def _update_config(self):
'install_dir': self.__install_dir,
'config_options': self._config_options,
'dse_config_options': self._dse_config_options,
'misc_config_options' : self._misc_config_options,
'log_level': self.__log_level,
'use_vnodes': self.use_vnodes,
'datadirs': self.data_dir_count,
Expand Down
2 changes: 2 additions & 0 deletions ccmlib/cluster_factory.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,8 @@ def load(path, name):
cluster._config_options = data['config_options']
if 'dse_config_options' in data:
cluster._dse_config_options = data['dse_config_options']
if 'misc_config_options' in data:
cluster._misc_config_options = data['misc_config_options']
if 'log_level' in data:
cluster.__log_level = data['log_level']
if 'use_vnodes' in data:
Expand Down
19 changes: 18 additions & 1 deletion ccmlib/cmds/cluster_cmds.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,8 @@
"checklogerror",
"showlastlog",
"jconsole",
"setworkload"
"setworkload",
"enableaoss"
]


Expand Down Expand Up @@ -833,3 +834,19 @@ def run(self):
except common.ArgumentError as e:
print_(str(e), file=sys.stderr)
exit(1)

class ClusterEnableaossCmd(Cmd):

descr_text = "Enable DSE's Always On SparkSQL Server"
usage = "usage: ccm enableaoss"

def validate(self, parser, options, args):
Cmd.validate(self, parser, options, args, load_cluster=True)

def run(self):
try:
self.cluster.enable_aoss()
except Exception as e:
print_(str(e), file=sys.stderr)
exit(1)

2 changes: 2 additions & 0 deletions ccmlib/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -302,6 +302,8 @@ def make_dse_env(install_dir, node_path, node_ip):
if version < '6.0':
env['SPARK_WORKER_MEMORY'] = os.environ.get('SPARK_WORKER_MEMORY', '1024M')
env['SPARK_WORKER_CORES'] = os.environ.get('SPARK_WORKER_CORES', '2')
else:
env['ALWAYSON_SQL_LOG_DIR'] = os.path.join(node_path, 'logs')
env['DSE_HOME'] = os.path.join(install_dir)
env['DSE_CONF'] = os.path.join(node_path, 'resources', 'dse', 'conf')
env['CASSANDRA_HOME'] = os.path.join(install_dir, 'resources', 'cassandra')
Expand Down
17 changes: 17 additions & 0 deletions ccmlib/dse_cluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -74,18 +74,35 @@ def start(self, no_wait=False, verbose=False, wait_for_binary_proto=False, wait_
jvm_args = []
started = super(DseCluster, self).start(no_wait, verbose, wait_for_binary_proto, wait_other_notice, jvm_args, profile_options, quiet_start=quiet_start, allow_root=allow_root, timeout=180)
self.start_opscenter()
if self._misc_config_options.get('enable_aoss', False):
self.wait_for_any_log('AlwaysOn SQL started', 600)
return started

def stop(self, wait=True, signal_event=signal.SIGTERM, **kwargs):
not_running = super(DseCluster, self).stop(wait=wait, signal_event=signal.SIGTERM, **kwargs)
self.stop_opscenter()
return not_running


def remove(self, node=None):
# We _must_ gracefully stop if aoss is enabled, otherwise we will leak the spark workers
super(DseCluster, self).remove(node=node, gently=self._misc_config_options.get('enable_aoss', False))

def cassandra_version(self):
if self._cassandra_version is None:
self._cassandra_version = common.get_dse_cassandra_version(self.get_install_dir())
return self._cassandra_version

def enable_aoss(self):
if self.version() < '6.0':
common.error("Cannot enable AOSS in DSE clusters before 6.0")
exit(1)
self._misc_config_options['enable_aoss'] = True
for node in self.nodelist():
port_offset = int(node.name[4:])
node.enable_aoss(thrift_port=10000 + port_offset, web_ui_port=9077 + port_offset)
self._update_config()

def set_dse_configuration_options(self, values=None):
if values is not None:
self._dse_config_options = common.merge_configuration(self._dse_config_options, values)
Expand Down
19 changes: 18 additions & 1 deletion ccmlib/dse_node.py
Original file line number Diff line number Diff line change
Expand Up @@ -73,11 +73,27 @@ def set_workloads(self, workloads):
'work_dir': os.path.join(self.get_path(), 'dsefs'),
'data_directories': [{'dir': os.path.join(self.get_path(), 'dsefs', 'data')}]}}
if self.cluster.version() >= '6.0':
dse_options['resource_manager_options'] = {'worker_options' : {'memory_total': '1g', 'cores_total': 2}}
# Don't overwrite aoss options
if not self._dse_config_options.has_key('resource_manager_options'):
dse_options['resource_manager_options'] = {'worker_options': {'memory_total': '1g', 'cores_total': 2}}

self.set_dse_configuration_options(dse_options)
self._update_spark_env()

def enable_aoss(self, thrift_port=10000, web_ui_port=9077):
self._dse_config_options['alwayson_sql_options'] = {'enabled': True, 'thrift_port': thrift_port, 'web_ui_port': web_ui_port}
self._dse_config_options['resource_manager_options'] = {'worker_options':
{'cores_total': 2,
'memory_total': '2g',
'workpools': [{
'name': 'alwayson_sql',
'cores': 0.5,
'memory': 0.5
}]}}
self._update_config()
self._update_spark_env()
self._update_yaml()

def set_dse_configuration_options(self, values=None):
if values is not None:
self._dse_config_options = common.merge_configuration(self._dse_config_options, values)
Expand Down Expand Up @@ -128,6 +144,7 @@ def start(self,
quiet_start=False,
allow_root=False,
set_migration_task=True):
mark = self.mark_log()
process = super(DseNode, self).start(join_ring, no_wait, verbose, update_pid, wait_other_notice, replace_token,
replace_address, jvm_args, wait_for_binary_proto, profile_options, use_jna,
quiet_start, allow_root, set_migration_task)
Expand Down
19 changes: 18 additions & 1 deletion ccmlib/node.py
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,21 @@ class Node(object):
Provides interactions to a Cassandra node.
"""

def __init__(self, name, cluster, auto_bootstrap, thrift_interface, storage_interface, jmx_port, remote_debug_port, initial_token, save=True, binary_interface=None, byteman_port='0', environment_variables=None, byteman_startup_script=None, derived_cassandra_version=None):
def __init__(self,
name,
cluster,
auto_bootstrap,
thrift_interface,
storage_interface,
jmx_port,
remote_debug_port,
initial_token,
save=True,
binary_interface=None,
byteman_port='0',
environment_variables=None,
byteman_startup_script=None,
derived_cassandra_version=None):
"""
Create a new Node.
- name: the name for that node
Expand Down Expand Up @@ -860,6 +874,9 @@ def verify(self, options):
p = self.verify_process(options=options)
return handle_external_tool_process(p, ['sstableverify'] + options)

def enable_aoss(self, thrift_port=10000, web_ui_port=9077):
pass

def run_cqlsh_process(self, cmds=None, cqlsh_options=None):
if cqlsh_options is None:
cqlsh_options = []
Expand Down

0 comments on commit 1db7384

Please sign in to comment.