Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
40 changes: 24 additions & 16 deletions contentctl/actions/validate.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@

import pathlib

import urllib3.util
from contentctl.input.director import Director, DirectorOutputDto
from contentctl.objects.config import validate
from contentctl.enrichments.attack_enrichment import AttackEnrichment
Expand Down Expand Up @@ -36,10 +38,8 @@ def execute(self, input_dto: validate) -> DirectorOutputDto:
director.execute(input_dto)
self.ensure_no_orphaned_files_in_lookups(input_dto.path, director_output_dto)
if input_dto.data_source_TA_validation:
if self.validate_latest_TA_information(director_output_dto.data_sources) != 1:
print("All TA versions are up to date.")
else:
raise Exception("One or more TA versions are out of date. Please update the data source with the latest version.")
self.validate_latest_TA_information(director_output_dto.data_sources)

return director_output_dto


Expand Down Expand Up @@ -81,27 +81,35 @@ def ensure_no_orphaned_files_in_lookups(self, repo_path:pathlib.Path, director_o
return


def validate_latest_TA_information(self, data_sources: list[DataSource]) -> int:
def validate_latest_TA_information(self, data_sources: list[DataSource]) -> None:
validated_TAs: list[tuple[str, str]] = []
error_occurred = False
errors:list[str] = []
print("----------------------")
print("Validating latest TA:")
print("----------------------")
for data_source in data_sources:
for supported_TA in data_source.supported_TA:
ta_identifier = (supported_TA["name"], supported_TA["version"])
ta_identifier = (supported_TA.name, supported_TA.version)
if ta_identifier in validated_TAs:
continue
if "url" in supported_TA:
if supported_TA.url is not None:
validated_TAs.append(ta_identifier)
uid = int(supported_TA["url"].rstrip('/').split("/")[-1])
uid = int(str(supported_TA.url).rstrip('/').split("/")[-1])
try:
splunk_app = SplunkApp(app_uid=uid)
if splunk_app.latest_version != supported_TA["version"]:
raise Exception(f"Version mismatch for TA {supported_TA['name']}: "
f"Latest version on Splunkbase is {splunk_app.latest_version}, "
f"but version {supported_TA['version']} is specified in the data source {data_source.name}.")
if splunk_app.latest_version != supported_TA.version:
errors.append(f"Version mismatch in '{data_source.file_path}' supported TA '{supported_TA.name}'"
f"\n Latest version on Splunkbase : {splunk_app.latest_version}"
f"\n Version specified in data source: {supported_TA.version}")
except Exception as e:
print(f"Error processing TA {supported_TA['name']}: {str(e)}")
error_occurred = True
return 1 if error_occurred else 0
errors.append(f"Error processing checking version of TA {supported_TA.name}: {str(e)}")

if len(errors) > 0:
errorString = '\n\n'.join(errors)
raise Exception(f"[{len(errors)}] or more TA versions are out of date or have other errors."
f"Please update the following data sources with the latest versions of "
f"their supported tas:\n\n{errorString}")
print("All TA versions are up to date.")



9 changes: 7 additions & 2 deletions contentctl/objects/data_source.py
Original file line number Diff line number Diff line change
@@ -1,15 +1,20 @@
from __future__ import annotations
from typing import Optional, Any
from pydantic import Field, FilePath, model_serializer
from pydantic import Field, HttpUrl, model_serializer, BaseModel
from contentctl.objects.security_content_object import SecurityContentObject
from contentctl.objects.event_source import EventSource


class TA(BaseModel):
name: str
url: HttpUrl | None = None
version: str
class DataSource(SecurityContentObject):
source: str = Field(...)
sourcetype: str = Field(...)
separator: Optional[str] = None
configuration: Optional[str] = None
supported_TA: Optional[list] = None
supported_TA: list[TA] = []
fields: Optional[list] = None
field_mappings: Optional[list] = None
convert_to_log_source: Optional[list] = None
Expand Down
8 changes: 4 additions & 4 deletions contentctl/output/data_source_writer.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,10 @@ def writeDataSourceCsv(data_source_objects: List[DataSource], file_path: pathlib
])
# Write the data
for data_source in data_source_objects:
if data_source.supported_TA and isinstance(data_source.supported_TA, list) and len(data_source.supported_TA) > 0:
supported_TA_name = data_source.supported_TA[0].get('name', '')
supported_TA_version = data_source.supported_TA[0].get('version', '')
supported_TA_url = data_source.supported_TA[0].get('url', '')
if len(data_source.supported_TA) > 0:
supported_TA_name = data_source.supported_TA[0].name
supported_TA_version = data_source.supported_TA[0].version
supported_TA_url = data_source.supported_TA[0].url or ''
else:
supported_TA_name = ''
supported_TA_version = ''
Expand Down