Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Better detection of remote host going away #17

Merged
merged 1 commit into from
Oct 28, 2015
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
77 changes: 77 additions & 0 deletions radssh/keepalive.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
#
# Copyright (c) 2014 LexisNexis Risk Data Management Inc.
#
# This file is part of the RadSSH software package.
#
# RadSSH is free software, released under the Revised BSD License.
# You are permitted to use, modify, and redsitribute this software
# according to the Revised BSD License, a copy of which should be
# included with the distribution as file LICENSE.txt
#

'''
RadSSH Keepalive Module
Extension to Paramiko to perform keepalive global-requests with
response, so we can tell if the remote server is responding, instead
of just padding out the local Send-Q buffer with unsendable data.
'''

import threading
import struct

import paramiko

class ServerNotResponding(Exception):
'''
Raised when (threshold) keepalive messages in a row fail to get
any response, allowing better detection of severed connection.
'''
pass

class KeepAlive(object):
'''
Transport global_request() is not able to handle the scenario that
KeepAlive test requires. The "wait" parameter, if True, will include
in the request "want reply", but blocks the calling thread trying
to read the reply; if False, the calling thread is not blocked, but
the global-request is sent without "want reply" set, so there is
nothing on the client side that indicates that the remote server
ever got the keepalive message. Sent messages wind up in the socket
Send-Q buffer, and no errors get detected by this process.

This class sets up a way to send the global-request message with the
"want reply" set, but does not block waiting for responses. After a
short wait, if the ACK is not picked up, a counter for the number of
pending requests is incremented, and only when it exceeds a specified
threshold will a possible Exception be raised.

The ping() call will fabricate a suitable keepalive global-request
message, and use the transport completion_event to track the confirming
response from the remote server. Oddly, RFC 4254 does not specify a
keepalive global-request message; the operation relies on the requirement
that the remote end MUST reply to global-request, even if the reply
is a SSH_MSG_REQUEST_FAILURE. There is nothing special about the
string "keepalive@openssh.com". All that is needed is that the server
sends some response, even if it is a failure, to set the Event.
'''
def __init__(self, transport, threshold=5):
self.transport = transport
self.threshold = threshold
self.transport.completion_event = threading.Event()
self.pending_count = 0

def ping(self):
m = paramiko.Message()
m.add_byte(struct.pack('b',paramiko.common.MSG_GLOBAL_REQUEST))
m.add_string('keepalive@openssh.com')
m.add_boolean(True)
self.transport._send_user_message(m)
self.transport.completion_event.wait(0.1)
if self.transport.completion_event.is_set():
self.transport.completion_event.clear()
self.pending_count = 0
return True
self.pending_count += 1
if self.pending_count > self.threshold:
raise ServerNotResponding(self.transport.getName())
return False
9 changes: 9 additions & 0 deletions radssh/ssh.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
from .console import RadSSHConsole
from .hostkey import HostKeyVerifier
from . import config
from .keepalive import KeepAlive, ServerNotResponding

# If main thread gets KeyboardInterrupt, use this to signal
# running background threads to terminate prior to command completion
Expand Down Expand Up @@ -166,6 +167,7 @@ def exec_command(host, t, cmd, quota, streamQ, encoding='UTF-8'):
if isinstance(t, paramiko.Transport) and t.is_authenticated():
stdout = StreamBuffer(streamQ, (str(host), False), blocksize=2048, encoding=encoding)
stderr = StreamBuffer(streamQ, (str(host), True), blocksize=2048, encoding=encoding)
keepalive = KeepAlive(t)
# If transport has a persistent session (identified by being named same as the transport.remote_version)
# then use the persistent session via send/recv to the shell quasi-interactively, rather than
# creating a single-use session with exec_command, which gives true process termination (exit_status_ready)
Expand Down Expand Up @@ -225,6 +227,13 @@ def exec_command(host, t, cmd, quota, streamQ, encoding='UTF-8'):
# Push out a (nothing) in case the queue needs to do a time-based dump
stdout.push('')
quiet_time += quiet_increment
try:
if quiet_time > 5.0:
keepalive.ping()
except ServerNotResponding:
t.close()
process_completion = '*** Server Not Responding ***'
break
# Read from stderr socket, altered timeout
try:
s.settimeout(0.1)
Expand Down