Skip to content

Commit

Permalink
Merge pull request #450 from pcmanus/dse-enhancements
Browse files Browse the repository at this point in the history
Support for graph and dsefs for DSE 5.0 EAP1
  • Loading branch information
ptnapoleon committed Feb 9, 2016
2 parents 40bdedf + 30f4810 commit b5bb9f9
Show file tree
Hide file tree
Showing 5 changed files with 121 additions and 37 deletions.
33 changes: 32 additions & 1 deletion ccmlib/cmds/cluster_cmds.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,8 @@ def cluster_cmds():
"invalidatecache",
"checklogerror",
"showlastlog",
"jconsole"
"jconsole",
"setworkload"
]


Expand Down Expand Up @@ -969,3 +970,33 @@ def run(self):
except OSError:
print_("Could not start jconsole. Please make sure jconsole can be found in your $PATH.")
exit(1)

class ClusterSetworkloadCmd(Cmd):

def description(self):
return "Sets the workloads for a DSE cluster"

def get_parser(self):
usage = "usage: ccm setworkload [cassandra|solr|hadoop|spark|dsefs|cfs|graph],..."
parser = self._get_default_parser(usage, self.description())
return parser

def validate(self, parser, options, args):
Cmd.validate(self, parser, options, args, load_cluster=True)
self.workloads = args[0].split(',')
valid_workloads = ['cassandra', 'solr', 'hadoop', 'spark', 'dsefs', 'cfs', 'graph']
for workload in self.workloads:
if workload not in valid_workloads:
print_(workload, ' is not a valid workload')
exit(1)

def run(self):
try:
if len(self.cluster.nodes) == 0:
print_("No node in this cluster yet. Use the populate command before starting.")
exit(1)
for node in list(self.cluster.nodes.values()):
node.set_workloads(workloads=self.workloads)
except common.ArgumentError as e:
print_(str(e), file=sys.stderr)
exit(1)
17 changes: 9 additions & 8 deletions ccmlib/cmds/node_cmds.py
Original file line number Diff line number Diff line change
Expand Up @@ -712,24 +712,25 @@ def run(self):
class NodeSetworkloadCmd(Cmd):

def description(self):
return "Sets the workload for a DSE node"
return "Sets the workloads for a DSE node"

def get_parser(self):
usage = "usage: ccm node_name setworkload [cassandra|solr|hadoop|spark|cfs]"
usage = "usage: ccm node_name setworkload [cassandra|solr|hadoop|spark|dsefs|cfs|graph],..."
parser = self._get_default_parser(usage, self.description())
return parser

def validate(self, parser, options, args):
Cmd.validate(self, parser, options, args, node_name=True, load_cluster=True)
self.workload = args[1]
workloads = ['cassandra', 'solr', 'hadoop', 'spark', 'cfs']
if self.workload not in workloads:
print_(self.workload, ' is not a valid workload')
exit(1)
self.workloads = args[1].split(',')
valid_workloads = ['cassandra', 'solr', 'hadoop', 'spark', 'dsefs', 'cfs', 'graph']
for workload in self.workloads:
if workload not in valid_workloads:
print_(workload, ' is not a valid workload')
exit(1)

def run(self):
try:
self.node.set_workload(workload=self.workload)
self.node.set_workloads(workloads=self.workloads)
except common.ArgumentError as e:
print_(str(e), file=sys.stderr)
exit(1)
Expand Down
10 changes: 9 additions & 1 deletion ccmlib/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -220,7 +220,7 @@ def make_cassandra_env(install_dir, node_path, update_conf=True):
return env


def make_dse_env(install_dir, node_path):
def make_dse_env(install_dir, node_path, node_ip):
env = os.environ.copy()
env['MAX_HEAP_SIZE'] = os.environ.get('CCM_MAX_HEAP_SIZE', '500M')
env['HEAP_NEWSIZE'] = os.environ.get('CCM_HEAP_NEWSIZE', '50M')
Expand All @@ -237,6 +237,14 @@ def make_dse_env(install_dir, node_path):
env['MAHOUT_CONF_DIR'] = os.path.join(node_path, 'resources', 'mahout', 'conf')
env['SPARK_CONF_DIR'] = os.path.join(node_path, 'resources', 'spark', 'conf')
env['SHARK_CONF_DIR'] = os.path.join(node_path, 'resources', 'shark', 'conf')
env['GREMLIN_CONSOLE_CONF_DIR'] = os.path.join(node_path, 'resources', 'graph', 'gremlin-console', 'conf')
env['SPARK_WORKER_DIR'] = os.path.join(node_path, 'spark', 'worker')
env['SPARK_LOCAL_DIRS'] = os.path.join(node_path, 'spark', 'rdd')
env['SPARK_WORKER_LOG_DIR'] = os.path.join(node_path, 'logs', 'spark', 'worker')
env['SPARK_MASTER_LOG_DIR'] = os.path.join(node_path, 'logs', 'spark', 'master')
env['DSE_LOG_ROOT'] = os.path.join(node_path, 'logs', 'dse')
env['CASSANDRA_LOG_DIR'] = os.path.join(node_path, 'logs')
env['SPARK_LOCAL_IP'] = ''+node_ip
return env


Expand Down
80 changes: 60 additions & 20 deletions ccmlib/dse_node.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
import time

import yaml
from six import print_
from six import print_,iteritems

from ccmlib import common
from ccmlib.node import Node, NodeError
Expand All @@ -25,6 +25,7 @@ class DseNode(Node):
def __init__(self, name, cluster, auto_bootstrap, thrift_interface, storage_interface, jmx_port, remote_debug_port, initial_token, save=True, binary_interface=None, byteman_port='0'):
super(DseNode, self).__init__(name, cluster, auto_bootstrap, thrift_interface, storage_interface, jmx_port, remote_debug_port, initial_token, save, binary_interface, byteman_port)
self.get_cassandra_version()
self._dse_config_options = {}
if self.cluster.hasOpscenter():
self._copy_agent()

Expand All @@ -47,16 +48,34 @@ def get_tool_args(self, toolname):
return [common.join_bin(os.path.join(self.get_install_dir(), 'resources', 'cassandra'), 'bin', 'dse'), toolname]

def get_env(self):
return common.make_dse_env(self.get_install_dir(), self.get_path())
(node_ip, _) = self.network_interfaces['binary']
return common.make_dse_env(self.get_install_dir(), self.get_path(), node_ip)

def get_cassandra_version(self):
return common.get_dse_cassandra_version(self.get_install_dir())

def set_workload(self, workload):
self.workload = workload
def set_workloads(self, workloads):
self.workloads = workloads
self._update_config()
if workload == 'solr':
if 'solr' in self.workloads:
self.__generate_server_xml()
if 'graph' in self.workloads:
(node_ip, _) = self.network_interfaces['binary']
self.set_dse_configuration_options({'graph' : {'gremlin_server': {'host': node_ip}}})
self.__update_gremlin_config_yaml()
if 'dsefs' in self.workloads:
dsefs_options = {'dsefs_options' : {'enabled': 'true',
'keyspace_name': 'dsefs',
'public_port': '5598',
'private_port': '5599',
'data_directories': [os.path.join(self.get_path(), 'dsefs')]}}
self.set_dse_configuration_options(dsefs_options)

def set_dse_configuration_options(self, values=None):
if values is not None:
for k, v in iteritems(values):
self._dse_config_options[k] = v
self.import_dse_config_files()

def watch_log_for_alive(self, nodes, from_mark=None, timeout=720, filename='system.log'):
"""
Expand Down Expand Up @@ -130,23 +149,26 @@ def start(self,

os.chmod(launch_bin, os.stat(launch_bin).st_mode | stat.S_IEXEC)

env = common.make_dse_env(self.get_install_dir(), self.get_path())
env = self.get_env()

if common.is_win():
self._clean_win_jmx()

pidfile = os.path.join(self.get_path(), 'cassandra.pid')
args = [launch_bin, 'cassandra']

if self.workload is not None:
if 'hadoop' in self.workload:
for workload in self.workloads:
if 'hadoop' in workload:
args.append('-t')
if 'solr' in self.workload:
if 'solr' in workload:
args.append('-s')
if 'spark' in self.workload:
if 'spark' in workload:
args.append('-k')
if 'cfs' in self.workload:
if 'cfs' in workload:
args.append('-c')
if 'graph' in workload:
args.append('-g')

args += ['-p', pidfile, '-Dcassandra.join_ring=%s' % str(join_ring)]
args += ['-Dcassandra.logdir=%s' % os.path.join(self.get_path(), 'logs')]
if replace_token is not None:
Expand Down Expand Up @@ -231,7 +253,7 @@ def nodetool(self, cmd, username=None, password=None, capture_output=True, wait=
return stdout, stderr

def dsetool(self, cmd):
env = common.make_dse_env(self.get_install_dir(), self.get_path())
env = self.get_env()
dsetool = common.join_bin(self.get_install_dir(), 'bin', 'dsetool')
args = [dsetool, '-h', 'localhost', '-j', str(self.jmx_port)]
args += cmd.split()
Expand All @@ -241,7 +263,7 @@ def dsetool(self, cmd):
def dse(self, dse_options=None):
if dse_options is None:
dse_options = []
env = common.make_dse_env(self.get_install_dir(), self.get_path())
env = self.get_env()
env['JMX_PORT'] = self.jmx_port
dse = common.join_bin(self.get_install_dir(), 'bin', 'dse')
args = [dse]
Expand All @@ -252,7 +274,7 @@ def dse(self, dse_options=None):
def hadoop(self, hadoop_options=None):
if hadoop_options is None:
hadoop_options = []
env = common.make_dse_env(self.get_install_dir(), self.get_path())
env = self.get_env()
env['JMX_PORT'] = self.jmx_port
dse = common.join_bin(self.get_install_dir(), 'bin', 'dse')
args = [dse, 'hadoop']
Expand All @@ -263,7 +285,7 @@ def hadoop(self, hadoop_options=None):
def hive(self, hive_options=None):
if hive_options is None:
hive_options = []
env = common.make_dse_env(self.get_install_dir(), self.get_path())
env = self.get_env()
env['JMX_PORT'] = self.jmx_port
dse = common.join_bin(self.get_install_dir(), 'bin', 'dse')
args = [dse, 'hive']
Expand All @@ -274,7 +296,7 @@ def hive(self, hive_options=None):
def pig(self, pig_options=None):
if pig_options is None:
pig_options = []
env = common.make_dse_env(self.get_install_dir(), self.get_path())
env = self.get_env()
env['JMX_PORT'] = self.jmx_port
dse = common.join_bin(self.get_install_dir(), 'bin', 'dse')
args = [dse, 'pig']
Expand All @@ -285,7 +307,7 @@ def pig(self, pig_options=None):
def sqoop(self, sqoop_options=None):
if sqoop_options is None:
sqoop_options = []
env = common.make_dse_env(self.get_install_dir(), self.get_path())
env = self.get_env()
env['JMX_PORT'] = self.jmx_port
dse = common.join_bin(self.get_install_dir(), 'bin', 'dse')
args = [dse, 'sqoop']
Expand All @@ -296,7 +318,7 @@ def sqoop(self, sqoop_options=None):
def spark(self, spark_options=None):
if spark_options is None:
spark_options = []
env = common.make_dse_env(self.get_install_dir(), self.get_path())
env = self.get_env()
env['JMX_PORT'] = self.jmx_port
dse = common.join_bin(self.get_install_dir(), 'bin', 'dse')
args = [dse, 'spark']
Expand All @@ -312,7 +334,7 @@ def import_dse_config_files(self):
self.__update_yaml()

def copy_config_files(self):
for product in ['dse', 'cassandra', 'hadoop', 'sqoop', 'hive', 'tomcat', 'spark', 'shark', 'mahout', 'pig', 'solr']:
for product in ['dse', 'cassandra', 'hadoop', 'sqoop', 'hive', 'tomcat', 'spark', 'shark', 'mahout', 'pig', 'solr', 'graph']:
src_conf = os.path.join(self.get_install_dir(), 'resources', product, 'conf')
dst_conf = os.path.join(self.get_path(), 'resources', product, 'conf')
if not os.path.isdir(src_conf):
Expand All @@ -337,6 +359,12 @@ def copy_config_files(self):
if os.path.isdir(dst_webapps):
common.rmdirs(dst_webapps)
shutil.copytree(src_webapps, dst_webapps)
src_lib = os.path.join(self.get_install_dir(), 'resources', product, 'gremlin-console', 'conf')
dst_lib = os.path.join(self.get_path(), 'resources', product, 'gremlin-console', 'conf')
if os.path.isdir(dst_lib):
common.rmdirs(dst_lib)
if os.path.exists(src_lib):
shutil.copytree(src_lib, dst_lib)

def import_bin_files(self):
os.makedirs(os.path.join(self.get_path(), 'resources', 'cassandra', 'bin'))
Expand Down Expand Up @@ -372,7 +400,7 @@ def __update_yaml(self):

data['system_key_directory'] = os.path.join(self.get_path(), 'keys')

full_options = dict(list(self.cluster._dse_config_options.items()))
full_options = dict(list(self.cluster._dse_config_options.items()) + list(self._dse_config_options.items()))
for name in full_options:
value = full_options[name]
if isinstance(value, str) and (value is None or len(value) == 0):
Expand Down Expand Up @@ -427,6 +455,18 @@ def __generate_server_xml(self):
f.write('</Server>\n')
f.close()

def __update_gremlin_config_yaml(self):
(node_ip, _) = self.network_interfaces['binary']

conf_file = os.path.join(self.get_path(), 'resources', 'graph', 'gremlin-console', 'conf', 'remote-objects.yaml')
with open(conf_file, 'r') as f:
data = yaml.load(f)

data['hosts'] = [node_ip]

with open(conf_file, 'w') as f:
yaml.safe_dump(data, f, default_flow_style=False)

def _get_directories(self):
dirs = []
for i in ['data', 'commitlogs', 'saved_caches', 'logs', 'bin', 'keys', 'resources', os.path.join('data', 'hints')]:
Expand Down
18 changes: 11 additions & 7 deletions ccmlib/node.py
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,8 @@ def __init__(self, name, cluster, auto_bootstrap, thrift_interface, storage_inte
self.initial_token = initial_token
self.pid = None
self.data_center = None
self.workload = None
self.workloads = []
self._dse_config_options = {}
self.__config_options = {}
self.__install_dir = None
self.__global_log_level = None
Expand Down Expand Up @@ -142,8 +143,8 @@ def load(path, name, cluster):
node.__config_options = data['config_options']
if 'data_center' in data:
node.data_center = data['data_center']
if 'workload' in data:
node.workload = data['workload']
if 'workloads' in data:
node.workloads = data['workloads']
return node
except KeyError as k:
raise common.LoadError("Error Loading " + filename + ", missing property: " + str(k))
Expand Down Expand Up @@ -216,8 +217,8 @@ def set_install_dir(self, install_dir=None, version=None, verbose=False):
self.__conf_updated = False
return self

def set_workload(self, workload):
raise common.ArgumentError("Cannot set workload on cassandra node")
def set_workloads(self, workloads):
raise common.ArgumentError("Cannot set workloads on a cassandra node")

def get_cassandra_version(self):
try:
Expand Down Expand Up @@ -255,6 +256,9 @@ def set_configuration_options(self, values=None, batch_commitlog=None):

self.import_config_files()

def set_dse_configuration_options(self, values=None):
pass

def show(self, only_status=False, show_cluster=True):
"""
Print infos on this node configuration.
Expand Down Expand Up @@ -1266,8 +1270,8 @@ def _update_config(self):
values['byteman_port'] = self.byteman_port
if self.data_center:
values['data_center'] = self.data_center
if self.workload is not None:
values['workload'] = self.workload
if self.workloads is not None:
values['workloads'] = self.workloads
with open(filename, 'w') as f:
yaml.safe_dump(values, f)

Expand Down

0 comments on commit b5bb9f9

Please sign in to comment.