Skip to content
Original file line number Diff line number Diff line change
Expand Up @@ -414,15 +414,12 @@ def test_detection(self, detection: Detection) -> None:
data)
:param detection: the Detection to test
"""
# TODO: do we want to return a failure here if no test exists for a production detection?
# Log and return if no tests exist
if detection.tests is None:
self.pbar.write(f"No test(s) found for {detection.name}")
return

# iterate TestGroups
for test_group in detection.test_groups:
# If all tests in the group have been skipped, report and continue
# If all tests in the group have been skipped, report and continue.
# Note that the logic for skipping tests for detections tagged manual_test exists in
# the detection builder.
if test_group.all_tests_skipped():
self.pbar.write(
self.format_pbar_string(
Expand Down
26 changes: 20 additions & 6 deletions contentctl/actions/detection_testing/views/DetectionTestingView.py
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,7 @@ def getSummaryObject(
tested_detections = []
total_pass = 0
total_fail = 0
total_skipped = 0

# Iterate the detections tested (anything in the output queue was tested)
for detection in self.sync_obj.outputQueue:
Expand All @@ -93,16 +94,27 @@ def getSummaryObject(
)

# Aggregate detection pass/fail metrics
if summary["success"] is True:
total_pass += 1
else:
if summary["success"] is False:
total_fail += 1
else:
#Test is marked as a success, but we need to determine if there were skipped unit tests
#SKIPPED tests still show a success in this field, but we want to count them differently
pass_increment = 1
for test in summary.get("tests"):
if test.get("test_type") == "unit" and test.get("status") == "skip":
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think, and I don't need this to be blocking, but I'd like to make detection state resolution (e.g. is it considered a skip/fail/error/pass) a property of the detection itself

E.g. define a property function status which iterates over tests

  • Pass: If at least one test passes and the rest pass/skip
  • Skip: If all tests skip
  • Fail: If at least one test fails and the rest pass/skip/fail
  • Error: If at least one detection errors (regardless of other test states)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💯
I think we should make this a pydantic computed_property. On top of that, I would add an additional option to this set of values (which is the default value) along the lines of NOT_YET_TESTED.
This way, when we are rendering the current state of all tests, like in a user interface, we can easily differentiate /sort all sorts of different tests.

total_skipped += 1
#Test should not count as a pass, so do not increment the count
pass_increment = 0
break
total_pass += pass_increment


# Append to our list
tested_detections.append(summary)

# Sort s.t. all failures appear first (then by name)
tested_detections.sort(key=lambda x: (x["success"], x["name"]))
#Second short condition is a hack to get detections with unit skipped tests to appear above pass tests
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can I ask why you want skip above pass?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The results file can be, and often is, REALLY large. I think it makes sense for errors/failures at the top, as these are the ones that deserve the most attention (and what people are likely to be looking for when they open this file).

I personally think "pass" is the least interesting since it means they are working as expected.
I think SKIP falls somewhere in between a failure and a pass. For example, someone might want to manually evaluable these OR note how may tests have been skipped.

Definitely a personal opinion.

tested_detections.sort(key=lambda x: (x["success"], 0 if x.get("tests",[{}])[0].get("status","status_missing")=="skip" else 1, x["name"]))

# Aggregate summaries for the untested detections (anything still in the input queue was untested)
total_untested = len(self.sync_obj.inputQueue)
Expand All @@ -128,14 +140,15 @@ def getSummaryObject(
overall_success = False

# Compute total detections
total_detections = total_fail + total_pass + total_untested
total_detections = total_fail + total_pass + total_untested + total_skipped


# Compute the percentage of completion for testing, as well as the success rate
percent_complete = Utils.getPercent(
len(tested_detections), len(untested_detections), 1
)
success_rate = Utils.getPercent(
total_pass, total_detections, 1
total_pass, total_detections-total_skipped, 1
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a non-blocking comment and a reiteration of a conversation in slack; just including it for good book-keeping

I think future work could break down another metric as tested_detections which is the value you are calculating in-line here. It would provide clarity to someone reading the YAML as to how success rate is derived. Likewise, total_detection should become a count of all detections w/in the scope of this test run (e.g. all, selected, changes) and if all, should include experimental and deprecated in its count, with separate counts for total_production total_experimental etc.

I think this is just good data to have, but this also serves a request from Javier around having a metics for test coverage of all detections delivered in ESCU (which would include experimental and deprecated)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, I think this type of additional breakdown is a good idea.
I also think it should be abstracted from this particular view to a Model so that it can be used in different UI components, such as:

  1. The file, like we use it today
  2. Command line output
  3. Other User Interfaces we may support in the future.

)

# TODO (cmcginley): add stats around total test cases and unit/integration test
Expand All @@ -149,6 +162,7 @@ def getSummaryObject(
"total_detections": total_detections,
"total_pass": total_pass,
"total_fail": total_fail,
"total_skipped": total_skipped,
"total_untested": total_untested,
"total_experimental_or_deprecated": len(deprecated_detections+experimental_detections),
"success_rate": success_rate,
Expand Down
17 changes: 17 additions & 0 deletions contentctl/input/detection_builder.py
Original file line number Diff line number Diff line change
Expand Up @@ -344,6 +344,23 @@ def skipIntegrationTests(self) -> None:
"security_content_obj must be an instance of Detection to skip integration tests, "
f"not {type(self.security_content_obj)}"
)

def skipAllTests(self, manual_test_explanation:str) -> None:
"""
Skip all unit and integration tests if the manual_test flag is defined in the yml
"""
# Sanity check for typing and in setObject wasn't called yet
if self.security_content_obj is not None and isinstance(self.security_content_obj, Detection):
for test in self.security_content_obj.tests:
#This should skip both unit and integration tests as appropriate
test.skip(f"TEST SKIPPED: Detection marked as 'manual_test' with explanation: {manual_test_explanation}")

else:
raise ValueError(
"security_content_obj must be an instance of Detection to skip unit and integration tests due "
f"to the presence of the manual_test field, not {type(self.security_content_obj)}"
)


def reset(self) -> None:
self.security_content_obj = None
Expand Down
7 changes: 7 additions & 0 deletions contentctl/input/director.py
Original file line number Diff line number Diff line change
Expand Up @@ -221,6 +221,13 @@ def constructDetection(self, builder: DetectionBuilder, file_path: str) -> None:
# TODO: is there a better way to handle this? The `test` portion of the config is not defined for validate
if (self.input_dto.config.test is not None) and (not self.input_dto.config.test.enable_integration_testing):
builder.skipIntegrationTests()

if builder.security_content_obj is not None and \
builder.security_content_obj.tags is not None and \
isinstance(builder.security_content_obj.tags.manual_test,str):
# Set all tests, both Unit AND Integration, to manual_test. Note that integration test messages
# will intentionally overwrite the justification in the skipIntegrationTests call above.
builder.skipAllTests(builder.security_content_obj.tags.manual_test)


def constructSSADetection(self, builder: DetectionBuilder, file_path: str) -> None:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
from contentctl.objects.playbook import Playbook
from contentctl.helper.link_validator import LinkValidator
from contentctl.objects.enums import SecurityContentType

from contentctl.objects.test_group import TestGroup

class Detection_Abstract(SecurityContentObject):
# contentType: SecurityContentType = SecurityContentType.detections
Expand Down Expand Up @@ -60,6 +60,34 @@ class Detection_Abstract(SecurityContentObject):
class Config:
use_enum_values = True


# A list of groups of tests, relying on the same data
test_groups: Union[list[TestGroup], None] = None
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see you migrated this code from Detection

Can you explain the intention behind having separate classes here (detection and the abstract)? I'm fine w/ this code living in either class, but I chose Detection bc I thought the spirit of the abstract class was to be a more faithful representation of the model as it exists in YAML, and since the test_groups attribute is an inferred field, I thought maybe it made sense to live elsewhere.

So tl;dr I'm totally fine with this, and no change required, just curious, and wondering if we don't have a need for two separate classes, perhaps we could collapse them into one in the future

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Great question! Many contentctl users want to change content somehow (and detections are the content they are most-often interested in changing). These changes could be, but are not limited to:

  1. Adding new fields (and possibly building validators for those fields)
  2. Setting more or less strict validations on fields
  3. Setting values for fields already defined in that object (but which may be missing from their YMLs)
  4. Disabling some of the validations we do.

However, if they change Detection_Abstract and WE change Detection_Abstract, it has a high probability of creating painful merge conflict issues. As such, the plan was to retain control of Detection_Abstract and ship all our out-of-the-box functionality in that file. Then if customers want their own functionality, they can add it to Detection.py.
This way, pulling the latest contentctl updates should be painless and without merge conflicts. We are hoping to do this with all types, but have started with Detection since it is the one that receives user changes most often.


@validator("test_groups", always=True)
def validate_test_groups(cls, value, values) -> Union[list[TestGroup], None]:
"""
Validates the `test_groups` field and constructs the model from the list of unit tests
if no explicit construct was provided
:param value: the value of the field `test_groups`
:param values: a dict of the other fields in the Detection model
"""
# if the value was not the None default, do nothing
if value is not None:
return value

# iterate over the unit tests and create a TestGroup (and as a result, an IntegrationTest) for each
test_groups: list[TestGroup] = []
for unit_test in values["tests"]:
test_group = TestGroup.derive_from_unit_test(unit_test, values["name"])
test_groups.append(test_group)

# now add each integration test to the list of tests
for test_group in test_groups:
values["tests"].append(test_group.integration_test)
return test_groups


def get_content_dependencies(self) -> list[SecurityContentObject]:
return self.playbooks + self.baselines + self.macros + self.lookups

Expand Down Expand Up @@ -194,16 +222,39 @@ def search_obsersables_exist_validate(cls, v, values):
# Found everything
return v

# TODO (cmcginley): Fix detection_abstract.tests_validate so that it surfaces validation errors
# (e.g. a lack of tests) to the final results, instead of just showing a failed detection w/
# no tests (maybe have a message propagated at the detection level? do a separate coverage
# check as part of validation?):
@validator("tests")
@validator("tests", always=True)
def tests_validate(cls, v, values):
if values.get("status", "") == DetectionStatus.production.value and not v:
raise ValueError(
"At least one test is REQUIRED for production detection: " + values["name"]
)
# TODO (cmcginley): Fix detection_abstract.tests_validate so that it surfaces validation errors
# (e.g. a lack of tests) to the final results, instead of just showing a failed detection w/
# no tests (maybe have a message propagated at the detection level? do a separate coverage
# check as part of validation?):


#Only production analytics require tests
if values.get("status","") != DetectionStatus.production.value:
return v

# All types EXCEPT Correlation MUST have test(s). Any other type, including newly defined types, requires them.
# Accordingly, we do not need to do additional checks if the type is Correlation
if values.get("type","") in set([AnalyticsType.Correlation.value]):
return v


# Ensure that there is at least 1 test
if len(v) == 0:
if values.get("tags",None) and values.get("tags").manual_test is not None:
# Detections that are manual_test MAY have detections, but it is not required. If they
# do not have one, then create one which will be a placeholder.
# Note that this fake UnitTest (and by extension, Integration Test) will NOT be generated
# if there ARE test(s) defined for a Detection.
placeholder_test = UnitTest(name="PLACEHOLDER FOR DETECTION TAGGED MANUAL_TEST WITH NO TESTS SPECIFIED IN YML FILE", attack_data=[])
return [placeholder_test]

else:
raise ValueError("At least one test is REQUIRED for production detection: " + values.get("name", "NO NAME FOUND"))


#No issues - at least one test provided for production type requiring testing
return v

@validator("datamodel")
Expand Down
29 changes: 2 additions & 27 deletions contentctl/objects/detection.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
from pydantic import validator

from contentctl.objects.abstract_security_content_objects.detection_abstract import Detection_Abstract
from contentctl.objects.test_group import TestGroup



class Detection(Detection_Abstract):
Expand All @@ -16,29 +16,4 @@ class Detection(Detection_Abstract):
# them or modifying their behavior may cause
# undefined issues with the contentctl tooling
# or output of the tooling.

# A list of groups of tests, relying on the same data
test_groups: Union[list[TestGroup], None] = None

@validator("test_groups", always=True)
def validate_test_groups(cls, value, values) -> Union[list[TestGroup], None]:
"""
Validates the `test_groups` field and constructs the model from the list of unit tests
if no explicit construct was provided
:param value: the value of the field `test_groups`
:param values: a dict of the other fields in the Detection model
"""
# if the value was not the None default, do nothing
if value is not None:
return value

# iterate over the unit tests and create a TestGroup (and as a result, an IntegrationTest) for each
test_groups: list[TestGroup] = []
for unit_test in values["tests"]:
test_group = TestGroup.derive_from_unit_test(unit_test, values["name"])
test_groups.append(test_group)

# now add each integration test to the list of tests
for test_group in test_groups:
values["tests"].append(test_group.integration_test)
return test_groups
pass
17 changes: 11 additions & 6 deletions contentctl/objects/unit_test_result.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@

class UnitTestResult(BaseTestResult):
missing_observables: list[str] = []

def set_job_content(
self,
content: Union[Record, None],
Expand All @@ -37,18 +37,18 @@ def set_job_content(
self.duration = round(duration, 2)
self.exception = exception
self.status = status

self.job_content = content

# Set the job content, if given
if content is not None:
self.job_content = content

if self.status == TestResultStatus.PASS:
self.message = "TEST PASSED"
elif self.status == TestResultStatus.FAIL:
self.message = "TEST FAILED"
elif self.status == TestResultStatus.ERROR:
self.message == "TEST FAILED (ERROR)"
self.message = "TEST ERROR"
elif self.status == TestResultStatus.SKIP:
#A test that was SKIPPED should not have job content since it should not have been run.
self.message = "TEST SKIPPED"

if not config.instance_address.startswith("http://"):
Expand All @@ -61,11 +61,16 @@ def set_job_content(
sid=content.get("sid", None),
)

elif self.status == TestResultStatus.SKIP:
self.message = "TEST SKIPPED"
pass

elif content is None:
self.job_content = None
self.status = TestResultStatus.ERROR
if self.exception is not None:
self.message = f"EXCEPTION: {str(self.exception)}"
else:
self.message = f"ERROR with no more specific message available."
self.sid_link = NO_SID

return self.success