Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
194 changes: 82 additions & 112 deletions pytest_splunk_addon/plugin.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,8 @@ def pytest_configure(config):
"""
Setup configuration after command-line options are parsed
"""
config.addinivalue_line(
"markers", "splunk_addon_internal_errors: Check Errors"
)
config.addinivalue_line(
"markers", "splunk_addon_searchtime: Test search time only"
)
config.addinivalue_line("markers", "splunk_addon_internal_errors: Check Errors")
config.addinivalue_line("markers", "splunk_addon_searchtime: Test search time only")


def dedup_tests(test_list):
Expand All @@ -48,13 +44,9 @@ def pytest_generate_tests(metafunc):
"""
for fixture in metafunc.fixturenames:
if fixture.startswith("splunk_app"):
LOGGER.info(
"generating testcases for splunk_app. fixture=%s", fixture
)
LOGGER.info("generating testcases for splunk_app. fixture=%s", fixture)
# Load associated test data
tests = load_splunk_tests(
metafunc.config.getoption("splunk_app"), fixture
)
tests = load_splunk_tests(metafunc.config.getoption("splunk_app"), fixture)
metafunc.parametrize(fixture, dedup_tests(tests))


Expand Down Expand Up @@ -129,15 +121,8 @@ def return_tags(tags_property, stanza_name):
List of pytest parameters
"""
return pytest.param(
{
"tag_query": stanza_name,
tags_property.value + "_tag": tags_property.name,
},
id=stanza_name
+ " | "
+ tags_property.name
+ "="
+ tags_property.value,
{"tag_query": stanza_name, tags_property.value + "_tag": tags_property.name,},
id=stanza_name + " | " + tags_property.name + "=" + tags_property.value,
)


Expand All @@ -158,14 +143,10 @@ def load_splunk_props(props):
elif props_section.startswith("source::"):
LOGGER.info("Parsing source stanza=%s", props_section)
for props_source in list(get_list_of_sources(props_section)):
yield return_props_stanza_param(
props_section, props_source, "source"
)
yield return_props_stanza_param(props_section, props_source, "source")
else:
LOGGER.info("parsing sourcetype stanza=%s", props_section)
yield return_props_stanza_param(
props_section, props_section, "sourcetype"
)
yield return_props_stanza_param(props_section, props_section, "sourcetype")


def return_props_stanza_param(stanza_id, stanza_value, stanza_type):
Expand All @@ -192,9 +173,7 @@ def return_props_stanza_param(stanza_id, stanza_value, stanza_type):
stanza_value,
str(test_name),
)
return pytest.param(
{"field": stanza_type, "value": stanza_value}, id=test_name
)
return pytest.param({"field": stanza_type, "value": stanza_value}, id=test_name)


def load_splunk_fields(app, props, transforms):
Expand Down Expand Up @@ -247,20 +226,15 @@ def load_splunk_fields(app, props, transforms):
)
elif current.startswith("REPORT-"):
yield from return_transforms_report(
transforms,
stanza_type,
each_stanza_name,
props_property,
transforms, stanza_type, each_stanza_name, props_property,
)
elif re.match('LOOKUP', current, re.IGNORECASE):
elif re.match("LOOKUP", current, re.IGNORECASE):
yield from return_props_lookup(
stanza_type, each_stanza_name, props_property, app

)

def get_params_from_regex(
regex, property_value, stanza_type, stanza_name, fields
):

def get_params_from_regex(regex, property_value, stanza_type, stanza_name, fields):
"""
Returns the fields captured using regex as pytest parameters

Expand All @@ -283,9 +257,7 @@ def get_params_from_regex(
for matchNum, match in enumerate(matches, start=1):
for groupNum in range(0, len(match.groups())):
groupNum = groupNum + 1
field_test_name = "{}_field::{}".format(
stanza_name, match.group(groupNum)
)
field_test_name = "{}_field::{}".format(stanza_name, match.group(groupNum))
yield pytest.param(
{
"stanza_type": stanza_type,
Expand All @@ -297,9 +269,7 @@ def get_params_from_regex(
fields.append(match.group(groupNum))


def return_transforms_report(
transforms, stanza_type, stanza_name, report_property
):
def return_transforms_report(transforms, stanza_type, stanza_name, report_property):
"""
Returns the fields parsed from transforms.conf as pytest parameters

Expand All @@ -316,8 +286,7 @@ def return_transforms_report(
"""
try:
for transforms_section in [
each_stanza.strip()
for each_stanza in report_property.value.split(",")
each_stanza.strip() for each_stanza in report_property.value.split(",")
]:
report_test_name = (
f"{stanza_name}::{report_property.name}::{transforms_section}"
Expand All @@ -339,27 +308,19 @@ def return_transforms_report(
),
)
fields.append(section.options["SOURCE_KEY"].value)
if (
"REGEX" in section.options
and section.options["REGEX"].value != ""
):
regex = r"\(\?<([^\>]+)\>"
if "REGEX" in section.options and section.options["REGEX"].value != "":
regex = r"\(\?P?(?:[<'])([^\>'\s]+)[\>']"
yield from get_params_from_regex(
regex,
section.options["REGEX"].value,
stanza_type,
stanza_name,
fields,
)
if (
"FIELDS" in section.options
and section.options["FIELDS"].value != ""
):
if "FIELDS" in section.options and section.options["FIELDS"].value != "":
fields_list = [
each_field.strip()
for each_field in section.options["FIELDS"].value.split(
","
)
for each_field in section.options["FIELDS"].value.split(",")
]
for each_field in fields_list:
yield pytest.param(
Expand All @@ -371,10 +332,7 @@ def return_transforms_report(
id="{}_field::{}".format(stanza_name, each_field),
)
fields.append(each_field)
if (
"FORMAT" in section.options
and section.options["FORMAT"].value != ""
):
if "FORMAT" in section.options and section.options["FORMAT"].value != "":
regex = r"(\S*)::"
yield from get_params_from_regex(
regex,
Expand Down Expand Up @@ -415,27 +373,27 @@ def return_props_extract(stanza_type, stanza_name, props_property):
generator of fields as pytest parameters
"""
test_name = f"{stanza_name}::{props_property.name}"
regex = r"\(\?<([^\>]+)\>(?:.*(?i)in\s+(.*))?"
matches = re.finditer(regex, props_property.value, re.MULTILINE)
regex = r"\(\?P?(?:[<'])([^\>'\s]+)[\>']"
fields = []
for match_num, match in enumerate(matches, start=1):
for group_num in range(0, len(match.groups())):
group_num = group_num + 1
if match.group(group_num):
field_test_name = "{}_field::{}".format(
stanza_name, match.group(group_num)
)
yield pytest.param(
{
"stanza_type": stanza_type,
"stanza_name": stanza_name,
"fields": [match.group(group_num)],
},
id=field_test_name,
)
fields.append(match.group(group_num))
yield from get_params_from_regex(
regex, props_property.value, stanza_type, stanza_name, fields,
)
# If SOURCE_KEY is used in EXTRACT, generate the test for the same.
regex_for_source_key = r"(?:(?i)in\s+(\w+))\s*$"
extract_source_key = re.search(
regex_for_source_key, props_property.value, re.MULTILINE
)
if extract_source_key:
yield pytest.param(
{
"stanza_type": stanza_type,
"stanza_name": stanza_name,
"fields": extract_source_key.group(1),
},
id="{}_field::{}".format(stanza_name, extract_source_key.group(1)),
)
fields.insert(0, extract_source_key.group(1))
if fields:
fields.reverse()
LOGGER.info(
"(Generated pytest.param for extract."
"stanza_type=%s stanza_name=%s, fields=%s)",
Expand All @@ -444,11 +402,7 @@ def return_props_extract(stanza_type, stanza_name, props_property):
str(fields),
)
yield pytest.param(
{
"stanza_type": stanza_type,
"stanza_name": stanza_name,
"fields": fields,
},
{"stanza_type": stanza_type, "stanza_name": stanza_name, "fields": fields,},
id=test_name,
)

Expand Down Expand Up @@ -477,9 +431,10 @@ def get_lookup_fields(lookup_str):
lookup_str += " OUTPUT "

# Take input fields or output fields depending on the input_output_index
input_output_str = lookup_str.split(" OUTPUT ")[input_output_index].split(" OUTPUTNEW ")[input_output_index]
input_output_str = lookup_str.split(" OUTPUT ")[input_output_index].split(
" OUTPUTNEW "
)[input_output_index]


field_parser = r"(\"(?:\\\"|[^\"])*\"|\'(?:\\\'|[^\'])*\'|[^\s,]+)\s*(?:[aA][sS]\s*(\"(?:\\\"|[^\"])*\"|\'(?:\\\'|[^\'])*\'|[^\s,]+))?"
# field_groups: Group of max 2 fields - (source, destination) for "source as destination"
field_groups = re.findall(field_parser, input_output_str)
Expand All @@ -489,11 +444,17 @@ def get_lookup_fields(lookup_str):
# Taking last non-empty field ensures that the aliased value will have
# higher priority
for each_group in field_groups:
field_list.append([each_field for each_field in reversed(each_group) if each_field][0])
field_list.append(
[each_field for each_field in reversed(each_group) if each_field][0]
)

input_output_field_list.append(field_list)

return {"input_fields": input_output_field_list[0], "output_fields": input_output_field_list[1], "lookup_stanza": lookup_stanza}
return {
"input_fields": input_output_field_list[0],
"output_fields": input_output_field_list[1],
"lookup_stanza": lookup_stanza,
}


def return_props_lookup(stanza_type, stanza_name, props_property, app):
Expand Down Expand Up @@ -530,10 +491,12 @@ def return_props_lookup(stanza_type, stanza_name, props_property, app):

if parsed_fields["lookup_stanza"] in transforms.sects:
stanza = transforms.sects[parsed_fields["lookup_stanza"]]
if 'filename' in stanza.options:
lookup_file = stanza.options['filename'].value
if "filename" in stanza.options:
lookup_file = stanza.options["filename"].value
try:
location = os.path.join(app.package.working_app_path, "lookups", lookup_file)
location = os.path.join(
app.package.working_app_path, "lookups", lookup_file
)
with open(location, "r") as csv_file:
reader = csv.DictReader(csv_file)
fieldnames = reader.fieldnames
Expand All @@ -544,16 +507,33 @@ def return_props_lookup(stanza_type, stanza_name, props_property, app):
# If there is an error. the test should fail with the current fields
# This makes sure the test doesn't exit prematurely
except (OSError, IOError, UnboundLocalError, TypeError) as e:
LOGGER.info("Could not read the lookup file, skipping test. error=%s", str(e))
LOGGER.info(
"Could not read the lookup file, skipping test. error=%s",
str(e),
)

# Test individual fields
for each_field in lookup_field_list:
field_test_name = f"{stanza_name}_field::{each_field}"
yield pytest.param({'stanza_type': stanza_type, 'stanza_name': stanza_name, 'fields': [each_field]}, id=field_test_name)
yield pytest.param(
{
"stanza_type": stanza_type,
"stanza_name": stanza_name,
"fields": [each_field],
},
id=field_test_name,
)

# Test Lookup as a whole
yield pytest.param({'stanza_type': stanza_type, 'stanza_name': stanza_name, 'fields': lookup_field_list}, id=test_name)

yield pytest.param(
{
"stanza_type": stanza_type,
"stanza_name": stanza_name,
"fields": lookup_field_list,
},
id=test_name,
)


def return_props_eval(stanza_type, stanza_name, props_property):
"""
Expand Down Expand Up @@ -586,11 +566,7 @@ def return_props_eval(stanza_type, stanza_name, props_property):
)
test_name = f"{stanza_name}_field::{fields[0]}"
return pytest.param(
{
"stanza_type": stanza_type,
"stanza_name": stanza_name,
"fields": fields,
},
{"stanza_type": stanza_type, "stanza_name": stanza_name, "fields": fields,},
id=test_name,
)

Expand Down Expand Up @@ -619,11 +595,7 @@ def return_props_sourcetype(stanza_type, stanza_name, props_property):
str(fields),
)
return pytest.param(
{
"stanza_type": stanza_type,
"stanza_name": stanza_name,
"fields": fields,
},
{"stanza_type": stanza_type, "stanza_name": stanza_name, "fields": fields,},
id=test_name,
)

Expand Down Expand Up @@ -730,10 +702,8 @@ def return_eventtypes_param(stanza_id):
"""

LOGGER.info(
"Generated pytest.param of eventtype with id=%s",
f"eventtype::{stanza_id}",
"Generated pytest.param of eventtype with id=%s", f"eventtype::{stanza_id}",
)
return pytest.param(
{"field": "eventtype", "value": stanza_id},
id=f"eventtype::{stanza_id}",
{"field": "eventtype", "value": stanza_id}, id=f"eventtype::{stanza_id}",
)
Loading