From fb4cb45403417e5c182bb226443002d88ad49218 Mon Sep 17 00:00:00 2001 From: pyth0n1c Date: Mon, 5 Aug 2024 15:30:56 -0700 Subject: [PATCH 1/2] Add some validator to supported_ta field --- contentctl/actions/validate.py | 16 +++++++++------- contentctl/objects/data_source.py | 9 +++++++-- 2 files changed, 16 insertions(+), 9 deletions(-) diff --git a/contentctl/actions/validate.py b/contentctl/actions/validate.py index 0b7496e7..fba273fa 100644 --- a/contentctl/actions/validate.py +++ b/contentctl/actions/validate.py @@ -1,5 +1,7 @@ import pathlib + +import urllib3.util from contentctl.input.director import Director, DirectorOutputDto from contentctl.objects.config import validate from contentctl.enrichments.attack_enrichment import AttackEnrichment @@ -89,19 +91,19 @@ def validate_latest_TA_information(self, data_sources: list[DataSource]) -> int: print("----------------------") for data_source in data_sources: for supported_TA in data_source.supported_TA: - ta_identifier = (supported_TA["name"], supported_TA["version"]) + ta_identifier = (supported_TA.name, supported_TA.version) if ta_identifier in validated_TAs: continue - if "url" in supported_TA: + if supported_TA.url is not None: validated_TAs.append(ta_identifier) - uid = int(supported_TA["url"].rstrip('/').split("/")[-1]) + uid = int(str(supported_TA.url).rstrip('/').split("/")[-1]) try: splunk_app = SplunkApp(app_uid=uid) - if splunk_app.latest_version != supported_TA["version"]: - raise Exception(f"Version mismatch for TA {supported_TA['name']}: " + if splunk_app.latest_version != supported_TA.version: + raise Exception(f"Version mismatch for TA {supported_TA.name}: " f"Latest version on Splunkbase is {splunk_app.latest_version}, " - f"but version {supported_TA['version']} is specified in the data source {data_source.name}.") + f"but version {supported_TA.version} is specified in the data source {data_source.name}.") except Exception as e: - print(f"Error processing TA {supported_TA['name']}: {str(e)}") + print(f"Error processing TA {supported_TA.version}: {str(e)}") error_occurred = True return 1 if error_occurred else 0 diff --git a/contentctl/objects/data_source.py b/contentctl/objects/data_source.py index 7e31a9a4..868bdd51 100644 --- a/contentctl/objects/data_source.py +++ b/contentctl/objects/data_source.py @@ -1,15 +1,20 @@ from __future__ import annotations from typing import Optional, Any -from pydantic import Field, FilePath, model_serializer +from pydantic import Field, HttpUrl, model_serializer, BaseModel from contentctl.objects.security_content_object import SecurityContentObject from contentctl.objects.event_source import EventSource + +class TA(BaseModel): + name: str + url: HttpUrl | None = None + version: str class DataSource(SecurityContentObject): source: str = Field(...) sourcetype: str = Field(...) separator: Optional[str] = None configuration: Optional[str] = None - supported_TA: Optional[list] = None + supported_TA: list[TA] = [] fields: Optional[list] = None field_mappings: Optional[list] = None convert_to_log_source: Optional[list] = None From e51b688e733c034be0514e843f123d151d6cc880 Mon Sep 17 00:00:00 2001 From: pyth0n1c Date: Mon, 5 Aug 2024 15:47:25 -0700 Subject: [PATCH 2/2] Slightly more changes to support small ta object --- contentctl/actions/validate.py | 30 +++++++++++++++---------- contentctl/output/data_source_writer.py | 8 +++---- 2 files changed, 22 insertions(+), 16 deletions(-) diff --git a/contentctl/actions/validate.py b/contentctl/actions/validate.py index fba273fa..1592e039 100644 --- a/contentctl/actions/validate.py +++ b/contentctl/actions/validate.py @@ -38,10 +38,8 @@ def execute(self, input_dto: validate) -> DirectorOutputDto: director.execute(input_dto) self.ensure_no_orphaned_files_in_lookups(input_dto.path, director_output_dto) if input_dto.data_source_TA_validation: - if self.validate_latest_TA_information(director_output_dto.data_sources) != 1: - print("All TA versions are up to date.") - else: - raise Exception("One or more TA versions are out of date. Please update the data source with the latest version.") + self.validate_latest_TA_information(director_output_dto.data_sources) + return director_output_dto @@ -83,9 +81,9 @@ def ensure_no_orphaned_files_in_lookups(self, repo_path:pathlib.Path, director_o return - def validate_latest_TA_information(self, data_sources: list[DataSource]) -> int: + def validate_latest_TA_information(self, data_sources: list[DataSource]) -> None: validated_TAs: list[tuple[str, str]] = [] - error_occurred = False + errors:list[str] = [] print("----------------------") print("Validating latest TA:") print("----------------------") @@ -100,10 +98,18 @@ def validate_latest_TA_information(self, data_sources: list[DataSource]) -> int: try: splunk_app = SplunkApp(app_uid=uid) if splunk_app.latest_version != supported_TA.version: - raise Exception(f"Version mismatch for TA {supported_TA.name}: " - f"Latest version on Splunkbase is {splunk_app.latest_version}, " - f"but version {supported_TA.version} is specified in the data source {data_source.name}.") + errors.append(f"Version mismatch in '{data_source.file_path}' supported TA '{supported_TA.name}'" + f"\n Latest version on Splunkbase : {splunk_app.latest_version}" + f"\n Version specified in data source: {supported_TA.version}") except Exception as e: - print(f"Error processing TA {supported_TA.version}: {str(e)}") - error_occurred = True - return 1 if error_occurred else 0 + errors.append(f"Error processing checking version of TA {supported_TA.name}: {str(e)}") + + if len(errors) > 0: + errorString = '\n\n'.join(errors) + raise Exception(f"[{len(errors)}] or more TA versions are out of date or have other errors." + f"Please update the following data sources with the latest versions of " + f"their supported tas:\n\n{errorString}") + print("All TA versions are up to date.") + + + diff --git a/contentctl/output/data_source_writer.py b/contentctl/output/data_source_writer.py index ba505905..97967a72 100644 --- a/contentctl/output/data_source_writer.py +++ b/contentctl/output/data_source_writer.py @@ -18,10 +18,10 @@ def writeDataSourceCsv(data_source_objects: List[DataSource], file_path: pathlib ]) # Write the data for data_source in data_source_objects: - if data_source.supported_TA and isinstance(data_source.supported_TA, list) and len(data_source.supported_TA) > 0: - supported_TA_name = data_source.supported_TA[0].get('name', '') - supported_TA_version = data_source.supported_TA[0].get('version', '') - supported_TA_url = data_source.supported_TA[0].get('url', '') + if len(data_source.supported_TA) > 0: + supported_TA_name = data_source.supported_TA[0].name + supported_TA_version = data_source.supported_TA[0].version + supported_TA_url = data_source.supported_TA[0].url or '' else: supported_TA_name = '' supported_TA_version = ''