diff --git a/contentctl/actions/validate.py b/contentctl/actions/validate.py index 6271eb9c..2d7203a4 100644 --- a/contentctl/actions/validate.py +++ b/contentctl/actions/validate.py @@ -1,11 +1,14 @@ import pathlib + from contentctl.input.director import Director, DirectorOutputDto from contentctl.objects.config import validate from contentctl.enrichments.attack_enrichment import AttackEnrichment from contentctl.enrichments.cve_enrichment import CveEnrichment from contentctl.objects.atomic import AtomicTest from contentctl.helper.utils import Utils +from contentctl.objects.data_source import DataSource +from contentctl.helper.splunk_app import SplunkApp class Validate: @@ -33,6 +36,9 @@ def execute(self, input_dto: validate) -> DirectorOutputDto: director = Director(director_output_dto) director.execute(input_dto) self.ensure_no_orphaned_files_in_lookups(input_dto.path, director_output_dto) + if input_dto.data_source_TA_validation: + self.validate_latest_TA_information(director_output_dto.data_sources) + return director_output_dto @@ -72,4 +78,37 @@ def ensure_no_orphaned_files_in_lookups(self, repo_path:pathlib.Path, director_o if len(unusedLookupFiles) > 0: raise Exception(f"The following .csv or .mlmodel files exist in '{lookupsDirectory}', but are not referenced by a lookup file: {[str(path) for path in unusedLookupFiles]}") return - \ No newline at end of file + + + def validate_latest_TA_information(self, data_sources: list[DataSource]) -> None: + validated_TAs: list[tuple[str, str]] = [] + errors:list[str] = [] + print("----------------------") + print("Validating latest TA:") + print("----------------------") + for data_source in data_sources: + for supported_TA in data_source.supported_TA: + ta_identifier = (supported_TA.name, supported_TA.version) + if ta_identifier in validated_TAs: + continue + if supported_TA.url is not None: + validated_TAs.append(ta_identifier) + uid = int(str(supported_TA.url).rstrip('/').split("/")[-1]) + try: + splunk_app = SplunkApp(app_uid=uid) + if splunk_app.latest_version != supported_TA.version: + errors.append(f"Version mismatch in '{data_source.file_path}' supported TA '{supported_TA.name}'" + f"\n Latest version on Splunkbase : {splunk_app.latest_version}" + f"\n Version specified in data source: {supported_TA.version}") + except Exception as e: + errors.append(f"Error processing checking version of TA {supported_TA.name}: {str(e)}") + + if len(errors) > 0: + errorString = '\n\n'.join(errors) + raise Exception(f"[{len(errors)}] or more TA versions are out of date or have other errors." + f"Please update the following data sources with the latest versions of " + f"their supported tas:\n\n{errorString}") + print("All TA versions are up to date.") + + + diff --git a/contentctl/helper/splunk_app.py b/contentctl/helper/splunk_app.py new file mode 100644 index 00000000..715e3072 --- /dev/null +++ b/contentctl/helper/splunk_app.py @@ -0,0 +1,263 @@ +import os +import time +import json +import xml.etree.ElementTree as ET +from typing import List, Tuple, Optional +from urllib.parse import urlencode + +import requests +import urllib3 +import xmltodict +from requests.adapters import HTTPAdapter +from requests.packages.urllib3.util.retry import Retry + +urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning) + +MAX_RETRY = 3 + +class APIEndPoint: + """ + Class which contains Static Endpoint + """ + + SPLUNK_BASE_AUTH_URL = "https://splunkbase.splunk.com/api/account:login/" + SPLUNK_BASE_FETCH_APP_BY_ENTRY_ID = ( + "https://apps.splunk.com/api/apps/entriesbyid/{app_name_id}" + ) + SPLUNK_BASE_GET_UID_REDIRECT = "https://apps.splunk.com/apps/id/{app_name_id}" + SPLUNK_BASE_APP_INFO = "https://splunkbase.splunk.com/api/v1/app/{app_uid}" + +class RetryConstant: + """ + Class which contains Retry Constant + """ + + RETRY_COUNT = 3 + RETRY_INTERVAL = 15 + + +class SplunkBaseError(requests.HTTPError): + """An error raise in communicating with Splunkbase""" + pass + + +# TODO (PEX-306): validate w/ Splunkbase team if there are better APIs we can rely on being supported +class SplunkApp: + """ + A Splunk app available for download on Splunkbase + """ + + class InitializationError(Exception): + """An initialization error during SplunkApp setup""" + pass + + @staticmethod + def requests_retry_session( + retries=RetryConstant.RETRY_COUNT, + backoff_factor=1, + status_forcelist=(500, 502, 503, 504), + session=None, + ): + session = session or requests.Session() + retry = Retry( + total=retries, + read=retries, + connect=retries, + backoff_factor=backoff_factor, + status_forcelist=status_forcelist, + ) + adapter = HTTPAdapter(max_retries=retry) + session.mount('http://', adapter) + session.mount('https://', adapter) + return session + + def __init__( + self, + app_uid: Optional[int] = None, + app_name_id: Optional[str] = None, + manual_setup: bool = False, + ) -> None: + if app_uid is None and app_name_id is None: + raise SplunkApp.InitializationError( + "Either app_uid (the numeric app UID e.g. 742) or app_name_id (the app name " + "idenitifier e.g. Splunk_TA_windows) must be provided" + ) + + # init or declare instance vars + self.app_uid: Optional[int] = app_uid + self.app_name_id: Optional[str] = app_name_id + self.manual_setup = manual_setup + self.app_title: str + self.latest_version: str + self.latest_version_download_url: str + self._app_info_cache: Optional[dict] = None + + # set instance vars as needed; skip if manual setup was indicated + if not self.manual_setup: + self.set_app_name_id() + self.set_app_uid() + self.set_app_title() + self.set_latest_version_info() + + def __eq__(self, __value: object) -> bool: + if isinstance(__value, SplunkApp): + return self.app_uid == __value.app_uid + return False + + def __repr__(self) -> str: + return ( + f"SplunkApp(app_name_id='{self.app_name_id}', app_uid={self.app_uid}, " + f"latest_version_download_url='{self.latest_version_download_url}')" + ) + + def __str__(self) -> str: + return f"<'{self.app_name_id}' ({self.app_uid})" + + def get_app_info_by_uid(self) -> dict: + """ + Retrieve app info via app_uid (e.g. 742) + :return: dictionary of app info + """ + # return cache if already set and raise and raise is app_uid is not set + if self._app_info_cache is not None: + return self._app_info_cache + elif self.app_uid is None: + raise SplunkApp.InitializationError("app_uid must be set in order to fetch app info") + + # NOTE: auth not required + # Get app info by uid + try: + response = self.requests_retry_session().get( + APIEndPoint.SPLUNK_BASE_APP_INFO.format(app_uid=self.app_uid), + timeout=RetryConstant.RETRY_INTERVAL + ) + response.raise_for_status() + except requests.exceptions.RequestException as e: + raise SplunkBaseError(f"Error fetching app info for app_uid {self.app_uid}: {str(e)}") + + # parse JSON and set cache + self._app_info_cache: dict = json.loads(response.content) + + return self._app_info_cache + + def set_app_name_id(self) -> None: + """ + Set app_name_id + """ + # return if app_name_id is already set + if self.app_name_id is not None: + return + + # get app info by app_uid + app_info = self.get_app_info_by_uid() + + # set app_name_id if found + if "appid" in app_info: + self.app_name_id = app_info["appid"] + else: + raise SplunkBaseError(f"Invalid response from Splunkbase; missing key 'appid': {app_info}") + + def set_app_uid(self) -> None: + """ + Set app_uid + """ + # return if app_uid is already set and raise if app_name_id was not set + if self.app_uid is not None: + return + elif self.app_name_id is None: + raise SplunkApp.InitializationError("app_name_id must be set in order to fetch app_uid") + + # NOTE: auth not required + # Get app_uid by app_name_id via a redirect + try: + response = self.requests_retry_session().get( + APIEndPoint.SPLUNK_BASE_GET_UID_REDIRECT.format(app_name_id=self.app_name_id), + allow_redirects=False, + timeout=RetryConstant.RETRY_INTERVAL + ) + response.raise_for_status() + except requests.exceptions.RequestException as e: + raise SplunkBaseError(f"Error fetching app_uid for app_name_id '{self.app_name_id}': {str(e)}") + + # Extract the app_uid from the redirect path + if "Location" in response.headers: + self.app_uid = response.headers.split("/")[-1] + else: + raise SplunkBaseError( + "Invalid response from Splunkbase; missing 'Location' in redirect header" + ) + + def set_app_title(self) -> None: + """ + Set app_title + """ + # get app info by app_uid + app_info = self.get_app_info_by_uid() + + # set app_title if found + if "title" in app_info: + self.app_title = app_info["title"] + else: + raise SplunkBaseError(f"Invalid response from Splunkbase; missing key 'title': {app_info}") + + def __fetch_url_latest_version_info(self) -> str: + """ + Identify latest version of the app and return a URL pointing to download info for the build + :return: url for download info on the latest build + """ + # retrieve app entries using the app_name_id + try: + response = self.requests_retry_session().get( + APIEndPoint.SPLUNK_BASE_FETCH_APP_BY_ENTRY_ID.format(app_name_id=self.app_name_id), + timeout=RetryConstant.RETRY_INTERVAL + ) + response.raise_for_status() + except requests.exceptions.RequestException as e: + raise SplunkBaseError(f"Error fetching app entries for app_name_id '{self.app_name_id}': {str(e)}") + + # parse xml + app_xml = xmltodict.parse(response.content) + + # convert to list if only one entry exists + app_entries = app_xml.get("feed").get("entry") + if not isinstance(app_entries, list): + app_entries = [app_entries] + + # iterate over multiple entries if present + for entry in app_entries: + for key in entry.get("content").get("s:dict").get("s:key"): + if key.get("@name") == "islatest" and key.get("#text") == "True": + return entry.get("link").get("@href") + + # raise if no entry was found + raise SplunkBaseError(f"No app entry found with 'islatest' tag set to True: {self.app_name_id}") + + def __fetch_url_latest_version_download(self, info_url: str) -> str: + """ + Fetch the download URL via the provided URL to build info + :param info_url: URL for download info for the latest build + :return: URL for downloading the latest build + """ + # fetch download info + try: + response = self.requests_retry_session().get(info_url, timeout=RetryConstant.RETRY_INTERVAL) + response.raise_for_status() + except requests.exceptions.RequestException as e: + raise SplunkBaseError(f"Error fetching download info for app_name_id '{self.app_name_id}': {str(e)}") + + # parse XML and extract download URL + build_xml = xmltodict.parse(response.content) + download_url = build_xml.get("feed").get("entry").get("link").get("@href") + return download_url + + def set_latest_version_info(self) -> None: + # raise if app_name_id not set + if self.app_name_id is None: + raise SplunkApp.InitializationError("app_name_id must be set in order to fetch latest version info") + + # fetch the info URL + info_url = self.__fetch_url_latest_version_info() + + # parse out the version number and fetch the download URL + self.latest_version = info_url.split("/")[-1] + self.latest_version_download_url = self.__fetch_url_latest_version_download(info_url) \ No newline at end of file diff --git a/contentctl/objects/config.py b/contentctl/objects/config.py index 20f77ed2..1261fddc 100644 --- a/contentctl/objects/config.py +++ b/contentctl/objects/config.py @@ -176,6 +176,7 @@ class validate(Config_Base): build_app: bool = Field(default=True, description="Should an app be built and output in the build_path?") build_api: bool = Field(default=False, description="Should api objects be built and output in the build_path?") build_ssa: bool = Field(default=False, description="Should ssa objects be built and output in the build_path?") + data_source_TA_validation: bool = Field(default=False, description="Validate latest TA information from Splunkbase") def getAtomicRedTeamRepoPath(self, atomic_red_team_repo_name:str = "atomic-red-team"): return self.path/atomic_red_team_repo_name diff --git a/contentctl/objects/data_source.py b/contentctl/objects/data_source.py index 7e31a9a4..868bdd51 100644 --- a/contentctl/objects/data_source.py +++ b/contentctl/objects/data_source.py @@ -1,15 +1,20 @@ from __future__ import annotations from typing import Optional, Any -from pydantic import Field, FilePath, model_serializer +from pydantic import Field, HttpUrl, model_serializer, BaseModel from contentctl.objects.security_content_object import SecurityContentObject from contentctl.objects.event_source import EventSource + +class TA(BaseModel): + name: str + url: HttpUrl | None = None + version: str class DataSource(SecurityContentObject): source: str = Field(...) sourcetype: str = Field(...) separator: Optional[str] = None configuration: Optional[str] = None - supported_TA: Optional[list] = None + supported_TA: list[TA] = [] fields: Optional[list] = None field_mappings: Optional[list] = None convert_to_log_source: Optional[list] = None diff --git a/contentctl/output/data_source_writer.py b/contentctl/output/data_source_writer.py index ba505905..97967a72 100644 --- a/contentctl/output/data_source_writer.py +++ b/contentctl/output/data_source_writer.py @@ -18,10 +18,10 @@ def writeDataSourceCsv(data_source_objects: List[DataSource], file_path: pathlib ]) # Write the data for data_source in data_source_objects: - if data_source.supported_TA and isinstance(data_source.supported_TA, list) and len(data_source.supported_TA) > 0: - supported_TA_name = data_source.supported_TA[0].get('name', '') - supported_TA_version = data_source.supported_TA[0].get('version', '') - supported_TA_url = data_source.supported_TA[0].get('url', '') + if len(data_source.supported_TA) > 0: + supported_TA_name = data_source.supported_TA[0].name + supported_TA_version = data_source.supported_TA[0].version + supported_TA_url = data_source.supported_TA[0].url or '' else: supported_TA_name = '' supported_TA_version = '' diff --git a/pyproject.toml b/pyproject.toml index 5afaddec..b195dc5c 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,6 +1,6 @@ [tool.poetry] name = "contentctl" -version = "4.2.2" +version = "4.2.3" description = "Splunk Content Control Tool" authors = ["STRT "] license = "Apache 2.0"