diff --git a/contentctl/actions/build.py b/contentctl/actions/build.py
index 8818c8e2..97c0296c 100644
--- a/contentctl/actions/build.py
+++ b/contentctl/actions/build.py
@@ -50,6 +50,7 @@ def execute(self, input_dto: BuildInputDto) -> DirectorOutputDto:
updated_conf_files.update(conf_output.writeObjects(input_dto.director_output_dto.investigations, SecurityContentType.investigations))
updated_conf_files.update(conf_output.writeObjects(input_dto.director_output_dto.lookups, SecurityContentType.lookups))
updated_conf_files.update(conf_output.writeObjects(input_dto.director_output_dto.macros, SecurityContentType.macros))
+ updated_conf_files.update(conf_output.writeObjects(input_dto.director_output_dto.dashboards, SecurityContentType.dashboards))
updated_conf_files.update(conf_output.writeAppConf())
#Ensure that the conf file we just generated/update is syntactically valid
diff --git a/contentctl/actions/detection_testing/infrastructures/DetectionTestingInfrastructure.py b/contentctl/actions/detection_testing/infrastructures/DetectionTestingInfrastructure.py
index ca28dce8..95ebc464 100644
--- a/contentctl/actions/detection_testing/infrastructures/DetectionTestingInfrastructure.py
+++ b/contentctl/actions/detection_testing/infrastructures/DetectionTestingInfrastructure.py
@@ -269,17 +269,25 @@ def configure_imported_roles(
):
indexes.append(self.sync_obj.replay_index)
indexes_encoded = ";".join(indexes)
+
try:
+ # Set which roles should be configured. For Enterprise Security/Integration Testing,
+ # we must add some extra foles.
+ if self.global_config.enable_integration_testing:
+ roles = imported_roles + enterprise_security_roles
+ else:
+ roles = imported_roles
+
self.get_conn().roles.post(
self.infrastructure.splunk_app_username,
- imported_roles=imported_roles + enterprise_security_roles,
+ imported_roles=roles,
srchIndexesAllowed=indexes_encoded,
srchIndexesDefault=self.sync_obj.replay_index,
)
return
except Exception as e:
self.pbar.write(
- f"Enterprise Security Roles do not exist:'{enterprise_security_roles}: {str(e)}"
+ f"The following role(s) do not exist:'{enterprise_security_roles}: {str(e)}"
)
self.get_conn().roles.post(
diff --git a/contentctl/actions/detection_testing/infrastructures/DetectionTestingInfrastructureContainer.py b/contentctl/actions/detection_testing/infrastructures/DetectionTestingInfrastructureContainer.py
index 83c61c63..f5887033 100644
--- a/contentctl/actions/detection_testing/infrastructures/DetectionTestingInfrastructureContainer.py
+++ b/contentctl/actions/detection_testing/infrastructures/DetectionTestingInfrastructureContainer.py
@@ -49,13 +49,17 @@ def get_docker_client(self):
def check_for_teardown(self):
try:
- self.get_docker_client().containers.get(self.get_name())
+ container: docker.models.containers.Container = self.get_docker_client().containers.get(self.get_name())
except Exception as e:
if self.sync_obj.terminate is not True:
self.pbar.write(
f"Error: could not get container [{self.get_name()}]: {str(e)}"
)
self.sync_obj.terminate = True
+ else:
+ if container.status != 'running':
+ self.sync_obj.terminate = True
+ self.container = None
if self.sync_obj.terminate:
self.finish()
diff --git a/contentctl/actions/new_content.py b/contentctl/actions/new_content.py
index 8813c996..bca81521 100644
--- a/contentctl/actions/new_content.py
+++ b/contentctl/actions/new_content.py
@@ -16,7 +16,11 @@ class NewContent:
def buildDetection(self)->dict[str,Any]:
questions = NewContentQuestions.get_questions_detection()
- answers = questionary.prompt(questions)
+ answers: dict[str,str] = questionary.prompt(
+ questions,
+ kbi_msg="User did not answer all of the prompt questions. Exiting...")
+ if not answers:
+ raise ValueError("User didn't answer one or more questions!")
answers.update(answers)
answers['name'] = answers['detection_name']
del answers['detection_name']
@@ -70,7 +74,11 @@ def buildDetection(self)->dict[str,Any]:
def buildStory(self)->dict[str,Any]:
questions = NewContentQuestions.get_questions_story()
- answers = questionary.prompt(questions)
+ answers = questionary.prompt(
+ questions,
+ kbi_msg="User did not answer all of the prompt questions. Exiting...")
+ if not answers:
+ raise ValueError("User didn't answer one or more questions!")
answers['name'] = answers['story_name']
del answers['story_name']
answers['id'] = str(uuid.uuid4())
diff --git a/contentctl/actions/validate.py b/contentctl/actions/validate.py
index a6fce6dc..9d394d07 100644
--- a/contentctl/actions/validate.py
+++ b/contentctl/actions/validate.py
@@ -12,7 +12,7 @@
class Validate:
- def execute(self, input_dto: validate) -> DirectorOutputDto:
+ def execute(self, input_dto: validate) -> DirectorOutputDto:
director_output_dto = DirectorOutputDto(
AtomicEnrichment.getAtomicEnrichment(input_dto),
AttackEnrichment.getAttackEnrichment(input_dto),
@@ -26,6 +26,7 @@ def execute(self, input_dto: validate) -> DirectorOutputDto:
[],
[],
[],
+ []
)
director = Director(director_output_dto)
diff --git a/contentctl/input/director.py b/contentctl/input/director.py
index c6d3b9c3..e18dc596 100644
--- a/contentctl/input/director.py
+++ b/contentctl/input/director.py
@@ -1,17 +1,14 @@
import os
import sys
-import pathlib
-from typing import Union
+from pathlib import Path
from dataclasses import dataclass, field
from pydantic import ValidationError
from uuid import UUID
from contentctl.input.yml_reader import YmlReader
-
from contentctl.objects.detection import Detection
from contentctl.objects.story import Story
-from contentctl.objects.enums import SecurityContentProduct
from contentctl.objects.baseline import Baseline
from contentctl.objects.investigation import Investigation
from contentctl.objects.playbook import Playbook
@@ -21,20 +18,15 @@
from contentctl.objects.atomic import AtomicEnrichment
from contentctl.objects.security_content_object import SecurityContentObject
from contentctl.objects.data_source import DataSource
-from contentctl.objects.event_source import EventSource
-
+from contentctl.objects.dashboard import Dashboard
from contentctl.enrichments.attack_enrichment import AttackEnrichment
from contentctl.enrichments.cve_enrichment import CveEnrichment
from contentctl.objects.config import validate
from contentctl.objects.enums import SecurityContentType
-
-from contentctl.objects.enums import DetectionStatus
from contentctl.helper.utils import Utils
-
-
@dataclass
class DirectorOutputDto:
# Atomic Tests are first because parsing them
@@ -50,6 +42,8 @@ class DirectorOutputDto:
macros: list[Macro]
lookups: list[Lookup]
deployments: list[Deployment]
+ dashboards: list[Dashboard]
+
data_sources: list[DataSource]
name_to_content_map: dict[str, SecurityContentObject] = field(default_factory=dict)
uuid_to_content_map: dict[UUID, SecurityContentObject] = field(default_factory=dict)
@@ -88,6 +82,9 @@ def addContentToDictMappings(self, content: SecurityContentObject):
self.stories.append(content)
elif isinstance(content, Detection):
self.detections.append(content)
+ elif isinstance(content, Dashboard):
+ self.dashboards.append(content)
+
elif isinstance(content, DataSource):
self.data_sources.append(content)
else:
@@ -115,7 +112,7 @@ def execute(self, input_dto: validate) -> None:
self.createSecurityContent(SecurityContentType.data_sources)
self.createSecurityContent(SecurityContentType.playbooks)
self.createSecurityContent(SecurityContentType.detections)
-
+ self.createSecurityContent(SecurityContentType.dashboards)
from contentctl.objects.abstract_security_content_objects.detection_abstract import MISSING_SOURCES
if len(MISSING_SOURCES) > 0:
@@ -137,6 +134,7 @@ def createSecurityContent(self, contentType: SecurityContentType) -> None:
SecurityContentType.playbooks,
SecurityContentType.detections,
SecurityContentType.data_sources,
+ SecurityContentType.dashboards
]:
files = Utils.get_all_yml_files_from_directory(
os.path.join(self.input_dto.path, str(contentType.name))
@@ -147,7 +145,7 @@ def createSecurityContent(self, contentType: SecurityContentType) -> None:
else:
raise (Exception(f"Cannot createSecurityContent for unknown product {contentType}."))
- validation_errors = []
+ validation_errors:list[tuple[Path,ValueError]] = []
already_ran = False
progress_percent = 0
@@ -189,6 +187,10 @@ def createSecurityContent(self, contentType: SecurityContentType) -> None:
elif contentType == SecurityContentType.detections:
detection = Detection.model_validate(modelDict, context={"output_dto":self.output_dto, "app":self.input_dto.app})
self.output_dto.addContentToDictMappings(detection)
+
+ elif contentType == SecurityContentType.dashboards:
+ dashboard = Dashboard.model_validate(modelDict,context={"output_dto":self.output_dto})
+ self.output_dto.addContentToDictMappings(dashboard)
elif contentType == SecurityContentType.data_sources:
data_source = DataSource.model_validate(
diff --git a/contentctl/input/new_content_questions.py b/contentctl/input/new_content_questions.py
index dc450936..0bd227d4 100644
--- a/contentctl/input/new_content_questions.py
+++ b/contentctl/input/new_content_questions.py
@@ -1,7 +1,10 @@
+from typing import Any
+
+
class NewContentQuestions:
@classmethod
- def get_questions_detection(self) -> list:
+ def get_questions_detection(cls) -> list[dict[str,Any]]:
questions = [
{
"type": "text",
@@ -116,7 +119,7 @@ def get_questions_detection(self) -> list:
return questions
@classmethod
- def get_questions_story(self) -> list:
+ def get_questions_story(cls)-> list[dict[str,Any]]:
questions = [
{
"type": "text",
diff --git a/contentctl/objects/abstract_security_content_objects/detection_abstract.py b/contentctl/objects/abstract_security_content_objects/detection_abstract.py
index bd4f83df..26014886 100644
--- a/contentctl/objects/abstract_security_content_objects/detection_abstract.py
+++ b/contentctl/objects/abstract_security_content_objects/detection_abstract.py
@@ -20,7 +20,8 @@
if TYPE_CHECKING:
from contentctl.input.director import DirectorOutputDto
from contentctl.objects.baseline import Baseline
-
+ from contentctl.objects.config import CustomApp
+
from contentctl.objects.security_content_object import SecurityContentObject
from contentctl.objects.enums import AnalyticsType
from contentctl.objects.enums import DataModel
@@ -36,10 +37,16 @@
from contentctl.objects.data_source import DataSource
from contentctl.objects.base_test_result import TestResultStatus
-# from contentctl.objects.playbook import Playbook
from contentctl.objects.enums import ProvidingTechnology
from contentctl.enrichments.cve_enrichment import CveEnrichmentObj
import datetime
+from contentctl.objects.constants import (
+ ES_MAX_STANZA_LENGTH,
+ ES_SEARCH_STANZA_NAME_FORMAT_AFTER_CLONING_IN_PRODUCT_TEMPLATE,
+ CONTENTCTL_MAX_SEARCH_NAME_LENGTH,
+ CONTENTCTL_DETECTION_STANZA_NAME_FORMAT_TEMPLATE
+)
+
MISSING_SOURCES: set[str] = set()
# Those AnalyticsTypes that we do not test via contentctl
@@ -51,8 +58,8 @@
# TODO (#266): disable the use_enum_values configuration
class Detection_Abstract(SecurityContentObject):
model_config = ConfigDict(use_enum_values=True)
-
- # contentType: SecurityContentType = SecurityContentType.detections
+ name:str = Field(...,max_length=CONTENTCTL_MAX_SEARCH_NAME_LENGTH)
+ #contentType: SecurityContentType = SecurityContentType.detections
type: AnalyticsType = Field(...)
status: DetectionStatus = Field(...)
data_source: list[str] = []
@@ -70,10 +77,30 @@ class Detection_Abstract(SecurityContentObject):
# https://github.com/pydantic/pydantic/issues/9101#issuecomment-2019032541
tests: List[Annotated[Union[UnitTest, IntegrationTest, ManualTest], Field(union_mode='left_to_right')]] = []
# A list of groups of tests, relying on the same data
- test_groups: Union[list[TestGroup], None] = Field(None, validate_default=True)
+ test_groups: list[TestGroup] = []
data_source_objects: list[DataSource] = []
+ def get_conf_stanza_name(self, app:CustomApp)->str:
+ stanza_name = CONTENTCTL_DETECTION_STANZA_NAME_FORMAT_TEMPLATE.format(app_label=app.label, detection_name=self.name)
+ self.check_conf_stanza_max_length(stanza_name)
+ return stanza_name
+
+
+ def get_action_dot_correlationsearch_dot_label(self, app:CustomApp, max_stanza_length:int=ES_MAX_STANZA_LENGTH)->str:
+ stanza_name = self.get_conf_stanza_name(app)
+ stanza_name_after_saving_in_es = ES_SEARCH_STANZA_NAME_FORMAT_AFTER_CLONING_IN_PRODUCT_TEMPLATE.format(
+ security_domain_value = self.tags.security_domain.value,
+ search_name = stanza_name
+ )
+
+
+ if len(stanza_name_after_saving_in_es) > max_stanza_length:
+ raise ValueError(f"label may only be {max_stanza_length} characters to allow updating in-product, "
+ f"but stanza was actually {len(stanza_name_after_saving_in_es)} characters: '{stanza_name_after_saving_in_es}' ")
+
+ return stanza_name
+
@field_validator("search", mode="before")
@classmethod
def validate_presence_of_filter_macro(cls, value:str, info:ValidationInfo)->str:
@@ -518,7 +545,7 @@ def model_post_init(self, __context: Any) -> None:
self.data_source_objects = matched_data_sources
for story in self.tags.analytic_story:
- story.detections.append(self)
+ story.detections.append(self)
self.cve_enrichment_func(__context)
@@ -653,6 +680,27 @@ def addTags_nist(self):
else:
self.tags.nist = [NistCategory.DE_AE]
return self
+
+
+ @model_validator(mode="after")
+ def ensureThrottlingFieldsExist(self):
+ '''
+ For throttling to work properly, the fields to throttle on MUST
+ exist in the search itself. If not, then we cannot apply the throttling
+ '''
+ if self.tags.throttling is None:
+ # No throttling configured for this detection
+ return self
+
+ missing_fields:list[str] = [field for field in self.tags.throttling.fields if field not in self.search]
+ if len(missing_fields) > 0:
+ raise ValueError(f"The following throttle fields were missing from the search: {missing_fields}")
+
+ else:
+ # All throttling fields present in search
+ return self
+
+
@model_validator(mode="after")
def ensureProperObservablesExist(self):
diff --git a/contentctl/objects/abstract_security_content_objects/security_content_object_abstract.py b/contentctl/objects/abstract_security_content_objects/security_content_object_abstract.py
index 40b3aedc..f93602f1 100644
--- a/contentctl/objects/abstract_security_content_objects/security_content_object_abstract.py
+++ b/contentctl/objects/abstract_security_content_objects/security_content_object_abstract.py
@@ -5,8 +5,10 @@
from contentctl.objects.deployment import Deployment
from contentctl.objects.security_content_object import SecurityContentObject
from contentctl.input.director import DirectorOutputDto
+ from contentctl.objects.config import CustomApp
from contentctl.objects.enums import AnalyticsType
+from contentctl.objects.constants import CONTENTCTL_MAX_STANZA_LENGTH
import abc
import uuid
import datetime
@@ -31,14 +33,14 @@
# TODO (#266): disable the use_enum_values configuration
class SecurityContentObject_Abstract(BaseModel, abc.ABC):
- model_config = ConfigDict(use_enum_values=True, validate_default=True)
-
- name: str = Field(...)
- author: str = Field("Content Author", max_length=255)
- date: datetime.date = Field(datetime.date.today())
- version: NonNegativeInt = 1
- id: uuid.UUID = Field(default_factory=uuid.uuid4) # we set a default here until all content has a uuid
- description: str = Field("Enter Description Here", max_length=10000)
+ model_config = ConfigDict(use_enum_values=True,validate_default=True)
+
+ name: str = Field(...,max_length=99)
+ author: str = Field(...,max_length=255)
+ date: datetime.date = Field(...)
+ version: NonNegativeInt = Field(...)
+ id: uuid.UUID = Field(...) #we set a default here until all content has a uuid
+ description: str = Field(...,max_length=10000)
file_path: Optional[FilePath] = None
references: Optional[List[HttpUrl]] = None
@@ -56,7 +58,13 @@ def serialize_model(self):
"description": self.description,
"references": [str(url) for url in self.references or []]
}
-
+
+
+ def check_conf_stanza_max_length(self, stanza_name:str, max_stanza_length:int=CONTENTCTL_MAX_STANZA_LENGTH) -> None:
+ if len(stanza_name) > max_stanza_length:
+ raise ValueError(f"conf stanza may only be {max_stanza_length} characters, "
+ f"but stanza was actually {len(stanza_name)} characters: '{stanza_name}' ")
+
@staticmethod
def objectListToNameList(objects: list[SecurityContentObject]) -> list[str]:
return [object.getName() for object in objects]
diff --git a/contentctl/objects/baseline.py b/contentctl/objects/baseline.py
index ee9e66bf..5dc59d8f 100644
--- a/contentctl/objects/baseline.py
+++ b/contentctl/objects/baseline.py
@@ -1,33 +1,21 @@
from __future__ import annotations
-from typing import TYPE_CHECKING, Annotated, Optional, List,Any
+from typing import Annotated, Optional, List,Any
from pydantic import field_validator, ValidationInfo, Field, model_serializer
-if TYPE_CHECKING:
- from contentctl.input.director import DirectorOutputDto
-
from contentctl.objects.deployment import Deployment
from contentctl.objects.security_content_object import SecurityContentObject
-from contentctl.objects.enums import DataModel, AnalyticsType
+from contentctl.objects.enums import DataModel
from contentctl.objects.baseline_tags import BaselineTags
-from contentctl.objects.enums import DeploymentType
-#from contentctl.objects.deployment import Deployment
-# from typing import TYPE_CHECKING
-# if TYPE_CHECKING:
-# from contentctl.input.director import DirectorOutputDto
+from contentctl.objects.config import CustomApp
+
+from contentctl.objects.constants import CONTENTCTL_MAX_SEARCH_NAME_LENGTH,CONTENTCTL_BASELINE_STANZA_NAME_FORMAT_TEMPLATE
class Baseline(SecurityContentObject):
- # baseline spec
- #name: str
- #id: str
- #version: int
- #date: str
- #author: str
- #contentType: SecurityContentType = SecurityContentType.baselines
+ name:str = Field(...,max_length=CONTENTCTL_MAX_SEARCH_NAME_LENGTH)
type: Annotated[str,Field(pattern="^Baseline$")] = Field(...)
datamodel: Optional[List[DataModel]] = None
- #description: str
search: str = Field(..., min_length=4)
how_to_implement: str = Field(..., min_length=4)
known_false_positives: str = Field(..., min_length=4)
@@ -35,6 +23,12 @@ class Baseline(SecurityContentObject):
# enrichment
deployment: Deployment = Field({})
+
+
+ def get_conf_stanza_name(self, app:CustomApp)->str:
+ stanza_name = CONTENTCTL_BASELINE_STANZA_NAME_FORMAT_TEMPLATE.format(app_label=app.label, detection_name=self.name)
+ self.check_conf_stanza_max_length(stanza_name)
+ return stanza_name
@field_validator("deployment", mode="before")
def getDeployment(cls, v:Any, info:ValidationInfo)->Deployment:
diff --git a/contentctl/objects/baseline_tags.py b/contentctl/objects/baseline_tags.py
index bf1ea88b..ea979664 100644
--- a/contentctl/objects/baseline_tags.py
+++ b/contentctl/objects/baseline_tags.py
@@ -1,15 +1,12 @@
from __future__ import annotations
-from typing import TYPE_CHECKING
from pydantic import BaseModel, Field, field_validator, ValidationInfo, model_serializer
from typing import List, Any, Union
from contentctl.objects.story import Story
-from contentctl.objects.deployment import Deployment
from contentctl.objects.detection import Detection
from contentctl.objects.enums import SecurityContentProductName
from contentctl.objects.enums import SecurityDomain
-if TYPE_CHECKING:
- from contentctl.input.director import DirectorOutputDto
+
@@ -19,7 +16,7 @@ class BaselineTags(BaseModel):
#deployment: Deployment = Field('SET_IN_GET_DEPLOYMENT_FUNCTION')
# TODO (#223): can we remove str from the possible types here?
detections: List[Union[Detection,str]] = Field(...)
- product: list[SecurityContentProductName] = Field(...,min_length=1)
+ product: List[SecurityContentProductName] = Field(...,min_length=1)
required_fields: List[str] = Field(...,min_length=1)
security_domain: SecurityDomain = Field(...)
diff --git a/contentctl/objects/config.py b/contentctl/objects/config.py
index 5a60b700..0b262c55 100644
--- a/contentctl/objects/config.py
+++ b/contentctl/objects/config.py
@@ -158,6 +158,7 @@ def getApp(self, config:test, stage_file=True)->str:
str(destination),
verbose_print=True)
return str(destination)
+
# TODO (#266): disable the use_enum_values configuration
diff --git a/contentctl/objects/constants.py b/contentctl/objects/constants.py
index a65e317c..c295ec86 100644
--- a/contentctl/objects/constants.py
+++ b/contentctl/objects/constants.py
@@ -1,3 +1,5 @@
+# Use for calculation of maximum length of name field
+from contentctl.objects.enums import SecurityDomain
ATTACK_TACTICS_KILLCHAIN_MAPPING = {
"Reconnaissance": "Reconnaissance",
@@ -140,3 +142,31 @@
# The relative path to the directory where any apps/packages will be downloaded
DOWNLOADS_DIRECTORY = "downloads"
+
+# Maximum length of the name field for a search.
+# This number is derived from a limitation that exists in
+# ESCU where a search cannot be edited, due to validation
+# errors, if its name is longer than 99 characters.
+# When an saved search is cloned in Enterprise Security User Interface,
+# it is wrapped in the following:
+# {Detection.tags.security_domain.value} - {SEARCH_STANZA_NAME} - Rule
+# Similarly, when we generate the search stanza name in contentctl, it
+# is app.label - detection.name - Rule
+# However, in product the search name is:
+# {CustomApp.label} - {detection.name} - Rule,
+# or in ESCU:
+# ESCU - {detection.name} - Rule,
+# this gives us a maximum length below.
+# When an ESCU search is cloned, it will
+# have a full name like (the following is NOT a typo):
+# Endpoint - ESCU - Name of Search From YML File - Rule - Rule
+# The math below accounts for all these caveats
+ES_MAX_STANZA_LENGTH = 99
+CONTENTCTL_DETECTION_STANZA_NAME_FORMAT_TEMPLATE = "{app_label} - {detection_name} - Rule"
+CONTENTCTL_BASELINE_STANZA_NAME_FORMAT_TEMPLATE = "{app_label} - {detection_name}"
+CONTENTCTL_RESPONSE_TASK_NAME_FORMAT_TEMPLATE = "{app_label} - {detection_name} - Response Task"
+
+ES_SEARCH_STANZA_NAME_FORMAT_AFTER_CLONING_IN_PRODUCT_TEMPLATE = "{security_domain_value} - {search_name} - Rule"
+SECURITY_DOMAIN_MAX_LENGTH = max([len(SecurityDomain[value]) for value in SecurityDomain._member_map_])
+CONTENTCTL_MAX_STANZA_LENGTH = ES_MAX_STANZA_LENGTH - len(ES_SEARCH_STANZA_NAME_FORMAT_AFTER_CLONING_IN_PRODUCT_TEMPLATE.format(security_domain_value="X"*SECURITY_DOMAIN_MAX_LENGTH,search_name=""))
+CONTENTCTL_MAX_SEARCH_NAME_LENGTH = CONTENTCTL_MAX_STANZA_LENGTH - len(CONTENTCTL_DETECTION_STANZA_NAME_FORMAT_TEMPLATE.format(app_label="ESCU", detection_name=""))
\ No newline at end of file
diff --git a/contentctl/objects/dashboard.py b/contentctl/objects/dashboard.py
new file mode 100644
index 00000000..90bfc93f
--- /dev/null
+++ b/contentctl/objects/dashboard.py
@@ -0,0 +1,100 @@
+from typing import Any
+from pydantic import Field, Json, model_validator
+
+import pathlib
+from jinja2 import Environment
+import json
+from contentctl.objects.security_content_object import SecurityContentObject
+from contentctl.objects.config import build
+from enum import StrEnum
+
+DEFAULT_DASHBAORD_JINJA2_TEMPLATE = '''
+
+
+
+
+'''
+
+class DashboardTheme(StrEnum):
+ light = "light"
+ dark = "dark"
+
+class Dashboard(SecurityContentObject):
+ j2_template: str = Field(default=DEFAULT_DASHBAORD_JINJA2_TEMPLATE, description="Jinja2 Template used to construct the dashboard")
+ description: str = Field(...,description="A description of the dashboard. This does not have to match "
+ "the description of the dashboard in the JSON file.", max_length=10000)
+ theme: DashboardTheme = Field(default=DashboardTheme.light, description="The theme of the dashboard. Choose between 'light' and 'dark'.")
+ json_obj: Json[dict[str,Any]] = Field(..., description="Valid JSON object that describes the dashboard")
+
+
+
+ def label(self, config:build)->str:
+ return f"{config.app.label} - {self.name}"
+
+ @model_validator(mode="before")
+ @classmethod
+ def validate_fields_from_json(cls, data:Any)->Any:
+ yml_file_name:str|None = data.get("file_path", None)
+ if yml_file_name is None:
+ raise ValueError("File name not passed to dashboard constructor")
+ yml_file_path = pathlib.Path(yml_file_name)
+ json_file_path = yml_file_path.with_suffix(".json")
+
+ if not json_file_path.is_file():
+ raise ValueError(f"Required file {json_file_path} does not exist.")
+
+ with open(json_file_path,'r') as jsonFilePointer:
+ try:
+ json_obj:dict[str,Any] = json.load(jsonFilePointer)
+ except Exception as e:
+ raise ValueError(f"Unable to load data from {json_file_path}: {str(e)}")
+
+ name_from_file = data.get("name",None)
+ name_from_json = json_obj.get("title",None)
+
+ errors:list[str] = []
+ if name_from_json is None:
+ errors.append(f"'title' field is missing from {json_file_path}")
+ elif name_from_json != name_from_file:
+ errors.append(f"The 'title' field in the JSON file [{json_file_path}] does not match the 'name' field in the YML object [{yml_file_path}]. These two MUST match:\n "
+ f"title in JSON : {name_from_json}\n "
+ f"title in YML : {name_from_file}\n ")
+
+ description_from_json = json_obj.get("description",None)
+ if description_from_json is None:
+ errors.append("'description' field is missing from field 'json_object'")
+
+ if len(errors) > 0 :
+ err_string = "\n - ".join(errors)
+ raise ValueError(f"Error(s) validating dashboard:\n - {err_string}")
+
+ data['name'] = name_from_file
+ data['json_obj'] = json.dumps(json_obj)
+ return data
+
+
+ def pretty_print_json_obj(self):
+ return json.dumps(self.json_obj, indent=4)
+
+ def getOutputFilepathRelativeToAppRoot(self, config:build)->pathlib.Path:
+ filename = f"{self.file_path.stem}.xml".lower()
+ return pathlib.Path("default/data/ui/views")/filename
+
+
+ def writeDashboardFile(self, j2_env:Environment, config:build):
+ template = j2_env.from_string(self.j2_template)
+ dashboard_text = template.render(config=config, dashboard=self)
+
+ with open(config.getPackageDirectoryPath()/self.getOutputFilepathRelativeToAppRoot(config), 'a') as f:
+ output_xml = dashboard_text.encode('utf-8', 'ignore').decode('utf-8')
+ f.write(output_xml)
+
+
diff --git a/contentctl/objects/deployment.py b/contentctl/objects/deployment.py
index f2b2f391..832c048d 100644
--- a/contentctl/objects/deployment.py
+++ b/contentctl/objects/deployment.py
@@ -1,7 +1,8 @@
from __future__ import annotations
-from pydantic import Field, computed_field, model_validator,ValidationInfo, model_serializer
-from typing import Optional,Any
-
+from pydantic import Field, computed_field,ValidationInfo, model_serializer, NonNegativeInt
+from typing import Any
+import uuid
+import datetime
from contentctl.objects.security_content_object import SecurityContentObject
from contentctl.objects.deployment_scheduling import DeploymentScheduling
from contentctl.objects.alert_action import AlertAction
@@ -15,9 +16,13 @@ class Deployment(SecurityContentObject):
#author: str = None
#description: str = None
#contentType: SecurityContentType = SecurityContentType.deployments
+
+
scheduling: DeploymentScheduling = Field(...)
alert_action: AlertAction = AlertAction()
type: DeploymentType = Field(...)
+ author: str = Field(...,max_length=255)
+ version: NonNegativeInt = 1
#Type was the only tag exposed and should likely be removed/refactored.
#For transitional reasons, provide this as a computed_field in prep for removal
@@ -25,7 +30,8 @@ class Deployment(SecurityContentObject):
@property
def tags(self)->dict[str,DeploymentType]:
return {"type": self.type}
-
+
+
@staticmethod
def getDeployment(v:dict[str,Any], info:ValidationInfo)->Deployment:
if v != {}:
@@ -36,8 +42,17 @@ def getDeployment(v:dict[str,Any], info:ValidationInfo)->Deployment:
detection_name = info.data.get("name", None)
if detection_name is None:
raise ValueError("Could not create inline deployment - Baseline or Detection lacking 'name' field,")
+
+ # Add a number of static values
+ v.update({
+ 'name': f"{detection_name} - Inline Deployment",
+ 'id':uuid.uuid4(),
+ 'date': datetime.date.today(),
+ 'description': "Inline deployment created at runtime.",
+ 'author': "contentctl tool"
+ })
+
- v['name'] = f"{detection_name} - Inline Deployment"
# This constructs a temporary in-memory deployment,
# allowing the deployment to be easily defined in the
# detection on a per detection basis.
diff --git a/contentctl/objects/detection_tags.py b/contentctl/objects/detection_tags.py
index 2998006f..c8dce678 100644
--- a/contentctl/objects/detection_tags.py
+++ b/contentctl/objects/detection_tags.py
@@ -16,6 +16,7 @@
model_validator
)
from contentctl.objects.story import Story
+from contentctl.objects.throttling import Throttling
if TYPE_CHECKING:
from contentctl.input.director import DirectorOutputDto
@@ -29,7 +30,6 @@
RiskSeverity,
KillChainPhase,
NistCategory,
- RiskLevel,
SecurityContentProductName
)
from contentctl.objects.atomic import AtomicEnrichment, AtomicTest
@@ -49,6 +49,23 @@ class DetectionTags(BaseModel):
@property
def risk_score(self) -> int:
return round((self.confidence * self.impact)/100)
+
+ @computed_field
+ @property
+ def severity(self)->RiskSeverity:
+ if 0 <= self.risk_score <= 20:
+ return RiskSeverity.INFORMATIONAL
+ elif 20 < self.risk_score <= 40:
+ return RiskSeverity.LOW
+ elif 40 < self.risk_score <= 60:
+ return RiskSeverity.MEDIUM
+ elif 60 < self.risk_score <= 80:
+ return RiskSeverity.HIGH
+ elif 80 < self.risk_score <= 100:
+ return RiskSeverity.CRITICAL
+ else:
+ raise Exception(f"Error getting severity - risk_score must be between 0-100, but was actually {self.risk_score}")
+
mitre_attack_id: List[MITRE_ATTACK_ID_TYPE] = []
nist: list[NistCategory] = []
@@ -58,19 +75,8 @@ def risk_score(self) -> int:
message: str = Field(...)
product: list[SecurityContentProductName] = Field(..., min_length=1)
required_fields: list[str] = Field(min_length=1)
-
+ throttling: Optional[Throttling] = None
security_domain: SecurityDomain = Field(...)
-
- @computed_field
- @property
- def risk_severity(self) -> RiskSeverity:
- if self.risk_score >= 80:
- return RiskSeverity('high')
- elif (self.risk_score >= 50 and self.risk_score <= 79):
- return RiskSeverity('medium')
- else:
- return RiskSeverity('low')
-
cve: List[CVE_TYPE] = []
atomic_guid: List[AtomicTest] = []
drilldown_search: Optional[str] = None
@@ -79,10 +85,6 @@ def risk_severity(self) -> RiskSeverity:
mitre_attack_enrichments: List[MitreAttackEnrichment] = Field([], validate_default=True)
confidence_id: Optional[PositiveInt] = Field(None, ge=1, le=3)
impact_id: Optional[PositiveInt] = Field(None, ge=1, le=5)
- # context_ids: list = None
- risk_level_id: Optional[NonNegativeInt] = Field(None, le=4)
- risk_level: Optional[RiskLevel] = None
- # observable_str: str = None
evidence_str: Optional[str] = None
@computed_field
@@ -158,7 +160,7 @@ def serialize_model(self):
"message": self.message,
"risk_score": self.risk_score,
"security_domain": self.security_domain,
- "risk_severity": self.risk_severity,
+ "risk_severity": self.severity,
"mitre_attack_id": self.mitre_attack_id,
"mitre_attack_enrichments": self.mitre_attack_enrichments
}
diff --git a/contentctl/objects/enums.py b/contentctl/objects/enums.py
index 240ba905..d7072ecc 100644
--- a/contentctl/objects/enums.py
+++ b/contentctl/objects/enums.py
@@ -55,6 +55,8 @@ class SecurityContentType(enum.Enum):
investigations = 8
unit_tests = 9
data_sources = 11
+ dashboards = 12
+
# Bringing these changes back in line will take some time after
# the initial merge is complete
@@ -405,14 +407,16 @@ class NistCategory(str, enum.Enum):
RC_IM = "RC.IM"
RC_CO = "RC.CO"
-class RiskLevel(str,enum.Enum):
- INFO = "Info"
- LOW = "Low"
- MEDIUM = "Medium"
- HIGH = "High"
- CRITICAL = "Critical"
-
class RiskSeverity(str,enum.Enum):
+ # Levels taken from the following documentation link
+ # https://docs.splunk.com/Documentation/ES/7.3.2/User/RiskScoring
+ # 20 - info (0-20 for us)
+ # 40 - low (21-40 for us)
+ # 60 - medium (41-60 for us)
+ # 80 - high (61-80 for us)
+ # 100 - critical (81 - 100 for us)
+ INFORMATIONAL = "informational"
LOW = "low"
MEDIUM = "medium"
HIGH = "high"
+ CRITICAL = "critical"
diff --git a/contentctl/objects/investigation.py b/contentctl/objects/investigation.py
index 81eb5460..293e3331 100644
--- a/contentctl/objects/investigation.py
+++ b/contentctl/objects/investigation.py
@@ -1,20 +1,23 @@
from __future__ import annotations
import re
-from typing import TYPE_CHECKING, Optional, List, Any
-from pydantic import field_validator, computed_field, Field, ValidationInfo, ConfigDict,model_serializer
-if TYPE_CHECKING:
- from contentctl.input.director import DirectorOutputDto
+from typing import List, Any
+from pydantic import computed_field, Field, ConfigDict,model_serializer
from contentctl.objects.security_content_object import SecurityContentObject
from contentctl.objects.enums import DataModel
from contentctl.objects.investigation_tags import InvestigationTags
-
+from contentctl.objects.constants import (
+ CONTENTCTL_MAX_SEARCH_NAME_LENGTH,
+ CONTENTCTL_RESPONSE_TASK_NAME_FORMAT_TEMPLATE,
+ CONTENTCTL_MAX_STANZA_LENGTH
+)
+from contentctl.objects.config import CustomApp
# TODO (#266): disable the use_enum_values configuration
class Investigation(SecurityContentObject):
model_config = ConfigDict(use_enum_values=True,validate_default=False)
type: str = Field(...,pattern="^Investigation$")
datamodel: list[DataModel] = Field(...)
-
+ name:str = Field(...,max_length=CONTENTCTL_MAX_SEARCH_NAME_LENGTH)
search: str = Field(...)
how_to_implement: str = Field(...)
known_false_positives: str = Field(...)
@@ -27,11 +30,11 @@ class Investigation(SecurityContentObject):
@property
def inputs(self)->List[str]:
#Parse out and return all inputs from the searchj
- inputs = []
+ inputs:List[str] = []
pattern = r"\$([^\s.]*)\$"
for input in re.findall(pattern, self.search):
- inputs.append(input)
+ inputs.append(str(input))
return inputs
@@ -40,6 +43,16 @@ def inputs(self)->List[str]:
def lowercase_name(self)->str:
return self.name.replace(' ', '_').replace('-','_').replace('.','_').replace('/','_').lower().replace(' ', '_').replace('-','_').replace('.','_').replace('/','_').lower()
+
+ # This is a slightly modified version of the get_conf_stanza_name function from
+ # SecurityContentObject_Abstract
+ def get_response_task_name(self, app:CustomApp, max_stanza_length:int=CONTENTCTL_MAX_STANZA_LENGTH)->str:
+ stanza_name = CONTENTCTL_RESPONSE_TASK_NAME_FORMAT_TEMPLATE.format(app_label=app.label, detection_name=self.name)
+ if len(stanza_name) > max_stanza_length:
+ raise ValueError(f"conf stanza may only be {max_stanza_length} characters, "
+ f"but stanza was actually {len(stanza_name)} characters: '{stanza_name}' ")
+ return stanza_name
+
@model_serializer
def serialize_model(self):
@@ -66,12 +79,7 @@ def serialize_model(self):
def model_post_init(self, ctx:dict[str,Any]):
- # director: Optional[DirectorOutputDto] = ctx.get("output_dto",None)
- # if not isinstance(director,DirectorOutputDto):
- # raise ValueError("DirectorOutputDto was not passed in context of Detection model_post_init")
- director: Optional[DirectorOutputDto] = ctx.get("output_dto",None)
+ # Ensure we link all stories this investigation references
+ # back to itself
for story in self.tags.analytic_story:
story.investigations.append(self)
-
-
-
\ No newline at end of file
diff --git a/contentctl/objects/investigation_tags.py b/contentctl/objects/investigation_tags.py
index c01ac2cc..6db99eff 100644
--- a/contentctl/objects/investigation_tags.py
+++ b/contentctl/objects/investigation_tags.py
@@ -1,12 +1,13 @@
from __future__ import annotations
+from typing import List
from pydantic import BaseModel, Field, field_validator, ValidationInfo, model_serializer
from contentctl.objects.story import Story
from contentctl.objects.enums import SecurityContentInvestigationProductName, SecurityDomain
class InvestigationTags(BaseModel):
- analytic_story: list[Story] = Field([],min_length=1)
- product: list[SecurityContentInvestigationProductName] = Field(...,min_length=1)
- required_fields: list[str] = Field(min_length=1)
+ analytic_story: List[Story] = Field([],min_length=1)
+ product: List[SecurityContentInvestigationProductName] = Field(...,min_length=1)
+ required_fields: List[str] = Field(min_length=1)
security_domain: SecurityDomain = Field(...)
diff --git a/contentctl/objects/lookup.py b/contentctl/objects/lookup.py
index 9cc36007..e37e60e9 100644
--- a/contentctl/objects/lookup.py
+++ b/contentctl/objects/lookup.py
@@ -1,8 +1,10 @@
from __future__ import annotations
-from pydantic import field_validator, ValidationInfo, model_validator, FilePath, model_serializer
+from pydantic import field_validator, ValidationInfo, model_validator, FilePath, model_serializer, Field, NonNegativeInt
from typing import TYPE_CHECKING, Optional, Any, Union
import re
import csv
+import uuid
+import datetime
if TYPE_CHECKING:
from contentctl.input.director import DirectorOutputDto
from contentctl.objects.config import validate
@@ -32,6 +34,11 @@ class Lookup(SecurityContentObject):
match_type: Optional[str] = None
min_matches: Optional[int] = None
case_sensitive_match: Optional[bool] = None
+ # TODO: Add id field to all lookup ymls
+ id: uuid.UUID = Field(default_factory=uuid.uuid4)
+ date: datetime.date = Field(datetime.date.today())
+ author: str = Field("NO AUTHOR DEFINED",max_length=255)
+ version: NonNegativeInt = 1
@model_serializer
diff --git a/contentctl/objects/macro.py b/contentctl/objects/macro.py
index 5f01f2d1..48daf602 100644
--- a/contentctl/objects/macro.py
+++ b/contentctl/objects/macro.py
@@ -3,7 +3,9 @@
from __future__ import annotations
from typing import TYPE_CHECKING, List
import re
-from pydantic import Field, model_serializer
+from pydantic import Field, model_serializer, NonNegativeInt
+import uuid
+import datetime
if TYPE_CHECKING:
from contentctl.input.director import DirectorOutputDto
from contentctl.objects.security_content_object import SecurityContentObject
@@ -22,7 +24,11 @@
class Macro(SecurityContentObject):
definition: str = Field(..., min_length=1)
arguments: List[str] = Field([])
-
+ # TODO: Add id field to all macro ymls
+ id: uuid.UUID = Field(default_factory=uuid.uuid4)
+ date: datetime.date = Field(datetime.date.today())
+ author: str = Field("NO AUTHOR DEFINED",max_length=255)
+ version: NonNegativeInt = 1
diff --git a/contentctl/objects/story.py b/contentctl/objects/story.py
index 36558388..09d65287 100644
--- a/contentctl/objects/story.py
+++ b/contentctl/objects/story.py
@@ -8,26 +8,14 @@
from contentctl.objects.investigation import Investigation
from contentctl.objects.baseline import Baseline
from contentctl.objects.data_source import DataSource
+ from contentctl.objects.config import CustomApp
from contentctl.objects.security_content_object import SecurityContentObject
-
-
-
-
-#from contentctl.objects.investigation import Investigation
-
-
-
class Story(SecurityContentObject):
narrative: str = Field(...)
tags: StoryTags = Field(...)
- # enrichments
- #detection_names: List[str] = []
- #investigation_names: List[str] = []
- #baseline_names: List[str] = []
-
# These are updated when detection and investigation objects are created.
# Specifically in the model_post_init functions
detections:List[Detection] = []
@@ -46,9 +34,9 @@ def data_sources(self)-> list[DataSource]:
return sorted(list(data_source_objects))
- def storyAndInvestigationNamesWithApp(self, app_name:str)->List[str]:
- return [f"{app_name} - {name} - Rule" for name in self.detection_names] + \
- [f"{app_name} - {name} - Response Task" for name in self.investigation_names]
+ def storyAndInvestigationNamesWithApp(self, app:CustomApp)->List[str]:
+ return [detection.get_conf_stanza_name(app) for detection in self.detections] + \
+ [investigation.get_response_task_name(app) for investigation in self.investigations]
@model_serializer
def serialize_model(self):
diff --git a/contentctl/objects/throttling.py b/contentctl/objects/throttling.py
new file mode 100644
index 00000000..04998ac6
--- /dev/null
+++ b/contentctl/objects/throttling.py
@@ -0,0 +1,46 @@
+from pydantic import BaseModel, Field, field_validator
+from typing import Annotated
+
+
+# Alert Suppression/Throttling settings have been taken from
+# https://docs.splunk.com/Documentation/Splunk/9.2.2/Admin/Savedsearchesconf
+class Throttling(BaseModel):
+ fields: list[str] = Field(..., description="The list of fields to throttle on. These fields MUST occur in the search.", min_length=1)
+ period: Annotated[str,Field(pattern="^[0-9]+[smh]$")] = Field(..., description="How often the alert should be triggered. "
+ "This may be specified in seconds, minutes, or hours. "
+ "For example, if an alert should be triggered once a day,"
+ " it may be specified in seconds (86400s), minutes (1440m), or hours import (24h).")
+
+ @field_validator("fields")
+ def no_spaces_in_fields(cls, v:list[str])->list[str]:
+ for field in v:
+ if ' ' in field:
+ raise ValueError("Spaces are not presently supported in 'alert.suppress.fields' / throttling fields in conf files. "
+ "The field '{field}' has a space in it. If this is a blocker, please raise this as an issue on the Project.")
+ return v
+
+ def conf_formatted_fields(self)->str:
+ '''
+ TODO:
+ The field alert.suppress.fields is defined as follows:
+ alert.suppress.fields =
+ * List of fields to use when suppressing per-result alerts. This field *must*
+ be specified if the digest mode is disabled and suppression is enabled.
+
+ In order to support fields with spaces in them, we may need to wrap each
+ field in "".
+ This function returns a properly formatted value, where each field
+ is wrapped in "" and separated with a comma. For example, the fields
+ ["field1", "field 2", "field3"] would be returned as the string
+
+ "field1","field 2","field3
+
+ However, for now, we will error on fields with spaces and simply
+ separate with commas
+ '''
+
+ return ",".join(self.fields)
+
+ # The following may be used once we determine proper support
+ # for fields with spaces
+ #return ",".join([f'"{field}"' for field in self.fields])
\ No newline at end of file
diff --git a/contentctl/output/conf_output.py b/contentctl/output/conf_output.py
index cf4574ae..0d00cf64 100644
--- a/contentctl/output/conf_output.py
+++ b/contentctl/output/conf_output.py
@@ -152,6 +152,10 @@ def writeObjects(self, objects: list, type: SecurityContentType = None) -> set[p
'macros.j2',
self.config, objects))
+ elif type == SecurityContentType.dashboards:
+ written_files.update(ConfWriter.writeDashboardFiles(self.config, objects))
+
+
return written_files
diff --git a/contentctl/output/conf_writer.py b/contentctl/output/conf_writer.py
index b103a291..4d2e0490 100644
--- a/contentctl/output/conf_writer.py
+++ b/contentctl/output/conf_writer.py
@@ -8,6 +8,7 @@
from jinja2 import Environment, FileSystemLoader, StrictUndefined
import pathlib
from contentctl.objects.security_content_object import SecurityContentObject
+from contentctl.objects.dashboard import Dashboard
from contentctl.objects.config import build
import xml.etree.ElementTree as ET
@@ -61,7 +62,7 @@ def writeManifestFile(app_output_path:pathlib.Path, template_name : str, config:
j2_env = ConfWriter.getJ2Environment()
template = j2_env.get_template(template_name)
- output = template.render(objects=objects, APP_NAME=config.app.label, currentDate=datetime.datetime.now(datetime.UTC).date().isoformat())
+ output = template.render(objects=objects, app=config.app, currentDate=datetime.datetime.now(datetime.UTC).date().isoformat())
output_path = config.getPackageDirectoryPath()/app_output_path
output_path.parent.mkdir(parents=True, exist_ok=True)
@@ -94,7 +95,7 @@ def writeXmlFile(app_output_path:pathlib.Path, template_name : str, config: buil
j2_env = ConfWriter.getJ2Environment()
template = j2_env.get_template(template_name)
- output = template.render(objects=objects, APP_NAME=config.app.label)
+ output = template.render(objects=objects, app=config.app)
output_path = config.getPackageDirectoryPath()/app_output_path
output_path.parent.mkdir(parents=True, exist_ok=True)
@@ -107,6 +108,22 @@ def writeXmlFile(app_output_path:pathlib.Path, template_name : str, config: buil
+ @staticmethod
+ def writeDashboardFiles(config:build, dashboards:list[Dashboard])->set[pathlib.Path]:
+ written_files:set[pathlib.Path] = set()
+ for dashboard in dashboards:
+ output_file_path = dashboard.getOutputFilepathRelativeToAppRoot(config)
+ # Check that the full output path does not exist so that we are not having an
+ # name collision with a file in app_template
+ if (config.getPackageDirectoryPath()/output_file_path).exists():
+ raise FileExistsError(f"ERROR: Overwriting Dashboard File {output_file_path}. Does this file exist in {config.getAppTemplatePath()} AND {config.path/'dashboards'}?")
+
+ ConfWriter.writeXmlFileHeader(output_file_path, config)
+ dashboard.writeDashboardFile(ConfWriter.getJ2Environment(), config)
+ ConfWriter.validateXmlFile(config.getPackageDirectoryPath()/output_file_path)
+ written_files.add(output_file_path)
+ return written_files
+
@staticmethod
def writeXmlFileHeader(app_output_path:pathlib.Path, config: build) -> None:
@@ -142,7 +159,7 @@ def writeConfFile(app_output_path:pathlib.Path, template_name : str, config: bui
j2_env = ConfWriter.getJ2Environment()
template = j2_env.get_template(template_name)
- output = template.render(objects=objects, APP_NAME=config.app.label)
+ output = template.render(objects=objects, app=config.app)
output_path.parent.mkdir(parents=True, exist_ok=True)
with open(output_path, 'a') as f:
diff --git a/contentctl/output/templates/analyticstories_detections.j2 b/contentctl/output/templates/analyticstories_detections.j2
index 290c4c85..2f9a1318 100644
--- a/contentctl/output/templates/analyticstories_detections.j2
+++ b/contentctl/output/templates/analyticstories_detections.j2
@@ -3,7 +3,7 @@
{% for detection in objects %}
{% if (detection.type == 'TTP' or detection.type == 'Anomaly' or detection.type == 'Hunting' or detection.type == 'Correlation') %}
-[savedsearch://{{APP_NAME}} - {{ detection.name }} - Rule]
+[savedsearch://{{ detection.get_conf_stanza_name(app) }}]
type = detection
asset_type = {{ detection.tags.asset_type.value }}
confidence = medium
diff --git a/contentctl/output/templates/analyticstories_investigations.j2 b/contentctl/output/templates/analyticstories_investigations.j2
index e742c909..3dac82c0 100644
--- a/contentctl/output/templates/analyticstories_investigations.j2
+++ b/contentctl/output/templates/analyticstories_investigations.j2
@@ -1,13 +1,13 @@
### RESPONSE TASKS ###
-{% for detection in objects %}
-{% if (detection.type == 'Investigation') %}
-[savedsearch://{{APP_NAME}} - {{ detection.name }} - Response Task]
+{% for investigation in objects %}
+{% if (investigation.type == 'Investigation') %}
+[savedsearch://{{ investigation.get_response_task_name(app) }}]
type = investigation
explanation = none
-{% if detection.how_to_implement is defined %}
-how_to_implement = {{ detection.how_to_implement | escapeNewlines() }}
+{% if investigation.how_to_implement is defined %}
+how_to_implement = {{ investigation.how_to_implement | escapeNewlines() }}
{% else %}
how_to_implement = none
{% endif %}
diff --git a/contentctl/output/templates/analyticstories_stories.j2 b/contentctl/output/templates/analyticstories_stories.j2
index 9723a6dd..829bd8d0 100644
--- a/contentctl/output/templates/analyticstories_stories.j2
+++ b/contentctl/output/templates/analyticstories_stories.j2
@@ -10,7 +10,7 @@ version = {{ story.version }}
references = {{ story.getReferencesListForJson() | tojson }}
maintainers = [{"company": "{{ story.author_company }}", "email": "{{ story.author_email }}", "name": "{{ story.author_name }}"}]
spec_version = 3
-searches = {{ story.storyAndInvestigationNamesWithApp(APP_NAME) | tojson }}
+searches = {{ story.storyAndInvestigationNamesWithApp(app) | tojson }}
description = {{ story.description | escapeNewlines() }}
{% if story.narrative is defined %}
narrative = {{ story.narrative | escapeNewlines() }}
diff --git a/contentctl/output/templates/finding_report.j2 b/contentctl/output/templates/finding_report.j2
deleted file mode 100644
index e965946a..00000000
--- a/contentctl/output/templates/finding_report.j2
+++ /dev/null
@@ -1,30 +0,0 @@
-
- | eval devices = [{"hostname": device_hostname, "type_id": 0, "uuid": device.uuid}],
- time = timestamp,
- evidence = {{ detection.tags.evidence_str }},
- message = "{{ detection.name }} has been triggered on " + device_hostname + " by " + {{ actor_user_name }} + ".",
- users = [{"name": {{ actor_user_name }}, "uuid": actor_user.uuid, "uid": actor_user.uid}],
- activity_id = 1,
- cis_csc = [{"control": "CIS 10", "version": 8}],
- analytic_stories = {{ detection.tags.analytics_story_str }},
- class_name = "Detection Report",
- confidence = {{ detection.tags.confidence }},
- confidence_id = {{ detection.tags.confidence_id }},
- duration = 0,
- impact = {{ detection.tags.impact }},
- impact_id = {{ detection.tags.impact_id }},
- kill_chain = {{ detection.tags.kill_chain_phases_str }},
- nist = ["DE.AE"],
- risk_level = "{{ detection.tags.risk_level }}",
- category_uid = 2,
- class_uid = 102001,
- risk_level_id = {{ detection.tags.risk_level_id }},
- risk_score = {{ detection.tags.risk_score }},
- severity_id = 0,
- rule = {"name": "{{ detection.name }}", "uid": "{{ detection.id }}", "type": "Streaming"},
- metadata = {"customer_uid": metadata.customer_uid, "product": {"name": "Behavior Analytics", "vendor_name": "Splunk"}, "version": "1.0.0-rc.2", "logged_time": time()},
- type_uid = 10200101,
- start_time = timestamp,
- end_time = timestamp
- | fields metadata, rule, activity_id, analytic_stories, cis_csc, category_uid, class_name, class_uid, confidence, confidence_id, devices, duration, time, evidence, impact, impact_id, kill_chain, message, nist, observables, risk_level, risk_level_id, risk_score, severity_id, type_uid, users, start_time, end_time
- | into sink;
diff --git a/contentctl/output/templates/savedsearches_baselines.j2 b/contentctl/output/templates/savedsearches_baselines.j2
index caf00fc0..fffe116c 100644
--- a/contentctl/output/templates/savedsearches_baselines.j2
+++ b/contentctl/output/templates/savedsearches_baselines.j2
@@ -1,14 +1,13 @@
-### {{APP_NAME}} BASELINES ###
+### {{app.label}} BASELINES ###
{% for detection in objects %}
{% if (detection.type == 'Baseline') %}
-[{{APP_NAME}} - {{ detection.name }}]
+[{{ detection.get_conf_stanza_name(app) }}]
action.escu = 0
action.escu.enabled = 1
action.escu.search_type = support
-action.escu.full_search_name = {{APP_NAME}} - {{ detection.name }}
description = {{ detection.description | escapeNewlines() }}
action.escu.creation_date = {{ detection.date }}
action.escu.modification_date = {{ detection.date }}
diff --git a/contentctl/output/templates/savedsearches_detections.j2 b/contentctl/output/templates/savedsearches_detections.j2
index 1b66d452..f2f345aa 100644
--- a/contentctl/output/templates/savedsearches_detections.j2
+++ b/contentctl/output/templates/savedsearches_detections.j2
@@ -1,8 +1,8 @@
-### {{APP_NAME}} DETECTIONS ###
+### {{app.label}} DETECTIONS ###
{% for detection in objects %}
{% if (detection.type == 'TTP' or detection.type == 'Anomaly' or detection.type == 'Hunting' or detection.type == 'Correlation') %}
-[{{APP_NAME}} - {{ detection.name }} - Rule]
+[{{ detection.get_conf_stanza_name(app) }}]
action.escu = 0
action.escu.enabled = 1
{% if detection.status == "deprecated" %}
@@ -28,7 +28,6 @@ action.escu.known_false_positives = None
action.escu.creation_date = {{ detection.date }}
action.escu.modification_date = {{ detection.date }}
action.escu.confidence = high
-action.escu.full_search_name = {{APP_NAME}} - {{ detection.name }} - Rule
action.escu.search_type = detection
{% if detection.tags.product is defined %}
action.escu.product = {{ detection.tags.product | tojson }}
@@ -57,7 +56,7 @@ cron_schedule = {{ detection.deployment.scheduling.cron_schedule }}
dispatch.earliest_time = {{ detection.deployment.scheduling.earliest_time }}
dispatch.latest_time = {{ detection.deployment.scheduling.latest_time }}
action.correlationsearch.enabled = 1
-action.correlationsearch.label = {{APP_NAME}} - {{ detection.name }} - Rule
+action.correlationsearch.label = {{ detection.get_action_dot_correlationsearch_dot_label(app) }}
action.correlationsearch.annotations = {{ detection.annotations | tojson }}
action.correlationsearch.metadata = {{ detection.metadata | tojson }}
{% if detection.deployment.scheduling.schedule_window is defined %}
@@ -72,7 +71,7 @@ action.notable.param.nes_fields = {{ detection.nes_fields }}
action.notable.param.rule_description = {{ detection.deployment.alert_action.notable.rule_description | custom_jinja2_enrichment_filter(detection) | escapeNewlines()}}
action.notable.param.rule_title = {% if detection.type | lower == "correlation" %}RBA: {{ detection.deployment.alert_action.notable.rule_title | custom_jinja2_enrichment_filter(detection) }}{% else %}{{ detection.deployment.alert_action.notable.rule_title | custom_jinja2_enrichment_filter(detection) }}{% endif +%}
action.notable.param.security_domain = {{ detection.tags.security_domain.value }}
-action.notable.param.severity = high
+action.notable.param.severity = {{ detection.tags.severity.value }}
{% endif %}
{% if detection.deployment.alert_action.email %}
action.email.subject.alert = {{ detection.deployment.alert_action.email.subject | custom_jinja2_enrichment_filter(detection) | escapeNewlines() }}
@@ -107,8 +106,13 @@ relation = greater than
quantity = 0
realtime_schedule = 0
is_visible = false
+{% if detection.tags.throttling %}
+alert.suppress = true
+alert.suppress.fields = {{ detection.tags.throttling.conf_formatted_fields() }}
+alert.suppress.period = {{ detection.tags.throttling.period }}
+{% endif %}
search = {{ detection.search | escapeNewlines() }}
{% endif %}
{% endfor %}
-### END {{ APP_NAME }} DETECTIONS ###
+### END {{ app.label }} DETECTIONS ###
diff --git a/contentctl/output/templates/savedsearches_investigations.j2 b/contentctl/output/templates/savedsearches_investigations.j2
index d80a2420..42458dea 100644
--- a/contentctl/output/templates/savedsearches_investigations.j2
+++ b/contentctl/output/templates/savedsearches_investigations.j2
@@ -1,15 +1,14 @@
-### {{APP_NAME}} RESPONSE TASKS ###
+### {{app.label}} RESPONSE TASKS ###
{% for detection in objects %}
{% if (detection.type == 'Investigation') %}
{% if detection.search is defined %}
-[{{APP_NAME}} - {{ detection.name }} - Response Task]
+[{{ detection.get_response_task_name(app) }}]
action.escu = 0
action.escu.enabled = 1
action.escu.search_type = investigative
-action.escu.full_search_name = {{APP_NAME}} - {{ detection.name }} - Response Task
description = {{ detection.description | escapeNewlines() }}
action.escu.creation_date = {{ detection.date }}
action.escu.modification_date = {{ detection.date }}
@@ -35,4 +34,4 @@ search = {{ detection.search | escapeNewlines() }}
{% endfor %}
-### END {{ APP_NAME }} RESPONSE TASKS ###
\ No newline at end of file
+### END {{ app.label }} RESPONSE TASKS ###
\ No newline at end of file
diff --git a/pyproject.toml b/pyproject.toml
index f368b66e..1db114c9 100644
--- a/pyproject.toml
+++ b/pyproject.toml
@@ -1,6 +1,6 @@
[tool.poetry]
name = "contentctl"
-version = "4.3.5"
+version = "4.4.0"
description = "Splunk Content Control Tool"
authors = ["STRT "]
license = "Apache 2.0"