diff --git a/netcompare/check_type.py b/netcompare/check_type.py index 6e101d1..dee3b6b 100644 --- a/netcompare/check_type.py +++ b/netcompare/check_type.py @@ -1,5 +1,5 @@ """CheckType Implementation.""" -from typing import Mapping, Tuple, Union, List +from typing import Mapping, Tuple, List, Dict, Any from .evaluator import diff_generator, parameter_evaluator from .runner import extract_values_from_output @@ -12,7 +12,11 @@ def __init__(self, *args): @staticmethod def init(*args): - """Factory pattern to get the appropiate CheckType implementation.""" + """Factory pattern to get the appropriate CheckType implementation. + + Args: + *args: Variable length argument list. + """ check_type = args[0] if check_type == "exact_match": return ExactMatchType(*args) @@ -24,15 +28,21 @@ def init(*args): raise NotImplementedError @staticmethod - def get_value(value: Mapping, path: Mapping, exclude: List = None) -> Union[Mapping, List, int, str, bool]: + def get_value(output: Mapping, path: str, exclude: List = None) -> Any: """Return the value contained into a Mapping for a defined path.""" - return extract_values_from_output(value, path, exclude) + return extract_values_from_output(output, path, exclude) - # TODO: Refine this typing - def evaluate(self, reference_value: Mapping, value_to_compare: Mapping) -> Tuple[Mapping, bool]: + def evaluate(self, reference_value: Any, value_to_compare: Any) -> Tuple[Dict, bool]: """Return the result of the evaluation and a boolean True if it passes it or False otherwise. This method is the one that each CheckType has to implement. + + Args: + reference_value: Can be any structured data or just a simple value. + value_to_compare: Similar value as above to perform comparison. + + Returns: + tuple: Dictionary representing check result, bool indicating if differences are found. """ raise NotImplementedError @@ -40,7 +50,7 @@ def evaluate(self, reference_value: Mapping, value_to_compare: Mapping) -> Tuple class ExactMatchType(CheckType): """Exact Match class docstring.""" - def evaluate(self, reference_value: Mapping, value_to_compare: Mapping) -> Tuple[Mapping, bool]: + def evaluate(self, reference_value: Any, value_to_compare: Any) -> Tuple[Dict, bool]: """Returns the difference between values and the boolean.""" diff = diff_generator(reference_value, value_to_compare) return diff, not diff @@ -50,7 +60,7 @@ class ToleranceType(CheckType): """Tolerance class docstring.""" def __init__(self, *args): - """Tollerance init method.""" + """Tolerance init method.""" try: tolerance = args[1] except IndexError as error: @@ -59,13 +69,13 @@ def __init__(self, *args): self.tolerance_factor = float(tolerance) / 100 super().__init__() - def evaluate(self, reference_value: Mapping, value_to_compare: Mapping) -> Tuple[Mapping, bool]: - """Returns the difference between values and the boolean.""" + def evaluate(self, reference_value: Mapping, value_to_compare: Mapping) -> Tuple[Dict, bool]: + """Returns the difference between values and the boolean. Overwrites method in base class.""" diff = diff_generator(reference_value, value_to_compare) diff = self._get_outliers(diff) return diff, not diff - def _get_outliers(self, diff: Mapping) -> Mapping: + def _get_outliers(self, diff: Mapping) -> Dict: """Return a mapping of values outside the tolerance threshold.""" result = { key: {sub_key: values for sub_key, values in obj.items() if not self._within_tolerance(**values)} @@ -82,7 +92,7 @@ def _within_tolerance(self, *, old_value: float, new_value: float) -> bool: class ParameterMatchType(CheckType): """Parameter Match class implementation.""" - def evaluate(self, reference_value: Mapping, value_to_compare: Mapping) -> Tuple[Mapping, bool]: + def evaluate(self, reference_value: Mapping, value_to_compare: Mapping) -> Tuple[Dict, bool]: """Parameter Match evaluator implementation.""" try: parameter = value_to_compare[1] diff --git a/netcompare/evaluator.py b/netcompare/evaluator.py index af7ce6b..a81ec95 100644 --- a/netcompare/evaluator.py +++ b/netcompare/evaluator.py @@ -3,15 +3,23 @@ import sys from collections import defaultdict from functools import partial -from typing import Mapping, List, Dict +from typing import Any, Mapping, Dict, List from deepdiff import DeepDiff sys.path.append(".") -def diff_generator(pre_result: Mapping, post_result: Mapping) -> Dict: - """Generates diff between pre and post data based on check definition.""" +def diff_generator(pre_result: Any, post_result: Any) -> Dict: + """Generates diff between pre and post data based on check definition. + + Args: + pre_result: dataset to compare + post_result: dataset to compare + + Returns: + differences between two datasets + """ diff_result = DeepDiff(pre_result, post_result) result = diff_result.get("values_changed", {}) @@ -27,13 +35,20 @@ def diff_generator(pre_result: Mapping, post_result: Mapping) -> Dict: return result -def get_diff_iterables_items(diff_result: Mapping) -> Mapping: - """Return a dict with new and missing values where the values are in a list.""" - # DeepDiff iterable_items are returned when the source data is a list - # and provided in the format: "root['Ethernet3'][1]" - # or more generically: root['KEY']['KEY']['KEY']...[numeric_index] - # where the KEYs are dict keys within the original object - # and the "[index]" is appended to indicate the position within the list. +def get_diff_iterables_items(diff_result: Mapping) -> Dict: + """Helper function for diff_generator to postprocess changes reported by DeepDiff for iterables. + + DeepDiff iterable_items are returned when the source data is a list + and provided in the format: "root['Ethernet3'][1]" + or more generically: root['KEY']['KEY']['KEY']...[numeric_index] + where the KEYs are dict keys within the original object + and the "[index]" is appended to indicate the position within the list. + + Args: + diff_result: iterable comparison result from DeepDiff + Returns: + Return a dict with new and missing values where the values are in a list. + """ get_dict_keys = re.compile(r"^root((\['\w.*'\])+)\[\d+\]$") defaultdict_list = partial(defaultdict, list) @@ -54,12 +69,20 @@ def get_diff_iterables_items(diff_result: Mapping) -> Mapping: return result -def fix_deepdiff_key_names(obj: Mapping) -> Mapping: - """Return a dict based on the provided dict object where the brackets and quotes are removed from the string.""" +def fix_deepdiff_key_names(obj: Mapping) -> Dict: + """Return a dict based on the provided dict object where the brackets and quotes are removed from the string. + + Args: + obj: Example: {"root[3]['7.7.7.7']['is_enabled']": {'new_value': False, 'old_value': True}, + "root[3]['7.7.7.7']['is_up']": {'new_value': False, 'old_value': True}} + + Returns: + aggregated output Example: {'7.7.7.7': {'is_enabled': {'new_value': False, 'old_value': True}, + 'is_up': {'new_value': False, 'old_value': True}}} + """ pattern = r"'([A-Za-z0-9_\./\\-]*)'" result = {} - # root[2]['10.64.207.255']['accepted_prefixes'] for key, value in obj.items(): key_parts = re.findall(pattern, key) partial_res = group_value(key_parts, value) @@ -67,14 +90,14 @@ def fix_deepdiff_key_names(obj: Mapping) -> Mapping: return result -def group_value(tree_list: List, value: Mapping) -> Mapping: +def group_value(tree_list: List, value: Dict) -> Dict: """Build dictionary based on value's key and reference key.""" if tree_list: return {tree_list[0]: group_value(tree_list[1:], value)} return value -def dict_merger(original_dict: Mapping, merged_dict: Mapping): +def dict_merger(original_dict: Dict, merged_dict: Dict): """Merge dictionaries to build final result.""" for key in merged_dict.keys(): if key in original_dict and isinstance(original_dict[key], dict) and isinstance(merged_dict[key], dict): @@ -83,7 +106,7 @@ def dict_merger(original_dict: Mapping, merged_dict: Mapping): original_dict[key] = merged_dict[key] -def parameter_evaluator(values: Mapping, parameter: Mapping) -> Mapping: +def parameter_evaluator(values: Mapping, parameter: Mapping) -> Dict: """Parameter Match evaluator engine.""" # value: [{'7.7.7.7': {'peerAddress': '7.7.7.7', 'localAsn': '65130.1100', 'linkType': 'external'}}] # parameter: {'localAsn': '65130.1100', 'linkType': 'external'} diff --git a/netcompare/runner.py b/netcompare/runner.py index 8bdec1d..d93fa7f 100644 --- a/netcompare/runner.py +++ b/netcompare/runner.py @@ -1,6 +1,6 @@ """Library wrapper for output parsing.""" import re -from typing import Mapping, List, Union +from typing import Mapping, List, Union, Any import jmespath from .utils.jmspath_parsers import jmspath_value_parser, jmspath_refkey_parser from .utils.filter_parsers import exclude_filter @@ -8,42 +8,49 @@ from .utils.flatten import flatten_list -def extract_values_from_output(value: Mapping, path: Mapping, exclude: List) -> Union[Mapping, List, int, str, bool]: - """Return data from output depending on the check path. See unit text for complete example.""" - # Get the wanted values to be evaluated if jmspath expression is defined, otherwise - # use the entire output if jmespath is not defined in check. This cover the "raw" diff type. - if path and not exclude: - wanted_value = jmespath.search(jmspath_value_parser(path), value) - - elif path and exclude: - wanted_value = jmespath.search(jmspath_value_parser(path), value) - exclude_filter(wanted_value, exclude) - elif not path and exclude: - exclude_filter(value, exclude) - return value - - # data type check - if path: - if not any(isinstance(i, list) for i in wanted_value): - return wanted_value - - for element in wanted_value: - for item in element: - if isinstance(item, dict): - raise TypeError( - f'Must be list of lists i.e. [["Idle", 75759616], ["Idle", 75759620]]. You have {wanted_value}\'.' - ) - if isinstance(item, list): - wanted_value = flatten_list(wanted_value) - break - - paired_key_value = associate_key_of_my_value(jmspath_value_parser(path), wanted_value) - else: - paired_key_value = value - - if path and re.search(r"\$.*\$", path): - wanted_reference_keys = jmespath.search(jmspath_refkey_parser(path), value) +def extract_values_from_output(output: Union[Mapping, List], path: str, exclude: List = None) -> Any: + """Return data from output depending on the check path. See unit test for complete example. + + Get the wanted values to be evaluated if JMESPath expression is defined, + otherwise use the entire output if jmespath is not defined in check. This covers the "raw" diff type. + Exclude data not desired to compare. + + Notes: + https://jmespath.org/ shows how JMESPath works. + + Args: + output: json data structure + path: JMESPath to extract specific values + exclude: list of keys to exclude + Returns: + Evaluated data, may be anything depending on JMESPath used. + """ + if exclude: + exclude_filter(output, exclude) # exclude unwanted elements + + if not path: + return output # return if path is not specified + + values = jmespath.search(jmspath_value_parser(path), output) + + if not any(isinstance(i, list) for i in values): # check for multi-nested lists if not found return here + return values + + for element in values: # process elements to check is lists should be flatten + for item in element: + if isinstance(item, dict): # raise if there is a dict, path must be more specific to extract data + raise TypeError( + f'Must be list of lists i.e. [["Idle", 75759616], ["Idle", 75759620]].' f"You have {values}'." + ) + if isinstance(item, list): + values = flatten_list(values) # flatten list and rewrite values + break # items are the same, need to check only first to see if this is a nested list + + paired_key_value = associate_key_of_my_value(jmspath_value_parser(path), values) + + if re.search(r"\$.*\$", path): # normalize + wanted_reference_keys = jmespath.search(jmspath_refkey_parser(path), output) list_of_reference_keys = keys_cleaner(wanted_reference_keys) return keys_values_zipper(list_of_reference_keys, paired_key_value) - return paired_key_value + return values diff --git a/netcompare/utils/jmspath_parsers.py b/netcompare/utils/jmspath_parsers.py index 4204e2c..31863f1 100644 --- a/netcompare/utils/jmspath_parsers.py +++ b/netcompare/utils/jmspath_parsers.py @@ -4,7 +4,7 @@ def jmspath_value_parser(path: str): """ - Get the JMSPath value path from 'path'. + Get the JMSPath value path from 'path'. Args: path: "result[0].vrfs.default.peerList[*].[$peerAddress$,prefixesReceived]" diff --git a/netcompare/utils/refkey.py b/netcompare/utils/refkey.py index 4ce3ad4..923af96 100644 --- a/netcompare/utils/refkey.py +++ b/netcompare/utils/refkey.py @@ -1,8 +1,8 @@ """Reference key utilities.""" -from typing import Mapping, List +from typing import Mapping, List, Optional -def keys_cleaner(wanted_reference_keys: Mapping) -> List[Mapping]: +def keys_cleaner(wanted_reference_keys: Mapping) -> Optional[List[Mapping]]: """Get every required reference key from output.""" if isinstance(wanted_reference_keys, list): return wanted_reference_keys @@ -52,13 +52,11 @@ def associate_key_of_my_value(paths: str, wanted_value: List) -> List: final_list = [] for items in wanted_value: - temp_dict = {} - if len(items) != len(my_key_value_list): raise ValueError("Key's value len != from value len") - for my_index, my_value in enumerate(items): - temp_dict.update({my_key_value_list[my_index]: my_value}) + temp_dict = {my_key_value_list[my_index]: my_value for my_index, my_value in enumerate(items)} + final_list.append(temp_dict) return final_list