From 072e0f340b1ffcaf304c57dbed71ef388ca19f96 Mon Sep 17 00:00:00 2001 From: pyth0n1c Date: Mon, 17 Jun 2024 13:13:28 -0700 Subject: [PATCH 1/6] Create branch with better filter macro-checking. --- contentctl/input/director.py | 144 +++++++++--------- .../detection_abstract.py | 52 ++++++- contentctl/objects/macro.py | 3 +- 3 files changed, 125 insertions(+), 74 deletions(-) diff --git a/contentctl/input/director.py b/contentctl/input/director.py index eef9879a..099b3930 100644 --- a/contentctl/input/director.py +++ b/contentctl/input/director.py @@ -5,9 +5,8 @@ from pydantic import ValidationError from uuid import UUID from contentctl.input.yml_reader import YmlReader - - - + + from contentctl.objects.detection import Detection from contentctl.objects.story import Story @@ -28,29 +27,69 @@ from contentctl.objects.config import validate - -@dataclass() +@dataclass class DirectorOutputDto: - # Atomic Tests are first because parsing them - # is far quicker than attack_enrichment - atomic_tests: Union[list[AtomicTest],None] - attack_enrichment: AttackEnrichment - detections: list[Detection] - stories: list[Story] - baselines: list[Baseline] - investigations: list[Investigation] - playbooks: list[Playbook] - macros: list[Macro] - lookups: list[Lookup] - deployments: list[Deployment] - ssa_detections: list[SSADetection] - #cve_enrichment: CveEnrichment - - name_to_content_map: dict[str, SecurityContentObject] = field(default_factory=dict) - uuid_to_content_map: dict[UUID, SecurityContentObject] = field(default_factory=dict) - + # Atomic Tests are first because parsing them + # is far quicker than attack_enrichment + atomic_tests: Union[list[AtomicTest], None] + attack_enrichment: AttackEnrichment + detections: list[Detection] + stories: list[Story] + baselines: list[Baseline] + investigations: list[Investigation] + playbooks: list[Playbook] + macros: list[Macro] + lookups: list[Lookup] + deployments: list[Deployment] + ssa_detections: list[SSADetection] + # cve_enrichment: CveEnrichment + + name_to_content_map: dict[str, SecurityContentObject] = field(default_factory=dict) + uuid_to_content_map: dict[UUID, SecurityContentObject] = field(default_factory=dict) + + def addContentToDictMappings(self, content: SecurityContentObject): + content_name = content.name + if isinstance(content, SSADetection): + # Since SSA detections may have the same name as ESCU detection, + # for this function we prepend 'SSA ' to the name. + content_name = f"SSA {content_name}" + if content_name in self.name_to_content_map: + raise ValueError( + f"Duplicate name '{content_name}' with paths:\n" + f" - {content.file_path}\n" + f" - {self.name_to_content_map[content_name].file_path}" + ) + elif content.id in self.uuid_to_content_map: + raise ValueError( + f"Duplicate id '{content.id}' with paths:\n" + f" - {content.file_path}\n" + f" - {self.name_to_content_map[content_name].file_path}" + ) + + if isinstance(content, Lookup): + self.lookups.append(content) + elif isinstance(content, Macro): + self.macros.append(content) + elif isinstance(content, Deployment): + self.deployments.append(content) + elif isinstance(content, Playbook): + self.playbooks.append(content) + elif isinstance(content, Baseline): + self.baselines.append(content) + elif isinstance(content, Investigation): + self.investigations.append(content) + elif isinstance(content, Story): + self.stories.append(content) + elif isinstance(content, Detection): + self.detections.append(content) + elif isinstance(content, SSADetection): + self.ssa_detections.append(content) + else: + raise Exception(f"Unknown security content type: {type(content)}") + self.name_to_content_map[content_name] = content + self.uuid_to_content_map[content.id] = content from contentctl.input.ssa_detection_builder import SSADetectionBuilder @@ -60,13 +99,6 @@ class DirectorOutputDto: from contentctl.helper.utils import Utils - - - - - - - class Director(): input_dto: validate output_dto: DirectorOutputDto @@ -77,27 +109,7 @@ class Director(): def __init__(self, output_dto: DirectorOutputDto) -> None: self.output_dto = output_dto self.ssa_detection_builder = SSADetectionBuilder() - - def addContentToDictMappings(self, content:SecurityContentObject): - content_name = content.name - if isinstance(content,SSADetection): - # Since SSA detections may have the same name as ESCU detection, - # for this function we prepend 'SSA ' to the name. - content_name = f"SSA {content_name}" - if content_name in self.output_dto.name_to_content_map: - raise ValueError(f"Duplicate name '{content_name}' with paths:\n" - f" - {content.file_path}\n" - f" - {self.output_dto.name_to_content_map[content_name].file_path}") - elif content.id in self.output_dto.uuid_to_content_map: - raise ValueError(f"Duplicate id '{content.id}' with paths:\n" - f" - {content.file_path}\n" - f" - {self.output_dto.name_to_content_map[content_name].file_path}") - - self.output_dto.name_to_content_map[content_name] = content - self.output_dto.uuid_to_content_map[content.id] = content - - def execute(self, input_dto: validate) -> None: self.input_dto = input_dto @@ -146,50 +158,41 @@ def createSecurityContent(self, contentType: SecurityContentType) -> None: if contentType == SecurityContentType.lookups: lookup = Lookup.model_validate(modelDict,context={"output_dto":self.output_dto, "config":self.input_dto}) - self.output_dto.lookups.append(lookup) - self.addContentToDictMappings(lookup) + self.output_dto.addContentToDictMappings(lookup) elif contentType == SecurityContentType.macros: macro = Macro.model_validate(modelDict,context={"output_dto":self.output_dto}) - self.output_dto.macros.append(macro) - self.addContentToDictMappings(macro) + self.output_dto.addContentToDictMappings(macro) elif contentType == SecurityContentType.deployments: deployment = Deployment.model_validate(modelDict,context={"output_dto":self.output_dto}) - self.output_dto.deployments.append(deployment) - self.addContentToDictMappings(deployment) + self.output_dto.addContentToDictMappings(deployment) elif contentType == SecurityContentType.playbooks: playbook = Playbook.model_validate(modelDict,context={"output_dto":self.output_dto}) - self.output_dto.playbooks.append(playbook) - self.addContentToDictMappings(playbook) + self.output_dto.addContentToDictMappings(playbook) elif contentType == SecurityContentType.baselines: baseline = Baseline.model_validate(modelDict,context={"output_dto":self.output_dto}) - self.output_dto.baselines.append(baseline) - self.addContentToDictMappings(baseline) + self.output_dto.addContentToDictMappings(baseline) elif contentType == SecurityContentType.investigations: investigation = Investigation.model_validate(modelDict,context={"output_dto":self.output_dto}) - self.output_dto.investigations.append(investigation) - self.addContentToDictMappings(investigation) + self.output_dto.addContentToDictMappings(investigation) elif contentType == SecurityContentType.stories: story = Story.model_validate(modelDict,context={"output_dto":self.output_dto}) - self.output_dto.stories.append(story) - self.addContentToDictMappings(story) + self.output_dto.addContentToDictMappings(story) elif contentType == SecurityContentType.detections: - detection = Detection.model_validate(modelDict,context={"output_dto":self.output_dto}) - self.output_dto.detections.append(detection) - self.addContentToDictMappings(detection) + detection = Detection.model_validate(modelDict,context={"output_dto":self.output_dto, "app":self.input_dto.app}) + self.output_dto.addContentToDictMappings(detection) elif contentType == SecurityContentType.ssa_detections: self.constructSSADetection(self.ssa_detection_builder, self.output_dto,str(file)) ssa_detection = self.ssa_detection_builder.getObject() if ssa_detection.status in [DetectionStatus.production.value, DetectionStatus.validation.value]: - self.output_dto.ssa_detections.append(ssa_detection) - self.addContentToDictMappings(ssa_detection) + self.output_dto.addContentToDictMappings(ssa_detection) else: raise Exception(f"Unsupported type: [{contentType}]") @@ -228,6 +231,3 @@ def constructSSADetection(self, builder: SSADetectionBuilder, directorOutput:Dir builder.addMappings() builder.addUnitTest() builder.addRBA() - - - \ No newline at end of file diff --git a/contentctl/objects/abstract_security_content_objects/detection_abstract.py b/contentctl/objects/abstract_security_content_objects/detection_abstract.py index a51eea07..c7df35d4 100644 --- a/contentctl/objects/abstract_security_content_objects/detection_abstract.py +++ b/contentctl/objects/abstract_security_content_objects/detection_abstract.py @@ -54,6 +54,56 @@ class Detection_Abstract(SecurityContentObject): # A list of groups of tests, relying on the same data test_groups: Union[list[TestGroup], None] = Field(None,validate_default=True) + + @field_validator("search", mode="before") + @classmethod + def validate_presence_of_filter_macro(cls, value:Union[str, dict[str,Any]], info:ValidationInfo)->Union[str, dict[str,Any]]: + """ + Validates that, if required to be present, the filter macro is present with the proper name. + The filter macro MUST be derived from the name of the detection + + + Args: + value (Union[str, dict[str,Any]]): The search. It can either be a string (and should be SPL) + or a dict, in which case it is Sigma-formatted. + info (ValidationInfo): The validation info can contain a number of different objects. Today it only contains the director. + + Returns: + Union[str, dict[str,Any]]: The search, either in sigma or SPL format. + """ + + if isinstance(value,dict): + #If the search is a dict, then it is in Sigma format so return it + return value + + # Otherwise, the search is SPL. + FORCE_FILTER_MACRO = True + + # In the future, we will may add support that makes the inclusion of the + # filter macro optional or automatically generates it for searches that + # do not have it. For now, continue to require that all searches have a filter macro. + if not FORCE_FILTER_MACRO: + return value + + # Get the required macro name, which is derived from the search name. + # Note that a separate validation ensures that the file name matches the content name + name:Union[str,None] = info.data.get("name",None) + if name is None: + #The search was sigma formatted (or failed other validation and was None), so we will not validate macros in it + raise ValueError("Cannot validate filter macro, field 'name' (which is required to validate the macro) was missing from the detection YML.") + + #Get the file name without the extension. Note this is not a full path! + file_name = pathlib.Path(cls.contentNameToFileName(name)).stem + file_name_with_filter = f"`{file_name}_filter`" + + if not value.endswith(file_name_with_filter): + + raise ValueError(f"Detection does not end with the exact filter macro {file_name_with_filter}.") + + return value + + + @field_validator("test_groups") @classmethod def validate_test_groups(cls, value:Union[None, List[TestGroup]], info:ValidationInfo) -> Union[List[TestGroup], None]: @@ -382,7 +432,7 @@ def getDetectionMacros(cls, v:list[str], info:ValidationInfo)->list[Macro]: filter_macro = Macro.model_validate({"name":filter_macro_name, "definition":'search *', "description":'Update this macro to limit the output results to filter out false positives.'}) - director.macros.append(filter_macro) + director.addContentToDictMappings(filter_macro) macros_from_search = Macro.get_macros(search, director) diff --git a/contentctl/objects/macro.py b/contentctl/objects/macro.py index 478e5e13..4ef32b3d 100644 --- a/contentctl/objects/macro.py +++ b/contentctl/objects/macro.py @@ -10,7 +10,8 @@ -MACROS_TO_IGNORE = set(["_filter", "drop_dm_object_name"]) +#MACROS_TO_IGNORE = set(["_filter", "drop_dm_object_name"]) +MACROS_TO_IGNORE = set(["drop_dm_object_name"]) #Should all of the following be included as well? MACROS_TO_IGNORE.add("get_asset" ) MACROS_TO_IGNORE.add("get_risk_severity") From 039cf012d50bcc58027610c448e12026521402c1 Mon Sep 17 00:00:00 2001 From: pyth0n1c Date: Mon, 17 Jun 2024 13:28:33 -0700 Subject: [PATCH 2/6] Relax need for filter macro to occur at VERY END of a search. It still must occur somewhere in the search. --- .../detection_abstract.py | 12 +++++++----- 1 file changed, 7 insertions(+), 5 deletions(-) diff --git a/contentctl/objects/abstract_security_content_objects/detection_abstract.py b/contentctl/objects/abstract_security_content_objects/detection_abstract.py index c7df35d4..f68fa4de 100644 --- a/contentctl/objects/abstract_security_content_objects/detection_abstract.py +++ b/contentctl/objects/abstract_security_content_objects/detection_abstract.py @@ -77,11 +77,12 @@ def validate_presence_of_filter_macro(cls, value:Union[str, dict[str,Any]], info return value # Otherwise, the search is SPL. - FORCE_FILTER_MACRO = True + # In the future, we will may add support that makes the inclusion of the # filter macro optional or automatically generates it for searches that # do not have it. For now, continue to require that all searches have a filter macro. + FORCE_FILTER_MACRO = True if not FORCE_FILTER_MACRO: return value @@ -95,10 +96,11 @@ def validate_presence_of_filter_macro(cls, value:Union[str, dict[str,Any]], info #Get the file name without the extension. Note this is not a full path! file_name = pathlib.Path(cls.contentNameToFileName(name)).stem file_name_with_filter = f"`{file_name}_filter`" - - if not value.endswith(file_name_with_filter): - - raise ValueError(f"Detection does not end with the exact filter macro {file_name_with_filter}.") + + if file_name_with_filter not in value: + raise ValueError(f"Detection does not contain the EXACT filter macro {file_name_with_filter}. " + "This filter macro MUST be present in the search. It usually placed at the end " + "of the search and is useful for environment-specific filtering of False Positive or noisy results.") return value From 5cf7f5b1330ef50bb1c9abf4e475492005776093 Mon Sep 17 00:00:00 2001 From: pyth0n1c Date: Mon, 17 Jun 2024 14:02:22 -0700 Subject: [PATCH 3/6] Downgrade to python3.9 as a requirement. Add testing against 3.9 and 3.10 to the github actions test workflow. --- .github/workflows/testEndToEnd.yml | 2 +- pyproject.toml | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/.github/workflows/testEndToEnd.yml b/.github/workflows/testEndToEnd.yml index fff5c5dc..d4e0ba6e 100644 --- a/.github/workflows/testEndToEnd.yml +++ b/.github/workflows/testEndToEnd.yml @@ -11,7 +11,7 @@ jobs: strategy: fail-fast: false matrix: - python_version: ["3.11", "3.12"] + python_version: ["3.9", "3.10", "3.11", "3.12"] operating_system: ["ubuntu-20.04", "ubuntu-22.04", "macos-latest", "macos-14"] #operating_system: ["ubuntu-20.04", "ubuntu-22.04", "macos-latest"] diff --git a/pyproject.toml b/pyproject.toml index ba7a0abd..c484ba26 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -10,7 +10,7 @@ readme = "README.md" contentctl = 'contentctl.contentctl:main' [tool.poetry.dependencies] -python = "^3.11" +python = "^3.9" pydantic = "^2.5.1" PyYAML = "^6.0.1" requests = "~2.32.2" From cbaa222e2f111d71355ac48885e5378239a4ba50 Mon Sep 17 00:00:00 2001 From: pyth0n1c Date: Tue, 18 Jun 2024 14:01:47 -0700 Subject: [PATCH 4/6] Add verbose comments as to WHY certain macros are excluded when we look for them. --- .../detection_abstract.py | 2 +- contentctl/objects/macro.py | 16 ++++++++-------- 2 files changed, 9 insertions(+), 9 deletions(-) diff --git a/contentctl/objects/abstract_security_content_objects/detection_abstract.py b/contentctl/objects/abstract_security_content_objects/detection_abstract.py index f68fa4de..d7cee8a0 100644 --- a/contentctl/objects/abstract_security_content_objects/detection_abstract.py +++ b/contentctl/objects/abstract_security_content_objects/detection_abstract.py @@ -438,7 +438,7 @@ def getDetectionMacros(cls, v:list[str], info:ValidationInfo)->list[Macro]: macros_from_search = Macro.get_macros(search, director) - return macros_from_search + [filter_macro] + return macros_from_search def get_content_dependencies(self)->list[SecurityContentObject]: #Do this separately to satisfy type checker diff --git a/contentctl/objects/macro.py b/contentctl/objects/macro.py index 4ef32b3d..5f01f2d1 100644 --- a/contentctl/objects/macro.py +++ b/contentctl/objects/macro.py @@ -9,14 +9,14 @@ from contentctl.objects.security_content_object import SecurityContentObject - -#MACROS_TO_IGNORE = set(["_filter", "drop_dm_object_name"]) -MACROS_TO_IGNORE = set(["drop_dm_object_name"]) -#Should all of the following be included as well? -MACROS_TO_IGNORE.add("get_asset" ) -MACROS_TO_IGNORE.add("get_risk_severity") -MACROS_TO_IGNORE.add("cim_corporate_web_domain_search") -MACROS_TO_IGNORE.add("prohibited_processes") +#The following macros are included in commonly-installed apps. +#As such, we will ignore if they are missing from our app. +#Included in +MACROS_TO_IGNORE = set(["drop_dm_object_name"]) # Part of CIM/Splunk_SA_CIM +MACROS_TO_IGNORE.add("get_asset") #SA-IdentityManagement, part of Enterprise Security +MACROS_TO_IGNORE.add("get_risk_severity") #SA-ThreatIntelligence, part of Enterprise Security +MACROS_TO_IGNORE.add("cim_corporate_web_domain_search") #Part of CIM/Splunk_SA_CIM +#MACROS_TO_IGNORE.add("prohibited_processes") class Macro(SecurityContentObject): From 0a059584d425bf767769f5a601cbdc808f274bc5 Mon Sep 17 00:00:00 2001 From: pyth0n1c <87383215+pyth0n1c@users.noreply.github.com> Date: Tue, 18 Jun 2024 17:26:43 -0700 Subject: [PATCH 5/6] Force Python3.11+ as a system requirement --- pyproject.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pyproject.toml b/pyproject.toml index c484ba26..ba7a0abd 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -10,7 +10,7 @@ readme = "README.md" contentctl = 'contentctl.contentctl:main' [tool.poetry.dependencies] -python = "^3.9" +python = "^3.11" pydantic = "^2.5.1" PyYAML = "^6.0.1" requests = "~2.32.2" From 9bb457510803dd86fc4fd77b707f629c2daad4a7 Mon Sep 17 00:00:00 2001 From: pyth0n1c <87383215+pyth0n1c@users.noreply.github.com> Date: Tue, 18 Jun 2024 17:28:53 -0700 Subject: [PATCH 6/6] remove 3.9 and 3.10 from test workflow --- .github/workflows/testEndToEnd.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/testEndToEnd.yml b/.github/workflows/testEndToEnd.yml index d4e0ba6e..fff5c5dc 100644 --- a/.github/workflows/testEndToEnd.yml +++ b/.github/workflows/testEndToEnd.yml @@ -11,7 +11,7 @@ jobs: strategy: fail-fast: false matrix: - python_version: ["3.9", "3.10", "3.11", "3.12"] + python_version: ["3.11", "3.12"] operating_system: ["ubuntu-20.04", "ubuntu-22.04", "macos-latest", "macos-14"] #operating_system: ["ubuntu-20.04", "ubuntu-22.04", "macos-latest"]