Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 22 additions & 12 deletions netcompare/check_type.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
"""CheckType Implementation."""
from typing import Mapping, Tuple, Union, List
from typing import Mapping, Tuple, List, Dict, Any
from .evaluator import diff_generator, parameter_evaluator
from .runner import extract_values_from_output

Expand All @@ -12,7 +12,11 @@ def __init__(self, *args):

@staticmethod
def init(*args):
"""Factory pattern to get the appropiate CheckType implementation."""
"""Factory pattern to get the appropriate CheckType implementation.

Args:
*args: Variable length argument list.
"""
check_type = args[0]
if check_type == "exact_match":
return ExactMatchType(*args)
Expand All @@ -24,23 +28,29 @@ def init(*args):
raise NotImplementedError

@staticmethod
def get_value(value: Mapping, path: Mapping, exclude: List = None) -> Union[Mapping, List, int, str, bool]:
def get_value(output: Mapping, path: str, exclude: List = None) -> Any:
"""Return the value contained into a Mapping for a defined path."""
return extract_values_from_output(value, path, exclude)
return extract_values_from_output(output, path, exclude)

# TODO: Refine this typing
def evaluate(self, reference_value: Mapping, value_to_compare: Mapping) -> Tuple[Mapping, bool]:
def evaluate(self, reference_value: Any, value_to_compare: Any) -> Tuple[Dict, bool]:
"""Return the result of the evaluation and a boolean True if it passes it or False otherwise.

This method is the one that each CheckType has to implement.

Args:
reference_value: Can be any structured data or just a simple value.
value_to_compare: Similar value as above to perform comparison.

Returns:
tuple: Dictionary representing check result, bool indicating if differences are found.
"""
raise NotImplementedError


class ExactMatchType(CheckType):
"""Exact Match class docstring."""

def evaluate(self, reference_value: Mapping, value_to_compare: Mapping) -> Tuple[Mapping, bool]:
def evaluate(self, reference_value: Any, value_to_compare: Any) -> Tuple[Dict, bool]:
"""Returns the difference between values and the boolean."""
diff = diff_generator(reference_value, value_to_compare)
return diff, not diff
Expand All @@ -50,7 +60,7 @@ class ToleranceType(CheckType):
"""Tolerance class docstring."""

def __init__(self, *args):
"""Tollerance init method."""
"""Tolerance init method."""
try:
tolerance = args[1]
except IndexError as error:
Expand All @@ -59,13 +69,13 @@ def __init__(self, *args):
self.tolerance_factor = float(tolerance) / 100
super().__init__()

def evaluate(self, reference_value: Mapping, value_to_compare: Mapping) -> Tuple[Mapping, bool]:
"""Returns the difference between values and the boolean."""
def evaluate(self, reference_value: Mapping, value_to_compare: Mapping) -> Tuple[Dict, bool]:
"""Returns the difference between values and the boolean. Overwrites method in base class."""
diff = diff_generator(reference_value, value_to_compare)
diff = self._get_outliers(diff)
return diff, not diff

def _get_outliers(self, diff: Mapping) -> Mapping:
def _get_outliers(self, diff: Mapping) -> Dict:
"""Return a mapping of values outside the tolerance threshold."""
result = {
key: {sub_key: values for sub_key, values in obj.items() if not self._within_tolerance(**values)}
Expand All @@ -82,7 +92,7 @@ def _within_tolerance(self, *, old_value: float, new_value: float) -> bool:
class ParameterMatchType(CheckType):
"""Parameter Match class implementation."""

def evaluate(self, reference_value: Mapping, value_to_compare: Mapping) -> Tuple[Mapping, bool]:
def evaluate(self, reference_value: Mapping, value_to_compare: Mapping) -> Tuple[Dict, bool]:
"""Parameter Match evaluator implementation."""
try:
parameter = value_to_compare[1]
Expand Down
55 changes: 39 additions & 16 deletions netcompare/evaluator.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,15 +3,23 @@
import sys
from collections import defaultdict
from functools import partial
from typing import Mapping, List, Dict
from typing import Any, Mapping, Dict, List
from deepdiff import DeepDiff


sys.path.append(".")


def diff_generator(pre_result: Mapping, post_result: Mapping) -> Dict:
"""Generates diff between pre and post data based on check definition."""
def diff_generator(pre_result: Any, post_result: Any) -> Dict:
"""Generates diff between pre and post data based on check definition.

Args:
pre_result: dataset to compare
post_result: dataset to compare

Returns:
differences between two datasets
"""
diff_result = DeepDiff(pre_result, post_result)

result = diff_result.get("values_changed", {})
Expand All @@ -27,13 +35,20 @@ def diff_generator(pre_result: Mapping, post_result: Mapping) -> Dict:
return result


def get_diff_iterables_items(diff_result: Mapping) -> Mapping:
"""Return a dict with new and missing values where the values are in a list."""
# DeepDiff iterable_items are returned when the source data is a list
# and provided in the format: "root['Ethernet3'][1]"
# or more generically: root['KEY']['KEY']['KEY']...[numeric_index]
# where the KEYs are dict keys within the original object
# and the "[index]" is appended to indicate the position within the list.
def get_diff_iterables_items(diff_result: Mapping) -> Dict:
"""Helper function for diff_generator to postprocess changes reported by DeepDiff for iterables.

DeepDiff iterable_items are returned when the source data is a list
and provided in the format: "root['Ethernet3'][1]"
or more generically: root['KEY']['KEY']['KEY']...[numeric_index]
where the KEYs are dict keys within the original object
and the "[index]" is appended to indicate the position within the list.

Args:
diff_result: iterable comparison result from DeepDiff
Returns:
Return a dict with new and missing values where the values are in a list.
"""
get_dict_keys = re.compile(r"^root((\['\w.*'\])+)\[\d+\]$")

defaultdict_list = partial(defaultdict, list)
Expand All @@ -54,27 +69,35 @@ def get_diff_iterables_items(diff_result: Mapping) -> Mapping:
return result


def fix_deepdiff_key_names(obj: Mapping) -> Mapping:
"""Return a dict based on the provided dict object where the brackets and quotes are removed from the string."""
def fix_deepdiff_key_names(obj: Mapping) -> Dict:
"""Return a dict based on the provided dict object where the brackets and quotes are removed from the string.

Args:
obj: Example: {"root[3]['7.7.7.7']['is_enabled']": {'new_value': False, 'old_value': True},
"root[3]['7.7.7.7']['is_up']": {'new_value': False, 'old_value': True}}

Returns:
aggregated output Example: {'7.7.7.7': {'is_enabled': {'new_value': False, 'old_value': True},
'is_up': {'new_value': False, 'old_value': True}}}
"""
pattern = r"'([A-Za-z0-9_\./\\-]*)'"

result = {}
# root[2]['10.64.207.255']['accepted_prefixes']
for key, value in obj.items():
key_parts = re.findall(pattern, key)
partial_res = group_value(key_parts, value)
dict_merger(result, partial_res)
return result


def group_value(tree_list: List, value: Mapping) -> Mapping:
def group_value(tree_list: List, value: Dict) -> Dict:
"""Build dictionary based on value's key and reference key."""
if tree_list:
return {tree_list[0]: group_value(tree_list[1:], value)}
return value


def dict_merger(original_dict: Mapping, merged_dict: Mapping):
def dict_merger(original_dict: Dict, merged_dict: Dict):
"""Merge dictionaries to build final result."""
for key in merged_dict.keys():
if key in original_dict and isinstance(original_dict[key], dict) and isinstance(merged_dict[key], dict):
Expand All @@ -83,7 +106,7 @@ def dict_merger(original_dict: Mapping, merged_dict: Mapping):
original_dict[key] = merged_dict[key]


def parameter_evaluator(values: Mapping, parameter: Mapping) -> Mapping:
def parameter_evaluator(values: Mapping, parameter: Mapping) -> Dict:
"""Parameter Match evaluator engine."""
# value: [{'7.7.7.7': {'peerAddress': '7.7.7.7', 'localAsn': '65130.1100', 'linkType': 'external'}}]
# parameter: {'localAsn': '65130.1100', 'linkType': 'external'}
Expand Down
81 changes: 44 additions & 37 deletions netcompare/runner.py
Original file line number Diff line number Diff line change
@@ -1,49 +1,56 @@
"""Library wrapper for output parsing."""
import re
from typing import Mapping, List, Union
from typing import Mapping, List, Union, Any
import jmespath
from .utils.jmspath_parsers import jmspath_value_parser, jmspath_refkey_parser
from .utils.filter_parsers import exclude_filter
from .utils.refkey import keys_cleaner, keys_values_zipper, associate_key_of_my_value
from .utils.flatten import flatten_list


def extract_values_from_output(value: Mapping, path: Mapping, exclude: List) -> Union[Mapping, List, int, str, bool]:
"""Return data from output depending on the check path. See unit text for complete example."""
# Get the wanted values to be evaluated if jmspath expression is defined, otherwise
# use the entire output if jmespath is not defined in check. This cover the "raw" diff type.
if path and not exclude:
wanted_value = jmespath.search(jmspath_value_parser(path), value)

elif path and exclude:
wanted_value = jmespath.search(jmspath_value_parser(path), value)
exclude_filter(wanted_value, exclude)
elif not path and exclude:
exclude_filter(value, exclude)
return value

# data type check
if path:
if not any(isinstance(i, list) for i in wanted_value):
return wanted_value

for element in wanted_value:
for item in element:
if isinstance(item, dict):
raise TypeError(
f'Must be list of lists i.e. [["Idle", 75759616], ["Idle", 75759620]]. You have {wanted_value}\'.'
)
if isinstance(item, list):
wanted_value = flatten_list(wanted_value)
break

paired_key_value = associate_key_of_my_value(jmspath_value_parser(path), wanted_value)
else:
paired_key_value = value

if path and re.search(r"\$.*\$", path):
wanted_reference_keys = jmespath.search(jmspath_refkey_parser(path), value)
def extract_values_from_output(output: Union[Mapping, List], path: str, exclude: List = None) -> Any:
"""Return data from output depending on the check path. See unit test for complete example.
Get the wanted values to be evaluated if JMESPath expression is defined,
otherwise use the entire output if jmespath is not defined in check. This covers the "raw" diff type.
Exclude data not desired to compare.
Notes:
https://jmespath.org/ shows how JMESPath works.
Args:
output: json data structure
path: JMESPath to extract specific values
exclude: list of keys to exclude
Returns:
Evaluated data, may be anything depending on JMESPath used.
"""
if exclude:
exclude_filter(output, exclude) # exclude unwanted elements

if not path:
return output # return if path is not specified

values = jmespath.search(jmspath_value_parser(path), output)

if not any(isinstance(i, list) for i in values): # check for multi-nested lists if not found return here
return values

for element in values: # process elements to check is lists should be flatten
for item in element:
if isinstance(item, dict): # raise if there is a dict, path must be more specific to extract data
raise TypeError(
f'Must be list of lists i.e. [["Idle", 75759616], ["Idle", 75759620]].' f"You have {values}'."
)
if isinstance(item, list):
values = flatten_list(values) # flatten list and rewrite values
break # items are the same, need to check only first to see if this is a nested list

paired_key_value = associate_key_of_my_value(jmspath_value_parser(path), values)

if re.search(r"\$.*\$", path): # normalize
wanted_reference_keys = jmespath.search(jmspath_refkey_parser(path), output)
list_of_reference_keys = keys_cleaner(wanted_reference_keys)
return keys_values_zipper(list_of_reference_keys, paired_key_value)

return paired_key_value
return values
2 changes: 1 addition & 1 deletion netcompare/utils/jmspath_parsers.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@

def jmspath_value_parser(path: str):
"""
Get the JMSPath value path from 'path'.
Get the JMSPath value path from 'path'.

Args:
path: "result[0].vrfs.default.peerList[*].[$peerAddress$,prefixesReceived]"
Expand Down
10 changes: 4 additions & 6 deletions netcompare/utils/refkey.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
"""Reference key utilities."""
from typing import Mapping, List
from typing import Mapping, List, Optional


def keys_cleaner(wanted_reference_keys: Mapping) -> List[Mapping]:
def keys_cleaner(wanted_reference_keys: Mapping) -> Optional[List[Mapping]]:
"""Get every required reference key from output."""
if isinstance(wanted_reference_keys, list):
return wanted_reference_keys
Expand Down Expand Up @@ -52,13 +52,11 @@ def associate_key_of_my_value(paths: str, wanted_value: List) -> List:
final_list = []

for items in wanted_value:
temp_dict = {}

if len(items) != len(my_key_value_list):
raise ValueError("Key's value len != from value len")

for my_index, my_value in enumerate(items):
temp_dict.update({my_key_value_list[my_index]: my_value})
temp_dict = {my_key_value_list[my_index]: my_value for my_index, my_value in enumerate(items)}

final_list.append(temp_dict)

return final_list