From d156eaa25f77901bf757ef1b1bc703e3b7489468 Mon Sep 17 00:00:00 2001 From: ljstella Date: Tue, 21 Jan 2025 12:44:33 -0600 Subject: [PATCH 1/6] Just CSV lookups --- .../actions/detection_testing/GitService.py | 206 ++++++++++++------ 1 file changed, 134 insertions(+), 72 deletions(-) diff --git a/contentctl/actions/detection_testing/GitService.py b/contentctl/actions/detection_testing/GitService.py index ee9e4693..e598562d 100644 --- a/contentctl/actions/detection_testing/GitService.py +++ b/contentctl/actions/detection_testing/GitService.py @@ -1,44 +1,42 @@ import logging import os import pathlib +from typing import TYPE_CHECKING, List, Optional + import pygit2 -from pygit2.enums import DeltaStatus -from typing import List, Optional from pydantic import BaseModel, FilePath -from typing import TYPE_CHECKING +from pygit2.enums import DeltaStatus + if TYPE_CHECKING: from contentctl.input.director import DirectorOutputDto - -from contentctl.objects.macro import Macro -from contentctl.objects.lookup import Lookup -from contentctl.objects.detection import Detection + +from contentctl.objects.config import All, Changes, Selected, test_common from contentctl.objects.data_source import DataSource +from contentctl.objects.detection import Detection +from contentctl.objects.lookup import Lookup, Lookup_Type +from contentctl.objects.macro import Macro from contentctl.objects.security_content_object import SecurityContentObject -from contentctl.objects.config import test_common, All, Changes, Selected # Logger logging.basicConfig(level=os.environ.get("LOGLEVEL", "INFO")) LOGGER = logging.getLogger(__name__) - from contentctl.input.director import DirectorOutputDto - class GitService(BaseModel): director: DirectorOutputDto config: test_common gitHash: Optional[str] = None - - def getHash(self)->str: + + def getHash(self) -> str: if self.gitHash is None: raise Exception("Cannot get hash of repo, it was not set") return self.gitHash - - def getContent(self)->List[Detection]: + def getContent(self) -> List[Detection]: if isinstance(self.config.mode, Selected): return self.getSelected(self.config.mode.files) elif isinstance(self.config.mode, Changes): @@ -46,142 +44,206 @@ def getContent(self)->List[Detection]: if isinstance(self.config.mode, All): return self.getAll() else: - raise Exception(f"Could not get content to test. Unsupported test mode '{self.config.mode}'") - def getAll(self)->List[Detection]: + raise Exception( + f"Could not get content to test. Unsupported test mode '{self.config.mode}'" + ) + + def getAll(self) -> List[Detection]: return self.director.detections - - def getChanges(self, target_branch:str)->List[Detection]: + + def getChanges(self, target_branch: str) -> List[Detection]: repo = pygit2.Repository(path=str(self.config.path)) try: target_tree = repo.revparse_single(target_branch).tree self.gitHash = target_tree.id diffs = repo.index.diff_to_tree(target_tree) - except Exception as e: - raise Exception(f"Error parsing diff target_branch '{target_branch}'. Are you certain that it exists?") - - #Get the uncommitted changes in the current directory + except Exception: + raise Exception( + f"Error parsing diff target_branch '{target_branch}'. Are you certain that it exists?" + ) + + # Get the uncommitted changes in the current directory diffs2 = repo.index.diff_to_workdir() - - #Combine the uncommitted changes with the committed changes + + # Combine the uncommitted changes with the committed changes all_diffs = list(diffs) + list(diffs2) - #Make a filename to content map - filepath_to_content_map = { obj.file_path:obj for (_,obj) in self.director.name_to_content_map.items()} + # Make a filename to content map + filepath_to_content_map = { + obj.file_path: obj for (_, obj) in self.director.name_to_content_map.items() + } updated_detections: set[Detection] = set() updated_macros: set[Macro] = set() updated_lookups: set[Lookup] = set() updated_datasources: set[DataSource] = set() - for diff in all_diffs: if type(diff) == pygit2.Patch: - if diff.delta.status in (DeltaStatus.ADDED, DeltaStatus.MODIFIED, DeltaStatus.RENAMED): - #print(f"{DeltaStatus(diff.delta.status).name:<8}:{diff.delta.new_file.raw_path}") - decoded_path = pathlib.Path(diff.delta.new_file.raw_path.decode('utf-8')) + if diff.delta.status in ( + DeltaStatus.ADDED, + DeltaStatus.MODIFIED, + DeltaStatus.RENAMED, + ): + # print(f"{DeltaStatus(diff.delta.status).name:<8}:{diff.delta.new_file.raw_path}") + decoded_path = pathlib.Path( + diff.delta.new_file.raw_path.decode("utf-8") + ) # Note that we only handle updates to detections, lookups, and macros at this time. All other changes are ignored. - if decoded_path.is_relative_to(self.config.path/"detections") and decoded_path.suffix == ".yml": - detectionObject = filepath_to_content_map.get(decoded_path, None) + if ( + decoded_path.is_relative_to(self.config.path / "detections") + and decoded_path.suffix == ".yml" + ): + detectionObject = filepath_to_content_map.get( + decoded_path, None + ) if isinstance(detectionObject, Detection): updated_detections.add(detectionObject) else: - raise Exception(f"Error getting detection object for file {str(decoded_path)}") - - elif decoded_path.is_relative_to(self.config.path/"macros") and decoded_path.suffix == ".yml": + raise Exception( + f"Error getting detection object for file {str(decoded_path)}" + ) + + elif ( + decoded_path.is_relative_to(self.config.path / "macros") + and decoded_path.suffix == ".yml" + ): macroObject = filepath_to_content_map.get(decoded_path, None) if isinstance(macroObject, Macro): updated_macros.add(macroObject) else: - raise Exception(f"Error getting macro object for file {str(decoded_path)}") - - elif decoded_path.is_relative_to(self.config.path/"data_sources") and decoded_path.suffix == ".yml": - datasourceObject = filepath_to_content_map.get(decoded_path, None) + raise Exception( + f"Error getting macro object for file {str(decoded_path)}" + ) + + elif ( + decoded_path.is_relative_to(self.config.path / "data_sources") + and decoded_path.suffix == ".yml" + ): + datasourceObject = filepath_to_content_map.get( + decoded_path, None + ) if isinstance(datasourceObject, DataSource): updated_datasources.add(datasourceObject) else: - raise Exception(f"Error getting data source object for file {str(decoded_path)}") + raise Exception( + f"Error getting data source object for file {str(decoded_path)}" + ) - elif decoded_path.is_relative_to(self.config.path/"lookups"): + elif decoded_path.is_relative_to(self.config.path / "lookups"): # We need to convert this to a yml. This means we will catch # both changes to a csv AND changes to the YML that uses it if decoded_path.suffix == ".yml": - updatedLookup = filepath_to_content_map.get(decoded_path, None) - if not isinstance(updatedLookup,Lookup): - raise Exception(f"Expected {decoded_path} to be type {type(Lookup)}, but instead if was {(type(updatedLookup))}") + updatedLookup = filepath_to_content_map.get( + decoded_path, None + ) + if not isinstance(updatedLookup, Lookup): + raise Exception( + f"Expected {decoded_path} to be type {type(Lookup)}, but instead if was {(type(updatedLookup))}" + ) updated_lookups.add(updatedLookup) elif decoded_path.suffix == ".csv": - # If the CSV was updated, we want to make sure that we + # If the CSV was updated, we want to make sure that we # add the correct corresponding Lookup object. - #Filter to find the Lookup Object the references this CSV - matched = list(filter(lambda x: x.filename is not None and x.filename == decoded_path, self.director.lookups)) + # Filter to find the Lookup Object the references this CSV + matched = list( + filter( + lambda x: x.lookup_type == Lookup_Type.csv + and x.filename is not None + and x.filename == decoded_path, + self.director.lookups, + ) + ) if len(matched) == 0: - raise Exception(f"Failed to find any lookups that reference the modified CSV file '{decoded_path}'") + raise Exception( + f"Failed to find any lookups that reference the modified CSV file '{decoded_path}'" + ) elif len(matched) > 1: - raise Exception(f"More than 1 Lookup reference the modified CSV file '{decoded_path}': {[l.file_path for l in matched ]}") + raise Exception( + f"More than 1 Lookup reference the modified CSV file '{decoded_path}': {[l.file_path for l in matched]}" + ) else: updatedLookup = matched[0] elif decoded_path.suffix == ".mlmodel": - # Detected a changed .mlmodel file. However, since we do not have testing for these detections at + # Detected a changed .mlmodel file. However, since we do not have testing for these detections at # this time, we will ignore this change. updatedLookup = None else: - raise Exception(f"Detected a changed file in the lookups/ directory '{str(decoded_path)}'.\n" - "Only files ending in .csv, .yml, or .mlmodel are supported in this " - "directory. This file must be removed from the lookups/ directory.") - - if updatedLookup is not None and updatedLookup not in updated_lookups: + raise Exception( + f"Detected a changed file in the lookups/ directory '{str(decoded_path)}'.\n" + "Only files ending in .csv, .yml, or .mlmodel are supported in this " + "directory. This file must be removed from the lookups/ directory." + ) + + if ( + updatedLookup is not None + and updatedLookup not in updated_lookups + ): # It is possible that both the CSV and YML have been modified for the same lookup, - # and we do not want to add it twice. + # and we do not want to add it twice. updated_lookups.add(updatedLookup) else: pass - #print(f"Ignore changes to file {decoded_path} since it is not a detection, macro, or lookup.") + # print(f"Ignore changes to file {decoded_path} since it is not a detection, macro, or lookup.") else: raise Exception(f"Unrecognized diff type {type(diff)}") - # If a detection has at least one dependency on changed content, # then we must test it again - changed_macros_and_lookups_and_datasources:set[SecurityContentObject] = updated_macros.union(updated_lookups, updated_datasources) - + changed_macros_and_lookups_and_datasources: set[SecurityContentObject] = ( + updated_macros.union(updated_lookups, updated_datasources) + ) + for detection in self.director.detections: if detection in updated_detections: - # we are already planning to test it, don't need + # we are already planning to test it, don't need # to add it again continue for obj in changed_macros_and_lookups_and_datasources: if obj in detection.get_content_dependencies(): - updated_detections.add(detection) - break + updated_detections.add(detection) + break - #Print out the names of all modified/new content - modifiedAndNewContentString = "\n - ".join(sorted([d.name for d in updated_detections])) + # Print out the names of all modified/new content + modifiedAndNewContentString = "\n - ".join( + sorted([d.name for d in updated_detections]) + ) - print(f"[{len(updated_detections)}] Pieces of modifed and new content (this may include experimental/deprecated/manual_test content):\n - {modifiedAndNewContentString}") + print( + f"[{len(updated_detections)}] Pieces of modifed and new content (this may include experimental/deprecated/manual_test content):\n - {modifiedAndNewContentString}" + ) return sorted(list(updated_detections)) def getSelected(self, detectionFilenames: List[FilePath]) -> List[Detection]: filepath_to_content_map: dict[FilePath, SecurityContentObject] = { - obj.file_path: obj for (_, obj) in self.director.name_to_content_map.items() if obj.file_path is not None - } + obj.file_path: obj + for (_, obj) in self.director.name_to_content_map.items() + if obj.file_path is not None + } errors = [] detections: List[Detection] = [] for name in detectionFilenames: obj = filepath_to_content_map.get(name, None) if obj is None: - errors.append(f"There is no detection file or security_content_object at '{name}'") + errors.append( + f"There is no detection file or security_content_object at '{name}'" + ) elif not isinstance(obj, Detection): - errors.append(f"The security_content_object at '{name}' is of type '{type(obj).__name__}', NOT '{Detection.__name__}'") + errors.append( + f"The security_content_object at '{name}' is of type '{type(obj).__name__}', NOT '{Detection.__name__}'" + ) else: detections.append(obj) if errors: errorsString = "\n - ".join(errors) - raise Exception(f"The following errors were encountered while getting selected detections to test:\n - {errorsString}") - return detections \ No newline at end of file + raise Exception( + f"The following errors were encountered while getting selected detections to test:\n - {errorsString}" + ) + return detections From 3a470364792f9828e628419d14bb6e95618a519d Mon Sep 17 00:00:00 2001 From: ljstella Date: Tue, 21 Jan 2025 12:45:42 -0600 Subject: [PATCH 2/6] Remove extra precommit job --- .pre-commit-config.yaml | 1 - 1 file changed, 1 deletion(-) diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml index 30df3046..8ac4fea4 100644 --- a/.pre-commit-config.yaml +++ b/.pre-commit-config.yaml @@ -5,7 +5,6 @@ repos: - id: check-json - id: check-symlinks - id: check-yaml - - id: detect-aws-credentials - id: detect-private-key - id: forbid-submodules - repo: https://github.com/astral-sh/ruff-pre-commit From d1e015b0a45379c20e212716206eaba963e9484a Mon Sep 17 00:00:00 2001 From: pyth0n1c Date: Tue, 21 Jan 2025 12:46:55 -0800 Subject: [PATCH 3/6] small cleanup to use CsvLookup class. Also move warning message to end of error printout. --- contentctl/actions/detection_testing/GitService.py | 10 ++++------ contentctl/contentctl.py | 5 ++--- 2 files changed, 6 insertions(+), 9 deletions(-) diff --git a/contentctl/actions/detection_testing/GitService.py b/contentctl/actions/detection_testing/GitService.py index e598562d..be8d3d47 100644 --- a/contentctl/actions/detection_testing/GitService.py +++ b/contentctl/actions/detection_testing/GitService.py @@ -10,11 +10,10 @@ if TYPE_CHECKING: from contentctl.input.director import DirectorOutputDto - from contentctl.objects.config import All, Changes, Selected, test_common from contentctl.objects.data_source import DataSource from contentctl.objects.detection import Detection -from contentctl.objects.lookup import Lookup, Lookup_Type +from contentctl.objects.lookup import CSVLookup, Lookup from contentctl.objects.macro import Macro from contentctl.objects.security_content_object import SecurityContentObject @@ -150,8 +149,7 @@ def getChanges(self, target_branch: str) -> List[Detection]: # Filter to find the Lookup Object the references this CSV matched = list( filter( - lambda x: x.lookup_type == Lookup_Type.csv - and x.filename is not None + lambda x: isinstance(x, CSVLookup) and x.filename == decoded_path, self.director.lookups, ) @@ -162,7 +160,7 @@ def getChanges(self, target_branch: str) -> List[Detection]: ) elif len(matched) > 1: raise Exception( - f"More than 1 Lookup reference the modified CSV file '{decoded_path}': {[l.file_path for l in matched]}" + f"More than 1 Lookup reference the modified CSV file '{decoded_path}': {[match.file_path for match in matched]}" ) else: updatedLookup = matched[0] @@ -195,7 +193,7 @@ def getChanges(self, target_branch: str) -> List[Detection]: # If a detection has at least one dependency on changed content, # then we must test it again - changed_macros_and_lookups_and_datasources: set[SecurityContentObject] = ( + changed_macros_and_lookups_and_datasources: set[Macro | Lookup | DataSource] = ( updated_macros.union(updated_lookups, updated_datasources) ) diff --git a/contentctl/contentctl.py b/contentctl/contentctl.py index 05d9b952..753c11f8 100644 --- a/contentctl/contentctl.py +++ b/contentctl/contentctl.py @@ -246,8 +246,6 @@ def main(): print(e) sys.exit(1) except Exception as e: - print(CONTENTCTL_5_WARNING) - if config is None: print( "There was a serious issue where the config file could not be created.\n" @@ -265,8 +263,9 @@ def main(): "Verbose error logging is DISABLED.\n" "Please use the --verbose command line argument if you need more context for your error or file a bug report." ) - print(e) + print(e) + print(CONTENTCTL_5_WARNING) sys.exit(1) From c9458b8f3cfce9c0b86c2d93cfe2b23eea68cadd Mon Sep 17 00:00:00 2001 From: pyth0n1c Date: Tue, 21 Jan 2025 13:20:03 -0800 Subject: [PATCH 4/6] fix template so that an extra newline is not written to savedsearches.conf where it should not be. --- contentctl/output/templates/savedsearches_detections.j2 | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/contentctl/output/templates/savedsearches_detections.j2 b/contentctl/output/templates/savedsearches_detections.j2 index 0a5c634f..f3a14be1 100644 --- a/contentctl/output/templates/savedsearches_detections.j2 +++ b/contentctl/output/templates/savedsearches_detections.j2 @@ -74,7 +74,7 @@ action.notable.param.security_domain = {{ detection.tags.security_domain }} {% if detection.rba %} action.notable.param.severity = {{ detection.rba.severity }} {% else %} -{# Correlations do not have detection.rba defined, but should get a default severity #} +{# Correlations do not have detection.rba defined, but should get a default severity #} action.notable.param.severity = high {% endif %} {% endif %} From 0f2c000713d4b5f34a3934b107b2b3855790bff8 Mon Sep 17 00:00:00 2001 From: pyth0n1c Date: Tue, 21 Jan 2025 13:21:13 -0800 Subject: [PATCH 5/6] update alpha name in prep for release --- pyproject.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pyproject.toml b/pyproject.toml index 7d15db05..b1e53887 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,6 +1,6 @@ [tool.poetry] name = "contentctl" -version = "5.0.0-alpha" +version = "5.0.0-alpha.2" description = "Splunk Content Control Tool" authors = ["STRT "] From a86ba5e74830a783a76ff3f34dd20e1f32f9fe33 Mon Sep 17 00:00:00 2001 From: pyth0n1c Date: Tue, 21 Jan 2025 13:36:44 -0800 Subject: [PATCH 6/6] pin to 9.3 so that tests pass --- contentctl/objects/config.py | 1330 ++++++++++++++++++++-------------- 1 file changed, 771 insertions(+), 559 deletions(-) diff --git a/contentctl/objects/config.py b/contentctl/objects/config.py index 35801bd3..02fd4bd9 100644 --- a/contentctl/objects/config.py +++ b/contentctl/objects/config.py @@ -1,31 +1,39 @@ from __future__ import annotations -from os import environ -from datetime import datetime, UTC -from typing import Optional, Any, List, Union, Self -import random -from enum import StrEnum, auto import pathlib -from urllib.parse import urlparse +import random from abc import ABC, abstractmethod +from datetime import UTC, datetime +from enum import StrEnum, auto from functools import partialmethod +from os import environ +from typing import Any, List, Optional, Self, Union +from urllib.parse import urlparse -import tqdm import semantic_version +import tqdm from pydantic import ( - BaseModel, Field, field_validator, - field_serializer, ConfigDict, DirectoryPath, - PositiveInt, FilePath, HttpUrl, AnyUrl, model_validator, - ValidationInfo + AnyUrl, + BaseModel, + ConfigDict, + DirectoryPath, + Field, + FilePath, + HttpUrl, + PositiveInt, + ValidationInfo, + field_serializer, + field_validator, + model_validator, ) -from contentctl.objects.constants import DOWNLOADS_DIRECTORY -from contentctl.output.yml_writer import YmlWriter +from contentctl.helper.splunk_app import SplunkApp from contentctl.helper.utils import Utils -from contentctl.objects.enums import PostTestBehavior, DetectionTestingMode -from contentctl.objects.detection import Detection from contentctl.objects.annotated_types import APPID_TYPE -from contentctl.helper.splunk_app import SplunkApp +from contentctl.objects.constants import DOWNLOADS_DIRECTORY +from contentctl.objects.detection import Detection +from contentctl.objects.enums import PostTestBehavior +from contentctl.output.yml_writer import YmlWriter ENTERPRISE_SECURITY_UID = 263 COMMON_INFORMATION_MODEL_UID = 1621 @@ -33,27 +41,34 @@ SPLUNKBASE_URL = "https://splunkbase.splunk.com/app/{uid}/release/{version}/download" -class App_Base(BaseModel,ABC): - - model_config = ConfigDict(validate_default=True, arbitrary_types_allowed=True, extra='forbid') +class App_Base(BaseModel, ABC): + model_config = ConfigDict( + validate_default=True, arbitrary_types_allowed=True, extra="forbid" + ) uid: Optional[int] = Field(default=None) - title: str = Field(description="Human-readable name used by the app. This can have special characters.") - appid: Optional[APPID_TYPE]= Field(default=None,description="Internal name used by your app. " - "It may ONLY have characters, numbers, and underscores. No other characters are allowed.") - version: str = Field(description="The version of your Content Pack. This must follow semantic versioning guidelines.") - description: Optional[str] = Field(default="description of app",description="Free text description of the Content Pack.") - - - + title: str = Field( + description="Human-readable name used by the app. This can have special characters." + ) + appid: Optional[APPID_TYPE] = Field( + default=None, + description="Internal name used by your app. " + "It may ONLY have characters, numbers, and underscores. No other characters are allowed.", + ) + version: str = Field( + description="The version of your Content Pack. This must follow semantic versioning guidelines." + ) + description: Optional[str] = Field( + default="description of app", + description="Free text description of the Content Pack.", + ) - def getSplunkbasePath(self)->HttpUrl: + def getSplunkbasePath(self) -> HttpUrl: return HttpUrl(SPLUNKBASE_URL.format(uid=self.uid, release=self.version)) @abstractmethod - def getApp(self, config:test, stage_file:bool=False)->str: - ... + def getApp(self, config: test, stage_file: bool = False) -> str: ... - def ensureAppPathExists(self, config:test, stage_file:bool=False): + def ensureAppPathExists(self, config: test, stage_file: bool = False): if stage_file: if not config.getLocalAppDir().exists(): config.getLocalAppDir().mkdir(parents=True) @@ -61,30 +76,36 @@ def ensureAppPathExists(self, config:test, stage_file:bool=False): class TestApp(App_Base): model_config = ConfigDict(validate_default=True, arbitrary_types_allowed=True) - hardcoded_path: Optional[Union[FilePath,HttpUrl]] = Field(default=None, description="This may be a relative or absolute link to a file OR an HTTP URL linking to your app.") - + hardcoded_path: Optional[Union[FilePath, HttpUrl]] = Field( + default=None, + description="This may be a relative or absolute link to a file OR an HTTP URL linking to your app.", + ) - @field_serializer('hardcoded_path',when_used='always') - def serialize_path(path: Union[AnyUrl, pathlib.Path])->str: + @field_serializer("hardcoded_path", when_used="always") + def serialize_path(path: Union[AnyUrl, pathlib.Path]) -> str: return str(path) - def getApp(self, config:test,stage_file:bool=False)->str: - #If the apps directory does not exist, then create it - self.ensureAppPathExists(config,stage_file) + def getApp(self, config: test, stage_file: bool = False) -> str: + # If the apps directory does not exist, then create it + self.ensureAppPathExists(config, stage_file) - if config.splunk_api_password is not None and config.splunk_api_username is not None: + if ( + config.splunk_api_password is not None + and config.splunk_api_username is not None + ): if self.version is not None and self.uid is not None: - return str(self.getSplunkbasePath()) + return str(self.getSplunkbasePath()) if self.version is None or self.uid is None: - print(f"Not downloading {self.title} from Splunkbase since uid[{self.uid}] AND version[{self.version}] MUST be defined") - - + print( + f"Not downloading {self.title} from Splunkbase since uid[{self.uid}] AND version[{self.version}] MUST be defined" + ) + elif isinstance(self.hardcoded_path, pathlib.Path): destination = config.getLocalAppDir() / self.hardcoded_path.name if stage_file: - Utils.copy_local_file(str(self.hardcoded_path), - str(destination), - verbose_print=True) + Utils.copy_local_file( + str(self.hardcoded_path), str(destination), verbose_print=True + ) elif isinstance(self.hardcoded_path, AnyUrl): file_url_string = str(self.hardcoded_path) @@ -94,7 +115,7 @@ def getApp(self, config:test,stage_file:bool=False)->str: Utils.download_file_from_http(file_url_string, str(destination)) else: raise Exception(f"Unknown path for app '{self.title}'") - + return str(destination) @@ -102,15 +123,34 @@ class CustomApp(App_Base): model_config = ConfigDict(validate_default=True, arbitrary_types_allowed=True) # Fields required for app.conf based on # https://docs.splunk.com/Documentation/Splunk/9.0.4/Admin/Appconf - uid: int = Field(ge=2, lt=100000, default_factory=lambda:random.randint(20000,100000)) - title: str = Field(default="Content Pack",description="Human-readable name used by the app. This can have special characters.") - appid: APPID_TYPE = Field(default="ContentPack",description="Internal name used by your app. " - "It may ONLY have characters, numbers, and underscores. No other characters are allowed.") - version: str = Field(default="0.0.1",description="The version of your Content Pack. This must follow semantic versioning guidelines.", validate_default=True) - - prefix: str = Field(default="ContentPack",description="A short prefix to easily identify all your content.") - build: int = Field(exclude=True, default=int(datetime.now(UTC).strftime("%Y%m%d%H%M%S")), validate_default=True, - description="Build number for your app. This will always be a number that corresponds to the time of the build in the format YYYYMMDDHHMMSS") + uid: int = Field( + ge=2, lt=100000, default_factory=lambda: random.randint(20000, 100000) + ) + title: str = Field( + default="Content Pack", + description="Human-readable name used by the app. This can have special characters.", + ) + appid: APPID_TYPE = Field( + default="ContentPack", + description="Internal name used by your app. " + "It may ONLY have characters, numbers, and underscores. No other characters are allowed.", + ) + version: str = Field( + default="0.0.1", + description="The version of your Content Pack. This must follow semantic versioning guidelines.", + validate_default=True, + ) + + prefix: str = Field( + default="ContentPack", + description="A short prefix to easily identify all your content.", + ) + build: int = Field( + exclude=True, + default=int(datetime.now(UTC).strftime("%Y%m%d%H%M%S")), + validate_default=True, + description="Build number for your app. This will always be a number that corresponds to the time of the build in the format YYYYMMDDHHMMSS", + ) # id has many restrictions: # * Omit this setting for apps that are for internal use only and not intended # for upload to Splunkbase. @@ -126,161 +166,217 @@ class CustomApp(App_Base): # * must not be any of the following names: CON, PRN, AUX, NUL, # COM1, COM2, COM3, COM4, COM5, COM6, COM7, COM8, COM9, # LPT1, LPT2, LPT3, LPT4, LPT5, LPT6, LPT7, LPT8, LPT9 - - label: str = Field(default="Custom Splunk Content Pack",description="This is the app name that shows in the launcher.") - author_name: str = Field(default="author name",description="Name of the Content Pack Author.") - author_email: str = Field(default="author@contactemailaddress.com",description="Contact email for the Content Pack Author") - author_company: str = Field(default="author company",description="Name of the company who has developed the Content Pack") - description: str = Field(default="description of app",description="Free text description of the Content Pack.") + label: str = Field( + default="Custom Splunk Content Pack", + description="This is the app name that shows in the launcher.", + ) + author_name: str = Field( + default="author name", description="Name of the Content Pack Author." + ) + author_email: str = Field( + default="author@contactemailaddress.com", + description="Contact email for the Content Pack Author", + ) + author_company: str = Field( + default="author company", + description="Name of the company who has developed the Content Pack", + ) + description: str = Field( + default="description of app", + description="Free text description of the Content Pack.", + ) - @field_validator('version') + @field_validator("version") def validate_version(cls, v, values): try: _ = semantic_version.Version(v) except Exception as e: - raise(ValueError(f"The specified version does not follow the semantic versioning spec (https://semver.org/). {str(e)}")) + raise ( + ValueError( + f"The specified version does not follow the semantic versioning spec (https://semver.org/). {str(e)}" + ) + ) return v - - #Build will ALWAYS be the current utc timestamp - @field_validator('build') + + # Build will ALWAYS be the current utc timestamp + @field_validator("build") def validate_build(cls, v, values): return int(datetime.utcnow().strftime("%Y%m%d%H%M%S")) - - def getApp(self, config:test, stage_file=True)->str: - self.ensureAppPathExists(config,stage_file) - - destination = config.getLocalAppDir() / (config.getPackageFilePath(include_version=True).name) + + def getApp(self, config: test, stage_file=True) -> str: + self.ensureAppPathExists(config, stage_file) + + destination = config.getLocalAppDir() / ( + config.getPackageFilePath(include_version=True).name + ) if stage_file: - Utils.copy_local_file(str(config.getPackageFilePath(include_version=True)), - str(destination), - verbose_print=True) + Utils.copy_local_file( + str(config.getPackageFilePath(include_version=True)), + str(destination), + verbose_print=True, + ) return str(destination) - + + class Config_Base(BaseModel): model_config = ConfigDict(validate_default=True, arbitrary_types_allowed=True) - path: DirectoryPath = Field(default=DirectoryPath("."), description="The root of your app.") - app:CustomApp = Field(default_factory=CustomApp) - verbose:bool = Field(default=False, description="Enable verbose error logging, including a stacktrace. " - "This option makes debugging contentctl errors much easier, but produces way more " - "output than is useful under most uses cases. " - "Please use this flag if you are submitting a bug report or issue on GitHub.") - - @field_serializer('path',when_used='always') - def serialize_path(path: DirectoryPath)->str: + path: DirectoryPath = Field( + default=DirectoryPath("."), description="The root of your app." + ) + app: CustomApp = Field(default_factory=CustomApp) + verbose: bool = Field( + default=False, + description="Enable verbose error logging, including a stacktrace. " + "This option makes debugging contentctl errors much easier, but produces way more " + "output than is useful under most uses cases. " + "Please use this flag if you are submitting a bug report or issue on GitHub.", + ) + + @field_serializer("path", when_used="always") + def serialize_path(path: DirectoryPath) -> str: return str(path) + class init(Config_Base): model_config = ConfigDict(validate_default=True, arbitrary_types_allowed=True) - bare: bool = Field(default=False, description="contentctl normally provides some some example content " - "(macros, stories, data_sources, and/or analytic stories). This option disables " - "initialization with that additional contnet. Note that even if --bare is used, it " - "init will still create the directory structure of the app, " - "include the app_template directory with default content, and content in " - "the deployment/ directory (since it is not yet easily customizable).") + bare: bool = Field( + default=False, + description="contentctl normally provides some some example content " + "(macros, stories, data_sources, and/or analytic stories). This option disables " + "initialization with that additional contnet. Note that even if --bare is used, it " + "init will still create the directory structure of the app, " + "include the app_template directory with default content, and content in " + "the deployment/ directory (since it is not yet easily customizable).", + ) class validate(Config_Base): model_config = ConfigDict(validate_default=True, arbitrary_types_allowed=True) - enrichments: bool = Field(default=False, description="Enable MITRE, APP, and CVE Enrichments. "\ - "This is useful when outputting a release build "\ - "and validating these values, but should otherwise "\ - "be avoided for performance reasons.") - build_app: bool = Field(default=True, description="Should an app be built and output in the build_path?") - build_api: bool = Field(default=False, description="Should api objects be built and output in the build_path?") - data_source_TA_validation: bool = Field(default=False, description="Validate latest TA information from Splunkbase") + enrichments: bool = Field( + default=False, + description="Enable MITRE, APP, and CVE Enrichments. " + "This is useful when outputting a release build " + "and validating these values, but should otherwise " + "be avoided for performance reasons.", + ) + build_app: bool = Field( + default=True, description="Should an app be built and output in the build_path?" + ) + build_api: bool = Field( + default=False, + description="Should api objects be built and output in the build_path?", + ) + data_source_TA_validation: bool = Field( + default=False, description="Validate latest TA information from Splunkbase" + ) @property - def external_repos_path(self)->pathlib.Path: - return self.path/"external_repos" + def external_repos_path(self) -> pathlib.Path: + return self.path / "external_repos" - @property - def mitre_cti_repo_path(self)->pathlib.Path: - return self.external_repos_path/"cti" + @property + def mitre_cti_repo_path(self) -> pathlib.Path: + return self.external_repos_path / "cti" @property def atomic_red_team_repo_path(self): - return self.external_repos_path/"atomic-red-team" + return self.external_repos_path / "atomic-red-team" @model_validator(mode="after") - def ensureEnrichmentReposPresent(self)->Self: - ''' - Ensures that the enrichments repos, the atomic red team repo and the + def ensureEnrichmentReposPresent(self) -> Self: + """ + Ensures that the enrichments repos, the atomic red team repo and the mitre attack enrichment repo, are present at the inded path. Raises a detailed exception if either of these are not present when enrichments are enabled. - ''' + """ if not self.enrichments: return self # If enrichments are enabled, ensure that all of the # enrichment directories exist - missing_repos:list[str] = [] + missing_repos: list[str] = [] if not self.atomic_red_team_repo_path.is_dir(): - missing_repos.append(f"https://github.com/redcanaryco/atomic-red-team {self.atomic_red_team_repo_path}") + missing_repos.append( + f"https://github.com/redcanaryco/atomic-red-team {self.atomic_red_team_repo_path}" + ) if not self.mitre_cti_repo_path.is_dir(): - missing_repos.append(f"https://github.com/mitre/cti {self.mitre_cti_repo_path}") - - if len(missing_repos) > 0: - msg_list = ["The following repositories, which are required for enrichment, have not " - f"been checked out to the {self.external_repos_path} directory. " - "Please check them out using the following commands:"] - msg_list.extend([f"git clone --single-branch {repo_string}" for repo_string in missing_repos]) - msg = '\n\t'.join(msg_list) + missing_repos.append( + f"https://github.com/mitre/cti {self.mitre_cti_repo_path}" + ) + + if len(missing_repos) > 0: + msg_list = [ + "The following repositories, which are required for enrichment, have not " + f"been checked out to the {self.external_repos_path} directory. " + "Please check them out using the following commands:" + ] + msg_list.extend( + [ + f"git clone --single-branch {repo_string}" + for repo_string in missing_repos + ] + ) + msg = "\n\t".join(msg_list) raise FileNotFoundError(msg) return self + class report(validate): - #reporting takes no extra args, but we define it here so that it can be a mode on the command line - def getReportingPath(self)->pathlib.Path: - return self.path/"reporting/" + # reporting takes no extra args, but we define it here so that it can be a mode on the command line + def getReportingPath(self) -> pathlib.Path: + return self.path / "reporting/" class build(validate): model_config = ConfigDict(validate_default=True, arbitrary_types_allowed=True) - build_path: DirectoryPath = Field(default=DirectoryPath("dist/"), title="Target path for all build outputs") + build_path: DirectoryPath = Field( + default=DirectoryPath("dist/"), title="Target path for all build outputs" + ) - @field_serializer('build_path',when_used='always') - def serialize_build_path(path: DirectoryPath)->str: + @field_serializer("build_path", when_used="always") + def serialize_build_path(path: DirectoryPath) -> str: return str(path) - @field_validator('build_path',mode='before') + @field_validator("build_path", mode="before") @classmethod - def ensure_build_path(cls, v:Union[str,DirectoryPath]): - ''' + def ensure_build_path(cls, v: Union[str, DirectoryPath]): + """ If the build path does not exist, then create it. If the build path is actually a file, then raise a descriptive exception. - ''' - if isinstance(v,str): + """ + if isinstance(v, str): v = pathlib.Path(v) if v.is_dir(): return v elif v.is_file(): - raise ValueError(f"Build path {v} must be a directory, but instead it is a file") + raise ValueError( + f"Build path {v} must be a directory, but instead it is a file" + ) elif not v.exists(): v.mkdir(parents=True) return v - - def getBuildDir(self)->pathlib.Path: + + def getBuildDir(self) -> pathlib.Path: return self.path / self.build_path - def getPackageDirectoryPath(self)->pathlib.Path: - return self.getBuildDir() / f"{self.app.appid}" - + def getPackageDirectoryPath(self) -> pathlib.Path: + return self.getBuildDir() / f"{self.app.appid}" - def getPackageFilePath(self, include_version:bool=False)->pathlib.Path: + def getPackageFilePath(self, include_version: bool = False) -> pathlib.Path: if include_version: return self.getBuildDir() / f"{self.app.appid}-{self.app.version}.tar.gz" else: return self.getBuildDir() / f"{self.app.appid}-latest.tar.gz" - def getAPIPath(self)->pathlib.Path: + def getAPIPath(self) -> pathlib.Path: return self.getBuildDir() / "api" - def getAppTemplatePath(self)->pathlib.Path: - return self.path/"app_template" + def getAppTemplatePath(self) -> pathlib.Path: + return self.path / "app_template" class StackType(StrEnum): @@ -289,20 +385,19 @@ class StackType(StrEnum): class inspect(build): - splunk_api_username: str = Field( description="Splunk API username used for appinspect and Splunkbase downloads." ) splunk_api_password: str = Field( exclude=True, - description="Splunk API password used for appinspect and Splunkbase downloads." + description="Splunk API password used for appinspect and Splunkbase downloads.", ) enable_metadata_validation: bool = Field( default=False, description=( "Flag indicating whether detection metadata validation and versioning enforcement " "should be enabled." - ) + ), ) suppress_missing_content_exceptions: bool = Field( default=False, @@ -312,15 +407,15 @@ class inspect(build): "is not accidentally removed. In order to support testing both public and private " "content, this warning can be suppressed. If it is suppressed, it will still be " "printed out as a warning." - ) + ), ) enrichments: bool = Field( default=True, description=( "[NOTE: enrichments must be ENABLED for inspect to run. Please adjust your config " f"or CLI invocation appropriately] {validate.model_fields['enrichments'].description}" - ) - ) + ), + ) # TODO (cmcginley): wording should change here if we want to be able to download any app from # Splunkbase previous_build: str | None = Field( @@ -328,13 +423,15 @@ class inspect(build): description=( "Local path to the previous app build for metatdata validation and versioning " "enforcement (defaults to the latest release of the app published on Splunkbase)." - ) + ), ) stack_type: StackType = Field(description="The type of your Splunk Cloud Stack") @field_validator("enrichments", mode="after") @classmethod - def validate_needed_flags_metadata_validation(cls, v: bool, info: ValidationInfo) -> bool: + def validate_needed_flags_metadata_validation( + cls, v: bool, info: ValidationInfo + ) -> bool: """ Validates that `enrichments` is True for the inspect action @@ -348,7 +445,9 @@ def validate_needed_flags_metadata_validation(cls, v: bool, info: ValidationInfo """ # Enforce that `enrichments` is True for the inspect action if v is False: - raise ValueError("Field `enrichments` must be True for the `inspect` action") + raise ValueError( + "Field `enrichments` must be True for the `inspect` action" + ) return v @@ -374,9 +473,11 @@ def get_previous_package_file_path(self) -> pathlib.Path: username=self.splunk_api_username, password=self.splunk_api_password, is_dir=True, - overwrite=True + overwrite=True, + ) + print( + f"Latest release downloaded from Splunkbase to: {previous_build_path}" ) - print(f"Latest release downloaded from Splunkbase to: {previous_build_path}") self.previous_build = str(previous_build_path) return pathlib.Path(previous_build_path) @@ -387,22 +488,36 @@ class NewContentType(StrEnum): class new(Config_Base): - type: NewContentType = Field(default=NewContentType.detection, description="Specify the type of content you would like to create.") + type: NewContentType = Field( + default=NewContentType.detection, + description="Specify the type of content you would like to create.", + ) class deploy_acs(inspect): model_config = ConfigDict(validate_default=False, arbitrary_types_allowed=True) - #ignore linter error - splunk_cloud_jwt_token: str = Field(exclude=True, description="Splunk JWT used for performing ACS operations on a Splunk Cloud Instance") + # ignore linter error + splunk_cloud_jwt_token: str = Field( + exclude=True, + description="Splunk JWT used for performing ACS operations on a Splunk Cloud Instance", + ) splunk_cloud_stack: str = Field(description="The name of your Splunk Cloud Stack") class Infrastructure(BaseModel): model_config = ConfigDict(validate_default=True, arbitrary_types_allowed=True) - splunk_app_username:str = Field(default="admin", description="Username for logging in to your Splunk Server") - splunk_app_password:str = Field(exclude=True, default="password", description="Password for logging in to your Splunk Server.") - instance_address:str = Field(..., description="Address of your splunk server.") - hec_port: int = Field(default=8088, gt=1, lt=65536, title="HTTP Event Collector Port") + splunk_app_username: str = Field( + default="admin", description="Username for logging in to your Splunk Server" + ) + splunk_app_password: str = Field( + exclude=True, + default="password", + description="Password for logging in to your Splunk Server.", + ) + instance_address: str = Field(..., description="Address of your splunk server.") + hec_port: int = Field( + default=8088, gt=1, lt=65536, title="HTTP Event Collector Port" + ) web_ui_port: int = Field(default=8000, gt=1, lt=65536, title="Web UI Port") api_port: int = Field(default=8089, gt=1, lt=65536, title="REST API Port") instance_name: str = Field(...) @@ -410,431 +525,480 @@ class Infrastructure(BaseModel): class Container(Infrastructure): model_config = ConfigDict(validate_default=True, arbitrary_types_allowed=True) - instance_address:str = Field(default="localhost", description="Address of your splunk server.") + instance_address: str = Field( + default="localhost", description="Address of your splunk server." + ) class ContainerSettings(BaseModel): model_config = ConfigDict(validate_default=True, arbitrary_types_allowed=True) - leave_running: bool = Field(default=True, description="Leave container running after it is first " - "set up to speed up subsequent test runs.") - num_containers: PositiveInt = Field(default=1, description="Number of containers to start in parallel. " - "Please note that each container is quite expensive to run. It is not " - "recommended to run more than 4 containers unless you have a very " - "well-resourced environment.") - full_image_path:str = Field(default="registry.hub.docker.com/splunk/splunk:latest", - title="Full path to the container image to be used") - - def getContainers(self)->List[Container]: + leave_running: bool = Field( + default=True, + description="Leave container running after it is first " + "set up to speed up subsequent test runs.", + ) + num_containers: PositiveInt = Field( + default=1, + description="Number of containers to start in parallel. " + "Please note that each container is quite expensive to run. It is not " + "recommended to run more than 4 containers unless you have a very " + "well-resourced environment.", + ) + full_image_path: str = Field( + default="registry.hub.docker.com/splunk/splunk:9.3", + title="Full path to the container image to be used. We are currently pinned to 9.3 as we resolve an issue with waiting to run until app installation completes.", + ) + + def getContainers(self) -> List[Container]: containers = [] for i in range(self.num_containers): - containers.append(Container(instance_name="contentctl_{}".format(i), - web_ui_port=8000+i, hec_port=8088+(i*2), api_port=8089+(i*2))) + containers.append( + Container( + instance_name="contentctl_{}".format(i), + web_ui_port=8000 + i, + hec_port=8088 + (i * 2), + api_port=8089 + (i * 2), + ) + ) return containers class All(BaseModel): - #Doesn't need any extra logic - mode_name:str = "All" + # Doesn't need any extra logic + mode_name: str = "All" pass class Changes(BaseModel): model_config = ConfigDict(validate_default=True, arbitrary_types_allowed=True) mode_name: str = "Changes" - target_branch:str = Field(...,description="The target branch to diff against. Note that this includes uncommitted changes in the working directory as well.") + target_branch: str = Field( + ..., + description="The target branch to diff against. Note that this includes uncommitted changes in the working directory as well.", + ) class Selected(BaseModel): model_config = ConfigDict(validate_default=True, arbitrary_types_allowed=True) - mode_name:str = "Selected" - files:List[FilePath] = Field(...,description="List of detection files to test, separated by spaces.") + mode_name: str = "Selected" + files: List[FilePath] = Field( + ..., description="List of detection files to test, separated by spaces." + ) - @field_serializer('files',when_used='always') - def serialize_path(paths: List[FilePath])->List[str]: + @field_serializer("files", when_used="always") + def serialize_path(paths: List[FilePath]) -> List[str]: return [str(path) for path in paths] -DEFAULT_APPS:List[TestApp] = [ - TestApp( - uid=1621, - appid="Splunk_SA_CIM", - title="Splunk Common Information Model (CIM)", - version="5.2.0", - hardcoded_path=HttpUrl( - "https://attack-range-appbinaries.s3.us-west-2.amazonaws.com/Latest/splunk-common-information-model-cim_520.tgz" - ), - ), - TestApp( - uid=6553, - appid="Splunk_TA_okta_identity_cloud", - title="Splunk Add-on for Okta Identity Cloud", - version="2.1.0", - hardcoded_path=HttpUrl( - "https://attack-range-appbinaries.s3.us-west-2.amazonaws.com/Latest/splunk-add-on-for-okta-identity-cloud_210.tgz" - ), + +DEFAULT_APPS: List[TestApp] = [ + TestApp( + uid=1621, + appid="Splunk_SA_CIM", + title="Splunk Common Information Model (CIM)", + version="5.2.0", + hardcoded_path=HttpUrl( + "https://attack-range-appbinaries.s3.us-west-2.amazonaws.com/Latest/splunk-common-information-model-cim_520.tgz" + ), + ), + TestApp( + uid=6553, + appid="Splunk_TA_okta_identity_cloud", + title="Splunk Add-on for Okta Identity Cloud", + version="2.1.0", + hardcoded_path=HttpUrl( + "https://attack-range-appbinaries.s3.us-west-2.amazonaws.com/Latest/splunk-add-on-for-okta-identity-cloud_210.tgz" ), - TestApp( - uid=6176, - appid="Splunk_TA_linux_sysmon", - title="Add-on for Linux Sysmon", - version="1.0.4", - hardcoded_path=HttpUrl( - "https://attack-range-appbinaries.s3.us-west-2.amazonaws.com/Latest/add-on-for-linux-sysmon_104.tgz" - ), + ), + TestApp( + uid=6176, + appid="Splunk_TA_linux_sysmon", + title="Add-on for Linux Sysmon", + version="1.0.4", + hardcoded_path=HttpUrl( + "https://attack-range-appbinaries.s3.us-west-2.amazonaws.com/Latest/add-on-for-linux-sysmon_104.tgz" ), - TestApp( - appid="Splunk_FIX_XMLWINEVENTLOG_HEC_PARSING", - title="Splunk Fix XmlWinEventLog HEC Parsing", - version="0.1", - description="This TA is required for replaying Windows Data into the Test Environment. The Default TA does not include logic for properly splitting multiple log events in a single file. In production environments, this logic is applied by the Universal Forwarder.", - hardcoded_path=HttpUrl( - "https://attack-range-appbinaries.s3.us-west-2.amazonaws.com/Latest/Splunk_TA_fix_windows.tgz" - ), + ), + TestApp( + appid="Splunk_FIX_XMLWINEVENTLOG_HEC_PARSING", + title="Splunk Fix XmlWinEventLog HEC Parsing", + version="0.1", + description="This TA is required for replaying Windows Data into the Test Environment. The Default TA does not include logic for properly splitting multiple log events in a single file. In production environments, this logic is applied by the Universal Forwarder.", + hardcoded_path=HttpUrl( + "https://attack-range-appbinaries.s3.us-west-2.amazonaws.com/Latest/Splunk_TA_fix_windows.tgz" ), - TestApp( - uid=742, - appid="SPLUNK_ADD_ON_FOR_MICROSOFT_WINDOWS", - title="Splunk Add-on for Microsoft Windows", - version="8.8.0", - hardcoded_path=HttpUrl( - "https://attack-range-appbinaries.s3.us-west-2.amazonaws.com/Latest/splunk-add-on-for-microsoft-windows_880.tgz" - ), + ), + TestApp( + uid=742, + appid="SPLUNK_ADD_ON_FOR_MICROSOFT_WINDOWS", + title="Splunk Add-on for Microsoft Windows", + version="8.8.0", + hardcoded_path=HttpUrl( + "https://attack-range-appbinaries.s3.us-west-2.amazonaws.com/Latest/splunk-add-on-for-microsoft-windows_880.tgz" ), - TestApp( - uid=5709, - appid="Splunk_TA_microsoft_sysmon", - title="Splunk Add-on for Sysmon", - version="4.0.0", - hardcoded_path=HttpUrl( - "https://attack-range-appbinaries.s3.us-west-2.amazonaws.com/Latest/splunk-add-on-for-sysmon_400.tgz" - ), + ), + TestApp( + uid=5709, + appid="Splunk_TA_microsoft_sysmon", + title="Splunk Add-on for Sysmon", + version="4.0.0", + hardcoded_path=HttpUrl( + "https://attack-range-appbinaries.s3.us-west-2.amazonaws.com/Latest/splunk-add-on-for-sysmon_400.tgz" ), - TestApp( - uid=833, - appid="Splunk_TA_nix", - title="Splunk Add-on for Unix and Linux", - version="9.0.0", - hardcoded_path=HttpUrl( - "https://attack-range-appbinaries.s3.us-west-2.amazonaws.com/Latest/splunk-add-on-for-unix-and-linux_900.tgz" - ), + ), + TestApp( + uid=833, + appid="Splunk_TA_nix", + title="Splunk Add-on for Unix and Linux", + version="9.0.0", + hardcoded_path=HttpUrl( + "https://attack-range-appbinaries.s3.us-west-2.amazonaws.com/Latest/splunk-add-on-for-unix-and-linux_900.tgz" ), - TestApp( - uid=5579, - appid="Splunk_TA_CrowdStrike_FDR", - title="Splunk Add-on for CrowdStrike FDR", - version="1.5.0", - hardcoded_path=HttpUrl( - "https://attack-range-appbinaries.s3.us-west-2.amazonaws.com/Latest/splunk-add-on-for-crowdstrike-fdr_150.tgz" - ), + ), + TestApp( + uid=5579, + appid="Splunk_TA_CrowdStrike_FDR", + title="Splunk Add-on for CrowdStrike FDR", + version="1.5.0", + hardcoded_path=HttpUrl( + "https://attack-range-appbinaries.s3.us-west-2.amazonaws.com/Latest/splunk-add-on-for-crowdstrike-fdr_150.tgz" ), - TestApp( - uid=3185, - appid="SPLUNK_TA_FOR_IIS", - title="Splunk Add-on for Microsoft IIS", - version="1.3.0", - hardcoded_path=HttpUrl( - "https://attack-range-appbinaries.s3.us-west-2.amazonaws.com/Latest/splunk-add-on-for-microsoft-iis_130.tgz" - ), + ), + TestApp( + uid=3185, + appid="SPLUNK_TA_FOR_IIS", + title="Splunk Add-on for Microsoft IIS", + version="1.3.0", + hardcoded_path=HttpUrl( + "https://attack-range-appbinaries.s3.us-west-2.amazonaws.com/Latest/splunk-add-on-for-microsoft-iis_130.tgz" ), - TestApp( - uid=4242, - appid="SPLUNK_TA_FOR_SURICATA", - title="TA for Suricata", - version="2.3.4", - hardcoded_path=HttpUrl( - "https://attack-range-appbinaries.s3.us-west-2.amazonaws.com/Latest/ta-for-suricata_234.tgz" - ), + ), + TestApp( + uid=4242, + appid="SPLUNK_TA_FOR_SURICATA", + title="TA for Suricata", + version="2.3.4", + hardcoded_path=HttpUrl( + "https://attack-range-appbinaries.s3.us-west-2.amazonaws.com/Latest/ta-for-suricata_234.tgz" ), - TestApp( - uid=5466, - appid="SPLUNK_TA_FOR_ZEEK", - title="TA for Zeek", - version="1.0.6", - hardcoded_path=HttpUrl( - "https://attack-range-appbinaries.s3.us-west-2.amazonaws.com/Latest/ta-for-zeek_106.tgz" - ), + ), + TestApp( + uid=5466, + appid="SPLUNK_TA_FOR_ZEEK", + title="TA for Zeek", + version="1.0.6", + hardcoded_path=HttpUrl( + "https://attack-range-appbinaries.s3.us-west-2.amazonaws.com/Latest/ta-for-zeek_106.tgz" ), - TestApp( - uid=3258, - appid="SPLUNK_ADD_ON_FOR_NGINX", - title="Splunk Add-on for NGINX", - version="3.2.2", - hardcoded_path=HttpUrl( - "https://attack-range-appbinaries.s3.us-west-2.amazonaws.com/Latest/splunk-add-on-for-nginx_322.tgz" - ), + ), + TestApp( + uid=3258, + appid="SPLUNK_ADD_ON_FOR_NGINX", + title="Splunk Add-on for NGINX", + version="3.2.2", + hardcoded_path=HttpUrl( + "https://attack-range-appbinaries.s3.us-west-2.amazonaws.com/Latest/splunk-add-on-for-nginx_322.tgz" ), - TestApp( - uid=5238, - appid="SPLUNK_ADD_ON_FOR_STREAM_FORWARDERS", - title="Splunk Add-on for Stream Forwarders", - version="8.1.1", - hardcoded_path=HttpUrl( - "https://attack-range-appbinaries.s3.us-west-2.amazonaws.com/Latest/splunk-add-on-for-stream-forwarders_811.tgz" - ), + ), + TestApp( + uid=5238, + appid="SPLUNK_ADD_ON_FOR_STREAM_FORWARDERS", + title="Splunk Add-on for Stream Forwarders", + version="8.1.1", + hardcoded_path=HttpUrl( + "https://attack-range-appbinaries.s3.us-west-2.amazonaws.com/Latest/splunk-add-on-for-stream-forwarders_811.tgz" ), - TestApp( - uid=5234, - appid="SPLUNK_ADD_ON_FOR_STREAM_WIRE_DATA", - title="Splunk Add-on for Stream Wire Data", - version="8.1.1", - hardcoded_path=HttpUrl( - "https://attack-range-appbinaries.s3.us-west-2.amazonaws.com/Latest/splunk-add-on-for-stream-wire-data_811.tgz" - ), + ), + TestApp( + uid=5234, + appid="SPLUNK_ADD_ON_FOR_STREAM_WIRE_DATA", + title="Splunk Add-on for Stream Wire Data", + version="8.1.1", + hardcoded_path=HttpUrl( + "https://attack-range-appbinaries.s3.us-west-2.amazonaws.com/Latest/splunk-add-on-for-stream-wire-data_811.tgz" ), - TestApp( - uid=2757, - appid="PALO_ALTO_NETWORKS_ADD_ON_FOR_SPLUNK", - title="Palo Alto Networks Add-on for Splunk", - version="8.1.1", - hardcoded_path=HttpUrl( - "https://attack-range-appbinaries.s3.us-west-2.amazonaws.com/Latest/palo-alto-networks-add-on-for-splunk_811.tgz" - ), + ), + TestApp( + uid=2757, + appid="PALO_ALTO_NETWORKS_ADD_ON_FOR_SPLUNK", + title="Palo Alto Networks Add-on for Splunk", + version="8.1.1", + hardcoded_path=HttpUrl( + "https://attack-range-appbinaries.s3.us-west-2.amazonaws.com/Latest/palo-alto-networks-add-on-for-splunk_811.tgz" ), - TestApp( - uid=3865, - appid="Zscaler_CIM", - title="Zscaler Technical Add-On for Splunk", - version="4.0.3", - hardcoded_path=HttpUrl( - "https://attack-range-appbinaries.s3.us-west-2.amazonaws.com/Latest/zscaler-technical-add-on-for-splunk_403.tgz" - ), + ), + TestApp( + uid=3865, + appid="Zscaler_CIM", + title="Zscaler Technical Add-On for Splunk", + version="4.0.3", + hardcoded_path=HttpUrl( + "https://attack-range-appbinaries.s3.us-west-2.amazonaws.com/Latest/zscaler-technical-add-on-for-splunk_403.tgz" ), - TestApp( - uid=3719, - appid="SPLUNK_ADD_ON_FOR_AMAZON_KINESIS_FIREHOSE", - title="Splunk Add-on for Amazon Kinesis Firehose", - version="1.3.2", - hardcoded_path=HttpUrl( - "https://attack-range-appbinaries.s3.us-west-2.amazonaws.com/Latest/splunk-add-on-for-amazon-kinesis-firehose_132.tgz" - ), + ), + TestApp( + uid=3719, + appid="SPLUNK_ADD_ON_FOR_AMAZON_KINESIS_FIREHOSE", + title="Splunk Add-on for Amazon Kinesis Firehose", + version="1.3.2", + hardcoded_path=HttpUrl( + "https://attack-range-appbinaries.s3.us-west-2.amazonaws.com/Latest/splunk-add-on-for-amazon-kinesis-firehose_132.tgz" ), - TestApp( - uid=1876, - appid="Splunk_TA_aws", - title="Splunk Add-on for AWS", - version="7.5.0", - hardcoded_path=HttpUrl( - "https://attack-range-appbinaries.s3.us-west-2.amazonaws.com/Latest/splunk-add-on-for-amazon-web-services-aws_750.tgz" - ), + ), + TestApp( + uid=1876, + appid="Splunk_TA_aws", + title="Splunk Add-on for AWS", + version="7.5.0", + hardcoded_path=HttpUrl( + "https://attack-range-appbinaries.s3.us-west-2.amazonaws.com/Latest/splunk-add-on-for-amazon-web-services-aws_750.tgz" ), - TestApp( - uid=3088, - appid="SPLUNK_ADD_ON_FOR_GOOGLE_CLOUD_PLATFORM", - title="Splunk Add-on for Google Cloud Platform", - version="4.4.0", - hardcoded_path=HttpUrl( - "https://attack-range-appbinaries.s3.us-west-2.amazonaws.com/Latest/splunk-add-on-for-google-cloud-platform_440.tgz" - ), + ), + TestApp( + uid=3088, + appid="SPLUNK_ADD_ON_FOR_GOOGLE_CLOUD_PLATFORM", + title="Splunk Add-on for Google Cloud Platform", + version="4.4.0", + hardcoded_path=HttpUrl( + "https://attack-range-appbinaries.s3.us-west-2.amazonaws.com/Latest/splunk-add-on-for-google-cloud-platform_440.tgz" ), - TestApp( - uid=5556, - appid="SPLUNK_ADD_ON_FOR_GOOGLE_WORKSPACE", - title="Splunk Add-on for Google Workspace", - version="2.6.3", - hardcoded_path=HttpUrl( - "https://attack-range-appbinaries.s3.us-west-2.amazonaws.com/Latest/splunk-add-on-for-google-workspace_263.tgz" - ), + ), + TestApp( + uid=5556, + appid="SPLUNK_ADD_ON_FOR_GOOGLE_WORKSPACE", + title="Splunk Add-on for Google Workspace", + version="2.6.3", + hardcoded_path=HttpUrl( + "https://attack-range-appbinaries.s3.us-west-2.amazonaws.com/Latest/splunk-add-on-for-google-workspace_263.tgz" ), - TestApp( - uid=3110, - appid="SPLUNK_TA_MICROSOFT_CLOUD_SERVICES", - title="Splunk Add-on for Microsoft Cloud Services", - version="5.2.2", - hardcoded_path=HttpUrl( - "https://attack-range-appbinaries.s3.us-west-2.amazonaws.com/Latest/splunk-add-on-for-microsoft-cloud-services_522.tgz" - ), + ), + TestApp( + uid=3110, + appid="SPLUNK_TA_MICROSOFT_CLOUD_SERVICES", + title="Splunk Add-on for Microsoft Cloud Services", + version="5.2.2", + hardcoded_path=HttpUrl( + "https://attack-range-appbinaries.s3.us-west-2.amazonaws.com/Latest/splunk-add-on-for-microsoft-cloud-services_522.tgz" ), - TestApp( - uid=4055, - appid="SPLUNK_ADD_ON_FOR_MICROSOFT_OFFICE_365", - title="Splunk Add-on for Microsoft Office 365", - version="4.5.1", - hardcoded_path=HttpUrl( - "https://attack-range-appbinaries.s3.us-west-2.amazonaws.com/Latest/splunk-add-on-for-microsoft-office-365_451.tgz" - ), + ), + TestApp( + uid=4055, + appid="SPLUNK_ADD_ON_FOR_MICROSOFT_OFFICE_365", + title="Splunk Add-on for Microsoft Office 365", + version="4.5.1", + hardcoded_path=HttpUrl( + "https://attack-range-appbinaries.s3.us-west-2.amazonaws.com/Latest/splunk-add-on-for-microsoft-office-365_451.tgz" ), - TestApp( - uid=2890, - appid="SPLUNK_MACHINE_LEARNING_TOOLKIT", - title="Splunk Machine Learning Toolkit", - version="5.4.1", - hardcoded_path=HttpUrl( - "https://attack-range-appbinaries.s3.us-west-2.amazonaws.com/Latest/splunk-machine-learning-toolkit_541.tgz" - ), + ), + TestApp( + uid=2890, + appid="SPLUNK_MACHINE_LEARNING_TOOLKIT", + title="Splunk Machine Learning Toolkit", + version="5.4.1", + hardcoded_path=HttpUrl( + "https://attack-range-appbinaries.s3.us-west-2.amazonaws.com/Latest/splunk-machine-learning-toolkit_541.tgz" ), - TestApp( - uid=2734, - appid="URL_TOOLBOX", - title="URL Toolbox", - version="1.9.2", - hardcoded_path=HttpUrl( - "https://attack-range-appbinaries.s3.us-west-2.amazonaws.com/Latest/url-toolbox_192.tgz" - ), + ), + TestApp( + uid=2734, + appid="URL_TOOLBOX", + title="URL Toolbox", + version="1.9.2", + hardcoded_path=HttpUrl( + "https://attack-range-appbinaries.s3.us-west-2.amazonaws.com/Latest/url-toolbox_192.tgz" ), - ] + ), +] + class test_common(build): - mode:Union[Changes, Selected, All] = Field(All(), union_mode='left_to_right') - post_test_behavior: PostTestBehavior = Field(default=PostTestBehavior.pause_on_failure, description="Controls what to do when a test completes.\n\n" - f"'{PostTestBehavior.always_pause}' - the state of " - "the test will always pause after a test, allowing the user to log into the " - "server and experiment with the search and data before it is removed.\n\n" - f"'{PostTestBehavior.pause_on_failure}' - pause execution ONLY when a test fails. The user may press ENTER in the terminal " - "running the test to move on to the next test.\n\n" - f"'{PostTestBehavior.never_pause}' - never stop testing, even if a test fails.\n\n" - "***SPECIAL NOTE FOR CI/CD*** 'never_pause' MUST be used for a test to " - "run in an unattended manner or in a CI/CD system - otherwise a single failed test " - "will result in the testing never finishing as the tool waits for input.") - test_instances:List[Infrastructure] = Field(...) - enable_integration_testing: bool = Field(default=False, description="Enable integration testing, which REQUIRES Splunk Enterprise Security " - "to be installed on the server. This checks for a number of different things including generation " - "of appropriate notables and messages. Please note that this will increase testing time " - "considerably (by approximately 2-3 minutes per detection).") - plan_only:bool = Field(default=False, exclude=True, description="WARNING - This is an advanced feature and currently intended for widespread use. " - "This flag is useful for building your app and generating a test plan to run on different infrastructure. " - "This flag does not actually perform the test. Instead, it builds validates all content and builds the app(s). " - "It MUST be used with mode.changes and must run in the context of a git repo.") - disable_tqdm:bool = Field(default=False, exclude=True, description="The tdqm library (https://github.com/tqdm/tqdm) is used to facilitate a richer," - " interactive command line workflow that can display progress bars and status information frequently. " - "Unfortunately it is incompatible with, or may cause poorly formatted logs, in many CI/CD systems or other unattended environments. " - "If you are running contentctl in CI/CD, then please set this argument to True. Note that if you are running in a CI/CD context, " - f"you also MUST set post_test_behavior to {PostTestBehavior.never_pause}. Otherwiser, a failed detection will cause" - "the CI/CD running to pause indefinitely.") - - apps: List[TestApp] = Field(default=DEFAULT_APPS, exclude=False, description="List of apps to install in test environment") - - - def dumpCICDPlanAndQuit(self, githash: str, detections:List[Detection]): + mode: Union[Changes, Selected, All] = Field(All(), union_mode="left_to_right") + post_test_behavior: PostTestBehavior = Field( + default=PostTestBehavior.pause_on_failure, + description="Controls what to do when a test completes.\n\n" + f"'{PostTestBehavior.always_pause}' - the state of " + "the test will always pause after a test, allowing the user to log into the " + "server and experiment with the search and data before it is removed.\n\n" + f"'{PostTestBehavior.pause_on_failure}' - pause execution ONLY when a test fails. The user may press ENTER in the terminal " + "running the test to move on to the next test.\n\n" + f"'{PostTestBehavior.never_pause}' - never stop testing, even if a test fails.\n\n" + "***SPECIAL NOTE FOR CI/CD*** 'never_pause' MUST be used for a test to " + "run in an unattended manner or in a CI/CD system - otherwise a single failed test " + "will result in the testing never finishing as the tool waits for input.", + ) + test_instances: List[Infrastructure] = Field(...) + enable_integration_testing: bool = Field( + default=False, + description="Enable integration testing, which REQUIRES Splunk Enterprise Security " + "to be installed on the server. This checks for a number of different things including generation " + "of appropriate notables and messages. Please note that this will increase testing time " + "considerably (by approximately 2-3 minutes per detection).", + ) + plan_only: bool = Field( + default=False, + exclude=True, + description="WARNING - This is an advanced feature and currently intended for widespread use. " + "This flag is useful for building your app and generating a test plan to run on different infrastructure. " + "This flag does not actually perform the test. Instead, it builds validates all content and builds the app(s). " + "It MUST be used with mode.changes and must run in the context of a git repo.", + ) + disable_tqdm: bool = Field( + default=False, + exclude=True, + description="The tdqm library (https://github.com/tqdm/tqdm) is used to facilitate a richer," + " interactive command line workflow that can display progress bars and status information frequently. " + "Unfortunately it is incompatible with, or may cause poorly formatted logs, in many CI/CD systems or other unattended environments. " + "If you are running contentctl in CI/CD, then please set this argument to True. Note that if you are running in a CI/CD context, " + f"you also MUST set post_test_behavior to {PostTestBehavior.never_pause}. Otherwiser, a failed detection will cause" + "the CI/CD running to pause indefinitely.", + ) + + apps: List[TestApp] = Field( + default=DEFAULT_APPS, + exclude=False, + description="List of apps to install in test environment", + ) + + def dumpCICDPlanAndQuit(self, githash: str, detections: List[Detection]): output_file = self.path / "test_plan.yml" - self.mode = Selected(files=sorted([detection.file_path for detection in detections], key=lambda path: str(path))) + self.mode = Selected( + files=sorted( + [detection.file_path for detection in detections], + key=lambda path: str(path), + ) + ) self.post_test_behavior = PostTestBehavior.never_pause - #required so that CI/CD does not get too much output or hang + # required so that CI/CD does not get too much output or hang self.disable_tqdm = True - # We will still parse the app, but no need to do enrichments or + # We will still parse the app, but no need to do enrichments or # output to dist. We have already built it! self.build_app = False self.build_api = False self.enrichments = False - + self.enable_integration_testing = True data = self.model_dump() - - #Add the hash of the current commit - data['githash'] = str(githash) - - #Remove some fields that are not relevant - for k in ['container_settings', 'test_instances']: + + # Add the hash of the current commit + data["githash"] = str(githash) + + # Remove some fields that are not relevant + for k in ["container_settings", "test_instances"]: if k in data: - del(data[k]) - - + del data[k] try: YmlWriter.writeYmlFile(str(output_file), data) - print(f"Successfully wrote a test plan for [{len(self.mode.files)} detections] using [{len(self.apps)} apps] to [{output_file}]") + print( + f"Successfully wrote a test plan for [{len(self.mode.files)} detections] using [{len(self.apps)} apps] to [{output_file}]" + ) except Exception as e: raise Exception(f"Error writing test plan file [{output_file}]: {str(e)}") - - def getLocalAppDir(self)->pathlib.Path: + def getLocalAppDir(self) -> pathlib.Path: # docker really wants absolute paths path = self.path / "apps" return path.absolute() - - def getContainerAppDir(self)->pathlib.Path: + + def getContainerAppDir(self) -> pathlib.Path: # docker really wants absolute paths return pathlib.Path("/tmp/apps") - def enterpriseSecurityInApps(self)->bool: - + def enterpriseSecurityInApps(self) -> bool: for app in self.apps: if app.uid == ENTERPRISE_SECURITY_UID: return True return False - - def commonInformationModelInApps(self)->bool: + + def commonInformationModelInApps(self) -> bool: for app in self.apps: if app.uid == COMMON_INFORMATION_MODEL_UID: return True - return False + return False - @model_validator(mode='after') - def ensureCommonInformationModel(self)->Self: + @model_validator(mode="after") + def ensureCommonInformationModel(self) -> Self: if self.commonInformationModelInApps(): return self - print(f"INFO: Common Information Model/CIM " - f"(uid: [{COMMON_INFORMATION_MODEL_UID}]) is not listed in apps.\n" - f"contentctl test MUST include Common Information Model.\n" - f"Please note this message is only informational.") + print( + f"INFO: Common Information Model/CIM " + f"(uid: [{COMMON_INFORMATION_MODEL_UID}]) is not listed in apps.\n" + f"contentctl test MUST include Common Information Model.\n" + f"Please note this message is only informational." + ) return self - - @model_validator(mode='after') - def suppressTQDM(self)->Self: + + @model_validator(mode="after") + def suppressTQDM(self) -> Self: if self.disable_tqdm: tqdm.tqdm.__init__ = partialmethod(tqdm.tqdm.__init__, disable=True) if self.post_test_behavior != PostTestBehavior.never_pause: - raise ValueError(f"You have disabled tqdm, presumably because you are " - f"running in CI/CD or another unattended context.\n" - f"However, post_test_behavior is set to [{self.post_test_behavior}].\n" - f"If that is the case, then you MUST set post_test_behavior " - f"to [{PostTestBehavior.never_pause}].\n" - "Otherwise, if a detection fails in CI/CD, your CI/CD runner will hang forever.") + raise ValueError( + f"You have disabled tqdm, presumably because you are " + f"running in CI/CD or another unattended context.\n" + f"However, post_test_behavior is set to [{self.post_test_behavior}].\n" + f"If that is the case, then you MUST set post_test_behavior " + f"to [{PostTestBehavior.never_pause}].\n" + "Otherwise, if a detection fails in CI/CD, your CI/CD runner will hang forever." + ) return self - - - @model_validator(mode='after') - def ensureEnterpriseSecurityForIntegrationTesting(self)->Self: + @model_validator(mode="after") + def ensureEnterpriseSecurityForIntegrationTesting(self) -> Self: if not self.enable_integration_testing: return self if self.enterpriseSecurityInApps(): return self - - print(f"INFO: enable_integration_testing is [{self.enable_integration_testing}], " - f"but the Splunk Enterprise Security " - f"App (uid: [{ENTERPRISE_SECURITY_UID}]) is not listed in apps.\n" - f"Integration Testing MUST include Enterprise Security.\n" - f"Please note this message is only informational.") - return self - + print( + f"INFO: enable_integration_testing is [{self.enable_integration_testing}], " + f"but the Splunk Enterprise Security " + f"App (uid: [{ENTERPRISE_SECURITY_UID}]) is not listed in apps.\n" + f"Integration Testing MUST include Enterprise Security.\n" + f"Please note this message is only informational." + ) + return self - @model_validator(mode='after') - def checkPlanOnlyUse(self)->Self: - #Ensure that mode is CHANGES + @model_validator(mode="after") + def checkPlanOnlyUse(self) -> Self: + # Ensure that mode is CHANGES if self.plan_only and not isinstance(self.mode, Changes): - raise ValueError("plan_only MUST be used with --mode:changes") + raise ValueError("plan_only MUST be used with --mode:changes") return self class test(test_common): model_config = ConfigDict(validate_default=True, arbitrary_types_allowed=True) - container_settings:ContainerSettings = ContainerSettings() - test_instances: List[Container] = Field([], exclude = True, validate_default=True) - splunk_api_username: Optional[str] = Field(default=None, exclude = True,description="Splunk API username used for running appinspect or installating apps from Splunkbase") - splunk_api_password: Optional[str] = Field(default=None, exclude = True, description="Splunk API password used for running appinspect or installaing apps from Splunkbase") - - - def getContainerInfrastructureObjects(self)->Self: + container_settings: ContainerSettings = ContainerSettings() + test_instances: List[Container] = Field([], exclude=True, validate_default=True) + splunk_api_username: Optional[str] = Field( + default=None, + exclude=True, + description="Splunk API username used for running appinspect or installating apps from Splunkbase", + ) + splunk_api_password: Optional[str] = Field( + default=None, + exclude=True, + description="Splunk API password used for running appinspect or installaing apps from Splunkbase", + ) + + def getContainerInfrastructureObjects(self) -> Self: try: self.test_instances = self.container_settings.getContainers() return self - + except Exception as e: raise ValueError(f"Error constructing container test_instances: {str(e)}") - - - - - @model_validator(mode='after') - def ensureAppsAreGood(self)->Self: + + @model_validator(mode="after") + def ensureAppsAreGood(self) -> Self: """ - This function ensures that, after the rest of the configuration + This function ensures that, after the rest of the configuration has been validated, all of the apps are able to be correctly resolved. This includes apps that may be sourced from local files, HTTP files, - and/or Splunkbase. + and/or Splunkbase. This is NOT a model_post_init function because it does perform some validation, even though it does not change the object @@ -844,29 +1008,34 @@ def ensureAppsAreGood(self)->Self: Returns: Self: The test object. No modifications are made during this call. - """ + """ try: - _ = self.getContainerEnvironmentString(stage_file=False, include_custom_app=False) + _ = self.getContainerEnvironmentString( + stage_file=False, include_custom_app=False + ) except Exception as e: raise Exception(f"Error validating test apps: {str(e)}") return self - - def getContainerEnvironmentString(self,stage_file:bool=False, include_custom_app:bool=True)->str: - apps:List[App_Base] = self.apps + def getContainerEnvironmentString( + self, stage_file: bool = False, include_custom_app: bool = True + ) -> str: + apps: List[App_Base] = self.apps if include_custom_app: apps.append(self.app) - paths = [app.getApp(self,stage_file=stage_file) for app in apps] + paths = [app.getApp(self, stage_file=stage_file) for app in apps] container_paths = [] for path in paths: if path.startswith(SPLUNKBASE_URL): container_paths.append(path) else: - container_paths.append((self.getContainerAppDir()/pathlib.Path(path).name).as_posix()) - - return ','.join(container_paths) + container_paths.append( + (self.getContainerAppDir() / pathlib.Path(path).name).as_posix() + ) + + return ",".join(container_paths) def getAppFilePath(self): return self.path / "apps.yml" @@ -877,98 +1046,141 @@ def getAppFilePath(self): class test_servers(test_common): model_config = ConfigDict(validate_default=True, arbitrary_types_allowed=True) - test_instances:List[Infrastructure] = Field([],description="Test against one or more preconfigured servers.", validate_default=True) - server_info:Optional[str] = Field(None, validate_default=True, description='String of pre-configured servers to use for testing. The list MUST be in the format:\n' - 'address,username,web_ui_port,hec_port,api_port;address_2,username_2,web_ui_port_2,hec_port_2,api_port_2' - '\nFor example, the following string will use 2 preconfigured test instances:\n' - '127.0.0.1,firstUser,firstUserPassword,8000,8088,8089;1.2.3.4,secondUser,secondUserPassword,8000,8088,8089\n' - 'Note that these test_instances may be hosted on the same system, such as localhost/127.0.0.1 or a docker server, or different hosts.\n' - f'This value may also be passed by setting the environment variable [{TEST_ARGS_ENV}] with the value above.') - - @model_validator(mode='before') + test_instances: List[Infrastructure] = Field( + [], + description="Test against one or more preconfigured servers.", + validate_default=True, + ) + server_info: Optional[str] = Field( + None, + validate_default=True, + description="String of pre-configured servers to use for testing. The list MUST be in the format:\n" + "address,username,web_ui_port,hec_port,api_port;address_2,username_2,web_ui_port_2,hec_port_2,api_port_2" + "\nFor example, the following string will use 2 preconfigured test instances:\n" + "127.0.0.1,firstUser,firstUserPassword,8000,8088,8089;1.2.3.4,secondUser,secondUserPassword,8000,8088,8089\n" + "Note that these test_instances may be hosted on the same system, such as localhost/127.0.0.1 or a docker server, or different hosts.\n" + f"This value may also be passed by setting the environment variable [{TEST_ARGS_ENV}] with the value above.", + ) + + @model_validator(mode="before") @classmethod - def parse_config(cls, data:Any, info: ValidationInfo)->Any: - #Ignore whatever is in the file or defaults, these must be supplied on command line - #if len(v) != 0: + def parse_config(cls, data: Any, info: ValidationInfo) -> Any: + # Ignore whatever is in the file or defaults, these must be supplied on command line + # if len(v) != 0: # return v - - - if isinstance(data.get("server_info"),str) : + + if isinstance(data.get("server_info"), str): server_info = data.get("server_info") - elif isinstance(environ.get(TEST_ARGS_ENV),str): + elif isinstance(environ.get(TEST_ARGS_ENV), str): server_info = environ.get(TEST_ARGS_ENV) else: - raise ValueError(f"server_info not passed on command line or in environment variable {TEST_ARGS_ENV}") + raise ValueError( + f"server_info not passed on command line or in environment variable {TEST_ARGS_ENV}" + ) + + infrastructures: List[Infrastructure] = [] - infrastructures:List[Infrastructure] = [] - - index = 0 - for server in server_info.split(';'): - address, username, password, web_ui_port, hec_port, api_port = server.split(",") - infrastructures.append(Infrastructure(splunk_app_username = username, splunk_app_password=password, - instance_address=address, hec_port = int(hec_port), - web_ui_port= int(web_ui_port),api_port=int(api_port), instance_name=f"test_server_{index}") + for server in server_info.split(";"): + address, username, password, web_ui_port, hec_port, api_port = server.split( + "," + ) + infrastructures.append( + Infrastructure( + splunk_app_username=username, + splunk_app_password=password, + instance_address=address, + hec_port=int(hec_port), + web_ui_port=int(web_ui_port), + api_port=int(api_port), + instance_name=f"test_server_{index}", ) - index+=1 - data['test_instances'] = infrastructures + ) + index += 1 + data["test_instances"] = infrastructures return data - @field_validator('test_instances',mode='before') + @field_validator("test_instances", mode="before") @classmethod - def check_environment_variable_for_config(cls, v:List[Infrastructure]): + def check_environment_variable_for_config(cls, v: List[Infrastructure]): return v - #Ignore whatever is in the file or defaults, these must be supplied on command line - #if len(v) != 0: + # Ignore whatever is in the file or defaults, these must be supplied on command line + # if len(v) != 0: # return v TEST_ARGS_ENV = "CONTENTCTL_TEST_INFRASTRUCTURES" - - - #environment variable is present. try to parse it - infrastructures:List[Infrastructure] = [] - server_info:str|None = environ.get(TEST_ARGS_ENV) + + # environment variable is present. try to parse it + infrastructures: List[Infrastructure] = [] + server_info: str | None = environ.get(TEST_ARGS_ENV) if server_info is None: - raise ValueError(f"test_instances not passed on command line or in environment variable {TEST_ARGS_ENV}") - - + raise ValueError( + f"test_instances not passed on command line or in environment variable {TEST_ARGS_ENV}" + ) + index = 0 - for server in server_info.split(';'): - address, username, password, web_ui_port, hec_port, api_port = server.split(",") - infrastructures.append(Infrastructure(splunk_app_username = username, splunk_app_password=password, - instance_address=address, hec_port = int(hec_port), - web_ui_port= int(web_ui_port),api_port=int(api_port), instance_name=f"test_server_{index}") + for server in server_info.split(";"): + address, username, password, web_ui_port, hec_port, api_port = server.split( + "," + ) + infrastructures.append( + Infrastructure( + splunk_app_username=username, + splunk_app_password=password, + instance_address=address, + hec_port=int(hec_port), + web_ui_port=int(web_ui_port), + api_port=int(api_port), + instance_name=f"test_server_{index}", ) - index+=1 + ) + index += 1 class release_notes(Config_Base): - old_tag:Optional[str] = Field(None, description="Name of the tag to diff against to find new content. " - "If it is not supplied, then it will be inferred as the " - "second newest tag at runtime.") - new_tag:Optional[str] = Field(None, description="Name of the tag containing new content. If it is not supplied," - " then it will be inferred as the newest tag at runtime.") - latest_branch:Optional[str] = Field(None, description="Branch name for which we are generating release notes for") - compare_against:Optional[str] = Field(default="develop", description="Branch name for which we are comparing the files changes against") - - def releaseNotesFilename(self, filename:str)->pathlib.Path: - #Assume that notes are written to dist/. This does not respect build_dir since that is - #only a member of build - p = self.path / "dist" + old_tag: Optional[str] = Field( + None, + description="Name of the tag to diff against to find new content. " + "If it is not supplied, then it will be inferred as the " + "second newest tag at runtime.", + ) + new_tag: Optional[str] = Field( + None, + description="Name of the tag containing new content. If it is not supplied," + " then it will be inferred as the newest tag at runtime.", + ) + latest_branch: Optional[str] = Field( + None, description="Branch name for which we are generating release notes for" + ) + compare_against: Optional[str] = Field( + default="develop", + description="Branch name for which we are comparing the files changes against", + ) + + def releaseNotesFilename(self, filename: str) -> pathlib.Path: + # Assume that notes are written to dist/. This does not respect build_dir since that is + # only a member of build + p = self.path / "dist" try: - p.mkdir(exist_ok=True,parents=True) + p.mkdir(exist_ok=True, parents=True) except Exception: - raise Exception(f"Error making the directory '{p}' to hold release_notes: {str(e)}") - return p/filename + raise Exception( + f"Error making the directory '{p}' to hold release_notes: {str(e)}" + ) + return p / filename - @model_validator(mode='after') + @model_validator(mode="after") def ensureNewTagOrLatestBranch(self): - ''' + """ Exactly one of latest_branch or new_tag must be defined. otherwise, throw an error - ''' + """ if self.new_tag is not None and self.latest_branch is not None: - raise ValueError("Both new_tag and latest_branch are defined. EXACTLY one of these MUST be defiend.") + raise ValueError( + "Both new_tag and latest_branch are defined. EXACTLY one of these MUST be defiend." + ) elif self.new_tag is None and self.latest_branch is None: - raise ValueError("Neither new_tag nor latest_branch are defined. EXACTLY one of these MUST be defined.") + raise ValueError( + "Neither new_tag nor latest_branch are defined. EXACTLY one of these MUST be defined." + ) return self # @model_validator(mode='after') @@ -978,10 +1190,9 @@ def ensureNewTagOrLatestBranch(self): # from pygit2 import Commit # repo = pygit2.Repository(path=str(self.path)) # tags = list(repo.references.iterator(references_return_type=pygit2.enums.ReferenceFilter.TAGS)) - + # #Sort all tags by commit time from newest to oldest # sorted_tags = sorted(tags, key=lambda tag: repo.lookup_reference(tag.name).peel(Commit).commit_time, reverse=True) - # tags_names:List[str] = [t.shorthand for t in sorted_tags] # print(tags_names) @@ -996,9 +1207,7 @@ def ensureNewTagOrLatestBranch(self): # pass # else: # raise ValueError(f"Unknown error getting new_tag {self.new_tag}") - - - + # if self.old_tag is not None and self.old_tag not in tags_names: # raise ValueError(f"The old_tag '{self.new_tag}' was not found in the set name tags for this repo: {tags_names}") # elif self.new_tag == self.old_tag: @@ -1012,15 +1221,18 @@ def ensureNewTagOrLatestBranch(self): # pass # else: # raise ValueError(f"Unknown error getting old_tag {self.old_tag}") - - - + # if not tags_names.index(self.new_tag) < tags_names.index(self.old_tag): # raise ValueError(f"The new_tag '{self.new_tag}' is not newer than the old_tag '{self.old_tag}'") - + # if self.latest_branch is not None: # if repo.lookup_branch(self.latest_branch) is None: # raise ValueError("The latest_branch '{self.latest_branch}' was not found in the repository") - - - # return self \ No newline at end of file + + # return self # raise ValueError(f"The new_tag '{self.new_tag}' is not newer than the old_tag '{self.old_tag}'") + + # if self.latest_branch is not None: + # if repo.lookup_branch(self.latest_branch) is None: + # raise ValueError("The latest_branch '{self.latest_branch}' was not found in the repository") + + # return self