Skip to content

Commit

Permalink
selector changes (#260)
Browse files Browse the repository at this point in the history
Changes to use selectors instead of select, allowing greater scalability via Epoll or Kqueue implementations.
  • Loading branch information
einarnn authored Aug 17, 2018
1 parent d8126f8 commit 8ef20f2
Show file tree
Hide file tree
Showing 4 changed files with 90 additions and 51 deletions.
4 changes: 4 additions & 0 deletions ncclient/transport/session.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ def __init__(self, capabilities):
self._notification_q = Queue()
self._client_capabilities = capabilities
self._server_capabilities = None # yet
self._base = "1.0"
self._id = None # session-id
self._connected = False # to be set/cleared by subclass implementation
logger.debug('%r created: client_capabilities=%r' %
Expand Down Expand Up @@ -107,6 +108,9 @@ def err_cb(err):
raise error[0]
#if ':base:1.0' not in self.server_capabilities:
# raise MissingCapabilityError(':base:1.0')
if 'urn:ietf:params:netconf:base:1.1' in self._server_capabilities and 'urn:ietf:params:netconf:base:1.1' in self._client_capabilities:
logger.debug("After 'hello' message selecting netconf:base:1.1 for encoding")
self._base = "1.1"
logger.info('initialized: session-id=%s | server_capabilities=%s' %
(self._id, self._server_capabilities))

Expand Down
78 changes: 32 additions & 46 deletions ncclient/transport/ssh.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,11 @@
import threading
from binascii import hexlify
from lxml import etree
from select import select

try:
import selectors
except ImportError:
import selectors2 as selectors

from ncclient.capabilities import Capabilities

Expand Down Expand Up @@ -169,7 +173,6 @@ def _parse11(self):
chunk = b''.join(chunk_list)
message_list.append(textify(chunk))
break # done reading
logger.debug('x: %s', x)
if state == idle:
if x == b'\n':
state = instart
Expand Down Expand Up @@ -545,60 +548,43 @@ def run(self):
def start_delim(data_len): return '\n#%s\n'%(data_len)

try:
s = selectors.DefaultSelector()
s.register(chan, selectors.EVENT_READ)
logger.debug('selector type = %s', s.__class__.__name__)
while True:
# select on a paramiko ssh channel object does not ever return it in the writable list, so channels don't exactly emulate the socket api
r, w, e = select([chan], [], [], TICK)
# will wakeup evey TICK seconds to check if something to send, more if something to read (due to select returning chan in readable list)
if r:

# Log what netconf:base version we are using this time
# round the loop; _base is updated when we receive the
# server's capabilities.
logger.debug('Currently selected netconf:base:%s', self._base)

# Will wakeup evey TICK seconds to check if something
# to send, more quickly if something to read (due to
# select returning chan in readable list).
events = s.select(timeout=TICK)
if events:
data = chan.recv(BUF_SIZE)
if data:
self._buffer.write(data)
if self._server_capabilities:
if 'urn:ietf:params:netconf:base:1.1' in self._server_capabilities and 'urn:ietf:params:netconf:base:1.1' in self._client_capabilities:
logger.debug("Selecting netconf:base:1.1 for encoding")
self._parse11()
elif 'urn:ietf:params:netconf:base:1.0' in self._server_capabilities or 'urn:ietf:params:xml:ns:netconf:base:1.0' in self._server_capabilities or 'urn:ietf:params:netconf:base:1.0' in self._client_capabilities:
logger.debug("Selecting netconf:base:1.0 for encoding")
self._parse10()
else: raise Exception
if self._base == "1.1":
self._parse11()
else:
self._parse10() # HELLO msg uses EOM markers.
self._parse10()
else:
raise SessionCloseError(self._buffer.getvalue())
if not q.empty() and chan.send_ready():
logger.debug("Sending message")
data = q.get()
try:
# send a HELLO msg using v1.0 EOM markers.
validated_element(data, tags='{urn:ietf:params:xml:ns:netconf:base:1.0}hello')
data = "%s%s"%(data, MSG_DELIM)
except XMLError:
# this is not a HELLO msg
# we publish v1.1 support
if 'urn:ietf:params:netconf:base:1.1' in self._client_capabilities:
if self._server_capabilities:
if 'urn:ietf:params:netconf:base:1.1' in self._server_capabilities:
# send using v1.1 chunked framing
data = "%s%s%s"%(start_delim(len(data)), data, END_DELIM)
elif 'urn:ietf:params:netconf:base:1.0' in self._server_capabilities or 'urn:ietf:params:xml:ns:netconf:base:1.0' in self._server_capabilities:
# send using v1.0 EOM markers
data = "%s%s"%(data, MSG_DELIM)
else: raise Exception
else:
logger.debug('HELLO msg was sent, but server capabilities are still not known')
raise Exception
# we publish only v1.0 support
else:
# send using v1.0 EOM markers
data = "%s%s"%(data, MSG_DELIM)
finally:
logger.info("Sending to %s session %s:\n%s",
self.host, self.id, data)
while data:
n = chan.send(data)
if n <= 0:
raise SessionCloseError(self._buffer.getvalue(), data)
data = data[n:]
if self._base == "1.1":
data = "%s%s%s" % (start_delim(len(data)), data, END_DELIM)
else:
data = "%s%s" % (data, MSG_DELIM)
logger.debug("Sending: %s", data)
while data:
n = chan.send(data)
if n <= 0:
raise SessionCloseError(self._buffer.getvalue(), data)
data = data[n:]
except Exception as e:
logger.debug("Broke out of main loop, error=%r", e)
self._dispatch_error(e)
Expand Down
1 change: 1 addition & 0 deletions requirements.txt
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
setuptools>0.6
paramiko>=1.15.0
lxml>=3.3.0
selectors2>=2.0.1
six
58 changes: 53 additions & 5 deletions test/unit/transport/test_ssh.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,12 @@
from ncclient.devices.junos import JunosDeviceHandler
import sys

try:
import selectors
except ImportError:
import selectors2 as selectors


reply_data = """<rpc-reply xmlns:junos="http://xml.juniper.net/junos/12.1X46/junos" attrib1 = "test">
<software-information>
<host-name>R1</host-name>
Expand Down Expand Up @@ -244,12 +250,51 @@ def test_load_host_key_2(self, mock_load, mock_os):
obj.load_known_hosts()
mock_load.assert_called_once_with("file_name")

@unittest.skipIf(sys.version_info.major == 2, "test not supported < Python3")
@patch('ncclient.transport.ssh.SSHSession.close')
@patch('paramiko.channel.Channel.recv')
@patch('selectors.DefaultSelector.select')
@patch('ncclient.transport.ssh.Session._dispatch_error')
def test_run_receive_py3(self, mock_error, mock_selector, mock_recv, mock_close):
mock_selector.return_value = True
mock_recv.return_value = 0
device_handler = JunosDeviceHandler({'name': 'junos'})
obj = SSHSession(device_handler)
obj._channel = paramiko.Channel("c100")
obj.run()
self.assertTrue(
isinstance(
mock_error.call_args_list[0][0][0],
SessionCloseError))

@unittest.skipIf(sys.version_info.major == 2, "test not supported < Python3")
@patch('ncclient.transport.ssh.SSHSession.close')
@patch('paramiko.channel.Channel.send_ready')
@patch('paramiko.channel.Channel.send')
@patch('selectors.DefaultSelector.select')
@patch('ncclient.transport.ssh.Session._dispatch_error')
def test_run_send_py3(self, mock_error, mock_selector, mock_send, mock_ready, mock_close):
mock_selector.return_value = False
mock_ready.return_value = True
mock_send.return_value = -1
device_handler = JunosDeviceHandler({'name': 'junos'})
obj = SSHSession(device_handler)
obj._channel = paramiko.Channel("c100")
obj._q.put("rpc")
obj.run()
self.assertEqual(mock_send.call_args_list[0][0][0], "rpc]]>]]>")
self.assertTrue(
isinstance(
mock_error.call_args_list[0][0][0],
SessionCloseError))

@unittest.skipIf(sys.version_info.major >= 3, "test not supported >= Python3")
@patch('ncclient.transport.ssh.SSHSession.close')
@patch('paramiko.channel.Channel.recv')
@patch('ncclient.transport.ssh.select')
@patch('selectors2.DefaultSelector')
@patch('ncclient.transport.ssh.Session._dispatch_error')
def test_run_recieve(self, mock_error, mock_select, mock_recv, mock_close):
mock_select.return_value = True, None, None
def test_run_receive_py2(self, mock_error, mock_selector, mock_recv, mock_close):
mock_selector.select.return_value = True
mock_recv.return_value = 0
device_handler = JunosDeviceHandler({'name': 'junos'})
obj = SSHSession(device_handler)
Expand All @@ -260,19 +305,22 @@ def test_run_recieve(self, mock_error, mock_select, mock_recv, mock_close):
mock_error.call_args_list[0][0][0],
SessionCloseError))

@unittest.skip("test currently non-functional")
@patch('ncclient.transport.ssh.SSHSession.close')
@patch('paramiko.channel.Channel.send_ready')
@patch('paramiko.channel.Channel.send')
@patch('selectors2.DefaultSelector')
@patch('ncclient.transport.ssh.Session._dispatch_error')
def test_run_send(self, mock_error, mock_send, mock_ready, mock_close):
def test_run_send_py2(self, mock_error, mock_selector, mock_send, mock_ready, mock_close):
mock_selector.select.return_value = False
mock_ready.return_value = True
mock_send.return_value = -1
device_handler = JunosDeviceHandler({'name': 'junos'})
obj = SSHSession(device_handler)
obj._channel = paramiko.Channel("c100")
obj._q.put("rpc")
obj.run()
self.assertEqual(mock_send.call_args_list[0][0][0], "rpc")
self.assertEqual(mock_send.call_args_list[0][0][0], "rpc]]>]]>")
self.assertTrue(
isinstance(
mock_error.call_args_list[0][0][0],
Expand Down

0 comments on commit 8ef20f2

Please sign in to comment.