forked from Juniper/py-junos-eznc
/
console.py
341 lines (295 loc) · 12.7 KB
/
console.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
"""
This file defines the 'netconifyCmdo' class.
Used by the 'netconify' shell utility.
"""
import traceback
import sys
import logging
import warnings
import socket
# 3rd-party packages
from ncclient.devices.junos import JunosDeviceHandler
from lxml import etree
from jnpr.junos.transport.tty_telnet import Telnet
from jnpr.junos.transport.tty_serial import Serial
from jnpr.junos.transport.tty_ssh import SSH
from ncclient.xml_ import NCElement
from jnpr.junos.device import _Connection
# local modules
from jnpr.junos.rpcmeta import _RpcMetaExec
from jnpr.junos.factcache import _FactCache
from jnpr.junos import jxml as JXML
from jnpr.junos.ofacts import *
from jnpr.junos.decorators import ignoreWarnDecorator
from jnpr.junos.device import _Jinja2ldr
logger = logging.getLogger("jnpr.junos.console")
class Console(_Connection):
def __init__(self, **kvargs):
"""
NoobDevice object constructor.
:param str host:
**REQUIRED** host-name or ipaddress of target device
:param str user:
*OPTIONAL* login user-name, uses root if not provided
:param str passwd:
*OPTIONAL* in console connection for device at zeroized state
password is not required
:param int port:
*OPTIONAL* port, defaults to '23' for telnet mode and
'/dev/ttyUSB0' for serial.
:param int baud:
*OPTIONAL* baud, default baud rate is 9600
:param str mode:
*OPTIONAL* mode, mode of connection (telnet/serial)
default is telnet
:param int timeout:
*OPTIONAL* timeout, default is 0.5
:param int attempts:
*OPTIONAL* attempts, default is 10
:param str ssh_config:
*OPTIONAL* The path to the SSH configuration file.
This can be used to load SSH information from a configuration file.
By default ~/.ssh/config is queried it will be used by SCP class.
So its assumed ssh is enabled by the time we use SCP functionality.
:param bool gather_facts:
*OPTIONAL* Defaults to ``False``. If ``False`` and old-style fact
gathering is in use then facts are not gathered on call to
:meth:`open`. This argument is a no-op when new-style fact
gathering is in use (the default.)
:param str fact_style:
*OPTIONAL* The style of fact gathering to use. Valid values are:
'new', 'old', or 'both'. The default is 'new'. The value 'both' is
only present for debugging purposes. It will be removed in a future
release. The value 'old' is only present to workaround bugs in
new-style fact gathering. It will be removed in a future release.
:param bool console_has_banner:
*OPTIONAL* default is ``False``. If ``False`` then in case of a
hung state, <close-session/> rpc is sent to the console.
If ``True``, after sleep(5), a new-line is sent
"""
# ----------------------------------------
# setup instance connection/open variables
# ----------------------------------------
self._tty = None
self._ofacts = {}
self.connected = False
self._skip_logout = False
self.results = dict(changed=False, failed=False, errmsg=None)
# hostname is not required in serial mode connection
self._hostname = kvargs.get('host')
self._auth_user = kvargs.get('user', 'root')
self._conf_auth_user = None
self._conf_ssh_private_key_file = None
self._auth_password = kvargs.get(
'password',
'') or kvargs.get(
'passwd',
'')
self.cs_user = kvargs.get('cs_user')
self.cs_passwd = kvargs.get('cs_passwd')
self._port = kvargs.get('port', '22' if self.cs_user else '23')
self._mode = kvargs.get('mode', None if self.cs_user else 'telnet')
self._baud = kvargs.get('baud', '9600')
if self._hostname:
self._ssh_config = kvargs.get('ssh_config')
self._sshconf_lkup()
self._ssh_private_key_file = kvargs.get('ssh_private_key_file') \
or self._conf_ssh_private_key_file
self._timeout = kvargs.get('timeout', '0.5')
self._normalize = kvargs.get('normalize', False)
self._attempts = kvargs.get('attempts', 10)
self._gather_facts = kvargs.get('gather_facts', False)
self._fact_style = kvargs.get('fact_style', 'new')
self._huge_tree = kvargs.get('huge_tree', False)
if self._fact_style != 'new':
warnings.warn('fact-style %s will be removed in '
'a future release.' %
(self._fact_style), RuntimeWarning)
self.console_has_banner = kvargs.get('console_has_banner', False)
self.rpc = _RpcMetaExec(self)
self._manages = []
self.junos_dev_handler = JunosDeviceHandler(
device_params={'name': 'junos',
'local': False})
self._j2ldr = _Jinja2ldr
if self._fact_style == 'old':
self.facts = self.ofacts
else:
self.facts = _FactCache(self)
@property
def timeout(self):
"""
:returns: current console connection timeout value (int) in seconds.
"""
return self._timeout
@timeout.setter
def timeout(self, value):
"""
Used to change the console connection timeout value (default=0.5 sec).
:param int value:
New timeout value in seconds
"""
self._timeout = value
@property
def transform(self):
"""
:returns: the current RPC XML Transformation.
"""
return self.junos_dev_handler.transform_reply
@transform.setter
def transform(self, func):
"""
Used to change the RPC XML Transformation.
:param lambda value:
New transform lambda
"""
self.junos_dev_handler.transform_reply = func
def open(self, *vargs, **kvargs):
"""
Opens a connection to the device using existing login/auth
information.
:param bool gather_facts:
If set to ``True``/``False`` will override the device
instance value for only this open process
"""
# ---------------------------------------------------------------
# validate device hostname or IP address
# ---------------------------------------------------------------
if ((self._mode and self._mode.upper() == 'TELNET') or
self.cs_user is not None) and self._hostname is None:
self.results['failed'] = True
self.results[
'errmsg'] = 'ERROR: Device hostname/IP not specified !!!'
return self.results
# ---------------------------------------------------------------
# validate console server and password. Password-less connection
# is not supported
# ---------------------------------------------------------------
if self.cs_user is not None and self.cs_passwd is None:
self.results['failed'] = True
self.results[
'errmsg'] = 'ERROR: Console SSH, Password-less connection is ' \
'not supported !!!'
logger.error(self.results['errmsg'])
return self.results
# --------------------
# login to the CONSOLE
# --------------------
try:
self._tty_login()
except RuntimeError as err:
logger.error("ERROR: {}:{}\n".format('login', str(err)))
logger.error(
"\nComplete traceback message: {}".format(
traceback.format_exc()))
raise err
except Exception as ex:
logger.error("Exception occurred: {}:{}\n".format('login',
str(ex)))
raise ex
self.connected = True
self._nc_transform = self.transform
self._norm_transform = lambda: JXML.normalize_xslt.encode('UTF-8')
# normalize argument to open() overrides normalize argument value
# to __init__(). Save value to self._normalize where it is used by
# normalizeDecorator()
self._normalize = kvargs.get('normalize', self._normalize)
if self._normalize is True:
self.transform = self._norm_transform
gather_facts = kvargs.get('gather_facts', self._gather_facts)
if gather_facts is True:
logger.info('facts: retrieving device facts...')
self.facts_refresh()
self.results['facts'] = self.facts
return self
def close(self, skip_logout=False):
"""
Closes the connection to the device.
"""
if skip_logout is False and self.connected is True:
try:
self._tty_logout()
except socket.error as err:
# if err contains "Connection reset by peer" connection to the
# device got closed
if "Connection reset by peer" not in str(err):
raise err
except EOFError as err:
if "telnet connection closed" not in str(err):
raise err
except Exception as err:
logger.error("ERROR {}:{}\n".format('logout', str(err)))
raise err
self.connected = False
elif self.connected is True:
try:
self._tty._tty_close()
except Exception as err:
logger.error("ERROR {}:{}\n".format('close', str(err)))
logger.error(
"\nComplete traceback message: {}".format(
traceback.format_exc()))
raise err
self.connected = False
@ignoreWarnDecorator
def _rpc_reply(self, rpc_cmd_e, *args, **kwargs):
encode = None if sys.version < '3' else 'unicode'
rpc_cmd = etree.tostring(rpc_cmd_e, encoding=encode) \
if isinstance(rpc_cmd_e, etree._Element) else rpc_cmd_e
reply = self._tty.nc.rpc(rpc_cmd)
rpc_rsp_e = NCElement(reply,
self.junos_dev_handler.transform_reply(),
self._huge_tree
)._NCElement__doc
return rpc_rsp_e
# -------------------------------------------------------------------------
# LOGIN/LOGOUT
# -------------------------------------------------------------------------
def _tty_login(self):
tty_args = dict()
tty_args['user'] = self._auth_user
tty_args['passwd'] = self._auth_password
tty_args['timeout'] = float(self._timeout)
tty_args['attempts'] = int(self._attempts)
tty_args['baud'] = self._baud
tty_args['huge_tree'] = self._huge_tree
if self._mode and self._mode.upper() == 'TELNET':
tty_args['host'] = self._hostname
tty_args['port'] = self._port
tty_args['console_has_banner'] = self.console_has_banner
self.console = ('telnet', self._hostname, self.port)
self._tty = Telnet(**tty_args)
elif self.cs_user is not None:
tty_args['cs_user'] = self.cs_user
tty_args['cs_passwd'] = self.cs_passwd
tty_args['host'] = self._hostname
tty_args['port'] = self._port
tty_args['console_has_banner'] = self.console_has_banner
tty_args['ssh_private_key_file'] = self._ssh_private_key_file
self.console = ('ssh', self._hostname, self.port)
self._tty = SSH(**tty_args)
elif self._mode.upper() == 'SERIAL':
tty_args['port'] = self._port
self.console = ('serial', self._port)
self._tty = Serial(**tty_args)
else:
logger.error('Mode should be either telnet or serial')
raise AttributeError('Mode to be telnet/serial')
self._tty.login()
def _tty_logout(self):
self._tty.logout()
def zeroize(self):
""" perform device ZEROIZE actions """
logger.info("zeroize : ZEROIZE device, rebooting")
self._tty.nc.zeroize()
self._skip_logout = True
self.results['changed'] = True
# -----------------------------------------------------------------------
# Context Manager
# -----------------------------------------------------------------------
def __enter__(self):
self._conn = self.open()
return self
def __exit__(self, exc_type, exc_val, exc_tb):
if self.connected:
self.close()