Skip to content

Commit

Permalink
Fix handling of RPC servers
Browse files Browse the repository at this point in the history
  • Loading branch information
kmaehashi committed Aug 30, 2016
1 parent e9e2d72 commit ce7b677
Showing 1 changed file with 125 additions and 93 deletions.
218 changes: 125 additions & 93 deletions jubakit/base.py
Expand Up @@ -417,12 +417,8 @@ def __init__(self, host='127.0.0.1', port=9199, cluster='', timeout=0):
self._backend = None

def __del__(self):
"""
Destroys the backend process if exists.
"""
backend = self._backend
if backend is not None:
backend.stop()
# Invoke the backend destructor as fast as possible.
self._backend = None

@classmethod
def name(cls):
Expand Down Expand Up @@ -524,93 +520,149 @@ class _ServiceBackend(object):
# Disable verbose tornado WARN logs on connect failure.
logging.getLogger('tornado').setLevel(logging.ERROR)

# Global Port-to-PID mapping.
port2pid = {}

def __init__(self, name, config, port=None):
self.name = name
self.config = config
self.port = port
self.log = None

self._logbuf = None
self._proc = None

self._check_installed(name)

if port is None:
self.port = self._get_free_port()
(started, log) = self._start()
if not started:
raise RuntimeError('failed to start server: {0}'.format(log))

if port in self.port2pid:
raise RuntimeError('port {0} currently in use by another service (PID {1})'.format(port, self.port2pid[port]))
def __del__(self):
# Destruct the process.
try:
proc = self._proc
if proc is not None and proc.poll() is None: # still running
proc.kill()
proc.communicate() # avoid process to become zombie (defunct)
except Exception as e:
print('Exception raised while destructing backend process: {0}'.format(e))

# Destruct the log buffer.
try:
logbuf = self._logbuf
if logbuf is not None and not logbuf.closed: # log buffer is still open
logbuf.close()
except Exception as e:
print('Exception raised while destructing backend log buffer: {0}'.format(e))

def _start(self):
"""
Starts the server instance and returns (is_started, error_log) tuple.
"""
# Number of retries; if user does not specify port, we can retry.
retry = 10 if self.port is None else 1

with tempfile.NamedTemporaryFile(prefix='jubakit-config-') as config_file:
config_file.write(json.dumps(config).encode('utf-8'))
config_file.write(json.dumps(self.config).encode('utf-8'))
config_file.flush()

args = [
'juba{0}'.format(name),
'--listen_addr', '127.0.0.1',
'--rpc-port', str(self.port),
'--timeout', '0',
'--configpath', config_file.name
]
_logger.info('starting service: %s', args)
self._logbuf = tempfile.NamedTemporaryFile(prefix='jubakit-log-')
self._proc = self._get_process(args, stdout=self._logbuf, stderr=subprocess.STDOUT)
self._assign_port(self.port, self._proc.pid)

# Wait until the RPC server start.
started = self._wait_until_rpc_ready(self.port)
if started:
status = self.get_status()
pid = int(status['pid'])
if pid != self._proc.pid:
self._proc.kill()
raise RuntimeError('server cannot be started as port {0} conflicts with external Jubatus process (PID: {1})'.format(self.port, pid))

if not started:
_logger.error('failed to start service')
log = self.stop()
stdout = None
for count in range(retry):
if self.port is None:
# Randomly pick a port number from TCP ports *currently* free.
# Note that the port may be in use when jubatus server tries to bind to it.
self.port = self._get_free_port()

args = [
'juba{0}'.format(self.name),
'--listen_addr', '127.0.0.1',
'--rpc-port', str(self.port),
'--timeout', '0',
'--configpath', config_file.name
]

_logger.debug('trying to start service on port %d: %s', self.port, args)
self._logbuf = tempfile.NamedTemporaryFile(prefix='jubakit-log-')
self._proc = subprocess.Popen(args, stdout=self._logbuf, stderr=subprocess.STDOUT)
pid = self._proc.pid

# Wait until the RPC server start.
started = self._wait_until_rpc_ready(self.port)

if started: # i.e. RPC server is working on the port
# Get status from the remote server.
status = None
try:
status = self.get_status()
except Exception as e:
# Other MessagePack-RPC server that does not respond to get_status RPC (e.g., Jubatus proxy) is running on the port.
_logger.debug('failed to get status from server on port %d: %s', self.port, str(e))
if status:
# Get PID from the server status and compare it with the PID of the process we just started.
remote_pid = int(status['pid'])
if remote_pid == pid:
# Service started successfully.
_logger.debug('service started on port %d with PID %d', self.port, pid)
return (True, None)
# The free port we found was taken by others.
_logger.debug('service PID mismatch (expected %d, got %d); other Jubatus server is already running on port %d', pid, remote_pid, self.port)
# Stop the process.
self._stop()
else: # i.e. RPC server is NOT working on the port
# Stop the process.
(retval, stdout) = self._stop()

# Check the log message from the server.
if 'server failed to start: any process using port' in stdout:
_logger.debug('service failed to start on port %d; the port is in use', self.port)
else:
_logger.debug('service failed to start on port %d: PID %d exit with status %d: %s', self.port, pid, retval, stdout)
break # do not retry; seems like it is not a port issue
self.port = None
else: # for count
_logger.debug('all attempts to start the service failed') # retry limit reached
return (False, stdout)

def _stop(self):
"""
Stops the server instance and returns (retval, stdout) tuple.
"""
(proc, logbuf) = (self._proc, self._logbuf)
(self._proc, self._logbuf) = (None, None)

if proc is None:
return (None, None)

(retval, stdout) = (-1, '(not available)')
if proc.poll() is None: # still running
_logger.debug('process is still running; will be terminated')
proc.terminate()
else:
_logger.debug('process already terminated')
_logger.debug('waiting for process to exit')
proc.communicate()
retval = proc.returncode

def __del__(self):
proc = self._proc
if proc is not None and proc.poll() is None: # still running
proc.kill()
self._unassign_port(self.port, proc.pid)
logbuf = self._logbuf
_logger.debug('gathering stdout from server')
if logbuf is not None and not logbuf.closed: # log buffer is still open
logbuf.seek(0)
stdout = logbuf.read().decode('utf-8')
logbuf.close()

return (retval, stdout)

def stop(self):
"""
Stops the server instance and return the server log.
"""
proc = self._proc
self._proc = None
if proc is not None:
if proc.poll() is None: # still running
_logger.debug('process is still running; will be terminated')
proc.terminate()
else:
_logger.debug('process already terminated')

_logger.debug('waiting for process to exit')
proc.communicate()
self._logbuf.seek(0)
stdout = self._logbuf.read()
self._logbuf.close()

retval = proc.returncode
_logger.debug('process exit with status %d', retval)
self._unassign_port(self.port, proc.pid)
if retval != 0:
raise RuntimeError('server exit with status {0}; confirm that the config is valid: {1}'.format(retval, stdout))
return stdout
(retval, stdout) = self._stop()
_logger.debug('process exit with status %d: %s', retval, stdout)
if retval != 0:
raise RuntimeError('server exit with status {0}; confirm that the config is valid: {1}'.format(retval, stdout))
return stdout

def get_status(self):
cli = msgpackrpc.Client(msgpackrpc.Address('127.0.0.1', self.port), unpack_encoding='utf-8')
try:
return cli.call('get_status', '')['127.0.0.1_{0}'.format(self.port)]
return cli.call('get_status', '').popitem()[1]
finally:
cli.close()
cli._loop._ioloop.close()
Expand All @@ -624,43 +676,23 @@ def _get_free_port(cls, start=10000, end=30000):
ephemeral port range on most platforms.
"""
used_ports = cls._get_ports_in_use()
port = start
while True:
if port not in used_ports: return port
port += 1
if end < port:
raise RuntimeError('no free port available in range [{0},{1}]'.format(start, end))
candidates = [x for x in range(start, end + 1) if x not in used_ports]
if len(candidates) == 0:
raise RuntimeError('no free port available in range [{0},{1}]'.format(start, end))
return random.choice(candidates)

@classmethod
def _get_ports_in_use(cls):
"""
Returns sorted list of ports currently used on localhost.
Returns a set of ports currently used on localhost.
"""
try:
return sorted(set([x.laddr[1] for x in psutil.net_connections(kind='inet4')]))
return set([x.laddr[1] for x in psutil.net_connections(kind='inet4')])
except psutil.AccessDenied:
# On some platforms (such as OS X), root privilege is required to get used ports.
# In that case we avoid port confliction to the best of our knowledge.
_logger.info('ports in use cannot be obtained on this platform; ports will be assigned sequentially')
return sorted(cls.port2pid.keys())

@classmethod
def _assign_port(cls, port, pid):
m = cls.port2pid
if port in m:
raise RuntimeError('port {0} currently in use by PID {1}'.format(port, m[port]))
m[port] = pid

@classmethod
def _unassign_port(cls, port, pid):
m = cls.port2pid
if m is None:
pass # maybe destruction is running.
if port not in m:
raise RuntimeError('port {0} is not in use'.format(port))
if m[port] != pid:
raise RuntimeError('port {0} is used by PID {1}, not PID {2}'.format(port, m[port], pid))
del m[port]
return set()

@classmethod
def _wait_until_rpc_ready(cls, port):
Expand Down

0 comments on commit ce7b677

Please sign in to comment.