diff --git a/contentctl/input/director.py b/contentctl/input/director.py index ea5a8bfa..72d78996 100644 --- a/contentctl/input/director.py +++ b/contentctl/input/director.py @@ -5,9 +5,8 @@ from pydantic import ValidationError from uuid import UUID from contentctl.input.yml_reader import YmlReader - - - + + from contentctl.objects.detection import Detection from contentctl.objects.story import Story @@ -28,30 +27,69 @@ from contentctl.objects.config import validate - -@dataclass() +@dataclass class DirectorOutputDto: - # Atomic Tests are first because parsing them - # is far quicker than attack_enrichment - atomic_tests: Union[list[AtomicTest],None] - attack_enrichment: AttackEnrichment - cve_enrichment: CveEnrichment - detections: list[Detection] - stories: list[Story] - baselines: list[Baseline] - investigations: list[Investigation] - playbooks: list[Playbook] - macros: list[Macro] - lookups: list[Lookup] - deployments: list[Deployment] - ssa_detections: list[SSADetection] - - - name_to_content_map: dict[str, SecurityContentObject] = field(default_factory=dict) - uuid_to_content_map: dict[UUID, SecurityContentObject] = field(default_factory=dict) - + # Atomic Tests are first because parsing them + # is far quicker than attack_enrichment + atomic_tests: Union[list[AtomicTest],None] + attack_enrichment: AttackEnrichment + cve_enrichment: CveEnrichment + detections: list[Detection] + stories: list[Story] + baselines: list[Baseline] + investigations: list[Investigation] + playbooks: list[Playbook] + macros: list[Macro] + lookups: list[Lookup] + deployments: list[Deployment] + ssa_detections: list[SSADetection] + + name_to_content_map: dict[str, SecurityContentObject] = field(default_factory=dict) + uuid_to_content_map: dict[UUID, SecurityContentObject] = field(default_factory=dict) + + def addContentToDictMappings(self, content: SecurityContentObject): + content_name = content.name + if isinstance(content, SSADetection): + # Since SSA detections may have the same name as ESCU detection, + # for this function we prepend 'SSA ' to the name. + content_name = f"SSA {content_name}" + if content_name in self.name_to_content_map: + raise ValueError( + f"Duplicate name '{content_name}' with paths:\n" + f" - {content.file_path}\n" + f" - {self.name_to_content_map[content_name].file_path}" + ) + elif content.id in self.uuid_to_content_map: + raise ValueError( + f"Duplicate id '{content.id}' with paths:\n" + f" - {content.file_path}\n" + f" - {self.name_to_content_map[content_name].file_path}" + ) + + if isinstance(content, Lookup): + self.lookups.append(content) + elif isinstance(content, Macro): + self.macros.append(content) + elif isinstance(content, Deployment): + self.deployments.append(content) + elif isinstance(content, Playbook): + self.playbooks.append(content) + elif isinstance(content, Baseline): + self.baselines.append(content) + elif isinstance(content, Investigation): + self.investigations.append(content) + elif isinstance(content, Story): + self.stories.append(content) + elif isinstance(content, Detection): + self.detections.append(content) + elif isinstance(content, SSADetection): + self.ssa_detections.append(content) + else: + raise Exception(f"Unknown security content type: {type(content)}") + self.name_to_content_map[content_name] = content + self.uuid_to_content_map[content.id] = content from contentctl.input.ssa_detection_builder import SSADetectionBuilder @@ -61,13 +99,6 @@ class DirectorOutputDto: from contentctl.helper.utils import Utils - - - - - - - class Director(): input_dto: validate output_dto: DirectorOutputDto @@ -78,27 +109,7 @@ class Director(): def __init__(self, output_dto: DirectorOutputDto) -> None: self.output_dto = output_dto self.ssa_detection_builder = SSADetectionBuilder() - - def addContentToDictMappings(self, content:SecurityContentObject): - content_name = content.name - if isinstance(content,SSADetection): - # Since SSA detections may have the same name as ESCU detection, - # for this function we prepend 'SSA ' to the name. - content_name = f"SSA {content_name}" - if content_name in self.output_dto.name_to_content_map: - raise ValueError(f"Duplicate name '{content_name}' with paths:\n" - f" - {content.file_path}\n" - f" - {self.output_dto.name_to_content_map[content_name].file_path}") - elif content.id in self.output_dto.uuid_to_content_map: - raise ValueError(f"Duplicate id '{content.id}' with paths:\n" - f" - {content.file_path}\n" - f" - {self.output_dto.name_to_content_map[content_name].file_path}") - - self.output_dto.name_to_content_map[content_name] = content - self.output_dto.uuid_to_content_map[content.id] = content - - def execute(self, input_dto: validate) -> None: self.input_dto = input_dto @@ -147,50 +158,41 @@ def createSecurityContent(self, contentType: SecurityContentType) -> None: if contentType == SecurityContentType.lookups: lookup = Lookup.model_validate(modelDict,context={"output_dto":self.output_dto, "config":self.input_dto}) - self.output_dto.lookups.append(lookup) - self.addContentToDictMappings(lookup) + self.output_dto.addContentToDictMappings(lookup) elif contentType == SecurityContentType.macros: macro = Macro.model_validate(modelDict,context={"output_dto":self.output_dto}) - self.output_dto.macros.append(macro) - self.addContentToDictMappings(macro) + self.output_dto.addContentToDictMappings(macro) elif contentType == SecurityContentType.deployments: deployment = Deployment.model_validate(modelDict,context={"output_dto":self.output_dto}) - self.output_dto.deployments.append(deployment) - self.addContentToDictMappings(deployment) + self.output_dto.addContentToDictMappings(deployment) elif contentType == SecurityContentType.playbooks: playbook = Playbook.model_validate(modelDict,context={"output_dto":self.output_dto}) - self.output_dto.playbooks.append(playbook) - self.addContentToDictMappings(playbook) + self.output_dto.addContentToDictMappings(playbook) elif contentType == SecurityContentType.baselines: baseline = Baseline.model_validate(modelDict,context={"output_dto":self.output_dto}) - self.output_dto.baselines.append(baseline) - self.addContentToDictMappings(baseline) + self.output_dto.addContentToDictMappings(baseline) elif contentType == SecurityContentType.investigations: investigation = Investigation.model_validate(modelDict,context={"output_dto":self.output_dto}) - self.output_dto.investigations.append(investigation) - self.addContentToDictMappings(investigation) + self.output_dto.addContentToDictMappings(investigation) elif contentType == SecurityContentType.stories: story = Story.model_validate(modelDict,context={"output_dto":self.output_dto}) - self.output_dto.stories.append(story) - self.addContentToDictMappings(story) + self.output_dto.addContentToDictMappings(story) elif contentType == SecurityContentType.detections: - detection = Detection.model_validate(modelDict,context={"output_dto":self.output_dto}) - self.output_dto.detections.append(detection) - self.addContentToDictMappings(detection) + detection = Detection.model_validate(modelDict,context={"output_dto":self.output_dto, "app":self.input_dto.app}) + self.output_dto.addContentToDictMappings(detection) elif contentType == SecurityContentType.ssa_detections: self.constructSSADetection(self.ssa_detection_builder, self.output_dto,str(file)) ssa_detection = self.ssa_detection_builder.getObject() if ssa_detection.status in [DetectionStatus.production.value, DetectionStatus.validation.value]: - self.output_dto.ssa_detections.append(ssa_detection) - self.addContentToDictMappings(ssa_detection) + self.output_dto.addContentToDictMappings(ssa_detection) else: raise Exception(f"Unsupported type: [{contentType}]") @@ -229,6 +231,3 @@ def constructSSADetection(self, builder: SSADetectionBuilder, directorOutput:Dir builder.addMappings() builder.addUnitTest() builder.addRBA() - - - \ No newline at end of file diff --git a/contentctl/objects/abstract_security_content_objects/detection_abstract.py b/contentctl/objects/abstract_security_content_objects/detection_abstract.py index cb608046..e4ecf82f 100644 --- a/contentctl/objects/abstract_security_content_objects/detection_abstract.py +++ b/contentctl/objects/abstract_security_content_objects/detection_abstract.py @@ -53,6 +53,58 @@ class Detection_Abstract(SecurityContentObject): # A list of groups of tests, relying on the same data test_groups: Union[list[TestGroup], None] = Field(None,validate_default=True) + + @field_validator("search", mode="before") + @classmethod + def validate_presence_of_filter_macro(cls, value:Union[str, dict[str,Any]], info:ValidationInfo)->Union[str, dict[str,Any]]: + """ + Validates that, if required to be present, the filter macro is present with the proper name. + The filter macro MUST be derived from the name of the detection + + + Args: + value (Union[str, dict[str,Any]]): The search. It can either be a string (and should be SPL) + or a dict, in which case it is Sigma-formatted. + info (ValidationInfo): The validation info can contain a number of different objects. Today it only contains the director. + + Returns: + Union[str, dict[str,Any]]: The search, either in sigma or SPL format. + """ + + if isinstance(value,dict): + #If the search is a dict, then it is in Sigma format so return it + return value + + # Otherwise, the search is SPL. + + + # In the future, we will may add support that makes the inclusion of the + # filter macro optional or automatically generates it for searches that + # do not have it. For now, continue to require that all searches have a filter macro. + FORCE_FILTER_MACRO = True + if not FORCE_FILTER_MACRO: + return value + + # Get the required macro name, which is derived from the search name. + # Note that a separate validation ensures that the file name matches the content name + name:Union[str,None] = info.data.get("name",None) + if name is None: + #The search was sigma formatted (or failed other validation and was None), so we will not validate macros in it + raise ValueError("Cannot validate filter macro, field 'name' (which is required to validate the macro) was missing from the detection YML.") + + #Get the file name without the extension. Note this is not a full path! + file_name = pathlib.Path(cls.contentNameToFileName(name)).stem + file_name_with_filter = f"`{file_name}_filter`" + + if file_name_with_filter not in value: + raise ValueError(f"Detection does not contain the EXACT filter macro {file_name_with_filter}. " + "This filter macro MUST be present in the search. It usually placed at the end " + "of the search and is useful for environment-specific filtering of False Positive or noisy results.") + + return value + + + @field_validator("test_groups") @classmethod def validate_test_groups(cls, value:Union[None, List[TestGroup]], info:ValidationInfo) -> Union[List[TestGroup], None]: @@ -394,11 +446,11 @@ def getDetectionMacros(cls, v:list[str], info:ValidationInfo)->list[Macro]: filter_macro = Macro.model_validate({"name":filter_macro_name, "definition":'search *', "description":'Update this macro to limit the output results to filter out false positives.'}) - director.macros.append(filter_macro) + director.addContentToDictMappings(filter_macro) macros_from_search = Macro.get_macros(search, director) - return macros_from_search + [filter_macro] + return macros_from_search def get_content_dependencies(self)->list[SecurityContentObject]: #Do this separately to satisfy type checker diff --git a/contentctl/objects/macro.py b/contentctl/objects/macro.py index 478e5e13..5f01f2d1 100644 --- a/contentctl/objects/macro.py +++ b/contentctl/objects/macro.py @@ -9,13 +9,14 @@ from contentctl.objects.security_content_object import SecurityContentObject - -MACROS_TO_IGNORE = set(["_filter", "drop_dm_object_name"]) -#Should all of the following be included as well? -MACROS_TO_IGNORE.add("get_asset" ) -MACROS_TO_IGNORE.add("get_risk_severity") -MACROS_TO_IGNORE.add("cim_corporate_web_domain_search") -MACROS_TO_IGNORE.add("prohibited_processes") +#The following macros are included in commonly-installed apps. +#As such, we will ignore if they are missing from our app. +#Included in +MACROS_TO_IGNORE = set(["drop_dm_object_name"]) # Part of CIM/Splunk_SA_CIM +MACROS_TO_IGNORE.add("get_asset") #SA-IdentityManagement, part of Enterprise Security +MACROS_TO_IGNORE.add("get_risk_severity") #SA-ThreatIntelligence, part of Enterprise Security +MACROS_TO_IGNORE.add("cim_corporate_web_domain_search") #Part of CIM/Splunk_SA_CIM +#MACROS_TO_IGNORE.add("prohibited_processes") class Macro(SecurityContentObject):