diff --git a/.travis.yml b/.travis.yml index 962f67a..38e874c 100644 --- a/.travis.yml +++ b/.travis.yml @@ -4,7 +4,7 @@ stages: - "lint" - "test" -if: "type IN (pull_request)" # Add in "branch" as an option if desired for branch testing as well +if: "type IN (pull_request)" # Add in "branch" as an option if desired for branch testing as well language: "python" services: - "docker" @@ -30,7 +30,7 @@ jobs: - "pip install invoke toml" script: - "invoke black" - - "invoke bandit" # Bandit fails to function on > Py3.8 https://github.com/PyCQA/bandit/issues/639 + - "invoke bandit" # Bandit fails to function on > Py3.8 https://github.com/PyCQA/bandit/issues/639 # - "invoke pydocstyle" - "invoke flake8" - "invoke yamllint" diff --git a/netcompare/__init__.py b/netcompare/__init__.py index 9358ec3..b64dae0 100644 --- a/netcompare/__init__.py +++ b/netcompare/__init__.py @@ -1,5 +1,5 @@ """Pre/Post Check library.""" -from .check_type import compare +# from .check_type import compare -__all__ = ["compare"] +# __all__ = ["compare"] diff --git a/netcompare/check_type.py b/netcompare/check_type.py index 1ba3f24..baf6779 100644 --- a/netcompare/check_type.py +++ b/netcompare/check_type.py @@ -1,5 +1,5 @@ """CheckType Implementation.""" -from typing import Mapping, Iterable, Tuple, Union, List +from typing import Mapping, Tuple, Union, List from .evaluator import diff_generator from .runner import extract_values_from_output @@ -9,7 +9,6 @@ class CheckType: def __init__(self, *args): """Check Type init method.""" - pass @staticmethod def init(*args): @@ -29,6 +28,7 @@ def extract_value_from_json_path( """Return the value contained into a Mapping for a defined path.""" return extract_values_from_output(value, path, exclude) + # TODO: Refine this typing def evaluate(self, reference_value: Mapping, value_to_compare: Mapping) -> Tuple[Mapping, bool]: """Return the result of the evaluation and a boolean True if it passes it or False otherwise. diff --git a/netcompare/evaluator.py b/netcompare/evaluator.py index 5f459f5..df2052a 100644 --- a/netcompare/evaluator.py +++ b/netcompare/evaluator.py @@ -1,25 +1,22 @@ """Diff evaluator.""" import re import sys -from deepdiff import DeepDiff from collections import defaultdict -from collections.abc import Mapping as DictMapping from functools import partial -from typing import Mapping, List +from typing import Mapping, List, Dict +from deepdiff import DeepDiff -from .runner import extract_values_from_output sys.path.append(".") -def diff_generator(pre_data: Mapping, post_data: Mapping, check_definition: Mapping) -> Mapping: +def diff_generator(pre_result: Mapping, post_result: Mapping) -> Dict: """ Generates diff between pre and post data based on check definition. Args: - pre_data: pre data result. - post_data: post data result. - check_definition: check definitions. + pre_result: pre data result. + post_result: post data result. Return: output: diff between pre and post data. @@ -32,9 +29,6 @@ def diff_generator(pre_data: Mapping, post_data: Mapping, check_definition: Mapp >>> print(diff_generator(check_definition, post_data, check_definition)) {'10.17.254.2': {'state': {'new_value': 'Up', 'old_value': 'Idle'}}} """ - pre_result = extract_values_from_output(check_definition, pre_data) - post_result = extract_values_from_output(check_definition, post_data) - diff_result = DeepDiff(pre_result, post_result) result = diff_result.get("values_changed", {}) @@ -144,7 +138,7 @@ def group_value(tree_list: List, value: Mapping) -> Mapping: return value -def dict_merger(original_dict: List, merged_dict: Mapping): +def dict_merger(original_dict: Mapping, merged_dict: Mapping): """ Merge dictionaries to build final result. @@ -158,7 +152,7 @@ def dict_merger(original_dict: List, merged_dict: Mapping): {'10.17.254.2': {'state': {'new_value': 'Up', 'old_value': 'Idle'}}} """ for key in merged_dict.keys(): - if key in original_dict and isinstance(original_dict[key], dict) and isinstance(merged_dict[key], DictMapping): + if key in original_dict and isinstance(original_dict[key], dict) and isinstance(merged_dict[key], dict): dict_merger(original_dict[key], merged_dict[key]) else: original_dict[key] = merged_dict[key] diff --git a/netcompare/runner.py b/netcompare/runner.py index 089f2c6..4d6b27a 100644 --- a/netcompare/runner.py +++ b/netcompare/runner.py @@ -1,7 +1,11 @@ #!/ur/bin/env python3 import re import jmespath -from typing import Mapping, List, Generator, Union +from typing import Mapping, List, Union +from .utils.jmspath_parsers import jmspath_value_parser, jmspath_refkey_parser +from .utils.filter_parsers import exclude_filter +from .utils.refkey import keys_cleaner, keys_values_zipper, associate_key_of_my_value +from .utils.flatten import flatten_list def extract_values_from_output(value: Mapping, path: Mapping, exclude: List) -> Union[Mapping, List, int, str, bool]: @@ -18,276 +22,47 @@ def extract_values_from_output(value: Mapping, path: Mapping, exclude: List) -> "default": { "peerList": [ { ... - exclude: [...] + exclude: ["interfaceStatistics", "interfaceCounters"] - TODO: This function should be able to return a list, or a Dict, or a Integer, or a Boolean or a String Return: [{'7.7.7.7': {'prefixesReceived': 101}}, {'10.1.0.0': {'prefixesReceived': 120}}, ... """ - + # Get the wanted values to be evaluated if jmspath expression is defined, otherwise + # use the entire output if jmespath is not defined in check. This cover the "raw" diff type. + if path and not exclude: + wanted_value = jmespath.search(jmspath_value_parser(path), value) + + elif path and exclude: + wanted_value = jmespath.search(jmspath_value_parser(path), value) + exclude_filter(wanted_value, exclude) + elif not path and exclude: + exclude_filter(value, exclude) + return value + + # data type check if path: - found_values = jmespath.search(jmspath_value_parser(path), value) - else: - found_values = value - - if exclude: - my_value_exclude_cleaner(found_values, exclude) - my_meaningful_values = found_values - else: - my_meaningful_values = get_meaningful_values(path, found_values) - - if path and re.search(r"\$.*\$", path): - wanted_reference_keys = jmespath.search(jmspath_refkey_parser(path), value) - list_of_reference_keys = keys_cleaner(wanted_reference_keys) - return keys_values_zipper(list_of_reference_keys, my_meaningful_values) - else: - return my_meaningful_values - - -def jmspath_value_parser(path): - """ - Get the JMSPath value path from 'path'. - - Args: - path: "result[0].vrfs.default.peerList[*].[$peerAddress$,prefixesReceived]" - Return: - "result[0].vrfs.default.peerList[*].[prefixesReceived]" - """ - regex_match_value = re.search(r"\$.*\$\.|\$.*\$,|,\$.*\$", path) - - if regex_match_value: - # $peers$. --> peers - regex_normalized_value = re.search(r"\$.*\$", regex_match_value.group()) - if regex_normalized_value: - normalized_value = regex_match_value.group().split("$")[1] - value_path = path.replace(regex_normalized_value.group(), normalized_value) - else: - value_path = path - - return value_path - - -def jmspath_refkey_parser(path): - """ - Get the JMSPath reference key path from 'path'. - - Args: - path: "result[0].vrfs.default.peerList[*].[$peerAddress$,prefixesReceived]" - Return: - "result[0].vrfs.default.peerList[*].[$peerAddress$]" - """ - splitted_jmspath = path.split(".") - - for n, i in enumerate(splitted_jmspath): - regex_match_anchor = re.search(r"\$.*\$", i) - - if regex_match_anchor: - splitted_jmspath[n] = regex_match_anchor.group().replace("$", "") - - if regex_match_anchor and not i.startswith("[") and not i.endswith("]"): - splitted_jmspath = splitted_jmspath[: n + 1] - - return ".".join(splitted_jmspath) - + if not any(isinstance(i, list) for i in wanted_value): + return wanted_value -def get_meaningful_values(path, found_values): - if path: - # check if list of lists - if not any(isinstance(i, list) for i in found_values): - raise TypeError( - "Catching value must be defined as list in jmespath expression i.e. result[*].state -> result[*].[state]. You have {}'.".format( - path - ) - ) - for element in found_values: + for element in wanted_value: for item in element: if isinstance(item, dict): raise TypeError( 'Must be list of lists i.e. [["Idle", 75759616], ["Idle", 75759620]]. You have {}\'.'.format( - found_values + wanted_value ) ) elif isinstance(item, list): - found_values = flatten_list(found_values) + wanted_value = flatten_list(wanted_value) break - my_meaningful_values = associate_key_of_my_value(jmspath_value_parser(path), found_values) + paired_key_value = associate_key_of_my_value(jmspath_value_parser(path), wanted_value) else: - my_meaningful_values = found_values - return my_meaningful_values - - -def my_value_exclude_cleaner(data: Mapping, exclude: List): - """ - Recusively look through all dict keys and pop out the one defined in "exclude". - - Update in place existing dictionary. Look into unit test for example. - - Args: - data: { - "interfaces": { - "Management1": { - "name": "Management1", - "interfaceStatus": "connected", - "autoNegotiate": "success", - "interfaceStatistics": { - "inBitsRate": 3403.4362520883615, - "inPktsRate": 3.7424095978179257, - "outBitsRate": 16249.69114419833, - "updateInterval": 300, - "outPktsRate": 2.1111866059750692 - },... - exclude: ["interfaceStatistics", "interfaceCounters"] - """ - if isinstance(data, dict): - for exclude_element in exclude: - try: - data.pop(exclude_element) - except KeyError: - pass - - for key in data: - if isinstance(data[key], dict) or isinstance(data[key], list): - my_value_exclude_cleaner(data[key], exclude) - - elif isinstance(data, list): - for element in data: - if isinstance(element, dict) or isinstance(element, list): - my_value_exclude_cleaner(element, exclude) - - -def flatten_list(my_list: List) -> List: - """ - Flatten a multi level nested list and returns a list of lists. - - Args: - my_list: nested list to be flattened. - - Return: - [[-1, 0], [-1, 0], [-1, 0], ...] + paired_key_value = value - Example: - >>> my_list = [[[[-1, 0], [-1, 0]]]] - >>> flatten_list(my_list) - [[-1, 0], [-1, 0]] - """ - if not isinstance(my_list, list): - raise ValueError(f"Argument provided must be a list. You passed a {type(my_list)}") - if is_flat_list(my_list): - return my_list - return list(iter_flatten_list(my_list)) - - -def iter_flatten_list(my_list: List) -> Generator[List, None, None]: - """Recursively yield all flat lists within a given list.""" - if is_flat_list(my_list): - yield my_list - else: - for item in my_list: - yield from iter_flatten_list(item) - - -def is_flat_list(obj: List) -> bool: - """Return True is obj is a list that does not contain any lists as its first order elements.""" - return isinstance(obj, list) and not any(isinstance(i, list) for i in obj) - - -def associate_key_of_my_value(paths: Mapping, found_values: List) -> List: - """ - Associate each key defined in path to every value found in output. - - Args: - paths: {"path": "global.peers.*.[is_enabled,is_up]"} - found_values: [[True, False], [True, False], [True, False], [True, False]] - - Return: - [{'is_enabled': True, 'is_up': False}, ... - - Example: - >>> from runner import associate_key_of_my_value - >>> path = {"path": "global.peers.*.[is_enabled,is_up]"} - >>> found_values = [[True, False], [True, False], [True, False], [True, False]] - {'is_enabled': True, 'is_up': False}, {'is_enabled': True, 'is_up': False}, ... - """ - - # global.peers.*.[is_enabled,is_up] / result.[*].state - find_the_key_of_my_values = paths.split(".")[-1] - - # [is_enabled,is_up] - if find_the_key_of_my_values.startswith("[") and find_the_key_of_my_values.endswith("]"): - # ['is_enabled', 'is_up'] - my_key_value_list = find_the_key_of_my_values.strip("[]").split(",") - # state + if path and re.search(r"\$.*\$", path): + wanted_reference_keys = jmespath.search(jmspath_refkey_parser(path), value) + list_of_reference_keys = keys_cleaner(wanted_reference_keys) + return keys_values_zipper(list_of_reference_keys, paired_key_value) else: - my_key_value_list = [find_the_key_of_my_values] - - final_list = list() - - for items in found_values: - temp_dict = dict() - - if len(items) != len(my_key_value_list): - raise ValueError("Key's value len != from value len") - - for my_index, my_value in enumerate(items): - temp_dict.update({my_key_value_list[my_index]: my_value}) - final_list.append(temp_dict) - - return final_list - - -def keys_cleaner(wanted_reference_keys: Mapping) -> list: - """ - Get every required reference key from output. - - Args: - wanted_reference_keys: {'10.1.0.0': {'address_family': {'ipv4': ... - - Return: - ['10.1.0.0', '10.2.0.0', '10.64.207.255', '7.7.7.7'] - - Example: - >>> from runner import keys_cleaner - >>> wanted_reference_keys = {'10.1.0.0': {'address_family': 'ipv4'}} - >>> keys_cleaner(wanted_reference_keys) - ['10.1.0.0', '10.2.0.0', '10.64.207.255', '7.7.7.7'] - """ - if isinstance(wanted_reference_keys, list): - return wanted_reference_keys - - elif isinstance(wanted_reference_keys, dict): - my_keys_list = list() - - for key in wanted_reference_keys.keys(): - my_keys_list.append(key) - - return my_keys_list - - -def keys_values_zipper(list_of_reference_keys: List, found_values_with_key: List) -> List: - """ - Build dictionary zipping keys with relative values. - - Args: - list_of_reference_keys: ['10.1.0.0', '10.2.0.0', '10.64.207.255', '7.7.7.7'] - found_values_with_key: [{'is_enabled': True, 'is_up': False}, ... - - Return: - [{'10.1.0.0': {'is_enabled': True, 'is_up': False}}, , ... - - Example: - >>> from runner import keys_values_zipper - >>> list_of_reference_keys = ['10.1.0.0'] - >>> found_values_with_key = [{'is_enabled': True, 'is_up': False}] - >>> keys_values_zipper(list_of_reference_keys, found_values_with_key) - [{'10.1.0.0': {'is_enabled': True, 'is_up': False}}] - """ - final_result = list() - - if len(list_of_reference_keys) != len(found_values_with_key): - raise ValueError("Keys len != from Values len") - - for my_index, my_key in enumerate(list_of_reference_keys): - final_result.append({my_key: found_values_with_key[my_index]}) - - return final_result + return paired_key_value diff --git a/netcompare/utils/__init__.py b/netcompare/utils/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/netcompare/utils/filter_parsers.py b/netcompare/utils/filter_parsers.py new file mode 100644 index 0000000..abdb189 --- /dev/null +++ b/netcompare/utils/filter_parsers.py @@ -0,0 +1,40 @@ +from typing import Mapping, List + + +def exclude_filter(data: Mapping, exclude: List): + """ + Recusively look through all dict keys and pop out the one defined in "exclude". + + Update in place existing dictionary. Look into unit test for example. + + Args: + data: { + "interfaces": { + "Management1": { + "name": "Management1", + "interfaceStatus": "connected", + "autoNegotiate": "success", + "interfaceStatistics": { + "inBitsRate": 3403.4362520883615, + "inPktsRate": 3.7424095978179257, + "outBitsRate": 16249.69114419833, + "updateInterval": 300, + "outPktsRate": 2.1111866059750692 + },... + exclude: ["interfaceStatistics", "interfaceCounters"] + """ + if isinstance(data, dict): + for exclude_element in exclude: + try: + data.pop(exclude_element) + except KeyError: + pass + + for key in data: + if isinstance(data[key], (dict, list)): + exclude_filter(data[key], exclude) + + elif isinstance(data, list): + for element in data: + if isinstance(element, dict) or isinstance(element, list): + exclude_filter(element, exclude) diff --git a/netcompare/utils/flatten.py b/netcompare/utils/flatten.py new file mode 100644 index 0000000..73f70c4 --- /dev/null +++ b/netcompare/utils/flatten.py @@ -0,0 +1,36 @@ +from typing import List, Generator + + +def flatten_list(my_list: List) -> List: + """ + Flatten a multi level nested list and returns a list of lists. + + Args: + my_list: nested list to be flattened. + + Return: + [[-1, 0], [-1, 0], [-1, 0], ...] + + Example: + >>> my_list = [[[[-1, 0], [-1, 0]]]] + >>> flatten_list(my_list) + [[-1, 0], [-1, 0]] + """ + + def iter_flatten_list(my_list: List) -> Generator[List, None, None]: + """Recursively yield all flat lists within a given list.""" + if is_flat_list(my_list): + yield my_list + else: + for item in my_list: + yield from iter_flatten_list(item) + + def is_flat_list(obj: List) -> bool: + """Return True is obj is a list that does not contain any lists as its first order elements.""" + return isinstance(obj, list) and not any(isinstance(i, list) for i in obj) + + if not isinstance(my_list, list): + raise ValueError(f"Argument provided must be a list. You passed a {type(my_list)}") + if is_flat_list(my_list): + return my_list + return list(iter_flatten_list(my_list)) diff --git a/netcompare/utils/jmspath_parsers.py b/netcompare/utils/jmspath_parsers.py new file mode 100644 index 0000000..80dd48d --- /dev/null +++ b/netcompare/utils/jmspath_parsers.py @@ -0,0 +1,45 @@ +import re + + +def jmspath_value_parser(path: str): + """ + Get the JMSPath value path from 'path'. + + Args: + path: "result[0].vrfs.default.peerList[*].[$peerAddress$,prefixesReceived]" + Return: + "result[0].vrfs.default.peerList[*].[prefixesReceived]" + """ + regex_match_value = re.search(r"\$.*\$\.|\$.*\$,|,\$.*\$", path) + + if not regex_match_value: + return path + # $peers$. --> peers + regex_normalized_value = re.search(r"\$.*\$", regex_match_value.group()) + if regex_normalized_value: + normalized_value = regex_match_value.group().split("$")[1] + return path.replace(regex_normalized_value.group(), normalized_value) + else: return path + + +def jmspath_refkey_parser(path: str): + """ + Get the JMSPath reference key path from 'path'. + + Args: + path: "result[0].vrfs.default.peerList[*].[$peerAddress$,prefixesReceived]" + Return: + "result[0].vrfs.default.peerList[*].[$peerAddress$]" + """ + splitted_jmspath = path.split(".") + + for n, i in enumerate(splitted_jmspath): + regex_match_anchor = re.search(r"\$.*\$", i) + + if regex_match_anchor: + splitted_jmspath[n] = regex_match_anchor.group().replace("$", "") + + if regex_match_anchor and not i.startswith("[") and not i.endswith("]"): + splitted_jmspath = splitted_jmspath[: n + 1] + + return ".".join(splitted_jmspath) diff --git a/netcompare/utils/refkey.py b/netcompare/utils/refkey.py new file mode 100644 index 0000000..972a39c --- /dev/null +++ b/netcompare/utils/refkey.py @@ -0,0 +1,105 @@ +from typing import Mapping, List + + +def keys_cleaner(wanted_reference_keys: Mapping) -> List[Mapping]: + """ + Get every required reference key from output. + + Args: + wanted_reference_keys: {'10.1.0.0': {'address_family': {'ipv4': ... + + Return: + ['10.1.0.0', '10.2.0.0', '10.64.207.255', '7.7.7.7'] + + Example: + >>> from runner import keys_cleaner + >>> wanted_reference_keys = {'10.1.0.0': {'address_family': 'ipv4'}} + >>> keys_cleaner(wanted_reference_keys) + ['10.1.0.0', '10.2.0.0', '10.64.207.255', '7.7.7.7'] + """ + if isinstance(wanted_reference_keys, list): + return wanted_reference_keys + + elif isinstance(wanted_reference_keys, dict): + my_keys_list = list() + + if isinstance(wanted_reference_keys, dict): + for key in wanted_reference_keys.keys(): + my_keys_list.append(key) + else: + raise TypeError(f'Must be a dictionary. You have type:{type(wanted_reference_keys)} output:{wanted_reference_keys}\'.') + + return my_keys_list + + +def keys_values_zipper(list_of_reference_keys: List, wanted_value_with_key: List) -> List: + """ + Build dictionary zipping keys with relative values. + + Args: + list_of_reference_keys: ['10.1.0.0', '10.2.0.0', '10.64.207.255', '7.7.7.7'] + wanted_value_with_key: [{'is_enabled': True, 'is_up': False}, ... + + Return: + [{'10.1.0.0': {'is_enabled': True, 'is_up': False}}, , ... + + Example: + >>> from runner import keys_values_zipper + >>> list_of_reference_keys = ['10.1.0.0'] + >>> wanted_value_with_key = [{'is_enabled': True, 'is_up': False}] + >>> keys_values_zipper(list_of_reference_keys, wanted_value_with_key) + [{'10.1.0.0': {'is_enabled': True, 'is_up': False}}] + """ + final_result = list() + + if len(list_of_reference_keys) != len(wanted_value_with_key): + raise ValueError("Keys len != from Values len") + + for my_index, my_key in enumerate(list_of_reference_keys): + final_result.append({my_key: wanted_value_with_key[my_index]}) + + return final_result + + +def associate_key_of_my_value(paths: str, wanted_value: List) -> List: + """ + Associate each key defined in path to every value found in output. + + Args: + paths: {"path": "global.peers.*.[is_enabled,is_up]"} + wanted_value: [[True, False], [True, False], [True, False], [True, False]] + + Return: + [{'is_enabled': True, 'is_up': False}, ... + + Example: + >>> from runner import associate_key_of_my_value + >>> path = {"path": "global.peers.*.[is_enabled,is_up]"} + >>> wanted_value = [[True, False], [True, False], [True, False], [True, False]] + {'is_enabled': True, 'is_up': False}, {'is_enabled': True, 'is_up': False}, ... + """ + + # global.peers.*.[is_enabled,is_up] / result.[*].state + find_the_key_of_my_values = paths.split(".")[-1] + + # [is_enabled,is_up] + if find_the_key_of_my_values.startswith("[") and find_the_key_of_my_values.endswith("]"): + # ['is_enabled', 'is_up'] + my_key_value_list = find_the_key_of_my_values.strip("[]").split(",") + # state + else: + my_key_value_list = [find_the_key_of_my_values] + + final_list = list() + + for items in wanted_value: + temp_dict = dict() + + if len(items) != len(my_key_value_list): + raise ValueError("Key's value len != from value len") + + for my_index, my_value in enumerate(items): + temp_dict.update({my_key_value_list[my_index]: my_value}) + final_list.append(temp_dict) + + return final_list diff --git a/tasks.py b/tasks.py index e77216f..dcc089d 100644 --- a/tasks.py +++ b/tasks.py @@ -107,44 +107,44 @@ def pytest(context, local=INVOKE_LOCAL): @task(help={"local": "Run locally or within the Docker container"}) -def black(context, local=INVOKE_LOCAL): +def black(context, path=".", local=INVOKE_LOCAL): """Run black to check that Python files adherence to black standards.""" - exec_cmd = "black --check --diff ." + exec_cmd = "black {path}".format(path=path) run_cmd(context, exec_cmd, local) @task(help={"local": "Run locally or within the Docker container"}) -def flake8(context, local=INVOKE_LOCAL): +def flake8(context, path=".", local=INVOKE_LOCAL): """Run flake8 code analysis.""" - exec_cmd = "flake8 ." + exec_cmd = "flake8 {path}".format(path=path) run_cmd(context, exec_cmd, local) @task(help={"local": "Run locally or within the Docker container"}) -def pylint(context, local=INVOKE_LOCAL): +def pylint(context, path=".", local=INVOKE_LOCAL): """Run pylint code analysis.""" - exec_cmd = 'find . -name "*.py" | xargs pylint' + exec_cmd = 'find {path} -name "*.py" | xargs pylint'.format(path=path) run_cmd(context, exec_cmd, local) @task(help={"local": "Run locally or within the Docker container"}) -def yamllint(context, local=INVOKE_LOCAL): +def yamllint(context, path=".", local=INVOKE_LOCAL): """Run yamllint to validate formatting adheres to NTC defined YAML standards.""" - exec_cmd = "yamllint ." + exec_cmd = "yamllint {path}".format(path=path) run_cmd(context, exec_cmd, local) @task(help={"local": "Run locally or within the Docker container"}) -def pydocstyle(context, local=INVOKE_LOCAL): +def pydocstyle(context, path=".", local=INVOKE_LOCAL): """Run pydocstyle to validate docstring formatting adheres to NTC defined standards.""" - exec_cmd = "pydocstyle ." + exec_cmd = "pydocstyle {path}".format(path=path) run_cmd(context, exec_cmd, local) @task(help={"local": "Run locally or within the Docker container"}) -def bandit(context, local=INVOKE_LOCAL): +def bandit(context, path=".", local=INVOKE_LOCAL): """Run bandit to validate basic static code security analysis.""" - exec_cmd = "bandit --recursive ./ --configfile .bandit.yml" + exec_cmd = "bandit --recursive ./{path} --configfile .bandit.yml".format(path=path) run_cmd(context, exec_cmd, local) @@ -156,14 +156,14 @@ def cli(context): @task(help={"local": "Run locally or within the Docker container"}) -def tests(context, local=INVOKE_LOCAL): +def tests(context, path=".", local=INVOKE_LOCAL): """Run all tests for this repository.""" - black(context, local) - flake8(context, local) - # pylint(context, local) - yamllint(context, local) - # pydocstyle(context, local) - bandit(context, local) - pytest(context, local) + black(context, path, local) + flake8(context, path, local) + pylint(context, path, local) + yamllint(context, path, local) + pydocstyle(context, path, local) + bandit(context, path, local) + pytest(context, path, local) print("All tests have passed!") diff --git a/tests/test_diff_generator.py b/tests/test_diff_generator.py index c13bb56..6fc272a 100644 --- a/tests/test_diff_generator.py +++ b/tests/test_diff_generator.py @@ -1,11 +1,7 @@ -#!/usr/bin/env python3 - import pytest -import sys from .utility import load_json_file from netcompare.evaluator import diff_generator - -sys.path.append("..") +from netcompare.runner import extract_values_from_output assertion_failed_message = """Test output is different from expected output. @@ -13,67 +9,40 @@ expected output: {expected_output} """ -exact_match_of_global_peers_via_napalm_getter = ( - "napalm_getter.json", - { - "check_type": "exact_match", - "path": "global.$peers$.*.[is_enabled,is_up]", - # "reference_key_path": "global.peers", - }, -) +exact_match_of_global_peers_via_napalm_getter = ("napalm_getter.json", "global.$peers$.*.[is_enabled,is_up]", []) exact_match_of_bgpPeerCaps_via_api = ( "api.json", - { - "check_type": "exact_match", - "path": "result[0].vrfs.default.peerList[*].[$peerAddress$,state,bgpPeerCaps]", - # "reference_key_path": "result[0].vrfs.default.peerList[*].peerAddress", - }, + "result[0].vrfs.default.peerList[*].[$peerAddress$,state,bgpPeerCaps]", + [], ) -exact_match_of_bgp_neigh_via_textfsm = ( - "textfsm.json", - { - "check_type": "exact_match", - "path": "result[*].[$bgp_neigh$,state]", - # "reference_key_path": "result[*].bgp_neigh" - }, -) +exact_match_of_bgp_neigh_via_textfsm = ("textfsm.json", "result[*].[$bgp_neigh$,state]", []) raw_diff_of_interface_ma1_via_api_value_exclude = ( "raw_value_exclude.json", - {"check_type": "exact_match", "path": "result[*]", "exclude": ["interfaceStatistics", "interfaceCounters"]}, + "result[*]", + ["interfaceStatistics", "interfaceCounters"], ) raw_diff_of_interface_ma1_via_api_novalue_exclude = ( "raw_novalue_exclude.json", - {"check_type": "exact_match", "exclude": ["interfaceStatistics", "interfaceCounters"]}, + None, + ["interfaceStatistics", "interfaceCounters"], ) -raw_diff_of_interface_ma1_via_api_novalue_noexclude = ( - "raw_novalue_noexclude.json", - {"check_type": "exact_match"}, -) +raw_diff_of_interface_ma1_via_api_novalue_noexclude = ("raw_novalue_noexclude.json", None, []) -exact_match_missing_item = ( - "napalm_getter_missing_peer.json", - {"check_type": "exact_match"}, -) +exact_match_missing_item = ("napalm_getter_missing_peer.json", None, []) -exact_match_additional_item = ("napalm_getter_additional_peer.json", {"check_type": "exact_match"}) +exact_match_additional_item = ("napalm_getter_additional_peer.json", None, []) -exact_match_changed_item = ( - "napalm_getter_changed_peer.json", - {"check_type": "exact_match"}, -) +exact_match_changed_item = ("napalm_getter_changed_peer.json", None, []) exact_match_multi_nested_list = ( "exact_match_nested.json", - { - "check_type": "exact_match", - "path": "global.$peers$.*.*.ipv4.[accepted_prefixes,received_prefixes]", - # "reference_key_path": "global.peers", - }, + "global.$peers$.*.*.ipv4.[accepted_prefixes,received_prefixes]", + [], ) eval_tests = [ @@ -90,13 +59,14 @@ ] -@pytest.mark.parametrize("filename, path", eval_tests) -def test_eval(filename, path): +@pytest.mark.parametrize("filename, path, exclude", eval_tests) +def test_eval(filename, path, exclude): pre_data = load_json_file("pre", filename) post_data = load_json_file("post", filename) expected_output = load_json_file("results", filename) - - output = diff_generator(pre_data, post_data, path) + pre_value = extract_values_from_output(pre_data, path, exclude) + post_value = extract_values_from_output(post_data, path, exclude) + output = diff_generator(pre_value, post_value) assert expected_output == output, assertion_failed_message.format(output=output, expected_output=expected_output) diff --git a/tests/test_filter_parsers.py b/tests/test_filter_parsers.py new file mode 100644 index 0000000..c38c9a8 --- /dev/null +++ b/tests/test_filter_parsers.py @@ -0,0 +1,44 @@ +"Filter parser unit tests." +import pytest +from netcompare.utils.filter_parsers import exclude_filter + + +assertion_failed_message = """Test output is different from expected output. +output: {output} +expected output: {expected_output} +""" + +exclude_filter_case_1 = ( + ["interfaceStatistics"], + { + "interfaces": { + "Management1": { + "name": "Management1", + "interfaceStatus": "connected", + "autoNegotiate": "success", + "interfaceStatistics": { + "inBitsRate": 3403.4362520883615, + "inPktsRate": 3.7424095978179257, + "outBitsRate": 16249.69114419833, + "updateInterval": 300, + "outPktsRate": 2.1111866059750692, + }, + } + } + }, + { + "interfaces": { + "Management1": {"name": "Management1", "interfaceStatus": "connected", "autoNegotiate": "success"} + } + }, +) + +exclude_filter_tests = [ + exclude_filter_case_1, +] + + +@pytest.mark.parametrize("exclude, data, expected_output", exclude_filter_tests) +def test_exclude_filter(exclude, data, expected_output): + exclude_filter(data, exclude) + assert expected_output == data, assertion_failed_message.format(output=data, expected_output=expected_output) diff --git a/tests/test_flatten.py b/tests/test_flatten.py new file mode 100644 index 0000000..ab32404 --- /dev/null +++ b/tests/test_flatten.py @@ -0,0 +1,24 @@ +"Flatten list unit test" +import pytest +from netcompare.utils.flatten import flatten_list + + +assertion_failed_message = """Test output is different from expected output. +output: {output} +expected output: {expected_output} +""" + +flatten_list_case_1 = ( + [[[[-1, 0], [-1, 0]]]], + [[-1, 0], [-1, 0]], +) + +flatten_list_tests = [ + flatten_list_case_1, +] + + +@pytest.mark.parametrize("data, expected_output", flatten_list_tests) +def test_value_parser(data, expected_output): + output = flatten_list(data) + assert expected_output == output, assertion_failed_message.format(output=output, expected_output=expected_output) diff --git a/tests/test_jmspath_parsers.py b/tests/test_jmspath_parsers.py new file mode 100644 index 0000000..cd069bc --- /dev/null +++ b/tests/test_jmspath_parsers.py @@ -0,0 +1,70 @@ +"JMSPath parser unit tests." +import pytest +from netcompare.utils.jmspath_parsers import jmspath_value_parser, jmspath_refkey_parser + + +assertion_failed_message = """Test output is different from expected output. +output: {output} +expected output: {expected_output} +""" + +value_parser_case_1 = ( + "result[0].vrfs.default.peerList[*].[$peerAddress$,prefixesReceived]", + "result[0].vrfs.default.peerList[*].[peerAddress,prefixesReceived]", +) +value_parser_case_2 = ( + "result[0].vrfs.default.peerList[*].[peerAddress,$prefixesReceived$]", + "result[0].vrfs.default.peerList[*].[peerAddress,prefixesReceived]", +) +value_parser_case_3 = ( + "result[0].vrfs.default.peerList[*].[interfaceCounters,$peerAddress$,prefixesReceived]", + "result[0].vrfs.default.peerList[*].[interfaceCounters,peerAddress,prefixesReceived]", +) +value_parser_case_4 = ( + "result[0].$vrfs$.default.peerList[*].[peerAddress,prefixesReceived]", + "result[0].vrfs.default.peerList[*].[peerAddress,prefixesReceived]", +) + +keyref_parser_case_1 = ( + "result[0].vrfs.default.peerList[*].[$peerAddress$,prefixesReceived]", + "result[0].vrfs.default.peerList[*].peerAddress", +) +keyref_parser_case_2 = ( + "result[0].vrfs.default.peerList[*].[peerAddress,$prefixesReceived$]", + "result[0].vrfs.default.peerList[*].prefixesReceived", +) +keyref_parser_case_3 = ( + "result[0].vrfs.default.peerList[*].[interfaceCounters,$peerAddress$,prefixesReceived]", + "result[0].vrfs.default.peerList[*].peerAddress", +) +keyref_parser_case_4 = ( + "result[0].$vrfs$.default.peerList[*].[peerAddress,prefixesReceived]", + "result[0].vrfs", +) + + +value_parser_tests = [ + value_parser_case_1, + value_parser_case_2, + value_parser_case_3, + value_parser_case_4, +] + +keyref_parser_tests = [ + keyref_parser_case_1, + keyref_parser_case_2, + keyref_parser_case_3, + keyref_parser_case_4, +] + + +@pytest.mark.parametrize("path, expected_output", value_parser_tests) +def test_value_parser(path, expected_output): + output = jmspath_value_parser(path) + assert expected_output == output, assertion_failed_message.format(output=output, expected_output=expected_output) + + +@pytest.mark.parametrize("path, expected_output", keyref_parser_tests) +def test_keyref_parser(path, expected_output): + output = jmspath_refkey_parser(path) + assert expected_output == output, assertion_failed_message.format(output=output, expected_output=expected_output) diff --git a/tests/test_refkey.py b/tests/test_refkey.py new file mode 100644 index 0000000..7ab4ceb --- /dev/null +++ b/tests/test_refkey.py @@ -0,0 +1,56 @@ +"Reference key unit tests." +import pytest +from netcompare.utils.refkey import keys_cleaner, keys_values_zipper, associate_key_of_my_value + + +assertion_failed_message = """Test output is different from expected output. +output: {output} +expected output: {expected_output} +""" + +keys_cleaner_case_1 = ( + {"10.1.0.0": {"address_family": "ipv4"}}, + ["10.1.0.0"], +) + +keys_zipper_case_1 = ( + ["10.1.0.0", "10.2.0.0"], + [{"is_enabled": False, "is_up": False}, {"is_enabled": True, "is_up": True}], + [{"10.1.0.0": {"is_enabled": False, "is_up": False}}, {"10.2.0.0": {"is_enabled": True, "is_up": True}}], +) + +keys_association_case_1 = ( + "global.peers.*.[is_enabled,is_up]", + [[True, False], [True, False]], + [{"is_enabled": True, "is_up": False}, {"is_enabled": True, "is_up": False}], +) + +keys_cleaner_tests = [ + keys_cleaner_case_1, +] + +keys_zipper_tests = [ + keys_zipper_case_1, +] + +keys_association_test = [ + keys_association_case_1, +] + + +@pytest.mark.parametrize("wanted_key, expected_output", keys_cleaner_tests) +def test_keys_cleaner(wanted_key, expected_output): + output = keys_cleaner(wanted_key) + assert expected_output == output, assertion_failed_message.format(output=output, expected_output=expected_output) + + +@pytest.mark.parametrize("ref_keys, wanted_values, expected_output", keys_zipper_tests) +def test_keys_zipper(ref_keys, wanted_values, expected_output): + output = keys_values_zipper(ref_keys, wanted_values) + assert expected_output == output, assertion_failed_message.format(output=output, expected_output=expected_output) + + +@pytest.mark.parametrize("path, wanted_values, expected_output", keys_association_test) +def test_keys_association(path, wanted_values, expected_output): + output = associate_key_of_my_value(path, wanted_values) + assert expected_output == output, assertion_failed_message.format(output=output, expected_output=expected_output) diff --git a/tests/test_type_check.py b/tests/test_type_check.py index 325b811..492050d 100644 --- a/tests/test_type_check.py +++ b/tests/test_type_check.py @@ -1,9 +1,7 @@ -import sys +"Check Type unit tests." import pytest -from .utility import load_json_file from netcompare.check_type import CheckType, ExactMatchType, ToleranceType - -sys.path.append("..") +from .utility import load_json_file @pytest.mark.parametrize( @@ -21,29 +19,17 @@ def test_CheckType_raises_NotImplementedError_for_invalid_check_type(): CheckType.init("does_not_exist") -def test_CheckType_raises_NotImplementedError_when_calling_check_logic_method(): - """Validate that CheckType raises a NotImplementedError when passed a non-existant check_type.""" - with pytest.raises(NotImplementedError): - CheckType().check_logic() - - exact_match_test_values_no_change = ( ("exact_match",), "api.json", - { - "path": "result[0].vrfs.default.peerList[*].[$peerAddress$,establishedTransitions]", - # "reference_key_path": "result[0].vrfs.default.peerList[*].peerAddress", - }, + "result[0].vrfs.default.peerList[*].[$peerAddress$,establishedTransitions]", ({}, True), ) exact_match_test_values_changed = ( ("exact_match",), "api.json", - { - "path": "result[0].vrfs.default.peerList[*].[$peerAddress$,prefixesSent]", - # "reference_key_path": "result[0].vrfs.default.peerList[*].peerAddress", - }, + "result[0].vrfs.default.peerList[*].[$peerAddress$,prefixesSent]", ( { "10.1.0.0": {"prefixesSent": {"new_value": 52, "old_value": 50}}, @@ -56,30 +42,21 @@ def test_CheckType_raises_NotImplementedError_when_calling_check_logic_method(): tolerance_test_values_no_change = ( ("tolerance", 10), "api.json", - { - "path": "result[0].vrfs.default.peerList[*].[$peerAddress$,establishedTransitions]", - # "reference_key_path": "result[0].vrfs.default.peerList[*].peerAddress", - }, + "result[0].vrfs.default.peerList[*].[$peerAddress$,establishedTransitions]", ({}, True), ) tolerance_test_values_within_threshold = ( ("tolerance", 10), "api.json", - { - "path": "result[0].vrfs.default.peerList[*].[$peerAddress$,prefixesSent]", - # "reference_key_path": "result[0].vrfs.default.peerList[*].peerAddress", - }, + "result[0].vrfs.default.peerList[*].[$peerAddress$,prefixesSent]", ({}, True), ) tolerance_test_values_beyond_threshold = ( ("tolerance", 10), "api.json", - { - "path": "result[0].vrfs.default.peerList[*].[$peerAddress$,prefixesReceived]", - # "reference_key_path": "result[0].vrfs.default.peerList[*].peerAddress", - }, + "result[0].vrfs.default.peerList[*].[$peerAddress$,prefixesReceived]", ( { "10.1.0.0": {"prefixesReceived": {"new_value": 120, "old_value": 100}}, @@ -104,41 +81,34 @@ def test_check_type_results(check_type_args, filename, path, expected_results): check = CheckType.init(*check_type_args) pre_data = load_json_file("pre", filename) post_data = load_json_file("post", filename) - actual_results = check.evaluate(pre_data, post_data, path) + pre_value = check.extract_value_from_json_path(pre_data, path) + post_value = check.extract_value_from_json_path(post_data, path) + actual_results = check.evaluate(pre_value, post_value) assert actual_results == expected_results napalm_bgp_neighbor_status = ( "napalm_get_bgp_neighbors.json", ("exact_match",), - { - "path": "global.$peers$.*.[is_enabled,is_up]", - # "reference_key_path": "global.peers" - }, + "global.$peers$.*.[is_enabled,is_up]", 0, ) napalm_bgp_neighbor_prefixes_ipv4 = ( "napalm_get_bgp_neighbors.json", ("tolerance", 10), - { - "path": "global.$peers$.*.*.ipv4.[accepted_prefixes,received_prefixes,sent_prefixes]", - # "reference_key_path": "global.peers", - }, + "global.$peers$.*.*.ipv4.[accepted_prefixes,received_prefixes,sent_prefixes]", 1, ) napalm_bgp_neighbor_prefixes_ipv6 = ( "napalm_get_bgp_neighbors.json", ("tolerance", 10), - { - "path": "global.$peers$.*.*.ipv6.[accepted_prefixes,received_prefixes,sent_prefixes]", - # "reference_key_path": "global.peers", - }, + "global.$peers$.*.*.ipv6.[accepted_prefixes,received_prefixes,sent_prefixes]", 2, ) -napalm_get_lldp_neighbors_exact_raw = ("napalm_get_lldp_neighbors.json", ("exact_match",), {}, 0) +napalm_get_lldp_neighbors_exact_raw = ("napalm_get_lldp_neighbors.json", ("exact_match",), None, 0) check_tests = [ napalm_bgp_neighbor_status, @@ -151,10 +121,13 @@ def test_check_type_results(check_type_args, filename, path, expected_results): @pytest.mark.parametrize("filename, check_args, path, result_index", check_tests) def test_checks(filename, check_args, path, result_index): """Validate multiple checks on the same data to catch corner cases.""" + check = CheckType.init(*check_args) pre_data = load_json_file("pre", filename) post_data = load_json_file("post", filename) result = load_json_file("results", filename) - check = CheckType.init(*check_args) - check_output = check.evaluate(pre_data, post_data, path) - assert list(check_output) == result[result_index] + pre_value = check.extract_value_from_json_path(pre_data, path) + post_value = check.extract_value_from_json_path(post_data, path) + actual_results = check.evaluate(pre_value, post_value) + + assert list(actual_results) == result[result_index]