Skip to content
Merged
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,8 @@
if TYPE_CHECKING:
from contentctl.input.director import DirectorOutputDto
from contentctl.objects.baseline import Baseline

from contentctl.objects.config import CustomApp

from contentctl.objects.security_content_object import SecurityContentObject
from contentctl.objects.enums import AnalyticsType
from contentctl.objects.enums import DataModel
Expand All @@ -36,7 +37,6 @@
from contentctl.objects.data_source import DataSource
from contentctl.objects.base_test_result import TestResultStatus

# from contentctl.objects.playbook import Playbook
from contentctl.objects.enums import ProvidingTechnology
from contentctl.enrichments.cve_enrichment import CveEnrichmentObj
import datetime
Expand All @@ -51,8 +51,8 @@
# TODO (#266): disable the use_enum_values configuration
class Detection_Abstract(SecurityContentObject):
model_config = ConfigDict(use_enum_values=True)

# contentType: SecurityContentType = SecurityContentType.detections
name:str = Field(...,max_length=67)
#contentType: SecurityContentType = SecurityContentType.detections
type: AnalyticsType = Field(...)
status: DetectionStatus = Field(...)
data_source: list[str] = []
Expand All @@ -70,10 +70,31 @@ class Detection_Abstract(SecurityContentObject):
# https://github.com/pydantic/pydantic/issues/9101#issuecomment-2019032541
tests: List[Annotated[Union[UnitTest, IntegrationTest, ManualTest], Field(union_mode='left_to_right')]] = []
# A list of groups of tests, relying on the same data
test_groups: Union[list[TestGroup], None] = Field(None, validate_default=True)
test_groups: list[TestGroup] = []

data_source_objects: list[DataSource] = []

def get_action_dot_correlationsearch_dot_label(self, app:CustomApp, max_stanza_length:int=99)->str:
label = self.get_conf_stanza_name(app)
label_after_saving_in_product = f"{self.tags.security_domain.value} - {label} - Rule"

if len(label_after_saving_in_product) > max_stanza_length:
raise ValueError(f"label may only be {max_stanza_length} characters to allow updating in-product, "
f"but stanza was actually {len(label_after_saving_in_product)} characters: '{label_after_saving_in_product}' ")

return label

def get_conf_stanza_name(self, app:CustomApp, max_stanza_length:int=81)->str:
stanza_name = f"{app.label} - {self.name} - Rule"
if len(stanza_name) > max_stanza_length:
raise ValueError(f"conf stanza may only be {max_stanza_length} characters, "
f"but stanza was actually {len(stanza_name)} characters: '{stanza_name}' ")
#print(f"Stanza Length[{len(stanza_name)}]")
return stanza_name




@field_validator("search", mode="before")
@classmethod
def validate_presence_of_filter_macro(cls, value:str, info:ValidationInfo)->str:
Expand Down Expand Up @@ -515,7 +536,7 @@ def model_post_init(self, __context: Any) -> None:
self.data_source_objects = matched_data_sources

for story in self.tags.analytic_story:
story.detections.append(self)
story.detections.append(self)

self.cve_enrichment_func(__context)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,14 +31,14 @@

# TODO (#266): disable the use_enum_values configuration
class SecurityContentObject_Abstract(BaseModel, abc.ABC):
model_config = ConfigDict(use_enum_values=True, validate_default=True)

name: str = Field(...)
author: str = Field("Content Author", max_length=255)
date: datetime.date = Field(datetime.date.today())
version: NonNegativeInt = 1
id: uuid.UUID = Field(default_factory=uuid.uuid4) # we set a default here until all content has a uuid
description: str = Field("Enter Description Here", max_length=10000)
model_config = ConfigDict(use_enum_values=True,validate_default=True)
name: str = Field(...,max_length=99)
author: str = Field(...,max_length=255)
date: datetime.date = Field(...)
version: NonNegativeInt = Field(...)
id: uuid.UUID = Field(...) #we set a default here until all content has a uuid
description: str = Field(...,max_length=10000)
file_path: Optional[FilePath] = None
references: Optional[List[HttpUrl]] = None

Expand Down
11 changes: 11 additions & 0 deletions contentctl/objects/baseline.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
from contentctl.objects.security_content_object import SecurityContentObject
from contentctl.objects.enums import DataModel
from contentctl.objects.baseline_tags import BaselineTags
from contentctl.objects.config import CustomApp
#from contentctl.objects.deployment import Deployment

# from typing import TYPE_CHECKING
Expand All @@ -14,6 +15,7 @@


class Baseline(SecurityContentObject):
name:str = Field(...,max_length=67)
type: Annotated[str,Field(pattern="^Baseline$")] = Field(...)
datamodel: Optional[List[DataModel]] = None
search: str = Field(..., min_length=4)
Expand All @@ -23,6 +25,15 @@ class Baseline(SecurityContentObject):

# enrichment
deployment: Deployment = Field({})

def get_conf_stanza_name(self, app:CustomApp, max_stanza_length:int=81)->str:
stanza_name = f"{app.label} - {self.name}"
if len(stanza_name) > max_stanza_length:
raise ValueError(f"conf stanza may only be {max_stanza_length} characters, "
f"but stanza was actually {len(stanza_name)} characters: '{stanza_name}' ")
#print(f"Stanza Length[{len(stanza_name)}]")
return stanza_name


@field_validator("deployment", mode="before")
def getDeployment(cls, v:Any, info:ValidationInfo)->Deployment:
Expand Down
25 changes: 20 additions & 5 deletions contentctl/objects/deployment.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
from __future__ import annotations
from pydantic import Field, computed_field, model_validator,ValidationInfo, model_serializer
from typing import Optional,Any

from pydantic import Field, computed_field,ValidationInfo, model_serializer, NonNegativeInt
from typing import Any
import uuid
import datetime
from contentctl.objects.security_content_object import SecurityContentObject
from contentctl.objects.deployment_scheduling import DeploymentScheduling
from contentctl.objects.alert_action import AlertAction
Expand All @@ -15,17 +16,22 @@ class Deployment(SecurityContentObject):
#author: str = None
#description: str = None
#contentType: SecurityContentType = SecurityContentType.deployments


scheduling: DeploymentScheduling = Field(...)
alert_action: AlertAction = AlertAction()
type: DeploymentType = Field(...)
author: str = Field(...,max_length=255)
version: NonNegativeInt = 1

#Type was the only tag exposed and should likely be removed/refactored.
#For transitional reasons, provide this as a computed_field in prep for removal
@computed_field
@property
def tags(self)->dict[str,DeploymentType]:
return {"type": self.type}



@staticmethod
def getDeployment(v:dict[str,Any], info:ValidationInfo)->Deployment:
if v != {}:
Expand All @@ -36,8 +42,17 @@ def getDeployment(v:dict[str,Any], info:ValidationInfo)->Deployment:
detection_name = info.data.get("name", None)
if detection_name is None:
raise ValueError("Could not create inline deployment - Baseline or Detection lacking 'name' field,")

# Add a number of static values
v.update({
'name': f"{detection_name} - Inline Deployment",
'id':uuid.uuid4(),
'date': datetime.date.today(),
'description': "Inline deployment created at runtime.",
'author': "contentctl tool"
})


v['name'] = f"{detection_name} - Inline Deployment"
# This constructs a temporary in-memory deployment,
# allowing the deployment to be easily defined in the
# detection on a per detection basis.
Expand Down
12 changes: 10 additions & 2 deletions contentctl/objects/investigation.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,14 +5,14 @@
from contentctl.objects.security_content_object import SecurityContentObject
from contentctl.objects.enums import DataModel
from contentctl.objects.investigation_tags import InvestigationTags

from contentctl.objects.config import CustomApp

# TODO (#266): disable the use_enum_values configuration
class Investigation(SecurityContentObject):
model_config = ConfigDict(use_enum_values=True,validate_default=False)
type: str = Field(...,pattern="^Investigation$")
datamodel: list[DataModel] = Field(...)

name:str = Field(...,max_length=67)
search: str = Field(...)
how_to_implement: str = Field(...)
known_false_positives: str = Field(...)
Expand Down Expand Up @@ -69,5 +69,13 @@ def model_post_init(self, ctx:dict[str,Any]):
for story in self.tags.analytic_story:
story.investigations.append(self)

def get_conf_stanza_name(self, app:CustomApp, max_stanza_length:int=81)->str:
stanza_name = f"{app.label} - {self.name} - Response Task"
if len(stanza_name) > max_stanza_length:
raise ValueError(f"conf stanza may only be {max_stanza_length} characters, "
f"but stanza was actually {len(stanza_name)} characters: '{stanza_name}' ")
#print(f"Stanza Length[{len(stanza_name)}]")
return stanza_name



9 changes: 8 additions & 1 deletion contentctl/objects/lookup.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
from __future__ import annotations
from pydantic import field_validator, ValidationInfo, model_validator, FilePath, model_serializer
from pydantic import field_validator, ValidationInfo, model_validator, FilePath, model_serializer, Field, NonNegativeInt
from typing import TYPE_CHECKING, Optional, Any, Union
import re
import csv
import uuid
import datetime
if TYPE_CHECKING:
from contentctl.input.director import DirectorOutputDto
from contentctl.objects.config import validate
Expand Down Expand Up @@ -32,6 +34,11 @@ class Lookup(SecurityContentObject):
match_type: Optional[str] = None
min_matches: Optional[int] = None
case_sensitive_match: Optional[bool] = None
# TODO: Add id field to all lookup ymls
id: uuid.UUID = Field(default_factory=uuid.uuid4)
date: datetime.date = Field(datetime.date.today())
author: str = Field("NO AUTHOR DEFINED",max_length=255)
version: NonNegativeInt = 1


@model_serializer
Expand Down
10 changes: 8 additions & 2 deletions contentctl/objects/macro.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,9 @@
from __future__ import annotations
from typing import TYPE_CHECKING, List
import re
from pydantic import Field, model_serializer
from pydantic import Field, model_serializer, NonNegativeInt
import uuid
import datetime
if TYPE_CHECKING:
from contentctl.input.director import DirectorOutputDto
from contentctl.objects.security_content_object import SecurityContentObject
Expand All @@ -22,7 +24,11 @@
class Macro(SecurityContentObject):
definition: str = Field(..., min_length=1)
arguments: List[str] = Field([])

# TODO: Add id field to all macro ymls
id: uuid.UUID = Field(default_factory=uuid.uuid4)
date: datetime.date = Field(datetime.date.today())
author: str = Field("NO AUTHOR DEFINED",max_length=255)
version: NonNegativeInt = 1



Expand Down
2 changes: 1 addition & 1 deletion contentctl/output/templates/analyticstories_detections.j2
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@

{% for detection in objects %}
{% if (detection.type == 'TTP' or detection.type == 'Anomaly' or detection.type == 'Hunting' or detection.type == 'Correlation') %}
[savedsearch://{{app.label}} - {{ detection.name }} - Rule]
[savedsearch://{{ detection.get_conf_stanza_name(app) }}]
type = detection
asset_type = {{ detection.tags.asset_type.value }}
confidence = medium
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@

{% for detection in objects %}
{% if (detection.type == 'Investigation') %}
[savedsearch://{{app.label}} - {{ detection.name }} - Response Task]
[savedsearch://{{ detection.get_conf_stanza_name(app) }}]
type = investigation
explanation = none
{% if detection.how_to_implement is defined %}
Expand Down
3 changes: 1 addition & 2 deletions contentctl/output/templates/savedsearches_baselines.j2
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,10 @@

{% for detection in objects %}
{% if (detection.type == 'Baseline') %}
[{{app.label}} - {{ detection.name }}]
[{{ detection.get_conf_stanza_name(app) }}]
action.escu = 0
action.escu.enabled = 1
action.escu.search_type = support
action.escu.full_search_name = {{app.label}} - {{ detection.name }}
description = {{ detection.description | escapeNewlines() }}
action.escu.creation_date = {{ detection.date }}
action.escu.modification_date = {{ detection.date }}
Expand Down
5 changes: 2 additions & 3 deletions contentctl/output/templates/savedsearches_detections.j2
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

{% for detection in objects %}
{% if (detection.type == 'TTP' or detection.type == 'Anomaly' or detection.type == 'Hunting' or detection.type == 'Correlation') %}
[{{app.label}} - {{ detection.name }} - Rule]
[{{ detection.get_conf_stanza_name(app) }}]
action.escu = 0
action.escu.enabled = 1
{% if detection.status == "deprecated" %}
Expand All @@ -28,7 +28,6 @@ action.escu.known_false_positives = None
action.escu.creation_date = {{ detection.date }}
action.escu.modification_date = {{ detection.date }}
action.escu.confidence = high
action.escu.full_search_name = {{app.label}} - {{ detection.name }} - Rule
action.escu.search_type = detection
{% if detection.tags.product is defined %}
action.escu.product = {{ detection.tags.product | tojson }}
Expand Down Expand Up @@ -57,7 +56,7 @@ cron_schedule = {{ detection.deployment.scheduling.cron_schedule }}
dispatch.earliest_time = {{ detection.deployment.scheduling.earliest_time }}
dispatch.latest_time = {{ detection.deployment.scheduling.latest_time }}
action.correlationsearch.enabled = 1
action.correlationsearch.label = {{app.label}} - {{ detection.name }} - Rule
action.correlationsearch.label = {{ detection.get_action_dot_correlationsearch_dot_label(app) }}
action.correlationsearch.annotations = {{ detection.annotations | tojson }}
action.correlationsearch.metadata = {{ detection.metadata | tojson }}
{% if detection.deployment.scheduling.schedule_window is defined %}
Expand Down
3 changes: 1 addition & 2 deletions contentctl/output/templates/savedsearches_investigations.j2
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,10 @@
{% for detection in objects %}
{% if (detection.type == 'Investigation') %}
{% if detection.search is defined %}
[{{app.label}} - {{ detection.name }} - Response Task]
[{{ detection.get_conf_stanza_name(app) }}]
action.escu = 0
action.escu.enabled = 1
action.escu.search_type = investigative
action.escu.full_search_name = {{app.label}} - {{ detection.name }} - Response Task
description = {{ detection.description | escapeNewlines() }}
action.escu.creation_date = {{ detection.date }}
action.escu.modification_date = {{ detection.date }}
Expand Down
Loading