-
Notifications
You must be signed in to change notification settings - Fork 56
/
config_compliance.py
222 lines (180 loc) · 8.56 KB
/
config_compliance.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
"""Nornir job for generating the compliance data."""
# pylint: disable=relative-beyond-top-level
import difflib
import logging
import os
from collections import defaultdict
from datetime import datetime
from django.utils.timezone import make_aware
from nautobot_plugin_nornir.constants import NORNIR_SETTINGS
from nautobot_plugin_nornir.plugins.inventory.nautobot_orm import NautobotORMInventory
from netutils.config.compliance import _open_file_config, parser_map, section_config
from nornir import InitNornir
from nornir.core.plugins.inventory import InventoryPluginRegister
from nornir.core.task import Result, Task
from nornir_nautobot.exceptions import NornirNautobotException
from nautobot_golden_config.choices import ComplianceRuleConfigTypeChoice
from nautobot_golden_config.utilities.logger import NornirLogger
from nautobot_golden_config.models import ComplianceRule, ConfigCompliance, GoldenConfig
from nautobot_golden_config.nornir_plays.processor import ProcessGoldenConfig
from nautobot_golden_config.utilities.db_management import close_threaded_db_connections
from nautobot_golden_config.utilities.helper import (
get_device_to_settings_map,
get_job_filter,
get_json_config,
render_jinja_template,
verify_settings,
)
InventoryPluginRegister.register("nautobot-inventory", NautobotORMInventory)
LOGGER = logging.getLogger(__name__)
def get_rules():
"""A serializer of sorts to return rule mappings as a dictionary."""
# TODO: Future: Review if creating a proper serializer is the way to go.
rules = defaultdict(list)
for compliance_rule in ComplianceRule.objects.all():
platform = str(compliance_rule.platform.network_driver)
rules[platform].append(
{
"ordered": compliance_rule.config_ordered,
"obj": compliance_rule,
"section": compliance_rule.match_config.splitlines(),
}
)
return rules
def get_config_element(rule, config, obj, logger):
"""
Helper function to yield elements of the configuration as defined in the `config_match` under ComplianceRule.
Returns:
- a configuration section for `CLI` based config types
- top level JSON key for `JSON` based config types
"""
if rule["obj"].config_type == ComplianceRuleConfigTypeChoice.TYPE_JSON:
config_json = get_json_config(config)
if not config_json:
error_msg = "`E3002:` Unable to interpret configuration as JSON."
logger.error(error_msg, extra={"object": obj})
raise NornirNautobotException(error_msg)
if rule["obj"].match_config:
config_element = {k: config_json.get(k) for k in rule["obj"].match_config.splitlines() if k in config_json}
else:
config_element = config_json
elif rule["obj"].config_type == ComplianceRuleConfigTypeChoice.TYPE_CLI:
if obj.platform.network_driver_mappings["netmiko"] not in parser_map:
error_msg = f"`E3003:` There is currently no CLI-config parser support for platform network_driver `{obj.platform.network_driver}`, preemptively failed."
logger.error(error_msg, extra={"object": obj})
raise NornirNautobotException(error_msg)
config_element = section_config(rule, config, obj.platform.network_driver_mappings["netmiko"])
else:
error_msg = f"`E3004:` There rule type ({rule['obj'].config_type}) is not recognized."
logger.error(error_msg, extra={"object": obj})
raise NornirNautobotException(error_msg)
return config_element
def diff_files(backup_file, intended_file):
"""Utility function to provide `Unix Diff` between two files."""
with open(backup_file, encoding="utf-8") as file:
backup = file.read()
with open(intended_file, encoding="utf-8") as file:
intended = file.read()
for line in difflib.unified_diff(backup, intended, lineterm=""):
yield line
@close_threaded_db_connections
def run_compliance( # pylint: disable=too-many-arguments,too-many-locals
task: Task,
logger: logging.Logger,
device_to_settings_map,
rules,
) -> Result:
"""Prepare data for compliance task.
Args:
task (Task): Nornir task individual object
Returns:
result (Result): Result from Nornir task
"""
obj = task.host.data["obj"]
settings = device_to_settings_map[obj.id]
compliance_obj = GoldenConfig.objects.filter(device=obj).first()
if not compliance_obj:
compliance_obj = GoldenConfig.objects.create(device=obj)
compliance_obj.compliance_last_attempt_date = task.host.defaults.data["now"]
compliance_obj.save()
intended_directory = settings.intended_repository.filesystem_path
intended_path_template_obj = render_jinja_template(obj, logger, settings.intended_path_template)
intended_file = os.path.join(intended_directory, intended_path_template_obj)
if not os.path.exists(intended_file):
error_msg = f"`E3005:` Unable to locate intended file for device at {intended_file}, preemptively failed."
logger.error(error_msg, extra={"object": obj})
raise NornirNautobotException(error_msg)
backup_directory = settings.backup_repository.filesystem_path
backup_template = render_jinja_template(obj, logger, settings.backup_path_template)
backup_file = os.path.join(backup_directory, backup_template)
if not os.path.exists(backup_file):
error_msg = f"`E3006:` Unable to locate backup file for device at {backup_file}, preemptively failed."
logger.error(error_msg, extra={"object": obj})
raise NornirNautobotException(error_msg)
platform = obj.platform.network_driver
if not rules.get(platform):
error_msg = (
f"`E3007:` There is no defined `Configuration Rule` for platform network_driver `{platform}`, "
"preemptively failed."
)
logger.error(error_msg, extra={"object": obj})
raise NornirNautobotException(error_msg)
backup_cfg = _open_file_config(backup_file)
intended_cfg = _open_file_config(intended_file)
for rule in rules[obj.platform.network_driver]:
_actual = get_config_element(rule, backup_cfg, obj, logger)
_intended = get_config_element(rule, intended_cfg, obj, logger)
# using update_or_create() method to conveniently update actual obj or create new one.
ConfigCompliance.objects.update_or_create(
device=obj,
rule=rule["obj"],
defaults={
"actual": _actual,
"intended": _intended,
"missing": "",
"extra": "",
},
)
compliance_obj.compliance_last_success_date = task.host.defaults.data["now"]
compliance_obj.compliance_config = "\n".join(diff_files(backup_file, intended_file))
compliance_obj.save()
logger.info("Successfully tested compliance job.", extra={"object": obj})
return Result(host=task.host)
def config_compliance(job_result, log_level, data):
"""Nornir play to generate configurations."""
now = make_aware(datetime.now())
logger = NornirLogger(job_result, log_level)
rules = get_rules()
qs = get_job_filter(data)
logger.debug("Compiling device data for compliance job.")
device_to_settings_map = get_device_to_settings_map(queryset=qs)
for settings in set(device_to_settings_map.values()):
verify_settings(logger, settings, ["backup_path_template", "intended_path_template"])
try:
with InitNornir(
runner=NORNIR_SETTINGS.get("runner"),
logging={"enabled": False},
inventory={
"plugin": "nautobot-inventory",
"options": {
"credentials_class": NORNIR_SETTINGS.get("credentials"),
"params": NORNIR_SETTINGS.get("inventory_params"),
"queryset": qs,
"defaults": {"now": now},
},
},
) as nornir_obj:
nr_with_processors = nornir_obj.with_processors([ProcessGoldenConfig(logger)])
logger.debug("Run nornir compliance tasks.")
nr_with_processors.run(
task=run_compliance,
name="RENDER COMPLIANCE TASK GROUP",
logger=logger,
device_to_settings_map=device_to_settings_map,
rules=rules,
)
except Exception as error:
error_msg = f"`E3001:` General Exception handler, original error message ```{error}```"
logger.error(error_msg)
raise NornirNautobotException(error_msg) from error
logger.debug("Completed compliance job for devices.")