diff --git a/README.md b/README.md
index 48cfe016..6bb44bde 100644
--- a/README.md
+++ b/README.md
@@ -102,6 +102,7 @@ When instantiating a connection to a known type of NETCONF server:
- `device_params={'name':'huawei'}`
- `device_params={'name':'huaweiyang'}`
* Alcatel Lucent: `device_params={'name':'alu'}`
+* Nokia SR OS: `device_params={'name':'sros'}`
* H3C: `device_params={'name':'h3c'}`
* HP Comware: `device_params={'name':'hpcomware'}`
* Server or anything not in above: `device_params={'name':'default'}`
diff --git a/README.rst b/README.rst
index 29af7703..80b09db1 100644
--- a/README.rst
+++ b/README.rst
@@ -97,6 +97,7 @@ Supported device handlers
- `device_params={'name':'huawei'}`
- `device_params={'name':'huaweiyang'}`
* Alcatel Lucent: `device_params={'name':'alu'}`
+* Nokia SR OS: `device_params={'name':'sros'}`
* H3C: `device_params={'name':'h3c'}`
* HP Comware: `device_params={'name':'hpcomware'}`
* Server or anything not in above: `device_params={'name':'default'}`
diff --git a/examples/nokia/sros/get_config.py b/examples/nokia/sros/get_config.py
new file mode 100644
index 00000000..c8ee3e05
--- /dev/null
+++ b/examples/nokia/sros/get_config.py
@@ -0,0 +1,74 @@
+#!/usr/bin/env python
+#
+
+import logging
+import sys
+
+from ncclient import manager
+from ncclient.xml_ import to_xml
+from ncclient.operations.rpc import RPCError
+
+_NS_MAP = {
+ 'nc': 'urn:ietf:params:xml:ns:netconf:base:1.0',
+ 'nokia-conf': 'urn:nokia.com:sros:ns:yang:sr:conf'
+}
+
+_NOKIA_MGMT_INT_FILTER = '''
+
+
+
+
+
+''' % (_NS_MAP['nokia-conf'])
+
+def connect(host, port, user, password):
+ m = manager.connect(host=host, port=port,
+ username=user, password=password,
+ device_params={'name': 'sros'},
+ hostkey_verify=False)
+
+ ## Retrieve full configuration from the running datastore
+ running_xml = m.get_config(source='running')
+ logging.info(running_xml)
+
+ ## Retrieve full configuration from the running datastore and strip
+ ## the rpc-reply + data elements
+ running_xml = m.get_config(source='running').xpath(
+ '/nc:rpc-reply/nc:data/nokia-conf:configure',
+ namespaces=_NS_MAP)[0]
+ logging.info(to_xml(running_xml, pretty_print=True))
+
+ ## Retrieve full configuration from the running datastore and strip
+ ## out elements except for /configure/system/management-interface
+ running_xml = m.get_config(source='running').xpath(
+ '/nc:rpc-reply/nc:data/nokia-conf:configure' \
+ '/nokia-conf:system/nokia-conf:management-interface',
+ namespaces=_NS_MAP)[0]
+ logging.info(to_xml(running_xml, pretty_print=True))
+
+ ## Retrieve a specific filtered subtree from the running datastore
+ ## and handle any rpc-error should a portion of the filter criteria
+ ## be invalid
+ try:
+ running_xml = m.get_config(source='running',
+ filter=('subtree', _NOKIA_MGMT_INT_FILTER),
+ with_defaults='report-all').xpath(
+ '/nc:rpc-reply/nc:data/nokia-conf:configure',
+ namespaces=_NS_MAP)[0]
+ logging.info(to_xml(running_xml, pretty_print=True))
+
+ except RPCError as err:
+ logging.info('Error: %s' % err.message.strip())
+
+ m.close_session()
+
+if __name__ == '__main__':
+ LOG_FORMAT = '%(asctime)s %(levelname)s %(filename)s:%(lineno)d %(message)s'
+ logging.basicConfig(stream=sys.stdout, level=logging.INFO, format=LOG_FORMAT)
+
+ try:
+ connect(sys.argv[1], '830', 'admin', 'admin')
+
+ except IndexError:
+ logging.error('Must supply a valid hostname or IP address')
+
diff --git a/examples/nokia/sros/md_cli_raw_command.py b/examples/nokia/sros/md_cli_raw_command.py
new file mode 100644
index 00000000..d40215a4
--- /dev/null
+++ b/examples/nokia/sros/md_cli_raw_command.py
@@ -0,0 +1,92 @@
+#!/usr/bin/env python
+#
+# For Nokia SR OS > 20.10, support for issuing a subset of CLI commands
+# (e.g. 'show', 'admin', 'clear', 'ping', etc..) over NETCONF RPCs is
+# achieved by the use of the YANG 1.1 action
+
+import logging
+import sys
+
+from ncclient import manager
+from ncclient.operations.rpc import RPCError
+
+_NS_MAP = {
+ 'nc': 'urn:ietf:params:xml:ns:netconf:base:1.0',
+ 'nokia-oper': 'urn:nokia.com:sros:ns:yang:sr:oper-global'
+}
+
+def connect(host, port, user, password):
+ m = manager.connect(host=host, port=port,
+ username=user, password=password,
+ device_params={'name': 'sros'},
+ hostkey_verify=False)
+
+ ## Issue 'show card' and display the raw XML output
+ result = m.md_cli_raw_command('show card')
+ logging.info(result)
+
+ ## Issue 'show port' from MD-CLI context and emit only
+ ## the returned command contents
+ result = m.md_cli_raw_command('show port')
+ output = result.xpath(
+ '/nc:rpc-reply/nokia-oper:results' \
+ '/nokia-oper:md-cli-output-block',
+ namespaces=_NS_MAP)[0].text
+ logging.info(output)
+
+ ## Issue 'show version' from Classic CLI context and emit
+ ## only the returned command contents
+ result = m.md_cli_raw_command('//show version')
+ output = result.xpath(
+ '/nc:rpc-reply/nokia-oper:results' \
+ '/nokia-oper:md-cli-output-block',
+ namespaces=_NS_MAP)[0].text
+ logging.info(output)
+
+ ## Issue 'ping' from MD-CLI context and emit only the
+ ## returned command contents
+ result = m.md_cli_raw_command(
+ 'ping 127.0.0.1 router-instance "Base" count 3')
+ output = result.xpath(
+ '/nc:rpc-reply/nokia-oper:results' \
+ '/nokia-oper:md-cli-output-block',
+ namespaces=_NS_MAP)[0].text
+ logging.info(output)
+
+ ## Issue an admin command that returns only NETCONF or
+ ##
+ result = m.md_cli_raw_command('admin save')
+ try:
+ if len(result.xpath('/nc:rpc-reply/nc:ok', namespaces=_NS_MAP)) > 0:
+ logging.info('Admin save successful')
+ else:
+ logging.info('Admin save unsuccessful')
+
+ except RPCError as err:
+ logging.info('Error: %s' % err.message.strip())
+
+
+ ## Issue an unsupported command and handle the RPC error gracefully
+ try:
+ result = m.md_cli_raw_command('configure')
+ if len(result.xpath('/nc:rpc-reply/nc:ok', namespaces=_NS_MAP)) > 0:
+ logging.info('Command successful')
+ else:
+ logging.info('Command unsuccessful')
+
+ except RPCError as err:
+ logging.info('Error: %s' % err.message.strip())
+
+ m.close_session()
+
+if __name__ == '__main__':
+ LOG_FORMAT = '%(asctime)s %(levelname)s %(filename)s:%(lineno)d %(message)s'
+ logging.basicConfig(stream=sys.stdout, level=logging.INFO, format=LOG_FORMAT)
+
+ try:
+ connect(sys.argv[1], '830', 'admin', 'admin')
+
+ except IndexError:
+ logging.error('Must supply a valid hostname or IP address')
+
+
diff --git a/ncclient/devices/__init__.py b/ncclient/devices/__init__.py
index 6b0d6aa3..6e01c8b0 100644
--- a/ncclient/devices/__init__.py
+++ b/ncclient/devices/__init__.py
@@ -1,4 +1,4 @@
-# supported devices config, add new device (eg: 'device name':'device lable').
+# supported devices config, add new device (eg: 'device name':'device label').
supported_devices_cfg = {'junos':'Juniper',
'csr':'Cisco CSR1000v',
'nexus':'Cisco Nexus',
@@ -9,6 +9,7 @@
'alu':'Alcatel Lucent',
'h3c':'H3C',
'hpcomware':'HP Comware',
+ 'sros':'Nokia SR OS',
'default':'Server or anything not in above'}
def get_supported_devices():
diff --git a/ncclient/devices/sros.py b/ncclient/devices/sros.py
new file mode 100644
index 00000000..275b1ec0
--- /dev/null
+++ b/ncclient/devices/sros.py
@@ -0,0 +1,54 @@
+from lxml import etree
+
+from .default import DefaultDeviceHandler
+from ncclient.operations.third_party.sros.rpc import MdCliRawCommand
+from ncclient.xml_ import BASE_NS_1_0
+
+
+def passthrough(xml):
+ return xml
+
+class SrosDeviceHandler(DefaultDeviceHandler):
+ """
+ Nokia SR OS handler for device specific information.
+ """
+
+ def __init__(self, device_params):
+ super(SrosDeviceHandler, self).__init__(device_params)
+
+ def get_capabilities(self):
+ """Set SR OS device handler client capabilities
+
+ Set additional capabilities beyond the default device handler.
+
+ Returns:
+ A list of strings representing NETCONF capabilities to be
+ sent to the server.
+ """
+ base = super(SrosDeviceHandler, self).get_capabilities()
+ additional = [
+ 'urn:ietf:params:xml:ns:netconf:base:1.0',
+ 'urn:ietf:params:xml:ns:yang:1',
+ 'urn:ietf:params:netconf:capability:confirmed-commit:1.1',
+ 'urn:ietf:params:netconf:capability:validate:1.1']
+ return base + additional
+
+ def get_xml_base_namespace_dict(self):
+ return {None: BASE_NS_1_0}
+
+ def get_xml_extra_prefix_kwargs(self):
+ d = {}
+ d.update(self.get_xml_base_namespace_dict())
+ return {"nsmap": d}
+
+ def add_additional_operations(self):
+ operations = {
+ 'md_cli_raw_command': MdCliRawCommand
+ }
+ return operations
+
+ def perform_qualify_check(self):
+ return False
+
+ def transform_reply(self):
+ return passthrough
diff --git a/ncclient/operations/third_party/sros/__init__.py b/ncclient/operations/third_party/sros/__init__.py
new file mode 100644
index 00000000..e69de29b
diff --git a/ncclient/operations/third_party/sros/rpc.py b/ncclient/operations/third_party/sros/rpc.py
new file mode 100644
index 00000000..c321b581
--- /dev/null
+++ b/ncclient/operations/third_party/sros/rpc.py
@@ -0,0 +1,26 @@
+from ncclient.xml_ import *
+
+from ncclient.operations.rpc import RPC
+
+def global_operations(node):
+ """Instantiate an SR OS global operation action element
+
+ Args:
+ node: A string representing the top-level action for a
+ given global operation.
+ Returns:
+ A tuple of 'lxml.etree._Element' values. The first value
+ represents the top-level YANG action element and the second
+ represents the caller supplied initial node.
+ """
+ parent, child = yang_action('global-operations',
+ attrs={'xmlns': SROS_GLOBAL_OPS_NS})
+ ele = sub_ele(child, node)
+ return (parent, ele)
+
+class MdCliRawCommand(RPC):
+ def request(self, command=None):
+ node, raw_cmd_node = global_operations('md-cli-raw-command')
+ sub_ele(raw_cmd_node, 'md-cli-input-line').text = command
+ self._huge_tree = True
+ return self._request(node)
diff --git a/ncclient/xml_.py b/ncclient/xml_.py
index ef17980d..e47463d6 100644
--- a/ncclient/xml_.py
+++ b/ncclient/xml_.py
@@ -45,9 +45,11 @@ class XMLError(NCClientError):
#: Base NETCONF namespace
BASE_NS_1_0 = "urn:ietf:params:xml:ns:netconf:base:1.0"
-# NXOS_1_0
+#: YANG (RFC 6020/RFC 7950) namespace
+YANG_NS_1_0 = "urn:ietf:params:xml:ns:yang:1"
+#: NXOS_1_0
NXOS_1_0 = "http://www.cisco.com/nxos:1.0"
-# NXOS_IF
+#: NXOS_IF
NXOS_IF = "http://www.cisco.com/nxos:1.0:if_manager"
#: Namespace for Tail-f core data model
TAILF_AAA_1_1 = "http://tail-f.com/ns/aaa/1.1"
@@ -75,9 +77,12 @@ class XMLError(NCClientError):
NETCONF_NOTIFICATION_NS = "urn:ietf:params:xml:ns:netconf:notification:1.0"
#: Namespace for netconf with-defaults (RFC 6243)
NETCONF_WITH_DEFAULTS_NS = "urn:ietf:params:xml:ns:yang:ietf-netconf-with-defaults"
-#: Namespace for Alcatel-Lucent
+#: Namespace for Alcatel-Lucent SR OS Base r13 YANG models
ALU_CONFIG = "urn:alcatel-lucent.com:sros:ns:yang:conf-r13"
-#
+#: Namespace for Nokia SR OS global operations
+SROS_GLOBAL_OPS_NS = "urn:nokia.com:sros:ns:yang:sr:oper-global"
+
+
try:
register_namespace = etree.register_namespace
except AttributeError:
@@ -173,13 +178,22 @@ def __init__(self, result, transform_reply, huge_tree=False):
else:
self.__doc = self.remove_namespaces(self.__result)
- def xpath(self, expression):
- """
- return result for a call to lxml xpath()
- output will be a list
+ def xpath(self, expression, namespaces={}):
+ """Perform XPath navigation on an object
+
+ Args:
+ expression: A string representing a compliant XPath
+ expression.
+ namespaces: A dict of caller supplied prefix/xmlns to
+ append to the static dict of XPath namespaces.
+ Returns:
+ A list of 'lxml.etree._Element' should a match on the
+ expression be successful. Otherwise, an empty list will
+ be returned to the caller.
"""
self.__expression = expression
self.__namespaces = XPATH_NAMESPACES
+ self.__namespaces.update(namespaces)
return self.__doc.xpath(self.__expression, namespaces=self.__namespaces)
def find(self, expression):
@@ -199,7 +213,7 @@ def findall(self, expression):
def __str__(self):
"""syntactic sugar for str() - alias to tostring"""
- if sys.version<'3':
+ if sys.version < '3':
return self.tostring
else:
return self.tostring.decode('UTF-8')
@@ -232,6 +246,22 @@ def parent_ns(node):
return node.nsmap[node.prefix]
return None
+def yang_action(name, attrs):
+ """Instantiate a YANG action element
+
+ Args:
+ name: A string representing the first descendant name of the
+ XML element for the YANG action.
+ attrs: A dict of attributes to apply to the XML element
+ (e.g. namespaces).
+ Returns:
+ A tuple of 'lxml.etree._Element' values. The first value
+ represents the top-level YANG action element and the second
+ represents the caller supplied initial node.
+ """
+ node = new_ele('action', attrs={'xmlns': YANG_NS_1_0})
+ return (node, sub_ele(node, name, attrs))
+
new_ele_nsmap = lambda tag, nsmap, attrs={}, **extra: etree.Element(qualify(tag), attrs, nsmap, **extra)
new_ele = lambda tag, attrs={}, **extra: etree.Element(qualify(tag), attrs, **extra)
diff --git a/setup.py b/setup.py
index d166bed5..68e00412 100644
--- a/setup.py
+++ b/setup.py
@@ -23,7 +23,7 @@
import versioneer
__author__ = "Shikhar Bhushan, Leonidas Poulopoulos, Ebben Aries, Einar Nilsen-Nygaard"
-__author_email__ = "shikhar@schmizz.net, lpoulopoulos@verisign.com, earies@juniper.net, einarnn@gmail.com"
+__author_email__ = "shikhar@schmizz.net, lpoulopoulos@verisign.com, exa@dscp.org, einarnn@gmail.com"
__licence__ = "Apache 2.0"
if sys.version_info.major == 2 and sys.version_info.minor < 7:
diff --git a/test/unit/devices/test_get_supported_devices.py b/test/unit/devices/test_get_supported_devices.py
index 3394dda3..59d9d019 100644
--- a/test/unit/devices/test_get_supported_devices.py
+++ b/test/unit/devices/test_get_supported_devices.py
@@ -15,6 +15,7 @@ def test_get_supported_devices(self):
'alu',
'h3c',
'hpcomware',
+ 'sros',
'default')))
def test_get_supported_device_labels(self):
@@ -29,5 +30,6 @@ def test_get_supported_device_labels(self):
'alu':'Alcatel Lucent',
'h3c':'H3C',
'hpcomware':'HP Comware',
+ 'sros':'Nokia SR OS',
'default':'Server or anything not in above'})
diff --git a/test/unit/devices/test_sros.py b/test/unit/devices/test_sros.py
new file mode 100644
index 00000000..a4684f3a
--- /dev/null
+++ b/test/unit/devices/test_sros.py
@@ -0,0 +1,66 @@
+import unittest
+
+from ncclient.devices.sros import *
+from ncclient.xml_ import *
+
+capabilities = ['urn:ietf:params:netconf:base:1.0',
+ 'urn:ietf:params:netconf:base:1.1',
+ 'urn:ietf:params:netconf:capability:writable-running:1.0',
+ 'urn:ietf:params:netconf:capability:candidate:1.0',
+ 'urn:ietf:params:netconf:capability:confirmed-commit:1.0',
+ 'urn:ietf:params:netconf:capability:rollback-on-error:1.0',
+ 'urn:ietf:params:netconf:capability:startup:1.0',
+ 'urn:ietf:params:netconf:capability:url:1.0?scheme=http,ftp,file,https,sftp',
+ 'urn:ietf:params:netconf:capability:validate:1.0',
+ 'urn:ietf:params:netconf:capability:xpath:1.0',
+ 'urn:ietf:params:netconf:capability:notification:1.0',
+ 'urn:liberouter:params:netconf:capability:power-control:1.0',
+ 'urn:ietf:params:netconf:capability:interleave:1.0',
+ 'urn:ietf:params:netconf:capability:with-defaults:1.0',
+ 'urn:ietf:params:xml:ns:netconf:base:1.0',
+ 'urn:ietf:params:xml:ns:yang:1',
+ 'urn:ietf:params:netconf:capability:confirmed-commit:1.1',
+ 'urn:ietf:params:netconf:capability:validate:1.1']
+
+xml = """
+
+TiMOS-B-20.10.B1-5 both/x86_64 Nokia 7750 SR Copyright (c) 2000-2020 Nokia.
+
+All rights reserved. All use subject to applicable license agreements.
+
+Built on Fri Oct 2 18:11:20 PDT 2020 by builder in /builds/c/2010B/B1-5/panos/main/sros
+
+
+
+"""
+
+class TestSrosDevice(unittest.TestCase):
+
+ def setUp(self):
+ self.obj = SrosDeviceHandler({'name': 'sros'})
+
+ def test_add_additional_operations(self):
+ expected = dict()
+ expected['md_cli_raw_command'] = MdCliRawCommand
+ self.assertDictEqual(expected, self.obj.add_additional_operations())
+
+ def test_transform_reply(self):
+ expected = xml
+ actual = self.obj.transform_reply()
+ ele = to_ele(xml)
+ self.assertEqual(expected, to_xml(actual(ele)))
+
+ def test_get_capabilities(self):
+ self.assertListEqual(capabilities, self.obj.get_capabilities())
+
+ def test_get_xml_base_namespace_dict(self):
+ expected = {None: BASE_NS_1_0}
+ self.assertDictEqual(expected, self.obj.get_xml_base_namespace_dict())
+
+ def test_get_xml_extra_prefix_kwargs(self):
+ expected = dict()
+ expected['nsmap'] = self.obj.get_xml_base_namespace_dict()
+ self.assertDictEqual(expected, self.obj.get_xml_extra_prefix_kwargs())
+
+ def test_perform_qualify_check(self):
+ self.assertFalse(self.obj.perform_qualify_check())
diff --git a/test/unit/operations/third_party/sros/__init__.py b/test/unit/operations/third_party/sros/__init__.py
new file mode 100644
index 00000000..e69de29b
diff --git a/test/unit/operations/third_party/sros/test_rpc.py b/test/unit/operations/third_party/sros/test_rpc.py
new file mode 100644
index 00000000..ba9e73d3
--- /dev/null
+++ b/test/unit/operations/third_party/sros/test_rpc.py
@@ -0,0 +1,20 @@
+import unittest
+from mock import patch
+from ncclient import manager
+import ncclient.transport
+from ncclient.operations.third_party.sros.rpc import *
+
+
+class TestRPC(unittest.TestCase):
+ def setUp(self):
+ self.device_handler = manager.make_device_handler({'name': 'sros'})
+
+ @patch('ncclient.operations.third_party.sros.rpc.RPC._request')
+ def test_MdCliRawCommand(self, mock_request):
+ mock_request.return_value = 'sros'
+ expected = 'sros'
+ session = ncclient.transport.SSHSession(self.device_handler)
+ obj = MdCliRawCommand(session, self.device_handler)
+ command = 'show version'
+ actual = obj.request(command=command)
+ self.assertEqual(expected, actual)