diff --git a/netcompare/__init__.py b/netcompare/__init__.py index b64dae0..bf0038b 100644 --- a/netcompare/__init__.py +++ b/netcompare/__init__.py @@ -1,5 +1,5 @@ """Pre/Post Check library.""" -# from .check_type import compare +from .check_types import CheckType -# __all__ = ["compare"] +__all__ = ["CheckType"] diff --git a/netcompare/check_type.py b/netcompare/check_types.py similarity index 55% rename from netcompare/check_type.py rename to netcompare/check_types.py index fdd9fdb..3b11c8d 100644 --- a/netcompare/check_type.py +++ b/netcompare/check_types.py @@ -1,7 +1,17 @@ """CheckType Implementation.""" -from typing import Mapping, Tuple, List, Dict, Any -from .evaluator import diff_generator, parameter_evaluator, regex_evaluator -from .runner import extract_values_from_output +import re +from typing import Mapping, Tuple, List, Dict, Any, Union +import jmespath + +from .utils.jmespath_parsers import ( + jmespath_value_parser, + jmespath_refkey_parser, + associate_key_of_my_value, + keys_cleaner, + keys_values_zipper, +) +from .utils.data_normalization import exclude_filter, flatten_list +from .evaluators import diff_generator, parameter_evaluator, regex_evaluator class CheckType: @@ -26,12 +36,58 @@ def init(*args): return ParameterMatchType(*args) if check_type == "regex": return RegexType(*args) + raise NotImplementedError @staticmethod - def get_value(output: Mapping, path: str, exclude: List = None) -> Any: - """Return the value contained into a Mapping for a defined path.""" - return extract_values_from_output(output, path, exclude) + def get_value(output: Union[Mapping, List], path: str, exclude: List = None) -> Any: + """Return data from output depending on the check path. See unit test for complete example. + + Get the wanted values to be evaluated if JMESPath expression is defined, + otherwise use the entire output if jmespath is not defined in check. This covers the "raw" diff type. + Exclude data not desired to compare. + + Notes: + https://jmespath.org/ shows how JMESPath works. + + Args: + output: json data structure + path: JMESPath to extract specific values + exclude: list of keys to exclude + Returns: + Evaluated data, may be anything depending on JMESPath used. + """ + if exclude and isinstance(output, Dict): + exclude_filter(output, exclude) # exclude unwanted elements + + if not path: + return output # return if path is not specified + + values = jmespath.search(jmespath_value_parser(path), output) + + if not any(isinstance(i, list) for i in values): # check for multi-nested lists if not found return here + return values + + for element in values: # process elements to check is lists should be flatten + # TODO: Not sure how this is working becasyse from `jmespath.search` it's supposed to get a flat list + # of str or Decimals, not another list... + for item in element: + if isinstance(item, dict): # raise if there is a dict, path must be more specific to extract data + raise TypeError( + f'Must be list of lists i.e. [["Idle", 75759616], ["Idle", 75759620]].' f"You have {values}'." + ) + if isinstance(item, list): + values = flatten_list(values) # flatten list and rewrite values + break # items are the same, need to check only first to see if this is a nested list + + paired_key_value = associate_key_of_my_value(jmespath_value_parser(path), values) + + if re.search(r"\$.*\$", path): # normalize + wanted_reference_keys = jmespath.search(jmespath_refkey_parser(path), output) + list_of_reference_keys = keys_cleaner(wanted_reference_keys) + return keys_values_zipper(list_of_reference_keys, paired_key_value) + + return values def evaluate(self, reference_value: Any, value_to_compare: Any) -> Tuple[Dict, bool]: """Return the result of the evaluation and a boolean True if it passes it or False otherwise. @@ -53,8 +109,8 @@ class ExactMatchType(CheckType): def evaluate(self, reference_value: Any, value_to_compare: Any) -> Tuple[Dict, bool]: """Returns the difference between values and the boolean.""" - diff = diff_generator(reference_value, value_to_compare) - return diff, not diff + evaluation_result = diff_generator(reference_value, value_to_compare) + return evaluation_result, not evaluation_result class ToleranceType(CheckType): @@ -62,13 +118,13 @@ class ToleranceType(CheckType): def __init__(self, *args): """Tolerance init method.""" + super().__init__() + try: tolerance = args[1] except IndexError as error: - raise f"Tolerance parameter must be defined as float at index 1. You have: {args}" from error - + raise ValueError(f"Tolerance parameter must be defined as float at index 1. You have: {args}") from error self.tolerance_factor = float(tolerance) / 100 - super().__init__() def evaluate(self, reference_value: Mapping, value_to_compare: Mapping) -> Tuple[Dict, bool]: """Returns the difference between values and the boolean. Overwrites method in base class.""" @@ -78,20 +134,21 @@ def evaluate(self, reference_value: Mapping, value_to_compare: Mapping) -> Tuple def _remove_within_tolerance(self, diff: Dict) -> None: """Recursively look into diff and apply tolerance check, remove reported difference when within tolerance.""" + + def _within_tolerance(*, old_value: float, new_value: float) -> bool: + """Return True if new value is within the tolerance range of the previous value.""" + max_diff = old_value * self.tolerance_factor + return (old_value - max_diff) < new_value < (old_value + max_diff) + for key, value in list(diff.items()): # casting list makes copy, so we don't modify object being iterated. if isinstance(value, dict): - if "new_value" in value.keys() and "old_value" in value.keys() and self._within_tolerance(**value): + if "new_value" in value.keys() and "old_value" in value.keys() and _within_tolerance(**value): diff.pop(key) else: self._remove_within_tolerance(diff[key]) if not value: diff.pop(key) - def _within_tolerance(self, *, old_value: float, new_value: float) -> bool: - """Return True if new value is within the tolerance range of the previous value.""" - max_diff = old_value * self.tolerance_factor - return (old_value - max_diff) < new_value < (old_value + max_diff) - class ParameterMatchType(CheckType): """Parameter Match class implementation.""" @@ -101,12 +158,14 @@ def evaluate(self, reference_value: Mapping, value_to_compare: Mapping) -> Tuple try: parameter = value_to_compare[1] except IndexError as error: - raise f"Evaluating parameter must be defined as dict at index 1. You have: {value_to_compare}" from error + raise ValueError( + f"Evaluating parameter must be defined as dict at index 1. You have: {value_to_compare}" + ) from error if not isinstance(parameter, dict): raise TypeError("check_option must be of type dict()") - diff = parameter_evaluator(reference_value, parameter) - return diff, not diff + evaluation_result = parameter_evaluator(reference_value, parameter) + return evaluation_result, not evaluation_result class RegexType(CheckType): @@ -140,28 +199,3 @@ def evaluate(self, reference_value: Mapping, value_to_compare: Mapping) -> Tuple diff = regex_evaluator(reference_value, parameter) return diff, not diff - - -# TODO: compare is no longer the entry point, we should use the libary as: -# netcompare_check = CheckType.init(check_type_info, options) -# pre_result = netcompare_check.get_value(pre_obj, path) -# post_result = netcompare_check.get_value(post_obj, path) -# netcompare_check.evaluate(pre_result, post_result) -# -# def compare( -# pre_obj: Mapping, post_obj: Mapping, path: Mapping, type_info: Iterable, options: Mapping -# ) -> Tuple[Mapping, bool]: -# """Entry point function. - -# Returns a diff object and the boolean of the comparison. -# """ - -# type_info = type_info.lower() - -# try: -# type_obj = CheckType.init(type_info, options) -# except Exception: -# # We will be here if we can't infer the type_obj -# raise - -# return type_obj.evaluate(pre_obj, post_obj, path) diff --git a/netcompare/evaluator.py b/netcompare/evaluator.py deleted file mode 100644 index f6a944c..0000000 --- a/netcompare/evaluator.py +++ /dev/null @@ -1,158 +0,0 @@ -"""Diff evaluator.""" -import re -import sys -from collections import defaultdict -from functools import partial -from typing import Any, Mapping, Dict, List -from deepdiff import DeepDiff - - -sys.path.append(".") - - -def diff_generator(pre_result: Any, post_result: Any) -> Dict: - """Generates diff between pre and post data based on check definition. - - Args: - pre_result: dataset to compare - post_result: dataset to compare - - Returns: - differences between two datasets - """ - diff_result = DeepDiff(pre_result, post_result) - - result = diff_result.get("values_changed", {}) - if diff_result.get("dictionary_item_removed"): - result.update({k: "missing" for k in diff_result["dictionary_item_removed"]}) - if diff_result.get("dictionary_item_added"): - result.update({k: "new" for k in diff_result["dictionary_item_added"]}) - iterables_items = get_diff_iterables_items(diff_result) - if iterables_items: - result.update(iterables_items) - - result = fix_deepdiff_key_names(result) - return result - - -def get_diff_iterables_items(diff_result: Mapping) -> Dict: - """Helper function for diff_generator to postprocess changes reported by DeepDiff for iterables. - - DeepDiff iterable_items are returned when the source data is a list - and provided in the format: "root['Ethernet3'][1]" - or more generically: root['KEY']['KEY']['KEY']...[numeric_index] - where the KEYs are dict keys within the original object - and the "[index]" is appended to indicate the position within the list. - - Args: - diff_result: iterable comparison result from DeepDiff - Returns: - Return a dict with new and missing values where the values are in a list. - """ - get_dict_keys = re.compile(r"^root((\['\w.*'\])+)\[\d+\]$") - - defaultdict_list = partial(defaultdict, list) - result = defaultdict(defaultdict_list) - - items_removed = diff_result.get("iterable_item_removed") - if items_removed: - for key, value in items_removed.items(): - key, *_ = get_dict_keys.match(key).groups() - result[key]["missing"].append(value) - - items_added = diff_result.get("iterable_item_added") - if items_added: - for key, value in items_added.items(): - key, *_ = get_dict_keys.match(key).groups() - result[key]["new"].append(value) - - return result - - -def fix_deepdiff_key_names(obj: Mapping) -> Dict: - """Return a dict based on the provided dict object where the brackets and quotes are removed from the string. - - Args: - obj: Example: {"root[3]['7.7.7.7']['is_enabled']": {'new_value': False, 'old_value': True}, - "root[3]['7.7.7.7']['is_up']": {'new_value': False, 'old_value': True}} - - Returns: - aggregated output Example: {'7.7.7.7': {'is_enabled': {'new_value': False, 'old_value': True}, - 'is_up': {'new_value': False, 'old_value': True}}} - """ - pattern = r"'([A-Za-z0-9_\./\\-]*)'" - - result = {} - for key, value in obj.items(): - key_parts = re.findall(pattern, key) - partial_res = group_value(key_parts, value) - dict_merger(result, partial_res) - return result - - -def group_value(tree_list: List, value: Dict) -> Dict: - """Build dictionary based on value's key and reference key.""" - if tree_list: - return {tree_list[0]: group_value(tree_list[1:], value)} - return value - - -def dict_merger(original_dict: Dict, merged_dict: Dict): - """Merge dictionaries to build final result.""" - for key in merged_dict.keys(): - if key in original_dict and isinstance(original_dict[key], dict) and isinstance(merged_dict[key], dict): - dict_merger(original_dict[key], merged_dict[key]) - else: - original_dict[key] = merged_dict[key] - - -def parameter_evaluator(values: Mapping, parameter: Mapping) -> Dict: - """Parameter Match evaluator engine.""" - # value: [{'7.7.7.7': {'peerAddress': '7.7.7.7', 'localAsn': '65130.1100', 'linkType': 'external'}}] - # parameter: {'localAsn': '65130.1100', 'linkType': 'external'} - result = {} - if not isinstance(values, list): - raise TypeError("Something went wrong during JMSPath parsing. values must be of type list.") - - for value in values: - # item: {'7.7.7.7': {'peerAddress': '7.7.7.7', 'localAsn': '65130.1101', 'linkType': 'externals - temp_dict = {} - - inner_key = list(value.keys())[0] - # inner_key: '7.7.7.7' - inner_value = list(value.values())[0] - # inner_value: [{'peerAddress': '7.7.7.7', 'localAsn': '65130.1101', 'linkType': 'externals'}] - - for p_key, p_value in parameter.items(): - if inner_value[p_key] != p_value: - temp_dict[p_key] = inner_value[p_key] - - if temp_dict: - result[inner_key] = temp_dict - - return result - - -def regex_evaluator(values: Mapping, parameter: Mapping) -> Dict: - """Regex Match evaluator engine.""" - # values: [{'7.7.7.7': {'peerGroup': 'EVPN-OVERLAY-SPINE'}}] - # parameter: {'regex': '.*UNDERLAY.*', 'mode': 'include'} - result = {} - if not isinstance(values, list): - raise TypeError("Something went wrong during JMSPath parsing. values must be of type list.") - - regex_expression = parameter["regex"] - mode = parameter["mode"] - - for item in values: - for founded_value in item.values(): - for value in founded_value.values(): - match_result = re.search(regex_expression, value) - # Fail if there is not regex match - if mode == "match" and not match_result: - result.update(item) - # Fail if there is regex match - elif mode == "no-match" and match_result: - result.update(item) - - return result diff --git a/netcompare/evaluators.py b/netcompare/evaluators.py new file mode 100644 index 0000000..36f9cdd --- /dev/null +++ b/netcompare/evaluators.py @@ -0,0 +1,104 @@ +"""Evaluators.""" +import re +from typing import Any, Mapping, Dict +from deepdiff import DeepDiff +from .utils.diff_helpers import get_diff_iterables_items, fix_deepdiff_key_names + + +def diff_generator(pre_result: Any, post_result: Any) -> Dict: + """Generates diff between pre and post data based on check definition. + + Args: + pre_result: dataset to compare + post_result: dataset to compare + + Returns: + dict: differences between two datasets with the following keys: + - "values_changed": Item values that have changed + - "missing": Item keys that have been removed + - "new": Item keys that have been added + """ + diff_result = DeepDiff(pre_result, post_result) + + result = diff_result.get("values_changed", {}) + + if diff_result.get("dictionary_item_removed"): + result.update({k: "missing" for k in diff_result["dictionary_item_removed"]}) + + if diff_result.get("dictionary_item_added"): + result.update({k: "new" for k in diff_result["dictionary_item_added"]}) + + iterables_items = get_diff_iterables_items(diff_result) + if iterables_items: + result.update(iterables_items) + + return fix_deepdiff_key_names(result) + + +def parameter_evaluator(values: Mapping, parameters: Mapping) -> Dict: + """Parameter Match evaluator engine. + + Args: + values: List of items what we will check the parameters against + parameters: Dict with the keys and reference values to check + + Example: + values: [{'7.7.7.7': {'peerAddress': '7.7.7.7', 'localAsn': '65130.1100', 'linkType': 'external'}}] + parameters: {'localAsn': '65130.1100', 'linkType': 'external'} + + Returns: + Dictionary with all the items that have some value not matching the expectations from parameters + """ + if not isinstance(values, list): + raise TypeError("Something went wrong during jmespath parsing. 'values' must be of type List.") + + result = {} + for value in values: + # value: {'7.7.7.7': {'peerAddress': '7.7.7.7', 'localAsn': '65130.1101', 'linkType': 'externals + if not isinstance(value, dict): + raise TypeError( + "Something went wrong during jmespath parsing. ", + f"'value' ({value}) must be of type Dict, and it's {type(value)}", + ) + + result_item = {} + + # TODO: Why the 'value' dict has always ONE single element? we have to explain + # inner_key: '7.7.7.7' + inner_key = list(value.keys())[0] + # inner_value: [{'peerAddress': '7.7.7.7', 'localAsn': '65130.1101', 'linkType': 'externals'}] + inner_value = list(value.values())[0] + + for parameter_key, parameter_value in parameters.items(): + if inner_value[parameter_key] != parameter_value: + result_item[parameter_key] = inner_value[parameter_key] + + if result_item: + result[inner_key] = result_item + + return result + + +def regex_evaluator(values: Mapping, parameter: Mapping) -> Dict: + """Regex Match evaluator engine.""" + # values: [{'7.7.7.7': {'peerGroup': 'EVPN-OVERLAY-SPINE'}}] + # parameter: {'regex': '.*UNDERLAY.*', 'mode': 'include'} + result = {} + if not isinstance(values, list): + raise TypeError("Something went wrong during JMSPath parsing. values must be of type list.") + + regex_expression = parameter["regex"] + mode = parameter["mode"] + + for item in values: + for founded_value in item.values(): + for value in founded_value.values(): + match_result = re.search(regex_expression, value) + # Fail if there is not regex match + if mode == "match" and not match_result: + result.update(item) + # Fail if there is regex match + elif mode == "no-match" and match_result: + result.update(item) + + return result diff --git a/netcompare/runner.py b/netcompare/runner.py deleted file mode 100644 index d93fa7f..0000000 --- a/netcompare/runner.py +++ /dev/null @@ -1,56 +0,0 @@ -"""Library wrapper for output parsing.""" -import re -from typing import Mapping, List, Union, Any -import jmespath -from .utils.jmspath_parsers import jmspath_value_parser, jmspath_refkey_parser -from .utils.filter_parsers import exclude_filter -from .utils.refkey import keys_cleaner, keys_values_zipper, associate_key_of_my_value -from .utils.flatten import flatten_list - - -def extract_values_from_output(output: Union[Mapping, List], path: str, exclude: List = None) -> Any: - """Return data from output depending on the check path. See unit test for complete example. - - Get the wanted values to be evaluated if JMESPath expression is defined, - otherwise use the entire output if jmespath is not defined in check. This covers the "raw" diff type. - Exclude data not desired to compare. - - Notes: - https://jmespath.org/ shows how JMESPath works. - - Args: - output: json data structure - path: JMESPath to extract specific values - exclude: list of keys to exclude - Returns: - Evaluated data, may be anything depending on JMESPath used. - """ - if exclude: - exclude_filter(output, exclude) # exclude unwanted elements - - if not path: - return output # return if path is not specified - - values = jmespath.search(jmspath_value_parser(path), output) - - if not any(isinstance(i, list) for i in values): # check for multi-nested lists if not found return here - return values - - for element in values: # process elements to check is lists should be flatten - for item in element: - if isinstance(item, dict): # raise if there is a dict, path must be more specific to extract data - raise TypeError( - f'Must be list of lists i.e. [["Idle", 75759616], ["Idle", 75759620]].' f"You have {values}'." - ) - if isinstance(item, list): - values = flatten_list(values) # flatten list and rewrite values - break # items are the same, need to check only first to see if this is a nested list - - paired_key_value = associate_key_of_my_value(jmspath_value_parser(path), values) - - if re.search(r"\$.*\$", path): # normalize - wanted_reference_keys = jmespath.search(jmspath_refkey_parser(path), output) - list_of_reference_keys = keys_cleaner(wanted_reference_keys) - return keys_values_zipper(list_of_reference_keys, paired_key_value) - - return values diff --git a/netcompare/utils/filter_parsers.py b/netcompare/utils/data_normalization.py similarity index 52% rename from netcompare/utils/filter_parsers.py rename to netcompare/utils/data_normalization.py index d774dc7..1fd2fea 100644 --- a/netcompare/utils/filter_parsers.py +++ b/netcompare/utils/data_normalization.py @@ -1,5 +1,40 @@ -"""Filtering parsing.""" -from typing import Mapping, List +"""Data Normalization utilities.""" +from typing import List, Generator, Mapping + + +def flatten_list(my_list: List) -> List: + """ + Flatten a multi level nested list and returns a list of lists. + + Args: + my_list: nested list to be flattened. + + Return: + [[-1, 0], [-1, 0], [-1, 0], ...] + + Example: + >>> my_list = [[[[-1, 0], [-1, 0]]]] + >>> flatten_list(my_list) + [[-1, 0], [-1, 0]] + """ + + def iter_flatten_list(my_list: List) -> Generator[List, None, None]: + """Recursively yield all flat lists within a given list.""" + if is_flat_list(my_list): + yield my_list + else: + for item in my_list: + yield from iter_flatten_list(item) + + def is_flat_list(obj: List) -> bool: + """Return True is obj is a list that does not contain any lists as its first order elements.""" + return isinstance(obj, list) and not any(isinstance(i, list) for i in obj) + + if not isinstance(my_list, list): + raise ValueError(f"Argument provided must be a list. You passed a {type(my_list)}") + if is_flat_list(my_list): + return my_list + return list(iter_flatten_list(my_list)) def exclude_filter(data: Mapping, exclude: List): diff --git a/netcompare/utils/diff_helpers.py b/netcompare/utils/diff_helpers.py new file mode 100644 index 0000000..12216ff --- /dev/null +++ b/netcompare/utils/diff_helpers.py @@ -0,0 +1,81 @@ +"""Diff helpers.""" +import re +from collections import defaultdict +from functools import partial +from typing import Mapping, Dict, List + +REGEX_PATTERN_RELEVANT_KEYS = r"'([A-Za-z0-9_\./\\-]*)'" + + +def get_diff_iterables_items(diff_result: Mapping) -> Dict: + """Helper function for diff_generator to postprocess changes reported by DeepDiff for iterables. + + DeepDiff iterable_items are returned when the source data is a list + and provided in the format: "root['Ethernet3'][1]" + or more generically: root['KEY']['KEY']['KEY']...[numeric_index] + where the KEYs are dict keys within the original object + and the "[index]" is appended to indicate the position within the list. + + Args: + diff_result: iterable comparison result from DeepDiff + Returns: + Return a dict with new and missing values where the values are in a list. + """ + get_dict_keys = re.compile(r"^root((\['\w.*'\])+)\[\d+\]$") + + defaultdict_list = partial(defaultdict, list) + result = defaultdict(defaultdict_list) + + items_removed = diff_result.get("iterable_item_removed") + if items_removed: + for key, value in items_removed.items(): + key, *_ = get_dict_keys.match(key).groups() + result[key]["missing"].append(value) + + items_added = diff_result.get("iterable_item_added") + if items_added: + for key, value in items_added.items(): + key, *_ = get_dict_keys.match(key).groups() + result[key]["new"].append(value) + + return result + + +def fix_deepdiff_key_names(obj: Mapping) -> Dict: + """Return a dict based on the provided dict object where the brackets and quotes are removed from the string. + + Args: + obj (Mapping): Mapping to be fixed. For example: + { + "root[3]['7.7.7.7']['is_enabled']": {'new_value': False, 'old_value': True}, + "root[3]['7.7.7.7']['is_up']": {'new_value': False, 'old_value': True} + } + + Returns: + Dict: aggregated output, for example: {'7.7.7.7': {'is_enabled': {'new_value': False, 'old_value': True}, + 'is_up': {'new_value': False, 'old_value': True}}} + """ + result = {} + for key, value in obj.items(): + key_parts = re.findall(REGEX_PATTERN_RELEVANT_KEYS, key) + partial_res = group_value(key_parts, value) + dict_merger(result, partial_res) + return result + + +# TODO: Add testing +def group_value(tree_list: List, value: Dict) -> Dict: + """Function to create a nested Dict by recursively use the tree_list as nested keys.""" + if tree_list: + return {tree_list[0]: group_value(tree_list[1:], value)} + return value + + +# TODO: Add testing +def dict_merger(original_dict: Dict, dict_to_merge: Dict): + """Function to merge a dictionary (dict_to_merge) recursively into the original_dict.""" + for key in dict_to_merge.keys(): + if key in original_dict and isinstance(original_dict[key], dict) and isinstance(dict_to_merge[key], dict): + dict_merger(original_dict[key], dict_to_merge[key]) + else: + original_dict[key] = dict_to_merge[key] diff --git a/netcompare/utils/flatten.py b/netcompare/utils/flatten.py deleted file mode 100644 index 84f0fc7..0000000 --- a/netcompare/utils/flatten.py +++ /dev/null @@ -1,37 +0,0 @@ -"""Flatten multi-nested list.""" -from typing import List, Generator - - -def flatten_list(my_list: List) -> List: - """ - Flatten a multi level nested list and returns a list of lists. - - Args: - my_list: nested list to be flattened. - - Return: - [[-1, 0], [-1, 0], [-1, 0], ...] - - Example: - >>> my_list = [[[[-1, 0], [-1, 0]]]] - >>> flatten_list(my_list) - [[-1, 0], [-1, 0]] - """ - - def iter_flatten_list(my_list: List) -> Generator[List, None, None]: - """Recursively yield all flat lists within a given list.""" - if is_flat_list(my_list): - yield my_list - else: - for item in my_list: - yield from iter_flatten_list(item) - - def is_flat_list(obj: List) -> bool: - """Return True is obj is a list that does not contain any lists as its first order elements.""" - return isinstance(obj, list) and not any(isinstance(i, list) for i in obj) - - if not isinstance(my_list, list): - raise ValueError(f"Argument provided must be a list. You passed a {type(my_list)}") - if is_flat_list(my_list): - return my_list - return list(iter_flatten_list(my_list)) diff --git a/netcompare/utils/jmespath_parsers.py b/netcompare/utils/jmespath_parsers.py new file mode 100644 index 0000000..d4d5ae0 --- /dev/null +++ b/netcompare/utils/jmespath_parsers.py @@ -0,0 +1,118 @@ +"""jmespath expression parsers and related utilities.""" +import re +from typing import Mapping, List, Optional + + +def jmespath_value_parser(path: str): + """ + Get the jmespath value path from 'path'. + + Two combinations are possible based on where reference key is defined. See example below. + + Args: + path: "result[0].vrfs.default.peerList[*].[$peerAddress$,prefixesReceived]" + path: "result[0].$vrfs$.default.peerList[*].[peerAddress, prefixesReceived]" + + Return: + "result[0].vrfs.default.peerList[*].[prefixesReceived]" + "result[0].vrfs.default.peerList[*].[peerAddress, prefixesReceived]" + """ + regex_ref_key = re.compile(r"\$.*\$\.|\$.*\$,|,\$.*\$") + regex_match_ref_key = regex_ref_key.search(path) + path_suffix = path.split(".")[-1] + + if regex_match_ref_key: + if regex_ref_key.search(path_suffix): + # [$peerAddress$,prefixesReceived] --> [prefixesReceived] + reference_key = regex_match_ref_key.group() + return path.replace(reference_key, "") + + # result[0].$vrfs$.default... --> result[0].vrfs.default.... + regex_normalized_value = re.search(r"\$.*\$", regex_match_ref_key.group()) + if regex_normalized_value: + normalized_value = regex_match_ref_key.group().split("$")[1] + return path.replace(regex_normalized_value.group(), normalized_value) + return path + + +def jmespath_refkey_parser(path: str): + """ + Get the jmespath reference key path from 'path'. + + Args: + path: "result[0].vrfs.default.peerList[*].[$peerAddress$,prefixesReceived]" + Return: + "result[0].vrfs.default.peerList[*].[$peerAddress$]" + """ + splitted_jmespath = path.split(".") + + for number, element in enumerate(splitted_jmespath): + regex_match_anchor = re.search(r"\$.*\$", element) + + if regex_match_anchor: + splitted_jmespath[number] = regex_match_anchor.group().replace("$", "") + + if regex_match_anchor and not element.startswith("[") and not element.endswith("]"): + splitted_jmespath = splitted_jmespath[: number + 1] + + return ".".join(splitted_jmespath) + + +def associate_key_of_my_value(paths: str, wanted_value: List) -> List: + """Associate each key defined in path to every value found in output.""" + # global.peers.*.[is_enabled,is_up] / result.[*].state + find_the_key_of_my_values = paths.split(".")[-1] + + # [is_enabled,is_up] + if find_the_key_of_my_values.startswith("[") and find_the_key_of_my_values.endswith("]"): + # ['is_enabled', 'is_up'] + my_key_value_list = find_the_key_of_my_values.strip("[]").split(",") + # state + else: + my_key_value_list = [find_the_key_of_my_values] + + final_list = [] + + for items in wanted_value: + if len(items) != len(my_key_value_list): + raise ValueError("Key's value len != from value len") + + temp_dict = {my_key_value_list[my_index]: my_value for my_index, my_value in enumerate(items)} + + final_list.append(temp_dict) + + return final_list + + +def keys_cleaner(wanted_reference_keys: Mapping) -> Optional[List[Mapping]]: + """Get every required reference key from output.""" + if isinstance(wanted_reference_keys, list): + return wanted_reference_keys + + if isinstance(wanted_reference_keys, dict): + my_keys_list = [] + + if isinstance(wanted_reference_keys, dict): + for key in wanted_reference_keys.keys(): + my_keys_list.append(key) + else: + raise TypeError( + f"Must be a dictionary. You have type:{type(wanted_reference_keys)} output:{wanted_reference_keys}'." + ) + + return my_keys_list + + return None + + +def keys_values_zipper(list_of_reference_keys: List, wanted_value_with_key: List) -> List: + """Build dictionary zipping keys with relative values.""" + final_result = [] + + if len(list_of_reference_keys) != len(wanted_value_with_key): + raise ValueError("Keys len != from Values len") + + for my_index, my_key in enumerate(list_of_reference_keys): + final_result.append({my_key: wanted_value_with_key[my_index]}) + + return final_result diff --git a/netcompare/utils/jmspath_parsers.py b/netcompare/utils/jmspath_parsers.py deleted file mode 100644 index 4be6fe6..0000000 --- a/netcompare/utils/jmspath_parsers.py +++ /dev/null @@ -1,57 +0,0 @@ -"""JMSPath expresion parsers.""" -import re - - -def jmspath_value_parser(path: str): - """ - Get the JMSPath value path from 'path'. - - Two combinations are possible based on where reference key is defined. See example below. - - Args: - path: "result[0].vrfs.default.peerList[*].[$peerAddress$,prefixesReceived]" - path: "result[0].$vrfs$.default.peerList[*].[peerAddress, prefixesReceived]" - - Return: - "result[0].vrfs.default.peerList[*].[prefixesReceived]" - "result[0].vrfs.default.peerList[*].[peerAddress, prefixesReceived]" - """ - regex_ref_key = re.compile(r"\$.*\$\.|\$.*\$,|,\$.*\$") - regex_match_ref_key = regex_ref_key.search(path) - path_suffix = path.split(".")[-1] - - if regex_match_ref_key: - if regex_ref_key.search(path_suffix): - # [$peerAddress$,prefixesReceived] --> [prefixesReceived] - reference_key = regex_match_ref_key.group() - return path.replace(reference_key, "") - - # result[0].$vrfs$.default... --> result[0].vrfs.default.... - regex_normalized_value = re.search(r"\$.*\$", regex_match_ref_key.group()) - if regex_normalized_value: - normalized_value = regex_match_ref_key.group().split("$")[1] - return path.replace(regex_normalized_value.group(), normalized_value) - return path - - -def jmspath_refkey_parser(path: str): - """ - Get the JMSPath reference key path from 'path'. - - Args: - path: "result[0].vrfs.default.peerList[*].[$peerAddress$,prefixesReceived]" - Return: - "result[0].vrfs.default.peerList[*].[$peerAddress$]" - """ - splitted_jmspath = path.split(".") - - for number, element in enumerate(splitted_jmspath): - regex_match_anchor = re.search(r"\$.*\$", element) - - if regex_match_anchor: - splitted_jmspath[number] = regex_match_anchor.group().replace("$", "") - - if regex_match_anchor and not element.startswith("[") and not element.endswith("]"): - splitted_jmspath = splitted_jmspath[: number + 1] - - return ".".join(splitted_jmspath) diff --git a/netcompare/utils/refkey.py b/netcompare/utils/refkey.py deleted file mode 100644 index 923af96..0000000 --- a/netcompare/utils/refkey.py +++ /dev/null @@ -1,62 +0,0 @@ -"""Reference key utilities.""" -from typing import Mapping, List, Optional - - -def keys_cleaner(wanted_reference_keys: Mapping) -> Optional[List[Mapping]]: - """Get every required reference key from output.""" - if isinstance(wanted_reference_keys, list): - return wanted_reference_keys - - if isinstance(wanted_reference_keys, dict): - my_keys_list = [] - - if isinstance(wanted_reference_keys, dict): - for key in wanted_reference_keys.keys(): - my_keys_list.append(key) - else: - raise TypeError( - f"Must be a dictionary. You have type:{type(wanted_reference_keys)} output:{wanted_reference_keys}'." - ) - - return my_keys_list - - return None - - -def keys_values_zipper(list_of_reference_keys: List, wanted_value_with_key: List) -> List: - """Build dictionary zipping keys with relative values.""" - final_result = [] - - if len(list_of_reference_keys) != len(wanted_value_with_key): - raise ValueError("Keys len != from Values len") - - for my_index, my_key in enumerate(list_of_reference_keys): - final_result.append({my_key: wanted_value_with_key[my_index]}) - - return final_result - - -def associate_key_of_my_value(paths: str, wanted_value: List) -> List: - """Associate each key defined in path to every value found in output.""" - # global.peers.*.[is_enabled,is_up] / result.[*].state - find_the_key_of_my_values = paths.split(".")[-1] - - # [is_enabled,is_up] - if find_the_key_of_my_values.startswith("[") and find_the_key_of_my_values.endswith("]"): - # ['is_enabled', 'is_up'] - my_key_value_list = find_the_key_of_my_values.strip("[]").split(",") - # state - else: - my_key_value_list = [find_the_key_of_my_values] - - final_list = [] - - for items in wanted_value: - if len(items) != len(my_key_value_list): - raise ValueError("Key's value len != from value len") - - temp_dict = {my_key_value_list[my_index]: my_value for my_index, my_value in enumerate(items)} - - final_list.append(temp_dict) - - return final_list diff --git a/tests/test_flatten.py b/tests/test_data_normalization.py similarity index 88% rename from tests/test_flatten.py rename to tests/test_data_normalization.py index c3fbeb2..6d5760e 100644 --- a/tests/test_flatten.py +++ b/tests/test_data_normalization.py @@ -1,6 +1,6 @@ "Flatten list unit test" import pytest -from netcompare.utils.flatten import flatten_list +from netcompare.utils.data_normalization import flatten_list from .utility import ASSERT_FAIL_MESSAGE diff --git a/tests/test_diff_generator.py b/tests/test_diff_generator.py index 8ad8ab5..511a9af 100644 --- a/tests/test_diff_generator.py +++ b/tests/test_diff_generator.py @@ -1,7 +1,7 @@ """Diff generator tests.""" import pytest -from netcompare.evaluator import diff_generator -from netcompare.runner import extract_values_from_output +from netcompare.evaluators import diff_generator +from netcompare.check_types import CheckType from .utility import load_mocks, ASSERT_FAIL_MESSAGE @@ -149,8 +149,8 @@ def test_eval(folder_name, path, exclude, expected_output): """Run tests.""" pre_data, post_data = load_mocks(folder_name) - pre_value = extract_values_from_output(pre_data, path, exclude) - post_value = extract_values_from_output(post_data, path, exclude) + pre_value = CheckType.get_value(pre_data, path, exclude) + post_value = CheckType.get_value(post_data, path, exclude) output = diff_generator(pre_value, post_value) assert expected_output == output, ASSERT_FAIL_MESSAGE.format(output=output, expected_output=expected_output) diff --git a/tests/test_filter_parsers.py b/tests/test_filter_parsers.py index bbf67fe..d98137f 100644 --- a/tests/test_filter_parsers.py +++ b/tests/test_filter_parsers.py @@ -1,6 +1,6 @@ "Filter parser unit tests." import pytest -from netcompare.utils.filter_parsers import exclude_filter +from netcompare.utils.data_normalization import exclude_filter from .utility import ASSERT_FAIL_MESSAGE diff --git a/tests/test_jmspath_parsers.py b/tests/test_jmespath_parsers.py similarity index 52% rename from tests/test_jmspath_parsers.py rename to tests/test_jmespath_parsers.py index a54c683..0f6e372 100644 --- a/tests/test_jmspath_parsers.py +++ b/tests/test_jmespath_parsers.py @@ -1,6 +1,12 @@ -"""JMSPath parser unit tests.""" +"""jmespath parser unit tests.""" import pytest -from netcompare.utils.jmspath_parsers import jmspath_value_parser, jmspath_refkey_parser +from netcompare.utils.jmespath_parsers import ( + jmespath_value_parser, + jmespath_refkey_parser, + keys_cleaner, + keys_values_zipper, + associate_key_of_my_value, +) from .utility import ASSERT_FAIL_MESSAGE @@ -56,11 +62,59 @@ @pytest.mark.parametrize("path, expected_output", value_parser_tests) def test_value_parser(path, expected_output): - output = jmspath_value_parser(path) + output = jmespath_value_parser(path) assert expected_output == output, ASSERT_FAIL_MESSAGE.format(output=output, expected_output=expected_output) @pytest.mark.parametrize("path, expected_output", keyref_parser_tests) def test_keyref_parser(path, expected_output): - output = jmspath_refkey_parser(path) + output = jmespath_refkey_parser(path) + assert expected_output == output, ASSERT_FAIL_MESSAGE.format(output=output, expected_output=expected_output) + + +keys_cleaner_case_1 = ( + {"10.1.0.0": {"address_family": "ipv4"}}, + ["10.1.0.0"], +) + +keys_zipper_case_1 = ( + ["10.1.0.0", "10.2.0.0"], + [{"is_enabled": False, "is_up": False}, {"is_enabled": True, "is_up": True}], + [{"10.1.0.0": {"is_enabled": False, "is_up": False}}, {"10.2.0.0": {"is_enabled": True, "is_up": True}}], +) + +keys_association_case_1 = ( + "global.peers.*.[is_enabled,is_up]", + [[True, False], [True, False]], + [{"is_enabled": True, "is_up": False}, {"is_enabled": True, "is_up": False}], +) + +keys_cleaner_tests = [ + keys_cleaner_case_1, +] + +keys_zipper_tests = [ + keys_zipper_case_1, +] + +keys_association_test = [ + keys_association_case_1, +] + + +@pytest.mark.parametrize("wanted_key, expected_output", keys_cleaner_tests) +def test_keys_cleaner(wanted_key, expected_output): + output = keys_cleaner(wanted_key) + assert expected_output == output, ASSERT_FAIL_MESSAGE.format(output=output, expected_output=expected_output) + + +@pytest.mark.parametrize("ref_keys, wanted_values, expected_output", keys_zipper_tests) +def test_keys_zipper(ref_keys, wanted_values, expected_output): + output = keys_values_zipper(ref_keys, wanted_values) + assert expected_output == output, ASSERT_FAIL_MESSAGE.format(output=output, expected_output=expected_output) + + +@pytest.mark.parametrize("path, wanted_values, expected_output", keys_association_test) +def test_keys_association(path, wanted_values, expected_output): + output = associate_key_of_my_value(path, wanted_values) assert expected_output == output, ASSERT_FAIL_MESSAGE.format(output=output, expected_output=expected_output) diff --git a/tests/test_refkey.py b/tests/test_refkey.py deleted file mode 100644 index 26fbe6e..0000000 --- a/tests/test_refkey.py +++ /dev/null @@ -1,51 +0,0 @@ -"""Reference key unit tests.""" -import pytest -from netcompare.utils.refkey import keys_cleaner, keys_values_zipper, associate_key_of_my_value -from .utility import ASSERT_FAIL_MESSAGE - -keys_cleaner_case_1 = ( - {"10.1.0.0": {"address_family": "ipv4"}}, - ["10.1.0.0"], -) - -keys_zipper_case_1 = ( - ["10.1.0.0", "10.2.0.0"], - [{"is_enabled": False, "is_up": False}, {"is_enabled": True, "is_up": True}], - [{"10.1.0.0": {"is_enabled": False, "is_up": False}}, {"10.2.0.0": {"is_enabled": True, "is_up": True}}], -) - -keys_association_case_1 = ( - "global.peers.*.[is_enabled,is_up]", - [[True, False], [True, False]], - [{"is_enabled": True, "is_up": False}, {"is_enabled": True, "is_up": False}], -) - -keys_cleaner_tests = [ - keys_cleaner_case_1, -] - -keys_zipper_tests = [ - keys_zipper_case_1, -] - -keys_association_test = [ - keys_association_case_1, -] - - -@pytest.mark.parametrize("wanted_key, expected_output", keys_cleaner_tests) -def test_keys_cleaner(wanted_key, expected_output): - output = keys_cleaner(wanted_key) - assert expected_output == output, ASSERT_FAIL_MESSAGE.format(output=output, expected_output=expected_output) - - -@pytest.mark.parametrize("ref_keys, wanted_values, expected_output", keys_zipper_tests) -def test_keys_zipper(ref_keys, wanted_values, expected_output): - output = keys_values_zipper(ref_keys, wanted_values) - assert expected_output == output, ASSERT_FAIL_MESSAGE.format(output=output, expected_output=expected_output) - - -@pytest.mark.parametrize("path, wanted_values, expected_output", keys_association_test) -def test_keys_association(path, wanted_values, expected_output): - output = associate_key_of_my_value(path, wanted_values) - assert expected_output == output, ASSERT_FAIL_MESSAGE.format(output=output, expected_output=expected_output) diff --git a/tests/test_type_check.py b/tests/test_type_checks.py similarity index 99% rename from tests/test_type_check.py rename to tests/test_type_checks.py index 27ccc4e..6625804 100644 --- a/tests/test_type_check.py +++ b/tests/test_type_checks.py @@ -1,6 +1,6 @@ "Check Type unit tests." import pytest -from netcompare.check_type import CheckType, ExactMatchType, ToleranceType +from netcompare.check_types import CheckType, ExactMatchType, ToleranceType from .utility import load_json_file, load_mocks, ASSERT_FAIL_MESSAGE