Skip to content

Commit

Permalink
Merge pull request #467 from kevinadi/issue-461
Browse files Browse the repository at this point in the history
Fix mlaunch with MongoDB 3.4.0 (fixes #461)
  • Loading branch information
stennie committed Dec 8, 2016
2 parents 331ece6 + fa5e2bd commit db77ce9
Showing 1 changed file with 60 additions and 65 deletions.
125 changes: 60 additions & 65 deletions mtools/mlaunch/mlaunch.py
Expand Up @@ -278,13 +278,14 @@ def init(self):

# Check if config replicaset is applicable to this version
current_version = self.getMongoDVersion()
if self.args['csrs']:
if LooseVersion(current_version) < LooseVersion("3.2.0"):
errmsg = " \n * The '--csrs' option requires MongoDB version 3.2.0 or greater, the current version is %s.\n" % current_version
raise SystemExit(errmsg)

# Exit with error if --csrs is set and MongoDB < 3.1.0
if self.args['csrs'] and LooseVersion(current_version) < LooseVersion("3.1.0"):
errmsg = " \n * The '--csrs' option requires MongoDB version 3.2.0 or greater, the current version is %s.\n" % current_version
raise SystemExit(errmsg)

# add the 'csrs' parameter as default for MongoDB >= 3.3.0
if LooseVersion(current_version) >= LooseVersion("3.3.0"):
# add the 'csrs' parameter as default
self.args['csrs'] = True

# check if authentication is enabled, make key file
Expand Down Expand Up @@ -489,33 +490,10 @@ def stop(self):
""" sub-command stop. This method will parse the list of tags and stop the matching nodes.
Each tag has a set of nodes associated with it, and only the nodes matching all tags (intersection)
will be shut down.
Currently this is an alias for kill()
"""
self.discover()

matches = self._get_ports_from_args(self.args, 'running')
if len(matches) == 0:
raise SystemExit('no nodes stopped.')

for port in matches:
if self.args['verbose']:
print "shutting down localhost:%s" % port

username = self.loaded_args['username'] if self.loaded_args['auth'] else None
password = self.loaded_args['password'] if self.loaded_args['auth'] else None
authdb = self.loaded_args['auth_db'] if self.loaded_args['auth'] else None
shutdown_host(port, username, password, authdb)

# wait until nodes are all shut down
self.wait_for(matches, to_start=False)
print "%i node%s stopped." % (len(matches), '' if len(matches) == 1 else 's')

# there is a very brief period in which nodes are not reachable anymore, but the
# port is not torn down fully yet and an immediate start command would fail. This
# very short sleep prevents that case, and it is practically not noticable by users
time.sleep(0.1)

# refresh discover
self.discover()
self.kill()


def start(self):
Expand Down Expand Up @@ -552,9 +530,12 @@ def start(self):
if len(matches) == 0:
raise SystemExit('no nodes started.')

# start mongod and config servers first
mongod_matches = self.get_tagged(['mongod'])
mongod_matches = mongod_matches.union(self.get_tagged(['config']))
# start config servers first
config_matches = self.get_tagged(['config']).intersection(matches)
self._start_on_ports(config_matches, wait=True)

# start shards next
mongod_matches = self.get_tagged(['mongod']) - self.get_tagged(['config'])
mongod_matches = mongod_matches.intersection(matches)
self._start_on_ports(mongod_matches, wait=True)

Expand Down Expand Up @@ -675,8 +656,8 @@ def kill(self):
matches = self._get_ports_from_args(self.args, 'running')
processes = self._get_processes()

# convert signal to int
sig = self.args['signal']
# convert signal to int, default is SIGTERM for graceful shutdown
sig = self.args.get('signal') or 'SIGTERM'
if type(sig) == int:
pass
elif isinstance(sig, str):
Expand Down Expand Up @@ -708,16 +689,16 @@ def kill(self):


def restart(self):

# get all running processes
processes = self._get_processes()
procs = [processes[k] for k in processes.keys()]

# stop nodes via stop command
self.stop()

# there is a very brief period in which nodes are not reachable anymore, but the
# port is not torn down fully yet and an immediate start command would fail. This
# very short sleep prevents that case, and it is practically not noticable by users
time.sleep(0.1)

# refresh discover
self.discover()
# wait until all processes terminate
psutil.wait_procs(procs)

# start nodes again via start command
self.start()
Expand Down Expand Up @@ -847,6 +828,14 @@ def discover(self):
# add config server to cluster tree
self.cluster_tree.setdefault( 'config', [] ).append( port )

# If not CSRS, set the number of config servers to be 1 or 3
# This is needed, otherwise `mlaunch init --sharded 2 --replicaset --config 2` on <3.3.0 will crash
if not self.args.get('csrs') and self.args['command'] == 'init':
if num_config >= 3:
num_config = 3
else:
num_config = 1

for i in range(num_config):
port = i+current_port

Expand Down Expand Up @@ -1078,8 +1067,8 @@ def _filter_valid_arguments(self, arguments, binary="mongod", config=False):
line = line.lstrip()
if line.startswith('-'):
argument = line.split()[0]
# exception: don't allow --oplogSize for config servers
if config and argument == '--oplogSize':
# exception: don't allow unsupported config server arguments
if config and argument in ['--oplogSize', '--storageEngine', '--smallfiles', '--nojournal']:
continue
accepted_arguments.append(argument)

Expand Down Expand Up @@ -1138,19 +1127,21 @@ def _start_on_ports(self, ports, wait=False, overrideAuth=False):
# this is to set up sharded clusters without auth first, then relaunch with auth
command_str = re.sub(r'--keyFile \S+', '', command_str)

ret = subprocess.call([command_str], stderr=subprocess.STDOUT, stdout=subprocess.PIPE, shell=True)
try:
ret = subprocess.check_output([command_str], stderr=subprocess.STDOUT, shell=True)

binary = command_str.split()[0]
if '--configsvr' in command_str:
binary = 'config server'
binary = command_str.split()[0]
if '--configsvr' in command_str:
binary = 'config server'

if self.args['verbose']:
print "launching: %s" % command_str
else:
print "launching: %s on port %s" % (binary, port)
if self.args['verbose']:
print "launching: %s" % command_str
else:
print "launching: %s on port %s" % (binary, port)

if ret > 0:
raise SystemExit("can't start process, return code %i. tried to launch: %s"% (ret, command_str))
except subprocess.CalledProcessError, e:
print e.output
raise SystemExit("can't start process, return code %i. tried to launch: %s"% (e.returncode, command_str))

if wait:
self.wait_for(ports)
Expand Down Expand Up @@ -1275,7 +1266,7 @@ def _construct_cmdlines(self):

elif self.args['replicaset']:
# construct startup strings for a non-sharded replica set
self._construct_replset(self.dir, self.args['port'], self.args['name'])
self._construct_replset(self.dir, self.args['port'], self.args['name'], range(self.args['nodes']), self.args['arbiter'])

# discover current setup
self.discover()
Expand All @@ -1295,14 +1286,21 @@ def _construct_sharded(self):
self.shard_connection_str.append( self._construct_single(self.dir, nextport, name=shard, extra='--shardsvr') )
nextport += 1
elif self.args['replicaset']:
self.shard_connection_str.append( self._construct_replset(self.dir, nextport, shard, extra='--shardsvr') )
self.shard_connection_str.append( self._construct_replset(self.dir, nextport, shard, num_nodes=range(self.args['nodes']), arbiter=self.args['arbiter'], extra='--shardsvr') )
nextport += self.args['nodes']
if self.args['arbiter']:
nextport += 1

# start up config server(s)
config_string = []
config_names = ['config1', 'config2', 'config3'] if self.args['config'] == 3 else ['config']

# SCCC config servers (MongoDB <3.3.0)
if not self.args['csrs'] and self.args['config'] >= 3:
config_names = ['config1', 'config2', 'config3']
else:
config_names = ['config']

# CSRS config servers (MongoDB >=3.1.0)
if self.args['csrs']:
config_string.append(self._construct_config(self.dir, nextport, "configRepl", True))
else:
Expand Down Expand Up @@ -1331,15 +1329,12 @@ def _construct_sharded(self):
nextport += 1


def _construct_replset(self, basedir, portstart, name, extra=''):
def _construct_replset(self, basedir, portstart, name, num_nodes, arbiter, extra=''):
""" construct command line strings for a replicaset, either for sharded cluster or by itself. """

self.config_docs[name] = {'_id':name, 'members':[]}
# Corner case for csrs to calculate the number of nodes by number of configservers
if '--configsvr' in extra:
num_nodes = range(self.args['config'])
else:
num_nodes = range(self.args['nodes'])

# Construct individual replica set nodes
for i in num_nodes:
datapath = self._create_paths(basedir, '%s/rs%i'%(name, i+1))
self._construct_mongod(os.path.join(datapath, 'db'), os.path.join(datapath, 'mongod.log'), portstart+i, replset=name, extra=extra)
Expand All @@ -1357,7 +1352,7 @@ def _construct_replset(self, basedir, portstart, name, extra=''):
self.config_docs[name]['members'].append(member_config)

# launch arbiter if True
if self.args['arbiter'] and '--configsvr' not in extra:
if arbiter:
datapath = self._create_paths(basedir, '%s/arb'%(name))
self._construct_mongod(os.path.join(datapath, 'db'), os.path.join(datapath, 'mongod.log'), portstart+self.args['nodes'], replset=name)

Expand All @@ -1372,7 +1367,7 @@ def _construct_config(self, basedir, port, name=None, isReplSet=False):
""" construct command line strings for a config server """

if isReplSet:
return self._construct_replset(basedir, port, name, extra='--configsvr')
return self._construct_replset(basedir=basedir, portstart=port, name=name, num_nodes=range(self.args['config']), arbiter=False, extra='--configsvr')
else:
datapath = self._create_paths(basedir, name)
self._construct_mongod(os.path.join(datapath, 'db'), os.path.join(datapath, 'mongod.log'), port, replset=None, extra='--configsvr')
Expand Down

0 comments on commit db77ce9

Please sign in to comment.