diff --git a/contentctl/actions/build.py b/contentctl/actions/build.py index 051223bc..8818c8e2 100644 --- a/contentctl/actions/build.py +++ b/contentctl/actions/build.py @@ -8,7 +8,6 @@ from contentctl.input.director import Director, DirectorOutputDto from contentctl.output.conf_output import ConfOutput from contentctl.output.conf_writer import ConfWriter -from contentctl.output.ba_yml_output import BAYmlOutput from contentctl.output.api_json_output import ApiJsonOutput from contentctl.output.data_source_writer import DataSourceWriter from contentctl.objects.lookup import Lookup @@ -86,17 +85,4 @@ def execute(self, input_dto: BuildInputDto) -> DirectorOutputDto: print(f"Build of '{input_dto.config.app.title}' API successful to {input_dto.config.getAPIPath()}") - if input_dto.config.build_ssa: - - srs_path = input_dto.config.getSSAPath() / 'srs' - complex_path = input_dto.config.getSSAPath() / 'complex' - shutil.rmtree(srs_path, ignore_errors=True) - shutil.rmtree(complex_path, ignore_errors=True) - srs_path.mkdir(parents=True) - complex_path.mkdir(parents=True) - ba_yml_output = BAYmlOutput() - ba_yml_output.writeObjects(input_dto.director_output_dto.ssa_detections, str(input_dto.config.getSSAPath())) - - print(f"Build of 'SSA' successful to {input_dto.config.getSSAPath()}") - return input_dto.director_output_dto \ No newline at end of file diff --git a/contentctl/actions/convert.py b/contentctl/actions/convert.py deleted file mode 100644 index 06ad1167..00000000 --- a/contentctl/actions/convert.py +++ /dev/null @@ -1,25 +0,0 @@ - -import sys -import shutil -import os - -from dataclasses import dataclass - -from contentctl.input.sigma_converter import * -from contentctl.output.yml_output import YmlOutput - -@dataclass(frozen=True) -class ConvertInputDto: - sigma_converter_input_dto: SigmaConverterInputDto - output_path : str - - -class Convert: - - def execute(self, input_dto: ConvertInputDto) -> None: - sigma_converter_output_dto = SigmaConverterOutputDto([]) - sigma_converter = SigmaConverter(sigma_converter_output_dto) - sigma_converter.execute(input_dto.sigma_converter_input_dto) - - yml_output = YmlOutput() - yml_output.writeDetections(sigma_converter_output_dto.detections, input_dto.output_path) \ No newline at end of file diff --git a/contentctl/actions/validate.py b/contentctl/actions/validate.py index 2d7203a4..cbfa7615 100644 --- a/contentctl/actions/validate.py +++ b/contentctl/actions/validate.py @@ -30,7 +30,6 @@ def execute(self, input_dto: validate) -> DirectorOutputDto: [], [], [], - [], ) director = Director(director_output_dto) diff --git a/contentctl/input/backend_splunk_ba.py b/contentctl/input/backend_splunk_ba.py deleted file mode 100644 index b49f28e8..00000000 --- a/contentctl/input/backend_splunk_ba.py +++ /dev/null @@ -1,144 +0,0 @@ -import re -from sigma.conversion.state import ConversionState -from sigma.rule import SigmaRule -from sigma.conversion.base import TextQueryBackend -from sigma.conversion.deferred import DeferredTextQueryExpression -from sigma.conditions import ConditionFieldEqualsValueExpression, ConditionOR, ConditionAND, ConditionNOT, ConditionItem -from sigma.types import SigmaCompareExpression -from sigma.exceptions import SigmaFeatureNotSupportedByBackendError -from sigma.pipelines.splunk.splunk import splunk_sysmon_process_creation_cim_mapping, splunk_windows_registry_cim_mapping, splunk_windows_file_event_cim_mapping - -from contentctl.objects.ssa_detection import SSADetection - -from typing import ClassVar, Dict, List, Optional, Pattern, Tuple - - -class SplunkBABackend(TextQueryBackend): - """Splunk SPL backend.""" - precedence: ClassVar[Tuple[ConditionItem, ConditionItem, ConditionItem]] = (ConditionNOT, ConditionOR, ConditionAND) - group_expression : ClassVar[str] = "({expr})" - parenthesize : bool = True - - or_token : ClassVar[str] = "OR" - and_token : ClassVar[str] = "AND" - not_token : ClassVar[str] = "NOT" - eq_token : ClassVar[str] = "=" - - field_quote: ClassVar[str] = '"' - field_quote_pattern: ClassVar[Pattern] = re.compile("^[\w.]+$") - - str_quote : ClassVar[str] = '"' - escape_char : ClassVar[str] = "\\" - wildcard_multi : ClassVar[str] = "%" - wildcard_single : ClassVar[str] = "%" - add_escaped : ClassVar[str] = "\\" - - re_expression : ClassVar[str] = "match({field}, /(?i){regex}/)=true" - re_escape_char : ClassVar[str] = "" - re_escape : ClassVar[Tuple[str]] = ('"',) - - cidr_expression : ClassVar[str] = "{value}" - - compare_op_expression : ClassVar[str] = "{field}{operator}{value}" - compare_operators : ClassVar[Dict[SigmaCompareExpression.CompareOperators, str]] = { - SigmaCompareExpression.CompareOperators.LT : "<", - SigmaCompareExpression.CompareOperators.LTE : "<=", - SigmaCompareExpression.CompareOperators.GT : ">", - SigmaCompareExpression.CompareOperators.GTE : ">=", - } - - field_null_expression : ClassVar[str] = "{field} IS NOT NULL" - - convert_or_as_in : ClassVar[bool] = True - convert_and_as_in : ClassVar[bool] = False - in_expressions_allow_wildcards : ClassVar[bool] = False - field_in_list_expression : ClassVar[str] = "{field} {op} ({list})" - or_in_operator : ClassVar[Optional[str]] = "IN" - list_separator : ClassVar[str] = ", " - - unbound_value_str_expression : ClassVar[str] = '{value}' - unbound_value_num_expression : ClassVar[str] = '{value}' - unbound_value_re_expression : ClassVar[str] = '{value}' - - deferred_start : ClassVar[str] = " " - deferred_separator : ClassVar[str] = " OR " - deferred_only_query : ClassVar[str] = "*" - - wildcard_match_expression : ClassVar[Optional[str]] = "{field} LIKE {value}" - - - def __init__(self, processing_pipeline: Optional["sigma.processing.pipeline.ProcessingPipeline"] = None, collect_errors: bool = False, min_time : str = "-30d", max_time : str = "now", detection : SSADetection = None, field_mapping: dict = None, **kwargs): - super().__init__(processing_pipeline, collect_errors, **kwargs) - self.min_time = min_time or "-30d" - self.max_time = max_time or "now" - self.detection = detection - self.field_mapping = field_mapping - - def finalize_query_data_model(self, rule: SigmaRule, query: str, index: int, state: ConversionState) -> str: - - try: - fields = state.processing_state["fields"] - except KeyError: - raise SigmaFeatureNotSupportedByBackendError("No fields specified by processing pipeline") - - # fields_input_parsing = '' - # for count, value in enumerate(fields): - # fields_input_parsing = fields_input_parsing + value + '=ucast(map_get(input_event, "' + value + '"), "string", null)' - # if not count == len(fields) - 1: - # fields_input_parsing = fields_input_parsing + ', ' - - detection_str = """ -$main = from source -| eval timestamp = time -| eval metadata_uid = metadata.uid -""".replace("\n", " ") - - parsed_fields = [] - - for field in self.field_mapping["mapping"].keys(): - mapped_field = self.field_mapping["mapping"][field] - parent = 'parent' - i = 1 - values = mapped_field.split('.') - for val in values: - if parent == "parent": - parent = val - continue - else: - new_val = parent + '_' + val - if new_val in parsed_fields: - parent = new_val - i = i + 1 - continue - - - new_val_equals = new_val + "=" - new_val_IN = new_val + " IN" - if new_val_equals in query or new_val_IN in query: - parser_str = '| eval ' + new_val + ' = ' + 'lower(' + parent + '.' + val + ') ' - else: - parser_str = '| eval ' + new_val + ' = ' + parent + '.' + val + ' ' - detection_str = detection_str + parser_str - parsed_fields.append(new_val) - parent = new_val - i = i + 1 - - - ### Convert sigma values into lower case - lower_query = "" - in_quotes = False - for char in query: - if char == '"': - in_quotes = not in_quotes - if in_quotes: - lower_query += char.lower() - else: - lower_query += char - - detection_str = detection_str + "| where " + lower_query - - detection_str = detection_str.replace("\\\\\\\\", "\\\\") - return detection_str - - def finalize_output_data_model(self, queries: List[str]) -> List[str]: - return queries \ No newline at end of file diff --git a/contentctl/input/director.py b/contentctl/input/director.py index 4f8716c4..0e27add6 100644 --- a/contentctl/input/director.py +++ b/contentctl/input/director.py @@ -28,13 +28,11 @@ from contentctl.enrichments.cve_enrichment import CveEnrichment from contentctl.objects.config import validate -from contentctl.input.ssa_detection_builder import SSADetectionBuilder from contentctl.objects.enums import SecurityContentType from contentctl.objects.enums import DetectionStatus from contentctl.helper.utils import Utils -from contentctl.input.ssa_detection_builder import SSADetectionBuilder from contentctl.objects.enums import SecurityContentType from contentctl.objects.enums import DetectionStatus @@ -56,7 +54,6 @@ class DirectorOutputDto: macros: list[Macro] lookups: list[Lookup] deployments: list[Deployment] - ssa_detections: list[SSADetection] data_sources: list[DataSource] name_to_content_map: dict[str, SecurityContentObject] = field(default_factory=dict) uuid_to_content_map: dict[UUID, SecurityContentObject] = field(default_factory=dict) @@ -98,8 +95,6 @@ def addContentToDictMappings(self, content: SecurityContentObject): self.stories.append(content) elif isinstance(content, Detection): self.detections.append(content) - elif isinstance(content, SSADetection): - self.ssa_detections.append(content) elif isinstance(content, DataSource): self.data_sources.append(content) else: @@ -112,11 +107,9 @@ def addContentToDictMappings(self, content: SecurityContentObject): class Director(): input_dto: validate output_dto: DirectorOutputDto - ssa_detection_builder: SSADetectionBuilder def __init__(self, output_dto: DirectorOutputDto) -> None: self.output_dto = output_dto - self.ssa_detection_builder = SSADetectionBuilder() def execute(self, input_dto: validate) -> None: self.input_dto = input_dto @@ -129,7 +122,6 @@ def execute(self, input_dto: validate) -> None: self.createSecurityContent(SecurityContentType.data_sources) self.createSecurityContent(SecurityContentType.playbooks) self.createSecurityContent(SecurityContentType.detections) - self.createSecurityContent(SecurityContentType.ssa_detections) from contentctl.objects.abstract_security_content_objects.detection_abstract import MISSING_SOURCES @@ -142,12 +134,7 @@ def execute(self, input_dto: validate) -> None: print("No missing data_sources!") def createSecurityContent(self, contentType: SecurityContentType) -> None: - if contentType == SecurityContentType.ssa_detections: - files = Utils.get_all_yml_files_from_directory( - os.path.join(self.input_dto.path, "ssa_detections") - ) - security_content_files = [f for f in files if f.name.startswith("ssa___")] - elif contentType in [ + if contentType in [ SecurityContentType.deployments, SecurityContentType.lookups, SecurityContentType.macros, @@ -179,43 +166,37 @@ def createSecurityContent(self, contentType: SecurityContentType) -> None: modelDict = YmlReader.load_file(file) if contentType == SecurityContentType.lookups: - lookup = Lookup.model_validate(modelDict,context={"output_dto":self.output_dto, "config":self.input_dto}) + lookup = Lookup.model_validate(modelDict, context={"output_dto":self.output_dto, "config":self.input_dto}) self.output_dto.addContentToDictMappings(lookup) elif contentType == SecurityContentType.macros: - macro = Macro.model_validate(modelDict,context={"output_dto":self.output_dto}) + macro = Macro.model_validate(modelDict, context={"output_dto":self.output_dto}) self.output_dto.addContentToDictMappings(macro) elif contentType == SecurityContentType.deployments: - deployment = Deployment.model_validate(modelDict,context={"output_dto":self.output_dto}) + deployment = Deployment.model_validate(modelDict, context={"output_dto":self.output_dto}) self.output_dto.addContentToDictMappings(deployment) elif contentType == SecurityContentType.playbooks: - playbook = Playbook.model_validate(modelDict,context={"output_dto":self.output_dto}) + playbook = Playbook.model_validate(modelDict, context={"output_dto":self.output_dto}) self.output_dto.addContentToDictMappings(playbook) elif contentType == SecurityContentType.baselines: - baseline = Baseline.model_validate(modelDict,context={"output_dto":self.output_dto}) + baseline = Baseline.model_validate(modelDict, context={"output_dto":self.output_dto}) self.output_dto.addContentToDictMappings(baseline) elif contentType == SecurityContentType.investigations: - investigation = Investigation.model_validate(modelDict,context={"output_dto":self.output_dto}) + investigation = Investigation.model_validate(modelDict, context={"output_dto":self.output_dto}) self.output_dto.addContentToDictMappings(investigation) elif contentType == SecurityContentType.stories: - story = Story.model_validate(modelDict,context={"output_dto":self.output_dto}) + story = Story.model_validate(modelDict, context={"output_dto":self.output_dto}) self.output_dto.addContentToDictMappings(story) elif contentType == SecurityContentType.detections: - detection = Detection.model_validate(modelDict,context={"output_dto":self.output_dto, "app":self.input_dto.app}) + detection = Detection.model_validate(modelDict, context={"output_dto":self.output_dto, "app":self.input_dto.app}) self.output_dto.addContentToDictMappings(detection) - elif contentType == SecurityContentType.ssa_detections: - self.constructSSADetection(self.ssa_detection_builder, self.output_dto,str(file)) - ssa_detection = self.ssa_detection_builder.getObject() - if ssa_detection.status in [DetectionStatus.production.value, DetectionStatus.validation.value]: - self.output_dto.addContentToDictMappings(ssa_detection) - elif contentType == SecurityContentType.data_sources: data_source = DataSource.model_validate( modelDict, context={"output_dto": self.output_dto} @@ -262,19 +243,3 @@ def createSecurityContent(self, contentType: SecurityContentType) -> None: f"The following {len(validation_errors)} error(s) were found during validation:\n\n{errors_string}\n\nVALIDATION FAILED" ) - def constructSSADetection( - self, - builder: SSADetectionBuilder, - directorOutput: DirectorOutputDto, - file_path: str, - ) -> None: - builder.reset() - builder.setObject(file_path) - builder.addMitreAttackEnrichmentNew(directorOutput.attack_enrichment) - builder.addKillChainPhase() - builder.addCIS() - builder.addNist() - builder.addAnnotations() - builder.addMappings() - builder.addUnitTest() - builder.addRBA() diff --git a/contentctl/input/sigma_converter.py b/contentctl/input/sigma_converter.py deleted file mode 100644 index 3e168be6..00000000 --- a/contentctl/input/sigma_converter.py +++ /dev/null @@ -1,436 +0,0 @@ -import os -import sys -import copy -import pathlib - -from dataclasses import dataclass -from jinja2 import Environment, FileSystemLoader - -from sigma.processing.conditions import LogsourceCondition -from sigma.processing.transformations import AddConditionTransformation, FieldMappingTransformation, DetectionItemFailureTransformation, RuleFailureTransformation, SetStateTransformation -from sigma.processing.conditions import LogsourceCondition, IncludeFieldCondition, ExcludeFieldCondition, RuleProcessingItemAppliedCondition -from sigma.collection import SigmaCollection -from sigma.backends.splunk import SplunkBackend -from sigma.processing.pipeline import ProcessingItem, ProcessingPipeline - -from contentctl.input.yml_reader import YmlReader -from contentctl.objects.detection import Detection -from contentctl.objects.data_source import DataSource -from contentctl.objects.unit_test import UnitTest -from contentctl.objects.enums import * -from contentctl.helper.utils import Utils -from contentctl.input.backend_splunk_ba import SplunkBABackend - - -@dataclass(frozen=True) -class SigmaConverterInputDto: - data_model: SigmaConverterTarget - detection_path: str - detection_folder : str - input_path: str - log_source: str - - -@dataclass(frozen=True) -class SigmaConverterOutputDto: - detections: list - - -class SigmaConverter(): - output_dto : SigmaConverterOutputDto - - def __init__(self, output_dto: SigmaConverterOutputDto) -> None: - self.output_dto = output_dto - - - def execute(self, input_dto: SigmaConverterInputDto) -> None: - - detection_files = [] - errors = [] - - if input_dto.detection_path: - detection_files.append(input_dto.detection_path) - elif input_dto.detection_folder: - detection_files = Utils.get_all_yml_files_from_directory(input_dto.detection_folder) - else: - print("ERROR: --detection_path or --detection_folder needed.") - sys.exit(1) - - for detection_file in detection_files: - try: - detection = self.read_detection(str(detection_file)) - print("Converting detection: " + detection.name) - data_source = self.load_data_source(input_dto.input_path, detection.data_source[0]) - if not data_source: - print("ERROR: Didn't find data source with name: " + detection.data_source[0] + " for detection " + detection.name) - sys.exit(1) - - file_name = detection.name.replace(' ', '_').replace('-','_').replace('.','_').replace('/','_').lower() - - - if input_dto.data_model == SigmaConverterTarget.RAW: - if input_dto.log_source and input_dto.log_source != detection.data_source[0][0]: - try: - field_mapping = self.find_mapping(data_source.convert_to_log_source, 'data_source', input_dto.log_source) - except Exception as e: - print(e) - print("ERROR: Couldn't find data source mapping for log source " + input_dto.log_source + " for detection: " + detection.name) - sys.exit(1) - - detection = self.convert_detection_fields(detection, field_mapping) - - logsource_condition = self.get_logsource_condition(data_source) - processing_item = self.get_field_transformation_processing_item( - field_mapping['mapping'], - logsource_condition - ) - sigma_processing_pipeline = self.get_pipeline_from_processing_items([processing_item]) - splunk_backend = SplunkBackend(processing_pipeline=sigma_processing_pipeline) - data_source = self.load_data_source(input_dto.input_path, input_dto.log_source) - - else: - splunk_backend = SplunkBackend() - - sigma_rule = self.get_sigma_rule(detection, data_source) - search = splunk_backend.convert(sigma_rule)[0] - search = self.add_source_macro(search, data_source.type) - search = self.add_stats_count(search, data_source.raw_fields) - search = self.add_timeformat_conversion(search) - search = self.add_filter_macro(search, file_name) - - detection.file_path = file_name + '.yml' - - elif input_dto.data_model == SigmaConverterTarget.CIM: - logsource_condition = self.get_logsource_condition(data_source) - try: - field_mapping = self.find_mapping(data_source.field_mappings, 'data_model', 'cim') - except Exception as e: - print(e) - print("ERROR: Couldn't find data source mapping to cim for log source " + detection.data_source[0] + " and detection " + detection.name) - sys.exit(1) - - detection = self.convert_detection_fields(detection, field_mapping) - sigma_rule = self.get_sigma_rule(detection, data_source) - - sigma_transformation_processing_item = self.get_field_transformation_processing_item( - field_mapping['mapping'], - logsource_condition - ) - - sigma_state_fields_processing_item = self.get_state_fields_processing_item( - field_mapping['mapping'].values(), - logsource_condition - ) - sigma_state_data_model_processing_item = self.get_state_data_model_processing_item( - field_mapping['data_set'], - logsource_condition - ) - sigma_processing_pipeline = self.get_pipeline_from_processing_items([ - sigma_transformation_processing_item, - sigma_state_fields_processing_item, - sigma_state_data_model_processing_item - ]) - splunk_backend = SplunkBackend(processing_pipeline=sigma_processing_pipeline) - search = splunk_backend.convert(sigma_rule, "data_model")[0] - search = self.add_filter_macro(search, file_name) - - detection.file_path = file_name + '.yml' - - elif input_dto.data_model == SigmaConverterTarget.OCSF: - - processing_items = list() - logsource_condition = self.get_logsource_condition(data_source) - if input_dto.log_source and input_dto.log_source != detection.data_source[0]: - data_source_new = self.load_data_source(input_dto.input_path, input_dto.log_source) - - try: - field_mapping = self.get_mapping_converted_data_source( - data_source, - "data_source", - input_dto.log_source, - data_source_new, - "data_model", - "ocsf" - ) - except Exception as e: - print(e) - print("ERROR: Couldn't find data source mapping for log source " + input_dto.log_source + " and detection " + detection.name) - sys.exit(1) - - cim_to_ocsf_mapping = self.get_cim_to_ocsf_mapping(data_source_new) - - # elif input_dto.cim_to_ocsf: - # field_mapping = self.get_cim_to_ocsf_mapping(data_source) - # cim_to_ocsf_mapping = field_mapping - - else: - field_mapping = self.find_mapping(data_source.field_mappings, 'data_model', 'ocsf') - cim_to_ocsf_mapping = self.get_cim_to_ocsf_mapping(data_source) - - field_mapping_underline = copy.deepcopy(field_mapping) - for field in field_mapping_underline["mapping"].keys(): - field_mapping_underline["mapping"][field] = field_mapping_underline["mapping"][field].replace(".", "_") - - self.add_required_fields(cim_to_ocsf_mapping, detection) - self.add_mappings(cim_to_ocsf_mapping, detection) - - self.update_observables(detection) - - processing_items.append( - self.get_field_transformation_processing_item( - field_mapping_underline['mapping'], - logsource_condition - ) - ) - processing_items.append( - self.get_state_fields_processing_item( - field_mapping_underline['mapping'].values(), - logsource_condition - ) - ) - - detection = self.convert_detection_fields(detection) - sigma_rule = self.get_sigma_rule(detection, data_source) - sigma_processing_pipeline = self.get_pipeline_from_processing_items(processing_items) - - splunk_backend = SplunkBABackend(processing_pipeline=sigma_processing_pipeline, detection=detection, field_mapping=field_mapping) - - search = splunk_backend.convert(sigma_rule, "data_model")[0] - - search = search + ' --finding_report--' - detection.file_path = 'ssa___' + file_name + '.yml' - - detection.search = search - - self.output_dto.detections.append(detection) - - except Exception as e: - errors.append(f"ERROR: Converting detection file '{detection_file}': {str(e)}") - - if len(errors) > 0: - errors_string = '\n\t'.join(errors) - raise Exception(f"The following errors were encountered during conversion:\n\t{errors_string}") - - def read_detection(self, detection_path : str) -> Detection: - yml_dict = YmlReader.load_file(detection_path) - - #SSA Detections are ALLOWED to have names longer than 67 characters, - #unlike Splunk App Detections. Because we still want to use the - #Detection Object (and its validations), we will arbitrarily - #truncate the name of a detection if it is too long so that - #it passes validation, then updated the name after the object - #is constructed. Because we do not have Pydantic configured - #to validate each new field assignment, this will not throw - #an error - name = yml_dict.get("name","") - yml_dict["name"] = name[:67] - detection = Detection.parse_obj(yml_dict) - # Remove any Integration Tests. IntegrationTests are only relevant - # for ESCU Content and NOT for BA Content. Instead of filtering OUT - # IntegrationTest, we will ONLY include UnitTest. This supports the introduction - # of additional ESCU Test Types in the future. - detection.tests = list(filter(lambda t: isinstance(t, UnitTest), detection.tests)) - - detection.name = name - - - return detection - - - def load_data_source(self, input_path: str, data_source_name: str) -> DataSource: - data_sources = list() - files = Utils.get_all_yml_files_from_directory(os.path.join(input_path, 'data_sources')) - for file in files: - data_sources.append(DataSource.parse_obj(YmlReader.load_file(str(file)))) - - data_source = None - - for obj in data_sources: - if obj.name == data_source_name: - return obj - - return None - - - def get_sigma_rule(self, detection: Detection, data_source: DataSource) -> SigmaCollection: - return SigmaCollection.from_dicts([{ - "title": detection.name, - "status": "experimental", - "logsource": { - "category": data_source.category, - "product": data_source.product - }, - "detection": detection.search - }]) - - - # def convert_detection_fields(self, detection: Detection, mappings: dict) -> Detection: - # for selection in detection.search.keys(): - # if selection != "condition": - # new_selection = copy.deepcopy(detection.search[selection]) - # for field in detection.search[selection].keys(): - # for mapping in mappings["mapping"].keys(): - # if mapping == field: - # new_selection[mappings["mapping"][mapping]] = detection.search[selection][field] - # new_selection.pop(field) - # detection.search[selection] = new_selection - - # return detection - - def convert_detection_fields(self, detection: Detection) -> Detection: - for selection in detection.search.keys(): - if selection != "condition": - new_selection = copy.deepcopy(detection.search[selection]) - for field in detection.search[selection].keys(): - new_field_name = field.replace(".", "_") - new_selection[new_field_name] = detection.search[selection][field] - new_selection.pop(field) - detection.search[selection] = new_selection - - return detection - - - def get_logsource_condition(self, data_source: DataSource) -> LogsourceCondition: - return LogsourceCondition( - category=data_source.category, - product=data_source.product, - ) - - - def get_field_transformation_processing_item(self, data_source_mapping: dict, logsource_condition: LogsourceCondition) -> ProcessingItem: - return ProcessingItem( - identifier="field_mapping_transformation", - transformation=FieldMappingTransformation(data_source_mapping), - rule_conditions=[ - logsource_condition - ] - ) - - - def get_state_fields_processing_item(self, fields: list, logsource_condition: LogsourceCondition) -> ProcessingItem: - return ProcessingItem( - identifier="fields", - transformation=SetStateTransformation("fields", fields), - rule_conditions=[ - logsource_condition - ] - ) - - - def get_state_data_model_processing_item(self, data_model: str, logsource_condition: LogsourceCondition) -> ProcessingItem: - return ProcessingItem( - identifier="data_model", - transformation=SetStateTransformation("data_model_set", data_model), - rule_conditions=[ - logsource_condition - ] - ) - - - def get_pipeline_from_processing_items(self, processing_items: list) -> ProcessingPipeline: - return ProcessingPipeline( - name="Splunk Sigma", - priority=10, - items=processing_items - ) - - def add_source_macro(self, search: str, data_source_type: str) -> str: - return "`" + data_source_type + "` " + search - - def add_stats_count(self, search: str, fields: list) -> str: - search = search + " | fillnull | stats count min(_time) as firstTime max(_time) as lastTime by " - for key in fields: - search = search + key + " " - return search - - def add_timeformat_conversion(self, search: str) -> str: - return search + '| convert timeformat="%Y-%m-%dT%H:%M:%S" ctime(firstTime) | convert timeformat="%Y-%m-%dT%H:%M:%S" ctime(lastTime) ' - - def add_filter_macro(self, search: str, file_name: str) -> str: - return search + '| `' + file_name + '_filter`' - - def find(self, name: str, path: str) -> str: - for root, dirs, files in os.walk(path): - if name in files: - return os.path.join(root, name) - return None - - def find_mapping(self, field_mappings: list, object: str, data_model: str) -> dict: - for mapping in field_mappings: - if mapping[object] == data_model: - return mapping - - raise AttributeError("ERROR: Couldn't find mapping.") - - - def add_required_fields(self, field_mapping: dict, detection: Detection) -> None: - required_fields = list() -# required_fields = ["process.user.name", "device.hostname"] - for mapping in field_mapping["mapping"].keys(): - required_fields.append(field_mapping["mapping"][mapping]) - - detection.tags.required_fields = required_fields - - - def add_mappings(self, field_mapping: dict, detection: Detection) -> None: - mappings = list() - for mapping in field_mapping["mapping"].keys(): - mappings.append({ - "ocsf": field_mapping["mapping"][mapping], - "cim": mapping - }) - detection.tags.mappings = mappings - - def update_observables(self, detection : Detection) -> None: - mapping_field_to_type = { - "process.user.name": "User Name", - "actor.user.name": "User Name", - "device.hostname": "Hostname", - "process.file.name": "File Name", - "actor.process.file.name": "File Name", - "actor.process.file.path": "File Name", - "actor.process.cmd_line": "Process", - "actor.user.uid": "Other", - "process.cmd_line": "Other", - "process.file.path": "File", - "process.file.name": "File", - "process.uid": "Other", - "process.pid": "Other", - "actor.process.pid": "Other" - } - - observables = list() - - for field in detection.tags.required_fields: - observables.append({ - "name": field, - "type": mapping_field_to_type[field] - }) - - detection.tags.observable = observables - - - def get_cim_to_ocsf_mapping(self, data_source : DataSource) -> dict: - cim_to_ocsf_mapping = dict() - cim_to_ocsf_mapping["mapping"] = dict() - cim_mapping = self.find_mapping(data_source.field_mappings, "data_model", "cim") - ocsf_mapping = self.find_mapping(data_source.field_mappings, "data_model", "ocsf") - - for key in cim_mapping["mapping"].keys(): - cim_field = cim_mapping["mapping"][key].split(".")[1] - cim_to_ocsf_mapping["mapping"][cim_field] = ocsf_mapping["mapping"][key] - - return cim_to_ocsf_mapping - - - def get_mapping_converted_data_source(self, det_ds: DataSource, det_ds_obj: str, det_ds_dm: str, con_ds: DataSource, con_ds_obj: str, con_ds_dm: str) -> dict: - mapping = dict() - mapping["mapping"] = dict() - det_ds_mapping = self.find_mapping(det_ds.convert_to_log_source, det_ds_obj, det_ds_dm) - con_ds_mapping = self.find_mapping(con_ds.field_mappings, con_ds_obj, con_ds_dm) - - for key in det_ds_mapping["mapping"].keys(): - mapped_field = con_ds_mapping["mapping"][det_ds_mapping["mapping"][key]] - mapping["mapping"][key] = mapped_field - - return mapping \ No newline at end of file diff --git a/contentctl/input/ssa_detection_builder.py b/contentctl/input/ssa_detection_builder.py deleted file mode 100644 index e89b3b61..00000000 --- a/contentctl/input/ssa_detection_builder.py +++ /dev/null @@ -1,169 +0,0 @@ -import sys -import re -import os - -from pydantic import ValidationError -from typing import List -from contentctl.input.yml_reader import YmlReader -from contentctl.objects.detection import Detection -from contentctl.objects.security_content_object import SecurityContentObject -from contentctl.objects.macro import Macro -from contentctl.objects.mitre_attack_enrichment import MitreAttackEnrichment -from contentctl.enrichments.cve_enrichment import CveEnrichment -from contentctl.enrichments.splunk_app_enrichment import SplunkAppEnrichment -from contentctl.objects.ssa_detection import SSADetection -from contentctl.objects.constants import * -from contentctl.enrichments.attack_enrichment import AttackEnrichment - -class SSADetectionBuilder(): - security_content_obj : SSADetection - - - def setObject(self, path: str) -> None: - yml_dict = YmlReader.load_file(path) - self.security_content_obj = SSADetection.parse_obj(yml_dict) - self.security_content_obj.source = os.path.split(os.path.dirname(self.security_content_obj.file_path))[-1] - - def addProvidingTechnologies(self) -> None: - if self.security_content_obj: - if 'Endpoint' in str(self.security_content_obj.search): - self.security_content_obj.providing_technologies = ["Sysmon", "Microsoft Windows","Carbon Black Response","CrowdStrike Falcon", "Symantec Endpoint Protection"] - if "`cloudtrail`" in str(self.security_content_obj.search): - self.security_content_obj.providing_technologies = ["Amazon Web Services - Cloudtrail"] - if '`wineventlog_security`' in self.security_content_obj.search or '`powershell`' in self.security_content_obj.search: - self.security_content_obj.providing_technologies = ["Microsoft Windows"] - - - def addMappings(self) -> None: - if self.security_content_obj: - keys = ['mitre_attack', 'kill_chain_phases', 'cis20', 'nist'] - mappings = {} - for key in keys: - if key == 'mitre_attack': - if getattr(self.security_content_obj.tags, 'mitre_attack_id'): - mappings[key] = getattr(self.security_content_obj.tags, 'mitre_attack_id') - elif getattr(self.security_content_obj.tags, key): - mappings[key] = getattr(self.security_content_obj.tags, key) - self.security_content_obj.mappings = mappings - - - def addAnnotations(self) -> None: - if self.security_content_obj: - annotations = {} - annotation_keys = ['mitre_attack', 'kill_chain_phases', 'cis20', 'nist', - 'analytic_story', 'context', 'impact', 'confidence', 'cve'] - for key in annotation_keys: - if key == 'mitre_attack': - if getattr(self.security_content_obj.tags, 'mitre_attack_id'): - annotations[key] = getattr(self.security_content_obj.tags, 'mitre_attack_id') - try: - if getattr(self.security_content_obj.tags, key): - annotations[key] = getattr(self.security_content_obj.tags, key) - except AttributeError as e: - continue - self.security_content_obj.annotations = annotations - - - def addUnitTest(self) -> None: - if self.security_content_obj: - if self.security_content_obj.tests: - self.security_content_obj.test = self.security_content_obj.tests[0] - - - def addMitreAttackEnrichment(self, attack_enrichment: dict) -> None: - if self.security_content_obj: - if attack_enrichment: - if self.security_content_obj.tags.mitre_attack_id: - self.security_content_obj.tags.mitre_attack_enrichments = [] - - for mitre_attack_id in self.security_content_obj.tags.mitre_attack_id: - if mitre_attack_id in attack_enrichment: - mitre_attack_enrichment = MitreAttackEnrichment( - mitre_attack_id = mitre_attack_id, - mitre_attack_technique = attack_enrichment[mitre_attack_id]["technique"], - mitre_attack_tactics = sorted(attack_enrichment[mitre_attack_id]["tactics"]), - mitre_attack_groups = sorted(attack_enrichment[mitre_attack_id]["groups"]) - ) - self.security_content_obj.tags.mitre_attack_enrichments.append(mitre_attack_enrichment) - else: - #print("mitre_attack_id " + mitre_attack_id + " doesn't exist for detecction " + self.security_content_obj.name) - raise ValueError("mitre_attack_id " + mitre_attack_id + " doesn't exist for detection " + self.security_content_obj.name) - def addMitreAttackEnrichmentNew(self, attack_enrichment: AttackEnrichment) -> None: - # We skip enriching if configured to do so - if attack_enrichment.use_enrichment: - if self.security_content_obj and self.security_content_obj.tags.mitre_attack_id: - self.security_content_obj.tags.mitre_attack_enrichments = [] - for mitre_attack_id in self.security_content_obj.tags.mitre_attack_id: - enrichment_obj = attack_enrichment.getEnrichmentByMitreID(mitre_attack_id) - if enrichment_obj is not None: - self.security_content_obj.tags.mitre_attack_enrichments.append(enrichment_obj) - - - - def addCIS(self) -> None: - if self.security_content_obj: - if self.security_content_obj.tags.security_domain == "network": - self.security_content_obj.tags.cis20 = ["CIS 13"] - else: - self.security_content_obj.tags.cis20 = ["CIS 10"] - - - def addKillChainPhase(self) -> None: - if self.security_content_obj: - if not self.security_content_obj.tags.kill_chain_phases: - kill_chain_phases = list() - if self.security_content_obj.tags.mitre_attack_enrichments: - for mitre_attack_enrichment in self.security_content_obj.tags.mitre_attack_enrichments: - for mitre_attack_tactic in mitre_attack_enrichment.mitre_attack_tactics: - kill_chain_phases.append(ATTACK_TACTICS_KILLCHAIN_MAPPING[mitre_attack_tactic]) - self.security_content_obj.tags.kill_chain_phases = list(dict.fromkeys(kill_chain_phases)) - - - def addNist(self) -> None: - if self.security_content_obj: - if self.security_content_obj.type == "TTP": - self.security_content_obj.tags.nist = ["DE.CM"] - else: - self.security_content_obj.tags.nist = ["DE.AE"] - - - def addDatamodel(self) -> None: - if self.security_content_obj: - self.security_content_obj.datamodel = [] - data_models = [ - "Authentication", - "Change", - "Change_Analysis", - "Email", - "Endpoint", - "Network_Resolution", - "Network_Sessions", - "Network_Traffic", - "Risk", - "Splunk_Audit", - "UEBA", - "Updates", - "Vulnerabilities", - "Web" - ] - for data_model in data_models: - if data_model in self.security_content_obj.search: - self.security_content_obj.datamodel.append(data_model) - - - def addRBA(self) -> None: - if self.security_content_obj: - if self.security_content_obj.tags.risk_score >= 80: - self.security_content_obj.tags.risk_severity = 'high' - elif (self.security_content_obj.tags.risk_score >= 50 and self.security_content_obj.tags.risk_score <= 79): - self.security_content_obj.tags.risk_severity = 'medium' - else: - self.security_content_obj.tags.risk_severity = 'low' - - - def reset(self) -> None: - self.security_content_obj = None - - - def getObject(self) -> SSADetection: - return self.security_content_obj diff --git a/contentctl/objects/abstract_security_content_objects/detection_abstract.py b/contentctl/objects/abstract_security_content_objects/detection_abstract.py index 99e33b12..27eab687 100644 --- a/contentctl/objects/abstract_security_content_objects/detection_abstract.py +++ b/contentctl/objects/abstract_security_content_objects/detection_abstract.py @@ -46,7 +46,7 @@ class Detection_Abstract(SecurityContentObject): status: DetectionStatus = Field(...) data_source: list[str] = [] tags: DetectionTags = Field(...) - search: Union[str, dict[str, Any]] = Field(...) + search: str = Field(...) how_to_implement: str = Field(..., min_length=4) known_false_positives: str = Field(..., min_length=4) @@ -65,11 +65,7 @@ class Detection_Abstract(SecurityContentObject): @field_validator("search", mode="before") @classmethod - def validate_presence_of_filter_macro( - cls, - value: Union[str, dict[str, Any]], - info: ValidationInfo - ) -> Union[str, dict[str, Any]]: + def validate_presence_of_filter_macro(cls, value:str, info:ValidationInfo)->str: """ Validates that, if required to be present, the filter macro is present with the proper name. The filter macro MUST be derived from the name of the detection @@ -83,12 +79,9 @@ def validate_presence_of_filter_macro( Returns: Union[str, dict[str,Any]]: The search, either in sigma or SPL format. - """ - - if isinstance(value, dict): - # If the search is a dict, then it is in Sigma format so return it - return value - + """ + + # Otherwise, the search is SPL. # In the future, we will may add support that makes the inclusion of the @@ -155,15 +148,16 @@ def validate_test_groups( @computed_field @property def datamodel(self) -> List[DataModel]: - if isinstance(self.search, str): - return [dm for dm in DataModel if dm.value in self.search] - else: - return [] + return [dm for dm in DataModel if dm.value in self.search] + + + @computed_field @property def source(self) -> str: return self.file_path.absolute().parent.name + deployment: Deployment = Field({}) @@ -249,12 +243,9 @@ def nes_fields(self) -> Optional[str]: @computed_field @property def providing_technologies(self) -> List[ProvidingTechnology]: - if isinstance(self.search, str): - return ProvidingTechnology.getProvidingTechFromSearch(self.search) - else: - # Dict-formatted searches (sigma) will not have providing technologies - return [] - + return ProvidingTechnology.getProvidingTechFromSearch(self.search) + + @computed_field @property def risk(self) -> list[dict[str, Any]]: @@ -445,18 +436,13 @@ def model_post_init(self, __context: Any) -> None: @field_validator('lookups', mode="before") @classmethod - def getDetectionLookups(cls, v: list[str], info: ValidationInfo) -> list[Lookup]: - if info.context is None: - raise ValueError("ValidationInfo.context unexpectedly null") - - director: DirectorOutputDto = info.context.get("output_dto", None) - - search: Union[str, dict[str, Any], None] = info.data.get("search", None) - if not isinstance(search, str): - # The search was sigma formatted (or failed other validation and was None), so we will - # not validate macros in it - return [] - + def getDetectionLookups(cls, v:list[str], info:ValidationInfo) -> list[Lookup]: + director:DirectorOutputDto = info.context.get("output_dto",None) + + search:Union[str,None] = info.data.get("search",None) + if search is None: + raise ValueError("Search was None - is this file missing the search field?") + lookups = Lookup.get_lookups(search, director) return lookups @@ -496,11 +482,9 @@ def getDetectionMacros(cls, v: list[str], info: ValidationInfo) -> list[Macro]: director: DirectorOutputDto = info.context.get("output_dto", None) - search: str | dict[str, Any] | None = info.data.get("search", None) - if not isinstance(search, str): - # The search was sigma formatted (or failed other validation and was None), so we will - # not validate macros in it - return [] + search: str | None = info.data.get("search", None) + if search is None: + raise ValueError("Search was None - is this file missing the search field?") search_name: Union[str, Any] = info.data.get("name", None) message = f"Expected 'search_name' to be a string, instead it was [{type(search_name)}]" @@ -614,45 +598,43 @@ def ensureProperObservablesExist(self): @model_validator(mode="after") def search_observables_exist_validate(self): - if isinstance(self.search, str): - - observable_fields = [ob.name.lower() for ob in self.tags.observable] + observable_fields = [ob.name.lower() for ob in self.tags.observable] - # All $field$ fields from the message must appear in the search - field_match_regex = r"\$([^\s.]*)\$" + # All $field$ fields from the message must appear in the search + field_match_regex = r"\$([^\s.]*)\$" - missing_fields: set[str] - if self.tags.message: - matches = re.findall(field_match_regex, self.tags.message.lower()) - message_fields = [match.replace("$", "").lower() for match in matches] - missing_fields = set([field for field in observable_fields if field not in self.search.lower()]) - else: - message_fields = [] - missing_fields = set() - - error_messages: list[str] = [] - if len(missing_fields) > 0: - error_messages.append( - "The following fields are declared as observables, but do not exist in the " - f"search: {missing_fields}" - ) + missing_fields: set[str] + if self.tags.message: + matches = re.findall(field_match_regex, self.tags.message.lower()) + message_fields = [match.replace("$", "").lower() for match in matches] + missing_fields = set([field for field in observable_fields if field not in self.search.lower()]) + else: + message_fields = [] + missing_fields = set() + + error_messages: list[str] = [] + if len(missing_fields) > 0: + error_messages.append( + "The following fields are declared as observables, but do not exist in the " + f"search: {missing_fields}" + ) - missing_fields = set([field for field in message_fields if field not in self.search.lower()]) - if len(missing_fields) > 0: - error_messages.append( - "The following fields are used as fields in the message, but do not exist in " - f"the search: {missing_fields}" - ) + missing_fields = set([field for field in message_fields if field not in self.search.lower()]) + if len(missing_fields) > 0: + error_messages.append( + "The following fields are used as fields in the message, but do not exist in " + f"the search: {missing_fields}" + ) - # NOTE: we ignore the type error around self.status because we are using Pydantic's - # use_enum_values configuration - # https://docs.pydantic.dev/latest/api/config/#pydantic.config.ConfigDict.populate_by_name - if len(error_messages) > 0 and self.status == DetectionStatus.production.value: # type: ignore - msg = ( - "Use of fields in observables/messages that do not appear in search:\n\t- " - "\n\t- ".join(error_messages) - ) - raise ValueError(msg) + # NOTE: we ignore the type error around self.status because we are using Pydantic's + # use_enum_values configuration + # https://docs.pydantic.dev/latest/api/config/#pydantic.config.ConfigDict.populate_by_name + if len(error_messages) > 0 and self.status == DetectionStatus.production.value: # type: ignore + msg = ( + "Use of fields in observables/messages that do not appear in search:\n\t- " + "\n\t- ".join(error_messages) + ) + raise ValueError(msg) # Found everything return self diff --git a/contentctl/objects/config.py b/contentctl/objects/config.py index edf4ea64..2937923c 100644 --- a/contentctl/objects/config.py +++ b/contentctl/objects/config.py @@ -175,7 +175,6 @@ class validate(Config_Base): "be avoided for performance reasons.") build_app: bool = Field(default=True, description="Should an app be built and output in the build_path?") build_api: bool = Field(default=False, description="Should api objects be built and output in the build_path?") - build_ssa: bool = Field(default=False, description="Should ssa objects be built and output in the build_path?") data_source_TA_validation: bool = Field(default=False, description="Validate latest TA information from Splunkbase") def getAtomicRedTeamRepoPath(self, atomic_red_team_repo_name:str = "atomic-red-team"): @@ -577,7 +576,6 @@ def dumpCICDPlanAndQuit(self, githash: str, detections:List[Detection]): # output to dist. We have already built it! self.build_app = False self.build_api = False - self.build_ssa = False self.enrichments = False self.enable_integration_testing = True diff --git a/contentctl/output/ba_yml_output.py b/contentctl/output/ba_yml_output.py deleted file mode 100644 index 5b93743d..00000000 --- a/contentctl/output/ba_yml_output.py +++ /dev/null @@ -1,153 +0,0 @@ -import os -import re - -from urllib.parse import urlparse - -from contentctl.output.yml_writer import YmlWriter -from contentctl.objects.enums import SecurityContentType -from contentctl.output.finding_report_writer import FindingReportObject -from contentctl.objects.unit_test_old import UnitTestOld - - -class BAYmlOutput(): - - - def writeObjectsInPlace(self, objects: list) -> None: - for object in objects: - file_path = object['file_path'] - object.pop('file_path') - object.pop('deprecated') - object.pop('experimental') - YmlWriter.writeYmlFile(file_path, object) - - - def writeObjects(self, objects: list, output_path: str, contentType: SecurityContentType = None) -> None: - for obj in objects: - file_name = "ssa___" + self.convertNameToFileName(obj.name, obj.tags) - if self.isComplexBARule(obj.search): - file_path = os.path.join(output_path, 'complex', file_name) - else: - file_path = os.path.join(output_path, 'srs', file_name) - - # add research object - RESEARCH_SITE_BASE = 'https://research.splunk.com/' - research_site_url = RESEARCH_SITE_BASE + obj.source + "/" + obj.id + "/" - obj.tags.research_site_url = research_site_url - - # add ocsf schema tag - obj.tags.event_schema = 'ocsf' - - body = FindingReportObject.writeFindingReport(obj) - - if obj.test: - test_dict = { - "name": obj.name + " Unit Test", - "tests": [obj.test.dict()] - } - test_dict["tests"][0]["name"] = obj.name - for count in range(len(test_dict["tests"][0]["attack_data"])): - a = urlparse(str(test_dict["tests"][0]["attack_data"][count]["data"])) - test_dict["tests"][0]["attack_data"][count]["file_name"] = os.path.basename(a.path) - - test = UnitTestOld.parse_obj(test_dict) - - obj.test = test - - # create annotations object - obj.tags.annotations = { - "analytic_story": obj.tags.analytic_story, - "cis20": obj.tags.cis20, - "kill_chain_phases": obj.tags.kill_chain_phases, - "mitre_attack_id": obj.tags.mitre_attack_id, - "nist": obj.tags.nist - } - - obj.runtime = "SPL2" - obj.internalVersion = 2 - - ### Adding detection_type as top level key for SRS detections - obj.detection_type = "STREAMING" - - # remove unncessary fields - YmlWriter.writeYmlFile(file_path, obj.dict( - exclude_none=True, - include = - { - "name": True, - "id": True, - "version": True, - "status": True, - "detection_type": True, - "description": True, - "search": True, - "how_to_implement": True, - "known_false_positives": True, - "references": True, - "runtime": True, - "internalVersion": True, - "tags": - { - #"analytic_story": True, - #"cis20" : True, - #"nist": True, - #"kill_chain_phases": True, - "annotations": True, - "mappings": True, - #"mitre_attack_id": True, - "risk_severity": True, - "risk_score": True, - "security_domain": True, - "required_fields": True, - "research_site_url": True, - "event_schema": True - }, - "test": - { - "name": True, - "tests": { - '__all__': - { - "name": True, - "file": True, - "pass_condition": True, - "attack_data": { - '__all__': - { - "file_name": True, - "data": True, - "source": True - } - } - } - } - } - } - )) - - # Add Finding Report Object - with open(file_path, 'r') as file: - data = file.read().replace('--finding_report--', body) - - f = open(file_path, "w") - f.write(data) - f.close() - - - def convertNameToFileName(self, name: str, product: list): - file_name = name \ - .replace(' ', '_') \ - .replace('-','_') \ - .replace('.','_') \ - .replace('/','_') \ - .lower() - if 'Splunk Behavioral Analytics' in product: - - file_name = 'ssa___' + file_name + '.yml' - else: - file_name = file_name + '.yml' - return file_name - - - def isComplexBARule(self, search): - return re.findall("stats|first_time_event|adaptive_threshold", search) - diff --git a/contentctl/output/finding_report_writer.py b/contentctl/output/finding_report_writer.py deleted file mode 100644 index 06192b93..00000000 --- a/contentctl/output/finding_report_writer.py +++ /dev/null @@ -1,91 +0,0 @@ -import os -import re -from jinja2 import Environment, FileSystemLoader - -from contentctl.objects.ssa_detection import SSADetection -from contentctl.objects.constants import * - -class FindingReportObject(): - - @staticmethod - def writeFindingReport(detection : SSADetection) -> None: - - if detection.tags.confidence < 33: - detection.tags.confidence_id = 1 - elif detection.tags.confidence < 66: - detection.tags.confidence_id = 2 - else: - detection.tags.confidence_id = 3 - - if detection.tags.impact < 20: - detection.tags.impact_id = 1 - elif detection.tags.impact < 40: - detection.tags.impact_id = 2 - elif detection.tags.impact < 60: - detection.tags.impact_id = 3 - elif detection.tags.impact < 80: - detection.tags.impact_id = 4 - else: - detection.tags.impact_id = 5 - - detection.tags.kill_chain_phases_id = dict() - for kill_chain_phase in detection.tags.kill_chain_phases: - detection.tags.kill_chain_phases_id[kill_chain_phase] = SES_KILL_CHAIN_MAPPINGS[kill_chain_phase] - - kill_chain_phase_str = "[" - i = 0 - for kill_chain_phase in detection.tags.kill_chain_phases_id.keys(): - kill_chain_phase_str = kill_chain_phase_str + '{"phase": "' + kill_chain_phase + '", "phase_id": ' + str(detection.tags.kill_chain_phases_id[kill_chain_phase]) + "}" - if not i == (len(detection.tags.kill_chain_phases_id.keys()) - 1): - kill_chain_phase_str = kill_chain_phase_str + ', ' - i = i + 1 - kill_chain_phase_str = kill_chain_phase_str + ']' - detection.tags.kill_chain_phases_str = kill_chain_phase_str - - if detection.tags.risk_score < 20: - detection.tags.risk_level_id = 0 - detection.tags.risk_level = "Info" - elif detection.tags.risk_score < 40: - detection.tags.risk_level_id = 1 - detection.tags.risk_level = "Low" - elif detection.tags.risk_score < 60: - detection.tags.risk_level_id = 2 - detection.tags.risk_level = "Medium" - elif detection.tags.risk_score < 80: - detection.tags.risk_level_id = 3 - detection.tags.risk_level = "High" - else: - detection.tags.risk_level_id = 4 - detection.tags.risk_level = "Critical" - - evidence_str = "{" - for i in range(len(detection.tags.required_fields)): - evidence_str = evidence_str + '"' + detection.tags.required_fields[i] + '": ' + detection.tags.required_fields[i].replace(".", "_") - if not i == (len(detection.tags.required_fields) - 1): - evidence_str = evidence_str + ', ' - - evidence_str = evidence_str + ', "sourceType": metadata.source_type, "source": metadata.source}' - - - detection.tags.evidence_str = evidence_str - - analytics_story_str = "[" - for i in range(len(detection.tags.analytic_story)): - analytics_story_str = analytics_story_str + '"' + detection.tags.analytic_story[i] + '"' - if not i == (len(detection.tags.analytic_story) - 1): - analytics_story_str = analytics_story_str + ', ' - analytics_story_str = analytics_story_str + ']' - detection.tags.analytics_story_str = analytics_story_str - - if "actor.user.name" in detection.tags.required_fields: - actor_user_name = "actor_user_name" - else: - actor_user_name = "\"Unknown\"" - - j2_env = Environment( - loader=FileSystemLoader(os.path.join(os.path.dirname(__file__), 'templates')), - trim_blocks=True) - template = j2_env.get_template('finding_report.j2') - body = template.render(detection=detection, attack_tactics_id_mapping=SES_ATTACK_TACTICS_ID_MAPPING, actor_user_name=actor_user_name) - - return body diff --git a/pyproject.toml b/pyproject.toml index aad75f3a..2df898b2 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,6 +1,6 @@ [tool.poetry] name = "contentctl" -version = "4.2.5" +version = "4.3.0" description = "Splunk Content Control Tool" authors = ["STRT "] license = "Apache 2.0" @@ -24,9 +24,6 @@ splunk-sdk = "^2.0.1" semantic-version = "^2.10.0" bottle = "^0.12.25" tqdm = "^4.66.4" -#splunk-appinspect = "^2.36.0" -pysigma = "^0.11.5" -pysigma-backend-splunk = "^1.1.0" pygit2 = "^1.14.1" tyro = "^0.8.3" gitpython = "^3.1.43"