Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -79,3 +79,6 @@ venv
# Vs Code Settings
.vscode
test_report.md

# Pycharm
.idea/
19 changes: 19 additions & 0 deletions pytest_splunk_addon/helmut_lib/SearchUtil.py
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,25 @@ def checkQueryCountIsZero(self, query, max_time=120):
self.logger.debug("Count of results is > 0, it is:%d", result_count)
return False, job.get_results()

def get_search_results(self, query, max_time=120):
"""
Execute a search query
Args:
query (str): query string for Splunk Search
max_time: Amount of time job can wait to finish.
Returns:
events that match the query
"""

self.logger.debug("query is %s", query)
try:
job = self.jobs.create(query, auto_finalize_ec=120, max_time=max_time)
job.wait(max_time)
return job.get_results()
except Exception as e:
self.logger.debug("Errors when executing search!!!")
self.logger.debug(e)

def checkQueryFields(
self,
query,
Expand Down
27 changes: 19 additions & 8 deletions pytest_splunk_addon/standard_lib/fields_tests/test_templates.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,16 @@
from ..addon_parser import Field
import json

TOP_FIVE_STRUCTURALLY_UNIQUE_EVENTS_QUERY_PART = " | dedup punct | head 5"
COUNT_BY_SOURCE_TYPE_SEARCH_QUERY_PART = " | stats count by sourcetype"


class FieldTestTemplates(object):
"""
Test templates to test the knowledge objects of an App
"""

logger = logging.getLogger("pytest-splunk-addon-tests")
logger = logging.getLogger("pytest-splunk-addon")

@pytest.mark.splunk_searchtime_fields
@pytest.mark.splunk_searchtime_internal_errors
Expand Down Expand Up @@ -80,7 +84,7 @@ def test_props_fields(
search + f" AND ({field} IN ({expected_values})"
f" AND NOT {field} IN ({negative_values}))"
)
search += " | stats count by sourcetype"
search += COUNT_BY_SOURCE_TYPE_SEARCH_QUERY_PART

self.logger.info(f"Executing the search query: {search}")

Expand Down Expand Up @@ -125,7 +129,7 @@ def test_props_fields_no_dash_not_empty(
record_property("fields", splunk_searchtime_fields_negative["fields"])

index_list = "(index=" + " OR index=".join(splunk_search_util.search_index.split(',')) + ")"
search = (
base_search = (
f"search {index_list}"
f" {splunk_searchtime_fields_negative['stanza_type']}=\""
f"{splunk_searchtime_fields_negative['stanza']}\""
Expand All @@ -137,8 +141,8 @@ def test_props_fields_no_dash_not_empty(
negative_values = ", ".join([f'"{each}"' for each in field.negative_values])

fields_search.append(f"({field} IN ({negative_values}))")
search += " AND ({})".format(" OR ".join(fields_search))
search += " | stats count by sourcetype"
base_search += " AND ({})".format(" OR ".join(fields_search))
search = base_search + COUNT_BY_SOURCE_TYPE_SEARCH_QUERY_PART

self.logger.info(f"Executing the search query: {search}")

Expand All @@ -149,9 +153,16 @@ def test_props_fields_no_dash_not_empty(
record_property("results", results.as_list)
pp = pprint.PrettyPrinter(indent=4)
result_str = pp.pformat(results.as_list[:10])

query_for_unique_events = base_search + TOP_FIVE_STRUCTURALLY_UNIQUE_EVENTS_QUERY_PART
query_results = splunk_search_util.get_search_results(query_for_unique_events)
results_formatted_str = pp.pformat(query_results.as_list)
assert result, (
f"Query result greater than 0.\nsearch={search}\n"
f"found result={result_str}"
f"found result={result_str}\n"
" === STRUCTURALLY UNIQUE EVENTS:\n"
f"query={query_for_unique_events}\n"
f"events= {results_formatted_str}"
)

@pytest.mark.splunk_searchtime_fields
Expand Down Expand Up @@ -190,7 +201,7 @@ def test_tags(

index_list = "(index=" + " OR index=".join(splunk_search_util.search_index.split(',')) + ")"
search = f"search {index_list} {tag_query} AND tag={tag}"
search += " | stats count by sourcetype"
search += COUNT_BY_SOURCE_TYPE_SEARCH_QUERY_PART

self.logger.info(f"Search: {search}")

Expand Down Expand Up @@ -247,7 +258,7 @@ def test_eventtype(
search = (f"search {index_list} AND "
f"eventtype="
f"\"{splunk_searchtime_fields_eventtypes['stanza']}\"")
search += " | stats count by sourcetype"
search += COUNT_BY_SOURCE_TYPE_SEARCH_QUERY_PART

self.logger.info(
"Testing eventtype =%s", splunk_searchtime_fields_eventtypes["stanza"]
Expand Down