diff --git a/contentctl/actions/detection_testing/infrastructures/DetectionTestingInfrastructure.py b/contentctl/actions/detection_testing/infrastructures/DetectionTestingInfrastructure.py index 7afa34f3..3a6c5063 100644 --- a/contentctl/actions/detection_testing/infrastructures/DetectionTestingInfrastructure.py +++ b/contentctl/actions/detection_testing/infrastructures/DetectionTestingInfrastructure.py @@ -414,15 +414,12 @@ def test_detection(self, detection: Detection) -> None: data) :param detection: the Detection to test """ - # TODO: do we want to return a failure here if no test exists for a production detection? - # Log and return if no tests exist - if detection.tests is None: - self.pbar.write(f"No test(s) found for {detection.name}") - return # iterate TestGroups for test_group in detection.test_groups: - # If all tests in the group have been skipped, report and continue + # If all tests in the group have been skipped, report and continue. + # Note that the logic for skipping tests for detections tagged manual_test exists in + # the detection builder. if test_group.all_tests_skipped(): self.pbar.write( self.format_pbar_string( diff --git a/contentctl/actions/detection_testing/views/DetectionTestingView.py b/contentctl/actions/detection_testing/views/DetectionTestingView.py index 16f0a71b..c6011d8d 100644 --- a/contentctl/actions/detection_testing/views/DetectionTestingView.py +++ b/contentctl/actions/detection_testing/views/DetectionTestingView.py @@ -84,6 +84,7 @@ def getSummaryObject( tested_detections = [] total_pass = 0 total_fail = 0 + total_skipped = 0 # Iterate the detections tested (anything in the output queue was tested) for detection in self.sync_obj.outputQueue: @@ -93,16 +94,27 @@ def getSummaryObject( ) # Aggregate detection pass/fail metrics - if summary["success"] is True: - total_pass += 1 - else: + if summary["success"] is False: total_fail += 1 + else: + #Test is marked as a success, but we need to determine if there were skipped unit tests + #SKIPPED tests still show a success in this field, but we want to count them differently + pass_increment = 1 + for test in summary.get("tests"): + if test.get("test_type") == "unit" and test.get("status") == "skip": + total_skipped += 1 + #Test should not count as a pass, so do not increment the count + pass_increment = 0 + break + total_pass += pass_increment + # Append to our list tested_detections.append(summary) # Sort s.t. all failures appear first (then by name) - tested_detections.sort(key=lambda x: (x["success"], x["name"])) + #Second short condition is a hack to get detections with unit skipped tests to appear above pass tests + tested_detections.sort(key=lambda x: (x["success"], 0 if x.get("tests",[{}])[0].get("status","status_missing")=="skip" else 1, x["name"])) # Aggregate summaries for the untested detections (anything still in the input queue was untested) total_untested = len(self.sync_obj.inputQueue) @@ -128,14 +140,15 @@ def getSummaryObject( overall_success = False # Compute total detections - total_detections = total_fail + total_pass + total_untested + total_detections = total_fail + total_pass + total_untested + total_skipped + # Compute the percentage of completion for testing, as well as the success rate percent_complete = Utils.getPercent( len(tested_detections), len(untested_detections), 1 ) success_rate = Utils.getPercent( - total_pass, total_detections, 1 + total_pass, total_detections-total_skipped, 1 ) # TODO (cmcginley): add stats around total test cases and unit/integration test @@ -149,6 +162,7 @@ def getSummaryObject( "total_detections": total_detections, "total_pass": total_pass, "total_fail": total_fail, + "total_skipped": total_skipped, "total_untested": total_untested, "total_experimental_or_deprecated": len(deprecated_detections+experimental_detections), "success_rate": success_rate, diff --git a/contentctl/input/detection_builder.py b/contentctl/input/detection_builder.py index 294ae565..e0d4e9c2 100644 --- a/contentctl/input/detection_builder.py +++ b/contentctl/input/detection_builder.py @@ -344,6 +344,23 @@ def skipIntegrationTests(self) -> None: "security_content_obj must be an instance of Detection to skip integration tests, " f"not {type(self.security_content_obj)}" ) + + def skipAllTests(self, manual_test_explanation:str) -> None: + """ + Skip all unit and integration tests if the manual_test flag is defined in the yml + """ + # Sanity check for typing and in setObject wasn't called yet + if self.security_content_obj is not None and isinstance(self.security_content_obj, Detection): + for test in self.security_content_obj.tests: + #This should skip both unit and integration tests as appropriate + test.skip(f"TEST SKIPPED: Detection marked as 'manual_test' with explanation: {manual_test_explanation}") + + else: + raise ValueError( + "security_content_obj must be an instance of Detection to skip unit and integration tests due " + f"to the presence of the manual_test field, not {type(self.security_content_obj)}" + ) + def reset(self) -> None: self.security_content_obj = None diff --git a/contentctl/input/director.py b/contentctl/input/director.py index 075fddb5..95655011 100644 --- a/contentctl/input/director.py +++ b/contentctl/input/director.py @@ -221,6 +221,13 @@ def constructDetection(self, builder: DetectionBuilder, file_path: str) -> None: # TODO: is there a better way to handle this? The `test` portion of the config is not defined for validate if (self.input_dto.config.test is not None) and (not self.input_dto.config.test.enable_integration_testing): builder.skipIntegrationTests() + + if builder.security_content_obj is not None and \ + builder.security_content_obj.tags is not None and \ + isinstance(builder.security_content_obj.tags.manual_test,str): + # Set all tests, both Unit AND Integration, to manual_test. Note that integration test messages + # will intentionally overwrite the justification in the skipIntegrationTests call above. + builder.skipAllTests(builder.security_content_obj.tags.manual_test) def constructSSADetection(self, builder: DetectionBuilder, file_path: str) -> None: diff --git a/contentctl/objects/abstract_security_content_objects/detection_abstract.py b/contentctl/objects/abstract_security_content_objects/detection_abstract.py index 79372fe1..81e20329 100644 --- a/contentctl/objects/abstract_security_content_objects/detection_abstract.py +++ b/contentctl/objects/abstract_security_content_objects/detection_abstract.py @@ -19,7 +19,7 @@ from contentctl.objects.playbook import Playbook from contentctl.helper.link_validator import LinkValidator from contentctl.objects.enums import SecurityContentType - +from contentctl.objects.test_group import TestGroup class Detection_Abstract(SecurityContentObject): # contentType: SecurityContentType = SecurityContentType.detections @@ -60,6 +60,34 @@ class Detection_Abstract(SecurityContentObject): class Config: use_enum_values = True + + # A list of groups of tests, relying on the same data + test_groups: Union[list[TestGroup], None] = None + + @validator("test_groups", always=True) + def validate_test_groups(cls, value, values) -> Union[list[TestGroup], None]: + """ + Validates the `test_groups` field and constructs the model from the list of unit tests + if no explicit construct was provided + :param value: the value of the field `test_groups` + :param values: a dict of the other fields in the Detection model + """ + # if the value was not the None default, do nothing + if value is not None: + return value + + # iterate over the unit tests and create a TestGroup (and as a result, an IntegrationTest) for each + test_groups: list[TestGroup] = [] + for unit_test in values["tests"]: + test_group = TestGroup.derive_from_unit_test(unit_test, values["name"]) + test_groups.append(test_group) + + # now add each integration test to the list of tests + for test_group in test_groups: + values["tests"].append(test_group.integration_test) + return test_groups + + def get_content_dependencies(self) -> list[SecurityContentObject]: return self.playbooks + self.baselines + self.macros + self.lookups @@ -194,16 +222,39 @@ def search_obsersables_exist_validate(cls, v, values): # Found everything return v - # TODO (cmcginley): Fix detection_abstract.tests_validate so that it surfaces validation errors - # (e.g. a lack of tests) to the final results, instead of just showing a failed detection w/ - # no tests (maybe have a message propagated at the detection level? do a separate coverage - # check as part of validation?): - @validator("tests") + @validator("tests", always=True) def tests_validate(cls, v, values): - if values.get("status", "") == DetectionStatus.production.value and not v: - raise ValueError( - "At least one test is REQUIRED for production detection: " + values["name"] - ) + # TODO (cmcginley): Fix detection_abstract.tests_validate so that it surfaces validation errors + # (e.g. a lack of tests) to the final results, instead of just showing a failed detection w/ + # no tests (maybe have a message propagated at the detection level? do a separate coverage + # check as part of validation?): + + + #Only production analytics require tests + if values.get("status","") != DetectionStatus.production.value: + return v + + # All types EXCEPT Correlation MUST have test(s). Any other type, including newly defined types, requires them. + # Accordingly, we do not need to do additional checks if the type is Correlation + if values.get("type","") in set([AnalyticsType.Correlation.value]): + return v + + + # Ensure that there is at least 1 test + if len(v) == 0: + if values.get("tags",None) and values.get("tags").manual_test is not None: + # Detections that are manual_test MAY have detections, but it is not required. If they + # do not have one, then create one which will be a placeholder. + # Note that this fake UnitTest (and by extension, Integration Test) will NOT be generated + # if there ARE test(s) defined for a Detection. + placeholder_test = UnitTest(name="PLACEHOLDER FOR DETECTION TAGGED MANUAL_TEST WITH NO TESTS SPECIFIED IN YML FILE", attack_data=[]) + return [placeholder_test] + + else: + raise ValueError("At least one test is REQUIRED for production detection: " + values.get("name", "NO NAME FOUND")) + + + #No issues - at least one test provided for production type requiring testing return v @validator("datamodel") diff --git a/contentctl/objects/detection.py b/contentctl/objects/detection.py index e8dab83c..84321631 100644 --- a/contentctl/objects/detection.py +++ b/contentctl/objects/detection.py @@ -2,7 +2,7 @@ from pydantic import validator from contentctl.objects.abstract_security_content_objects.detection_abstract import Detection_Abstract -from contentctl.objects.test_group import TestGroup + class Detection(Detection_Abstract): @@ -16,29 +16,4 @@ class Detection(Detection_Abstract): # them or modifying their behavior may cause # undefined issues with the contentctl tooling # or output of the tooling. - - # A list of groups of tests, relying on the same data - test_groups: Union[list[TestGroup], None] = None - - @validator("test_groups", always=True) - def validate_test_groups(cls, value, values) -> Union[list[TestGroup], None]: - """ - Validates the `test_groups` field and constructs the model from the list of unit tests - if no explicit construct was provided - :param value: the value of the field `test_groups` - :param values: a dict of the other fields in the Detection model - """ - # if the value was not the None default, do nothing - if value is not None: - return value - - # iterate over the unit tests and create a TestGroup (and as a result, an IntegrationTest) for each - test_groups: list[TestGroup] = [] - for unit_test in values["tests"]: - test_group = TestGroup.derive_from_unit_test(unit_test, values["name"]) - test_groups.append(test_group) - - # now add each integration test to the list of tests - for test_group in test_groups: - values["tests"].append(test_group.integration_test) - return test_groups + pass \ No newline at end of file diff --git a/contentctl/objects/unit_test_result.py b/contentctl/objects/unit_test_result.py index 9467dd78..40924790 100644 --- a/contentctl/objects/unit_test_result.py +++ b/contentctl/objects/unit_test_result.py @@ -13,7 +13,7 @@ class UnitTestResult(BaseTestResult): missing_observables: list[str] = [] - + def set_job_content( self, content: Union[Record, None], @@ -37,18 +37,18 @@ def set_job_content( self.duration = round(duration, 2) self.exception = exception self.status = status - + self.job_content = content + # Set the job content, if given if content is not None: - self.job_content = content - if self.status == TestResultStatus.PASS: self.message = "TEST PASSED" elif self.status == TestResultStatus.FAIL: self.message = "TEST FAILED" elif self.status == TestResultStatus.ERROR: - self.message == "TEST FAILED (ERROR)" + self.message = "TEST ERROR" elif self.status == TestResultStatus.SKIP: + #A test that was SKIPPED should not have job content since it should not have been run. self.message = "TEST SKIPPED" if not config.instance_address.startswith("http://"): @@ -61,11 +61,16 @@ def set_job_content( sid=content.get("sid", None), ) + elif self.status == TestResultStatus.SKIP: + self.message = "TEST SKIPPED" + pass + elif content is None: - self.job_content = None self.status = TestResultStatus.ERROR if self.exception is not None: self.message = f"EXCEPTION: {str(self.exception)}" + else: + self.message = f"ERROR with no more specific message available." self.sid_link = NO_SID return self.success