Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
145 changes: 72 additions & 73 deletions contentctl/input/director.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,8 @@
from pydantic import ValidationError
from uuid import UUID
from contentctl.input.yml_reader import YmlReader





from contentctl.objects.detection import Detection
from contentctl.objects.story import Story

Expand All @@ -28,30 +27,69 @@
from contentctl.objects.config import validate



@dataclass()
@dataclass
class DirectorOutputDto:
# Atomic Tests are first because parsing them
# is far quicker than attack_enrichment
atomic_tests: Union[list[AtomicTest],None]
attack_enrichment: AttackEnrichment
cve_enrichment: CveEnrichment
detections: list[Detection]
stories: list[Story]
baselines: list[Baseline]
investigations: list[Investigation]
playbooks: list[Playbook]
macros: list[Macro]
lookups: list[Lookup]
deployments: list[Deployment]
ssa_detections: list[SSADetection]


name_to_content_map: dict[str, SecurityContentObject] = field(default_factory=dict)
uuid_to_content_map: dict[UUID, SecurityContentObject] = field(default_factory=dict)

# Atomic Tests are first because parsing them
# is far quicker than attack_enrichment
atomic_tests: Union[list[AtomicTest],None]
attack_enrichment: AttackEnrichment
cve_enrichment: CveEnrichment
detections: list[Detection]
stories: list[Story]
baselines: list[Baseline]
investigations: list[Investigation]
playbooks: list[Playbook]
macros: list[Macro]
lookups: list[Lookup]
deployments: list[Deployment]
ssa_detections: list[SSADetection]

name_to_content_map: dict[str, SecurityContentObject] = field(default_factory=dict)
uuid_to_content_map: dict[UUID, SecurityContentObject] = field(default_factory=dict)

def addContentToDictMappings(self, content: SecurityContentObject):
content_name = content.name
if isinstance(content, SSADetection):
# Since SSA detections may have the same name as ESCU detection,
# for this function we prepend 'SSA ' to the name.
content_name = f"SSA {content_name}"
if content_name in self.name_to_content_map:
raise ValueError(
f"Duplicate name '{content_name}' with paths:\n"
f" - {content.file_path}\n"
f" - {self.name_to_content_map[content_name].file_path}"
)
elif content.id in self.uuid_to_content_map:
raise ValueError(
f"Duplicate id '{content.id}' with paths:\n"
f" - {content.file_path}\n"
f" - {self.name_to_content_map[content_name].file_path}"
)

if isinstance(content, Lookup):
self.lookups.append(content)
elif isinstance(content, Macro):
self.macros.append(content)
elif isinstance(content, Deployment):
self.deployments.append(content)
elif isinstance(content, Playbook):
self.playbooks.append(content)
elif isinstance(content, Baseline):
self.baselines.append(content)
elif isinstance(content, Investigation):
self.investigations.append(content)
elif isinstance(content, Story):
self.stories.append(content)
elif isinstance(content, Detection):
self.detections.append(content)
elif isinstance(content, SSADetection):
self.ssa_detections.append(content)
else:
raise Exception(f"Unknown security content type: {type(content)}")


self.name_to_content_map[content_name] = content
self.uuid_to_content_map[content.id] = content


from contentctl.input.ssa_detection_builder import SSADetectionBuilder
Expand All @@ -61,13 +99,6 @@ class DirectorOutputDto:
from contentctl.helper.utils import Utils









class Director():
input_dto: validate
output_dto: DirectorOutputDto
Expand All @@ -78,27 +109,7 @@ class Director():
def __init__(self, output_dto: DirectorOutputDto) -> None:
self.output_dto = output_dto
self.ssa_detection_builder = SSADetectionBuilder()

def addContentToDictMappings(self, content:SecurityContentObject):
content_name = content.name
if isinstance(content,SSADetection):
# Since SSA detections may have the same name as ESCU detection,
# for this function we prepend 'SSA ' to the name.
content_name = f"SSA {content_name}"
if content_name in self.output_dto.name_to_content_map:
raise ValueError(f"Duplicate name '{content_name}' with paths:\n"
f" - {content.file_path}\n"
f" - {self.output_dto.name_to_content_map[content_name].file_path}")
elif content.id in self.output_dto.uuid_to_content_map:
raise ValueError(f"Duplicate id '{content.id}' with paths:\n"
f" - {content.file_path}\n"
f" - {self.output_dto.name_to_content_map[content_name].file_path}")

self.output_dto.name_to_content_map[content_name] = content
self.output_dto.uuid_to_content_map[content.id] = content



def execute(self, input_dto: validate) -> None:
self.input_dto = input_dto

Expand Down Expand Up @@ -147,50 +158,41 @@ def createSecurityContent(self, contentType: SecurityContentType) -> None:

if contentType == SecurityContentType.lookups:
lookup = Lookup.model_validate(modelDict,context={"output_dto":self.output_dto, "config":self.input_dto})
self.output_dto.lookups.append(lookup)
self.addContentToDictMappings(lookup)
self.output_dto.addContentToDictMappings(lookup)

elif contentType == SecurityContentType.macros:
macro = Macro.model_validate(modelDict,context={"output_dto":self.output_dto})
self.output_dto.macros.append(macro)
self.addContentToDictMappings(macro)
self.output_dto.addContentToDictMappings(macro)

elif contentType == SecurityContentType.deployments:
deployment = Deployment.model_validate(modelDict,context={"output_dto":self.output_dto})
self.output_dto.deployments.append(deployment)
self.addContentToDictMappings(deployment)
self.output_dto.addContentToDictMappings(deployment)

elif contentType == SecurityContentType.playbooks:
playbook = Playbook.model_validate(modelDict,context={"output_dto":self.output_dto})
self.output_dto.playbooks.append(playbook)
self.addContentToDictMappings(playbook)
self.output_dto.addContentToDictMappings(playbook)

elif contentType == SecurityContentType.baselines:
baseline = Baseline.model_validate(modelDict,context={"output_dto":self.output_dto})
self.output_dto.baselines.append(baseline)
self.addContentToDictMappings(baseline)
self.output_dto.addContentToDictMappings(baseline)

elif contentType == SecurityContentType.investigations:
investigation = Investigation.model_validate(modelDict,context={"output_dto":self.output_dto})
self.output_dto.investigations.append(investigation)
self.addContentToDictMappings(investigation)
self.output_dto.addContentToDictMappings(investigation)

elif contentType == SecurityContentType.stories:
story = Story.model_validate(modelDict,context={"output_dto":self.output_dto})
self.output_dto.stories.append(story)
self.addContentToDictMappings(story)
self.output_dto.addContentToDictMappings(story)

elif contentType == SecurityContentType.detections:
detection = Detection.model_validate(modelDict,context={"output_dto":self.output_dto})
self.output_dto.detections.append(detection)
self.addContentToDictMappings(detection)
detection = Detection.model_validate(modelDict,context={"output_dto":self.output_dto, "app":self.input_dto.app})
self.output_dto.addContentToDictMappings(detection)

elif contentType == SecurityContentType.ssa_detections:
self.constructSSADetection(self.ssa_detection_builder, self.output_dto,str(file))
ssa_detection = self.ssa_detection_builder.getObject()
if ssa_detection.status in [DetectionStatus.production.value, DetectionStatus.validation.value]:
self.output_dto.ssa_detections.append(ssa_detection)
self.addContentToDictMappings(ssa_detection)
self.output_dto.addContentToDictMappings(ssa_detection)

else:
raise Exception(f"Unsupported type: [{contentType}]")
Expand Down Expand Up @@ -229,6 +231,3 @@ def constructSSADetection(self, builder: SSADetectionBuilder, directorOutput:Dir
builder.addMappings()
builder.addUnitTest()
builder.addRBA()



Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,58 @@ class Detection_Abstract(SecurityContentObject):
# A list of groups of tests, relying on the same data
test_groups: Union[list[TestGroup], None] = Field(None,validate_default=True)


@field_validator("search", mode="before")
@classmethod
def validate_presence_of_filter_macro(cls, value:Union[str, dict[str,Any]], info:ValidationInfo)->Union[str, dict[str,Any]]:
"""
Validates that, if required to be present, the filter macro is present with the proper name.
The filter macro MUST be derived from the name of the detection


Args:
value (Union[str, dict[str,Any]]): The search. It can either be a string (and should be SPL)
or a dict, in which case it is Sigma-formatted.
info (ValidationInfo): The validation info can contain a number of different objects. Today it only contains the director.

Returns:
Union[str, dict[str,Any]]: The search, either in sigma or SPL format.
"""

if isinstance(value,dict):
#If the search is a dict, then it is in Sigma format so return it
return value

# Otherwise, the search is SPL.


# In the future, we will may add support that makes the inclusion of the
# filter macro optional or automatically generates it for searches that
# do not have it. For now, continue to require that all searches have a filter macro.
FORCE_FILTER_MACRO = True
if not FORCE_FILTER_MACRO:
return value

# Get the required macro name, which is derived from the search name.
# Note that a separate validation ensures that the file name matches the content name
name:Union[str,None] = info.data.get("name",None)
if name is None:
#The search was sigma formatted (or failed other validation and was None), so we will not validate macros in it
raise ValueError("Cannot validate filter macro, field 'name' (which is required to validate the macro) was missing from the detection YML.")

#Get the file name without the extension. Note this is not a full path!
file_name = pathlib.Path(cls.contentNameToFileName(name)).stem
file_name_with_filter = f"`{file_name}_filter`"

if file_name_with_filter not in value:
raise ValueError(f"Detection does not contain the EXACT filter macro {file_name_with_filter}. "
"This filter macro MUST be present in the search. It usually placed at the end "
"of the search and is useful for environment-specific filtering of False Positive or noisy results.")

return value



@field_validator("test_groups")
@classmethod
def validate_test_groups(cls, value:Union[None, List[TestGroup]], info:ValidationInfo) -> Union[List[TestGroup], None]:
Expand Down Expand Up @@ -394,11 +446,11 @@ def getDetectionMacros(cls, v:list[str], info:ValidationInfo)->list[Macro]:
filter_macro = Macro.model_validate({"name":filter_macro_name,
"definition":'search *',
"description":'Update this macro to limit the output results to filter out false positives.'})
director.macros.append(filter_macro)
director.addContentToDictMappings(filter_macro)

macros_from_search = Macro.get_macros(search, director)

return macros_from_search + [filter_macro]
return macros_from_search

def get_content_dependencies(self)->list[SecurityContentObject]:
#Do this separately to satisfy type checker
Expand Down
15 changes: 8 additions & 7 deletions contentctl/objects/macro.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,13 +9,14 @@
from contentctl.objects.security_content_object import SecurityContentObject



MACROS_TO_IGNORE = set(["_filter", "drop_dm_object_name"])
#Should all of the following be included as well?
MACROS_TO_IGNORE.add("get_asset" )
MACROS_TO_IGNORE.add("get_risk_severity")
MACROS_TO_IGNORE.add("cim_corporate_web_domain_search")
MACROS_TO_IGNORE.add("prohibited_processes")
#The following macros are included in commonly-installed apps.
#As such, we will ignore if they are missing from our app.
#Included in
MACROS_TO_IGNORE = set(["drop_dm_object_name"]) # Part of CIM/Splunk_SA_CIM
MACROS_TO_IGNORE.add("get_asset") #SA-IdentityManagement, part of Enterprise Security
MACROS_TO_IGNORE.add("get_risk_severity") #SA-ThreatIntelligence, part of Enterprise Security
MACROS_TO_IGNORE.add("cim_corporate_web_domain_search") #Part of CIM/Splunk_SA_CIM
#MACROS_TO_IGNORE.add("prohibited_processes")


class Macro(SecurityContentObject):
Expand Down