/
nornir_os_upgrade.py
98 lines (78 loc) · 2.43 KB
/
nornir_os_upgrade.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
import re
import sys
from nornir import InitNornir
from nornir.plugins.tasks.networking import netmiko_file_transfer
from nornir.plugins.tasks.networking import netmiko_send_command
from nornir.plugins.tasks.networking import netmiko_send_config
from nornir_utilities import nornir_set_creds, std_print
def os_upgrade(task):
file_name = task.host.get('img')
task.run(
task=netmiko_file_transfer,
source_file=file_name,
dest_file=file_name,
direction='put',
)
return ""
def set_boot_var(task):
"""
Set the boot variable for Cisco IOS.
return True if boot variable set
return False if staging verification steps failed
"""
primary_img = task.host.get('img')
backup_img = task.host.get('backup_img')
# Check images are on the device
for img in (primary_img, backup_img):
result = task.run(
task=netmiko_send_command,
command_string=f"dir flash:/{img}"
)
output = result[0].result
# Drop the first line as that line always contains the filename
output = re.split(r"Directory of.*", output, flags=re.M)[1]
if img not in output:
return False
commands = f"""
default boot system
boot system flash {primary_img}
boot system flash {backup_img}
"""
command_list = commands.strip().splitlines()
task.run(
task=netmiko_send_config,
config_commands=command_list
)
return True
def continue_func(msg="Do you want to continue (y/n)? "):
response = input(msg).lower()
if 'y' in response:
return True
else:
sys.exit()
def main():
# Initialize Nornir object using default "SimpleInventory" plugin
nr = InitNornir()
nornir_set_creds(nr)
result = nr.run(
task=os_upgrade,
num_workers=20,
)
std_print(result)
# Filter to only a single device
nr_ios = nr.filter(hostname="cisco1.domain.com")
aggr_result = nr_ios.run(task=set_boot_var)
# If setting the boot variable failed (assumes single device at this point)
for hostname, val in aggr_result.items():
if val[0].result is False:
sys.exit("Setting the boot variable failed")
# Verify the boot variable
result = nr_ios.run(
task=netmiko_send_command,
command_string="show run | section boot",
num_workers=20,
)
std_print(result)
continue_func()
if __name__ == "__main__":
main()