diff --git a/.github/workflows/testEndToEnd.yml b/.github/workflows/testEndToEnd.yml index b96ac2b1..fff5c5dc 100644 --- a/.github/workflows/testEndToEnd.yml +++ b/.github/workflows/testEndToEnd.yml @@ -51,7 +51,7 @@ jobs: - name: Run contentctl init run: | cd my_splunk_content_pack - poetry run contentctl init + poetry run contentctl init - name: Clone the AtomicRedTeam Repo run: | @@ -73,7 +73,7 @@ jobs: if: startsWith(matrix.operating_system, 'ubuntu') run: | cd my_splunk_content_pack - poetry run contentctl test + poetry run contentctl test --disable-tqdm --post-test-behavior never_pause - uses: actions/upload-artifact@v4 with: diff --git a/contentctl/actions/detection_testing/DetectionTestingManager.py b/contentctl/actions/detection_testing/DetectionTestingManager.py index 4dccc237..5ad5e117 100644 --- a/contentctl/actions/detection_testing/DetectionTestingManager.py +++ b/contentctl/actions/detection_testing/DetectionTestingManager.py @@ -154,9 +154,16 @@ def create_DetectionTestingInfrastructureObjects(self): except Exception as e: raise Exception(f"Failed to pull docker container image [{self.input_dto.config.container_settings.full_image_path}]: {str(e)}") + already_staged_container_files = False for infrastructure in self.input_dto.config.test_instances: if (isinstance(self.input_dto.config, test) and isinstance(infrastructure, Container)): + # Stage the files in the apps dir so that they can be passed directly to + # subsequent containers. Do this here, instead of inside each container, to + # avoid duplicate downloads/moves/copies + if not already_staged_container_files: + self.input_dto.config.getContainerEnvironmentString(stage_file=True) + already_staged_container_files = True self.detectionTestingInfrastructureObjects.append( DetectionTestingInfrastructureContainer( @@ -164,8 +171,7 @@ def create_DetectionTestingInfrastructureObjects(self): ) ) - elif isinstance(infrastructure, Infrastructure): - + elif (isinstance(self.input_dto.config, test_servers) and isinstance(infrastructure, Infrastructure)): self.detectionTestingInfrastructureObjects.append( DetectionTestingInfrastructureServer( global_config=self.input_dto.config, infrastructure=infrastructure, sync_obj=self.output_dto diff --git a/contentctl/actions/detection_testing/GitService.py b/contentctl/actions/detection_testing/GitService.py index 6ed7c6a6..a0d7ff2c 100644 --- a/contentctl/actions/detection_testing/GitService.py +++ b/contentctl/actions/detection_testing/GitService.py @@ -14,7 +14,7 @@ from contentctl.objects.lookup import Lookup from contentctl.objects.detection import Detection from contentctl.objects.security_content_object import SecurityContentObject -from contentctl.objects.config import test, All, Changes, Selected +from contentctl.objects.config import test_common, All, Changes, Selected # Logger logging.basicConfig(level=os.environ.get("LOGLEVEL", "INFO")) @@ -28,7 +28,7 @@ class GitService(BaseModel): director: DirectorOutputDto - config: test + config: test_common gitHash: Optional[str] = None def getHash(self)->str: diff --git a/contentctl/actions/detection_testing/infrastructures/DetectionTestingInfrastructure.py b/contentctl/actions/detection_testing/infrastructures/DetectionTestingInfrastructure.py index 9d7c40d0..1e892905 100644 --- a/contentctl/actions/detection_testing/infrastructures/DetectionTestingInfrastructure.py +++ b/contentctl/actions/detection_testing/infrastructures/DetectionTestingInfrastructure.py @@ -23,7 +23,7 @@ from urllib3 import disable_warnings import urllib.parse -from contentctl.objects.config import test, test_servers, test_common, Infrastructure +from contentctl.objects.config import test_common, Infrastructure from contentctl.objects.enums import PostTestBehavior, AnalyticsType from contentctl.objects.detection import Detection from contentctl.objects.base_test import BaseTest @@ -32,7 +32,6 @@ from contentctl.objects.unit_test_attack_data import UnitTestAttackData from contentctl.objects.unit_test_result import UnitTestResult from contentctl.objects.integration_test_result import IntegrationTestResult -#from contentctl.objects.test_config import TestConfig, Infrastructure from contentctl.objects.test_group import TestGroup from contentctl.objects.base_test_result import TestResultStatus from contentctl.objects.correlation_search import CorrelationSearch, PbarData @@ -79,7 +78,7 @@ class DetectionTestingManagerOutputDto(): class DetectionTestingInfrastructure(BaseModel, abc.ABC): # thread: threading.Thread = threading.Thread() - global_config: Union[test,test_servers] + global_config: test_common infrastructure: Infrastructure sync_obj: DetectionTestingManagerOutputDto hec_token: str = "" @@ -396,7 +395,7 @@ def execute(self): try: self.test_detection(detection) except ContainerStoppedException: - self.pbar.write(f"Stopped container [{self.get_name()}]") + self.pbar.write(f"Warning - container was stopped when trying to execute detection [{self.get_name()}]") self.finish() return except Exception as e: @@ -1357,7 +1356,7 @@ def status(self): pass def finish(self): - self.pbar.bar_format = f"Stopped container [{self.get_name()}]" + self.pbar.bar_format = f"Finished running tests on instance: [{self.get_name()}]" self.pbar.update() self.pbar.close() diff --git a/contentctl/actions/detection_testing/infrastructures/DetectionTestingInfrastructureContainer.py b/contentctl/actions/detection_testing/infrastructures/DetectionTestingInfrastructureContainer.py index e16303b6..7371cbfd 100644 --- a/contentctl/actions/detection_testing/infrastructures/DetectionTestingInfrastructureContainer.py +++ b/contentctl/actions/detection_testing/infrastructures/DetectionTestingInfrastructureContainer.py @@ -6,10 +6,6 @@ import docker.models.containers import docker import docker.types -from contentctl.objects.test_config import ( - CONTAINER_APP_DIR, - LOCAL_APP_DIR, -) class DetectionTestingInfrastructureContainer(DetectionTestingInfrastructure): @@ -78,8 +74,8 @@ def make_container(self) -> docker.models.resource.Model: mounts = [ docker.types.Mount( - source=str(LOCAL_APP_DIR.absolute()), - target=str(CONTAINER_APP_DIR.absolute()), + source=str(self.global_config.getLocalAppDir()), + target=str(self.global_config.getContainerAppDir()), type="bind", read_only=True, ) @@ -88,7 +84,9 @@ def make_container(self) -> docker.models.resource.Model: environment = {} environment["SPLUNK_START_ARGS"] = "--accept-license" environment["SPLUNK_PASSWORD"] = self.infrastructure.splunk_app_password - environment["SPLUNK_APPS_URL"] = self.global_config.getContainerEnvironmentString(stage_file=True) + # Files have already been staged by the time that we call this. Files must only be staged + # once, not staged by every container + environment["SPLUNK_APPS_URL"] = self.global_config.getContainerEnvironmentString(stage_file=False) if ( self.global_config.splunk_api_username is not None and self.global_config.splunk_api_password is not None @@ -119,6 +117,18 @@ def emit_docker_run_equivalent(): detach=True, platform="linux/amd64" ) + + if self.global_config.enterpriseSecurityInApps(): + #ES sets up https, so make sure it is included in the link + address = f"https://{self.infrastructure.instance_address}:{self.infrastructure.web_ui_port}" + else: + address = f"http://{self.infrastructure.instance_address}:{self.infrastructure.web_ui_port}" + print(f"\nStarted container with the following information:\n" + f"\tname : [{self.get_name()}]\n" + f"\taddress : [{address}]\n" + f"\tusername: [{self.infrastructure.splunk_app_username}]\n" + f"\tpassword: [{self.infrastructure.splunk_app_password}]\n" + ) return container @@ -140,6 +150,8 @@ def removeContainer(self, removeVolumes: bool = True, forceRemove: bool = True): # container was found, so now we try to remove it # v also removes volumes linked to the container container.remove(v=removeVolumes, force=forceRemove) + print(f"container [{self.get_name()}] successfully removed") + # remove it even if it is running. remove volumes as well # No need to print that the container has been removed, it is expected behavior diff --git a/contentctl/actions/detection_testing/views/DetectionTestingView.py b/contentctl/actions/detection_testing/views/DetectionTestingView.py index 421338fe..9b1d8e0d 100644 --- a/contentctl/actions/detection_testing/views/DetectionTestingView.py +++ b/contentctl/actions/detection_testing/views/DetectionTestingView.py @@ -3,8 +3,7 @@ from pydantic import BaseModel -from contentctl.objects.test_config import TestConfig -from contentctl.objects.config import test +from contentctl.objects.config import test_common from contentctl.actions.detection_testing.infrastructures.DetectionTestingInfrastructure import ( DetectionTestingManagerOutputDto, @@ -14,7 +13,7 @@ class DetectionTestingView(BaseModel, abc.ABC): - config: test + config: test_common sync_obj: DetectionTestingManagerOutputDto interval: float = 10 diff --git a/contentctl/actions/detection_testing/views/DetectionTestingViewFile.py b/contentctl/actions/detection_testing/views/DetectionTestingViewFile.py index 7cdf5acf..4b31bca7 100644 --- a/contentctl/actions/detection_testing/views/DetectionTestingViewFile.py +++ b/contentctl/actions/detection_testing/views/DetectionTestingViewFile.py @@ -1,11 +1,3 @@ -from pydantic import BaseModel -import abc -from typing import Callable -from contentctl.objects.test_config import TestConfig -from contentctl.actions.detection_testing.infrastructures.DetectionTestingInfrastructure import ( - DetectionTestingManagerOutputDto, -) - from contentctl.actions.detection_testing.views.DetectionTestingView import ( DetectionTestingView, ) diff --git a/contentctl/actions/initialize.py b/contentctl/actions/initialize.py index 8eccb559..679574b8 100644 --- a/contentctl/actions/initialize.py +++ b/contentctl/actions/initialize.py @@ -34,7 +34,7 @@ def execute(self, config: test) -> None: source_directory = pathlib.Path(os.path.dirname(__file__))/templateDir target_directory = config.path/targetDir #Throw an exception if the target exists - shutil.copytree(source_directory, targetDir, dirs_exist_ok=False) + shutil.copytree(source_directory, target_directory, dirs_exist_ok=False) #Create the config file as well shutil.copyfile(pathlib.Path(os.path.dirname(__file__))/'../templates/README','README') diff --git a/contentctl/actions/release_notes.py b/contentctl/actions/release_notes.py index 48ca3e6c..859fcf87 100644 --- a/contentctl/actions/release_notes.py +++ b/contentctl/actions/release_notes.py @@ -1,120 +1,131 @@ import os - -from dataclasses import dataclass - -from contentctl.input.director import DirectorInputDto, Director, DirectorOutputDto -from contentctl.output.svg_output import SvgOutput -from contentctl.output.attack_nav_output import AttackNavOutput +from contentctl.objects.config import release_notes from git import Repo import re import yaml -from typing import Union +import pathlib +from typing import List, Union -@dataclass(frozen=True) -class ReleaseNotesInputDto: - director_input_dto: DirectorInputDto class ReleaseNotes: - def create_notes(self,repo_path, file_paths): - for file_path in file_paths: - # Check if the file exists - if os.path.exists(file_path) and os.path.isfile(file_path): - # Check if the file is a YAML file - if file_path.endswith('.yaml') or file_path.endswith('.yml'): - # Read and parse the YAML file - with open(file_path, 'r') as file: - try: - data = yaml.safe_load(file) - # Check and create story link - if 'name' in data and 'stories/' in file_path: - story_link = "https://research.splunk.com/stories/" + data['name'] - story_link=story_link.replace(" ","_") - story_link = story_link.lower() - print("- "+"["+f"{data['name']}"+"]"+"("+story_link+")") - - if 'name' in data and'playbooks/' in file_path: - playbook_link = "https://research.splunk.com" + file_path.replace(repo_path,"") - playbook_link=playbook_link.replace(".yml","/").lower() - print("- "+"["+f"{data['name']}"+"]"+"("+playbook_link+")") - - if 'name' in data and'macros/' in file_path: - print("- " + f"{data['name']}") - - if 'name' in data and'lookups/' in file_path: - print("- " + f"{data['name']}") - - # Create only SSA link when its production - if 'name' in data and 'id' in data and 'ssa_detections/' in file_path: - if data['status'] == "production": - temp_link = "https://research.splunk.com" + file_path.replace(repo_path,"") - pattern = r'(?<=/)[^/]*$' - detection_link = re.sub(pattern, data['id'], temp_link) - detection_link = detection_link.replace("detections","" ) - detection_link = detection_link.replace("ssa_/","" ) - print("- "+"["+f"{data['name']}"+"]"+"("+detection_link+")") - - if data['status'] == "validation": - print("- "+f"{data['name']}"+" (Validation Mode)") - - # Check and create detection link - if 'name' in data and 'id' in data and 'detections/' in file_path and not 'ssa_detections/' in file_path: - temp_link = "https://research.splunk.com" + file_path.replace(repo_path,"") + def create_notes(self,repo_path:pathlib.Path, file_paths:List[pathlib.Path], header:str)->dict[str,Union[List[str], str]]: + updates:List[str] = [] + warnings:List[str] = [] + for file_path in file_paths: + # Check if the file exists + if file_path.exists() and file_path.is_file(): + # Check if the file is a YAML file + if file_path.suffix in ['.yaml', '.yml']: + # Read and parse the YAML file + with open(file_path, 'r') as file: + try: + data = yaml.safe_load(file) + # Check and create story link + if 'name' in data and 'stories' in file_path.parts: + story_link = "https://research.splunk.com/stories/" + data['name'] + story_link=story_link.replace(" ","_") + story_link = story_link.lower() + updates.append("- "+"["+f"{data['name']}"+"]"+"("+story_link+")") + + if 'name' in data and'playbooks' in file_path.parts: + playbook_link = "https://research.splunk.com/" + str(file_path).replace(str(repo_path),"") + playbook_link=playbook_link.replace(".yml","/").lower() + updates.append("- "+"["+f"{data['name']}"+"]"+"("+playbook_link+")") + + if 'name' in data and'macros' in file_path.parts: + updates.append("- " + f"{data['name']}") + + if 'name' in data and'lookups' in file_path.parts: + updates.append("- " + f"{data['name']}") + + # Create only SSA link when its production + if 'name' in data and 'id' in data and 'ssa_detections' in file_path.parts: + if data['status'] == "production": + temp_link = "https://research.splunk.com/" + str(file_path).replace(str(repo_path),"") + pattern = r'(?<=/)[^/]*$' + detection_link = re.sub(pattern, data['id'], temp_link) + detection_link = detection_link.replace("detections","" ) + detection_link = detection_link.replace("ssa_/","" ) + updates.append("- "+"["+f"{data['name']}"+"]"+"("+detection_link+")") + + if data['status'] == "validation": + updates.append("- "+f"{data['name']}"+" (Validation Mode)") + + + # Check and create detection link + if 'name' in data and 'id' in data and 'detections' in file_path.parts and not 'ssa_detections' in file_path.parts and 'detections/deprecated' not in file_path.parts: + + if data['status'] == "production": + temp_link = "https://research.splunk.com" + str(file_path).replace(str(repo_path),"") pattern = r'(?<=/)[^/]*$' detection_link = re.sub(pattern, data['id'], temp_link) detection_link = detection_link.replace("detections","" ) detection_link = detection_link.replace(".com//",".com/" ) - print("- "+"["+f"{data['name']}"+"]"+"("+detection_link+")") - - except yaml.YAMLError as exc: - print(f"Error parsing YAML file {file_path}: {exc}") - else: - print(f"File not found or is not a file: {file_path}") + updates.append("- "+"["+f"{data['name']}"+"]"+"("+detection_link+")") + + if data['status'] == "deprecated": + temp_link = "https://research.splunk.com" + str(file_path).replace(str(repo_path),"") + pattern = r'(?<=/)[^/]*$' + detection_link = re.sub(pattern, data['id'], temp_link) + detection_link = detection_link.replace("detections","" ) + detection_link = detection_link.replace(".com//",".com/" ) + updates.append("- "+"["+f"{data['name']}"+"]"+"("+detection_link+")") + + except yaml.YAMLError as exc: + raise Exception(f"Error parsing YAML file for release_notes {file_path}: {str(exc)}") + else: + warnings.append(f"Error parsing YAML file for release_notes. File not found or is not a file: {file_path}") + #print out all updates at once + success_header = f'### {header} - [{len(updates)}]' + warning_header = f'### {header} - [{len(warnings)}]' + return {'header': success_header, 'changes': sorted(updates), + 'warning_header': warning_header, 'warnings': warnings} + - def release_notes(self, input_dto: DirectorInputDto, old_tag:Union[str,None], new_tag:str, latest_branch:str) -> None: + def release_notes(self, config:release_notes) -> None: ### Remove hard coded path directories = ['detections/','stories/','macros/','lookups/','playbooks/','ssa_detections/'] - repo_path = os.path.abspath(input_dto.director_input_dto.input_path) - repo = Repo(repo_path) + + repo = Repo(config.path) # Ensure the new tag is in the tags if tags are supplied - if new_tag: - if new_tag not in repo.tags: - raise ValueError(f"new_tag {new_tag} does not exist in the repository. Make sure your branch nameis ") - if old_tag is None: + if config.new_tag: + if config.new_tag not in repo.tags: + raise Exception(f"new_tag {config.new_tag} does not exist in the repository. Make sure your branch nameis ") + if config.old_tag is None: #Old tag was not supplied, so find the index of the new tag, then get the tag before it tags_sorted = sorted(repo.tags, key=lambda t: t.commit.committed_datetime, reverse=True) tags_names_sorted = [tag.name for tag in tags_sorted] - new_tag_index = tags_names_sorted.index(new_tag) + new_tag_index = tags_names_sorted.index(config.new_tag) try: - old_tag = tags_names_sorted[new_tag_index+1] + config.old_tag = tags_names_sorted[new_tag_index+1] except Exception: - raise ValueError(f"old_tag cannot be inferred. {new_tag} is the oldest tag in the repo!") - latest_tag = new_tag - previous_tag = old_tag + raise Exception(f"old_tag cannot be inferred. {config.new_tag} is the oldest tag in the repo!") + latest_tag = config.new_tag + previous_tag = config.old_tag commit1 = repo.commit(latest_tag) commit2 = repo.commit(previous_tag) diff_index = commit2.diff(commit1) # Ensure the branch is in the repo - if latest_branch: + if config.latest_branch: #If a branch name is supplied, compare against develop - if latest_branch not in repo.branches: - raise ValueError(f"latest branch {latest_branch} does not exist in the repository. Make sure your branch name is correct") - latest_branch = latest_branch + if config.latest_branch not in repo.branches: + raise ValueError(f"latest branch {config.latest_branch} does not exist in the repository. Make sure your branch name is correct") compare_against = "develop" - commit1 = repo.commit(latest_branch) + commit1 = repo.commit(config.latest_branch) commit2 = repo.commit(compare_against) diff_index = commit2.diff(commit1) - modified_files = [] - added_files = [] + modified_files:List[pathlib.Path] = [] + added_files:List[pathlib.Path] = [] for diff in diff_index: - file_path = diff.a_path + file_path = pathlib.Path(diff.a_path) # Check if the file is in the specified directories - if any(file_path.startswith(directory) for directory in directories): + if any(str(file_path).startswith(directory) for directory in directories): # Check if a file is Modified if diff.change_type == 'M': modified_files.append(file_path) @@ -124,91 +135,104 @@ def release_notes(self, input_dto: DirectorInputDto, old_tag:Union[str,None], ne elif diff.change_type == 'A': added_files.append(file_path) # print(added_files) - detections_added = [] - ba_detections_added = [] - stories_added = [] - macros_added = [] - lookups_added = [] - playbooks_added = [] - detections_modified = [] - ba_detections_modified = [] - stories_modified = [] - macros_modified = [] - lookups_modified = [] - playbooks_modified = [] + detections_added:List[pathlib.Path] = [] + ba_detections_added:List[pathlib.Path] = [] + stories_added:List[pathlib.Path] = [] + macros_added:List[pathlib.Path] = [] + lookups_added:List[pathlib.Path] = [] + playbooks_added:List[pathlib.Path] = [] + detections_modified:List[pathlib.Path] = [] + ba_detections_modified:List[pathlib.Path] = [] + stories_modified:List[pathlib.Path] = [] + macros_modified:List[pathlib.Path] = [] + lookups_modified:List[pathlib.Path] = [] + playbooks_modified:List[pathlib.Path] = [] + detections_deprecated:List[pathlib.Path] = [] for file in modified_files: - file=repo_path +"/"+file - if 'detections/' in file and 'ssa_detections/' not in file: + file= config.path / file + if 'detections' in file.parts and 'ssa_detections' not in file.parts and 'deprecated' not in file.parts: detections_modified.append(file) - if 'stories/' in file: + if 'detections' in file.parts and 'ssa_detections' not in file.parts and 'deprecated' in file.parts: + detections_deprecated.append(file) + if 'stories' in file.parts: stories_modified.append(file) - if 'macros/' in file: + if 'macros' in file.parts: macros_modified.append(file) - if 'lookups/' in file: + if 'lookups' in file.parts: lookups_modified.append(file) - if 'playbooks/' in file: + if 'playbooks' in file.parts: playbooks_modified.append(file) - if 'ssa_detections/' in file: + if 'ssa_detections' in file.parts: ba_detections_modified.append(file) for file in added_files: - file=repo_path +"/"+file - if 'detections/' in file and 'ssa_detections/' not in file: + file=config.path / file + if 'detections' in file.parts and 'ssa_detections' not in file.parts: detections_added.append(file) - if 'stories/' in file: + if 'stories' in file.parts: stories_added.append(file) - if 'macros/' in file: + if 'macros' in file.parts: macros_added.append(file) - if 'lookups/' in file: + if 'lookups' in file.parts: lookups_added.append(file) - if 'playbooks/' in file: + if 'playbooks' in file.parts: playbooks_added.append(file) - if 'ssa_detections/' in file: + if 'ssa_detections' in file.parts: ba_detections_added.append(file) - if new_tag: + if config.new_tag: print(f"Generating release notes - \033[92m{latest_tag}\033[0m") print(f"Compared against - \033[92m{previous_tag}\033[0m") print("\n## Release notes for ESCU " + latest_tag) - if latest_branch: - print(f"Generating release notes - \033[92m{latest_branch}\033[0m") + if config.latest_branch: + print(f"Generating release notes - \033[92m{config.latest_branch}\033[0m") print(f"Compared against - \033[92m{compare_against}\033[0m") - print("\n## Release notes for ESCU " + latest_branch) - - print("\n### New Analytics Story") - self.create_notes(repo_path, stories_added) - print("\n### Updated Analytics Story") - self.create_notes(repo_path,stories_modified) - print("\n### New Analytics") - self.create_notes(repo_path,detections_added) - print("\n### Updated Analytics") - self.create_notes(repo_path,detections_modified) - print("\n### Macros Added") - self.create_notes(repo_path,macros_added) - print("\n### Macros Updated") - self.create_notes(repo_path,macros_modified) - print("\n### Lookups Added") - self.create_notes(repo_path,lookups_added) - print("\n### Lookups Updated") - self.create_notes(repo_path,lookups_modified) - print("\n### Playbooks Added") - self.create_notes(repo_path,playbooks_added) - print("\n### Playbooks Updated") - self.create_notes(repo_path,playbooks_modified) - - print("\n### Other Updates\n-\n") - - print("\n## BA Release Notes") - - print("\n### New BA Analytics") - self.create_notes(repo_path,ba_detections_added) - - print("\n### Updated BA Analytics") - self.create_notes(repo_path,ba_detections_modified) - - + print("\n## Release notes for ESCU " + config.latest_branch) + + notes = [self.create_notes(config.path, stories_added, header="New Analytic Story"), + self.create_notes(config.path,stories_modified, header="Updated Analytic Story"), + self.create_notes(config.path,detections_added, header="New Analytics"), + self.create_notes(config.path,detections_modified, header="Updated Analytics"), + self.create_notes(config.path,macros_added, header="Macros Added"), + self.create_notes(config.path,macros_modified, header="Macros Updated"), + self.create_notes(config.path,lookups_added, header="Lookups Added"), + self.create_notes(config.path,lookups_modified, header="Lookups Updated"), + self.create_notes(config.path,playbooks_added, header="Playbooks Added"), + self.create_notes(config.path,playbooks_modified, header="Playbooks Updated"), + self.create_notes(config.path,detections_deprecated, header="Deprecated Analytics")] + + #generate and show ba_notes in a different section + ba_notes = [self.create_notes(config.path,ba_detections_added, header="New BA Analytics"), + self.create_notes(config.path,ba_detections_modified, header="Updated BA Analytics") ] + + def printNotes(notes:List[dict[str,Union[List[str], str]]], outfile:Union[pathlib.Path,None]=None): + num_changes = sum([len(note['changes']) for note in notes]) + num_warnings = sum([len(note['warnings']) for note in notes]) + lines:List[str] = [] + lines.append(f"Total New and Updated Content: [{num_changes}]") + for note in notes: + lines.append("") + lines.append(note['header']) + lines+=(note['changes']) + + lines.append(f"\n\nTotal Warnings: [{num_warnings}]") + for note in notes: + if len(note['warnings']) > 0: + lines.append(note['warning_header']) + lines+=note['warnings'] + text_blob = '\n'.join(lines) + print(text_blob) + if outfile is not None: + with open(outfile,'w') as writer: + writer.write(text_blob) + + printNotes(notes, config.releaseNotesFilename(f"release_notes.txt")) + + print("\n\n### Other Updates\n-\n") + print("\n## BA Release Notes") + printNotes(ba_notes, config.releaseNotesFilename("ba_release_notes.txt")) print(f"Release notes completed succesfully") \ No newline at end of file diff --git a/contentctl/actions/reporting.py b/contentctl/actions/reporting.py index 8e28248d..a7997713 100644 --- a/contentctl/actions/reporting.py +++ b/contentctl/actions/reporting.py @@ -2,32 +2,43 @@ from dataclasses import dataclass -from contentctl.input.director import DirectorInputDto, Director, DirectorOutputDto +from contentctl.input.director import DirectorOutputDto from contentctl.output.svg_output import SvgOutput from contentctl.output.attack_nav_output import AttackNavOutput - +from contentctl.objects.config import report @dataclass(frozen=True) class ReportingInputDto: - director_input_dto: DirectorInputDto + director_output_dto: DirectorOutputDto + config: report class Reporting: def execute(self, input_dto: ReportingInputDto) -> None: - director_output_dto = DirectorOutputDto([],[],[],[],[],[],[],[],[],[]) - director = Director(director_output_dto) - director.execute(input_dto.director_input_dto) + + #Ensure the reporting path exists + try: + input_dto.config.getReportingPath().mkdir(exist_ok=True,parents=True) + except Exception as e: + if input_dto.config.getReportingPath().is_file(): + raise Exception(f"Error writing reporting: '{input_dto.config.getReportingPath()}' is a file, not a directory.") + else: + raise Exception(f"Error writing reporting : '{input_dto.config.getReportingPath()}': {str(e)}") + + print("Creating GitHub Badges...") + #Generate GitHub Badges svg_output = SvgOutput() svg_output.writeObjects( - director_output_dto.detections, - os.path.join(input_dto.director_input_dto.input_path, "reporting") - ) + input_dto.director_output_dto.detections, + input_dto.config.getReportingPath()) - attack_nav_output = AttackNavOutput() + #Generate coverage json + print("Generating coverage.json...") + attack_nav_output = AttackNavOutput() attack_nav_output.writeObjects( - director_output_dto.detections, - os.path.join(input_dto.director_input_dto.input_path, "reporting") + input_dto.director_output_dto.detections, + input_dto.config.getReportingPath() ) - print('Reporting of security content successful.') \ No newline at end of file + print(f"Reporting successfully written to '{input_dto.config.getReportingPath()}'") \ No newline at end of file diff --git a/contentctl/actions/test.py b/contentctl/actions/test.py index 7933992d..b0ee5faf 100644 --- a/contentctl/actions/test.py +++ b/contentctl/actions/test.py @@ -1,7 +1,7 @@ from dataclasses import dataclass from typing import List -from contentctl.objects.config import test +from contentctl.objects.config import test_common from contentctl.objects.enums import DetectionTestingMode, DetectionStatus, AnalyticsType from contentctl.objects.detection import Detection @@ -40,16 +40,13 @@ @dataclass(frozen=True) class TestInputDto: detections: List[Detection] - config: test + config: test_common -class TestOutputDto: - results: list - - class Test: - def filter_detections(self, input_dto: TestInputDto): + def filter_detections(self, input_dto: TestInputDto)->TestInputDto: + if not input_dto.config.enable_integration_testing: #Skip all integraiton tests if integration testing is not enabled: for detection in input_dto.detections: @@ -57,15 +54,21 @@ def filter_detections(self, input_dto: TestInputDto): if isinstance(test, IntegrationTest): test.skip("TEST SKIPPED: Skipping all integration tests") + list_after_filtering:List[Detection] = [] #extra filtering which may be removed/modified in the future for detection in input_dto.detections: if (detection.status != DetectionStatus.production.value): #print(f"{detection.name} - Not testing because [STATUS: {detection.status}]") - input_dto.detections.remove(detection) + pass elif detection.type == AnalyticsType.Correlation: #print(f"{detection.name} - Not testing because [ TYPE: {detection.type}]") - input_dto.detections.remove(detection) - + pass + else: + list_after_filtering.append(detection) + + return TestInputDto(list_after_filtering, input_dto.config) + + def execute(self, input_dto: TestInputDto) -> bool: diff --git a/contentctl/contentctl.py b/contentctl/contentctl.py index 90bb03c3..e5c3718b 100644 --- a/contentctl/contentctl.py +++ b/contentctl/contentctl.py @@ -1,6 +1,6 @@ from contentctl.actions.initialize import Initialize import tyro -from contentctl.objects.config import init, validate, build, new, deploy_acs, deploy_rest, test, test_servers, inspect +from contentctl.objects.config import init, validate, build, new, deploy_acs, deploy_rest, test, test_servers, inspect, report, test_common, release_notes from contentctl.actions.validate import Validate from contentctl.actions.new_content import NewContent from contentctl.actions.detection_testing.GitService import GitService @@ -12,12 +12,13 @@ from contentctl.actions.test import Test from contentctl.actions.test import TestInputDto - +from contentctl.actions.reporting import ReportingInputDto, Reporting from contentctl.actions.inspect import Inspect import sys import warnings import pathlib from contentctl.input.yml_reader import YmlReader +from contentctl.actions.release_notes import ReleaseNotes # def print_ascii_art(): # print( @@ -60,6 +61,16 @@ def validate_func(config:validate)->DirectorOutputDto: validate = Validate() return validate.execute(config) +def report_func(config:report)->None: + # First, perform validation. Remember that the validate + # configuration is actually a subset of the build configuration + director_output_dto = validate_func(config) + + r = Reporting() + return r.execute(ReportingInputDto(director_output_dto=director_output_dto, + config=config)) + + def build_func(config:build)->DirectorOutputDto: # First, perform validation. Remember that the validate # configuration is actually a subset of the build configuration @@ -74,6 +85,9 @@ def inspect_func(config:inspect)->str: return inspect_token +def release_notes_func(config:release_notes)->None: + ReleaseNotes().release_notes(config) + def new_func(config:new): NewContent().execute(config) @@ -87,7 +101,7 @@ def deploy_rest_func(config:deploy_rest): raise Exception("deploy rest not yet implemented") -def test_func(config:test): +def test_common_func(config:test_common): director_output_dto = build_func(config) gitServer = GitService(director=director_output_dto,config=config) detections_to_test = gitServer.getContent() @@ -100,14 +114,14 @@ def test_func(config:test): # Remove detections that we do not want to test because they are # not production, the correct type, or manual_test only - t.filter_detections(test_input_dto) + filted_test_input_dto = t.filter_detections(test_input_dto) if config.plan_only: #Emit the test plan and quit. Do not actually run the test - config.dumpCICDPlanAndQuit(gitServer.getHash(),test_input_dto.detections) + config.dumpCICDPlanAndQuit(gitServer.getHash(),filted_test_input_dto.detections) return - success = t.execute(test_input_dto) + success = t.execute(filted_test_input_dto) if success: #Everything passed! @@ -115,11 +129,6 @@ def test_func(config:test): return raise Exception("There was at least one unsuccessful test") -def test_servers_func(config:test_servers): - raise Exception("Not yet done") - - - def main(): try: configFile = pathlib.Path("contentctl.yml") @@ -127,10 +136,13 @@ def main(): # We MUST load a config (with testing info) object so that we can # properly construct the command line, including 'contentctl test' parameters. if not configFile.is_file(): - if "init" not in sys.argv: + if "init" not in sys.argv and "--help" not in sys.argv and "-h" not in sys.argv: raise Exception(f"'{configFile}' not found in the current directory.\n" "Please ensure you are in the correct directory or run 'contentctl init' to create a new content pack.") + if "--help" in sys.argv or "-h" in sys.argv: + print("Warning - contentctl.yml is missing from this directory. The configuration values showed at the default and are informational only.\n" + "Please ensure that contentctl.yml exists by manually creating it or running 'contentctl init'") # Otherwise generate a stub config file. # It will be used during init workflow @@ -156,11 +168,13 @@ def main(): { "init":init.model_validate(config_obj), "validate": validate.model_validate(config_obj), + "report": report.model_validate(config_obj), "build":build.model_validate(config_obj), "inspect": inspect.model_construct(**t.__dict__), "new":new.model_validate(config_obj), "test":test.model_validate(config_obj), - "test_servers":test_servers.model_validate(config_obj), + "test_servers":test_servers.model_construct(**t.__dict__), + "release_notes": release_notes.model_construct(**config_obj), "deploy_acs": deploy_acs.model_construct(**t.__dict__), #"deploy_rest":deploy_rest() } @@ -182,24 +196,33 @@ def main(): init_func(t) elif type(config) == validate: validate_func(config) + elif type(config) == report: + report_func(config) elif type(config) == build: build_func(config) elif type(config) == new: new_func(config) elif type(config) == inspect: inspect_func(config) + elif type(config) == release_notes: + release_notes_func(config) elif type(config) == deploy_acs: updated_config = deploy_acs.model_validate(config) deploy_acs_func(updated_config) elif type(config) == deploy_rest: deploy_rest_func(config) - elif type(config) == test: - test_func(config) - elif type(config) == test_servers: - test_servers_func(config) + elif type(config) == test or type(config) == test_servers: + if type(config) == test: + #construct the container Infrastructure objects + config.getContainerInfrastructureObjects() + #otherwise, they have already been passed as servers + test_common_func(config) else: raise Exception(f"Unknown command line type '{type(config).__name__}'") except Exception as e: - print(e) + import traceback + traceback.print_exc() + traceback.print_stack() + #print(e) sys.exit(1) \ No newline at end of file diff --git a/contentctl/helper/config_handler.py b/contentctl/helper/config_handler.py deleted file mode 100644 index 1050526b..00000000 --- a/contentctl/helper/config_handler.py +++ /dev/null @@ -1,72 +0,0 @@ -import os -import collections -import sys -import pathlib - -from contentctl.input.yml_reader import YmlReader -from contentctl.objects.config import Config, TestConfig, ConfigEnrichments -from contentctl.objects.test_config import InfrastructureConfig, Infrastructure -from contentctl.objects.enums import DetectionTestingMode -from typing import Union -import argparse - -from contentctl.objects.enums import ( - DetectionTestingTargetInfrastructure, -) - -class ConfigHandler: - - @classmethod - def read_config(cls, args:argparse.Namespace) -> Config: - config_path = pathlib.Path(args.path)/"contentctl.yml" - try: - yml_dict = YmlReader.load_file(config_path, add_fields=False) - - except: - print("ERROR: no contentctl.yml found in given path") - sys.exit(1) - - try: - - config = Config.model_validate(yml_dict) - - except Exception as e: - raise Exception(f"Error reading config file: {str(e)}") - - - return config - - @classmethod - def read_test_config(cls, args:argparse.Namespace) -> TestConfig: - test_config_path = pathlib.Path(args.path)/"contentctl_test.yml" - try: - yml_dict = YmlReader.load_file(test_config_path, add_fields=False) - except: - print("ERROR: no contentctl_test.yml found in given path") - sys.exit(1) - - try: - if args.dry_run: - yml_dict['apps'] = [] - yml_dict['infrastructure_config'] = InfrastructureConfig(infrastructure_type=DetectionTestingTargetInfrastructure.server, ).__dict__ - if args.server_info is None: - yml_dict['infrastructure_config']['infrastructures'] = [Infrastructure().__dict__] - if args.mode != DetectionTestingMode.changes: - yml_dict['version_control_config'] = None - if yml_dict.get("version_control_config", None) is not None: - #If they have been passed, override the target and test branch. If not, keep the defaults - yml_dict.get("version_control_config", None)['target_branch'] = args.target_branch or yml_dict.get("version_control_config", None)['target_branch'] - yml_dict.get("version_control_config", None)['test_branch'] = args.test_branch or yml_dict.get("version_control_config", None)['test_branch'] - if yml_dict.get("infrastructure_config", None) is not None: - yml_dict.get("infrastructure_config", None)['infrastructure_type'] = args.infrastructure or yml_dict.get("infrastructure_config", None)['infrastructure_type'] - test_config = TestConfig.model_validate(yml_dict) - except Exception as e: - raise Exception(f"Error reading test config file: {str(e)}") - - - return test_config - - - - - \ No newline at end of file diff --git a/contentctl/objects/abstract_security_content_objects/detection_abstract.py b/contentctl/objects/abstract_security_content_objects/detection_abstract.py index 63406c73..a51eea07 100644 --- a/contentctl/objects/abstract_security_content_objects/detection_abstract.py +++ b/contentctl/objects/abstract_security_content_objects/detection_abstract.py @@ -108,7 +108,7 @@ def annotations(self)->dict[str,Union[List[str],int,str]]: annotations_dict["cve"] = self.tags.cve annotations_dict["impact"] = self.tags.impact annotations_dict["type"] = self.type - annotations_dict["version"] = self.version + #annotations_dict["version"] = self.version #The annotations object is a superset of the mappings object. # So start with the mapping object. diff --git a/contentctl/objects/app.py b/contentctl/objects/app.py deleted file mode 100644 index 4fdf1abd..00000000 --- a/contentctl/objects/app.py +++ /dev/null @@ -1,120 +0,0 @@ -# Needed for a staticmethod to be able to return an instance of the class it belongs to -from __future__ import annotations -from typing import Union,Optional, Annotated, TYPE_CHECKING -from pydantic import BaseModel, validator, FilePath, computed_field, HttpUrl,Field -from urllib.parse import urlparse -import pathlib -import re -import os -import yaml -if TYPE_CHECKING: - from contentctl.objects.config import Config - from contentctl.objects.test_config import TestConfig, CONTAINER_APP_DIR, LOCAL_APP_DIR -from contentctl.helper.utils import Utils - - - -SPLUNKBASE_URL = "https://splunkbase.splunk.com/app/{uid}/release/{release}/download" -ENVIRONMENT_PATH_NOT_SET = "ENVIRONMENT_PATH_NOT_SET" - -class App(BaseModel, extra="forbid"): - - # uid is a numeric identifier assigned by splunkbase, so - # homemade applications will not have this - uid: Annotated[int, Field(gt=1,lt=100000)] - - # appid is basically the internal name of your app - appid: Optional[Annotated[str,Field(pattern="^[a-zA-Z0-9_-]+$")]] - - # Title is the human readable name for your application - title: Annotated[str,Field(min_length=1)] - - # Self explanatory - description: Optional[Annotated[str,Field(min_length=1)]] = None - release: Annotated[str,Field(min_length=1)] - - hardcoded_path: Optional[Union[FilePath,HttpUrl]] - - # Splunkbase path is made of the combination of uid and release fields - @computed_field - @property - def splunkbase_path(self)->Optional[HttpUrl]: - if self.uid is not None and self.release is not None: - return HttpUrl(SPLUNKBASE_URL.format(uid=self.uid,release=self.release)) - return None - - - @classmethod - def appFromConfig(cls, config:Config, built_app_path:pathlib.Path): - config_no_splunkbase_creds = config.model_copy(deep=True) - assert config_no_splunkbase_creds.test != None, "Error - test config MUST exist to create app from config" - config_no_splunkbase_creds.test.splunkbase_username = None - config_no_splunkbase_creds.test.splunkbase_password = None - new_app = cls(uid=config.build.uid, - appid=config.build.name, - title=config.build.title, - description=config.build.description, - release=config.build.version, - hardcoded_path=FilePath(built_app_path)) - - - - - - def get_app_source( - self, - config:Config, - )->str: - - assert config.test is not None, f"Error - config.test was 'None'. It should be an instance of TestConfig." - - test_config:TestConfig = config.test - - - if test_config.splunkbase_password is not None and \ - test_config.splunkbase_username is not None: - if self.appid == config.build.name: - # This is a special case. This is the app that we have - # just built, which we obviously CANNOT get from splunkbase! - pass - else: - return str(self.splunkbase_path) - - - if isinstance(self.hardcoded_path, FilePath): - filename = pathlib.Path(self.hardcoded_path) - destination = LOCAL_APP_DIR / filename.name - Utils.copy_local_file(str(self.hardcoded_path), str(destination), verbose_print=True) - - elif isinstance(self.hardcoded_path, HttpUrl): - - file_url_string = str(self.hardcoded_path) - server_path = pathlib.Path(urlparse(file_url_string).path) - destination = LOCAL_APP_DIR / server_path.name - Utils.download_file_from_http(file_url_string, str(destination)) - - else: - raise ( - Exception( - f"Unable to download app {self.title}:\n" - f"Splunkbase Path : {self.splunkbase_path}\n" - f"hardcoded_path : {self.hardcoded_path}\n" - f"Splunkbase Creds: {False}\n" - ) - ) - - return str(CONTAINER_APP_DIR/destination.name) - - @staticmethod - def get_default_apps() -> list[App]: - return [] - all_app_objs: list[App] = [] - with open( - os.path.join(os.path.dirname(__file__), "../", "templates/app_default.yml"), - "r", - ) as app_data: - all_apps_raw = yaml.safe_load(app_data) - for a in all_apps_raw: - app_obj = App.model_validate(a) - all_app_objs.append(app_obj) - return all_app_objs diff --git a/contentctl/objects/config.py b/contentctl/objects/config.py index bc822675..f036d132 100644 --- a/contentctl/objects/config.py +++ b/contentctl/objects/config.py @@ -2,13 +2,13 @@ from pydantic import ( BaseModel, Field, field_validator, field_serializer, ConfigDict, DirectoryPath, - PositiveInt, FilePath, HttpUrl, AnyUrl, computed_field, model_validator, + PositiveInt, FilePath, HttpUrl, AnyUrl, model_validator, ValidationInfo ) from contentctl.output.yml_writer import YmlWriter - +from os import environ from datetime import datetime, UTC -from typing import Optional,Any,Dict,Annotated,List,Union, Self +from typing import Optional,Any,Annotated,List,Union, Self import semantic_version import random from enum import StrEnum, auto @@ -17,9 +17,10 @@ from urllib.parse import urlparse from abc import ABC, abstractmethod from contentctl.objects.enums import PostTestBehavior -from contentctl.input.yml_reader import YmlReader from contentctl.objects.detection import Detection +import tqdm +from functools import partialmethod ENTERPRISE_SECURITY_UID = 263 COMMON_INFORMATION_MODEL_UID = 1621 @@ -168,13 +169,19 @@ class validate(Config_Base): "This is useful when outputting a release build "\ "and validating these values, but should otherwise "\ "be avoided for performance reasons.") - build_app: bool = Field(default=True, description="Should an app be built and output in the {build_path}?") - build_api: bool = Field(default=False, description="Should api objects be built and output in the {build_path}?") - build_ssa: bool = Field(default=False, description="Should ssa objects be built and output in the {build_path}?") + build_app: bool = Field(default=True, description="Should an app be built and output in the build_path?") + build_api: bool = Field(default=False, description="Should api objects be built and output in the build_path?") + build_ssa: bool = Field(default=False, description="Should ssa objects be built and output in the build_path?") def getAtomicRedTeamRepoPath(self, atomic_red_team_repo_name:str = "atomic-red-team"): return self.path/atomic_red_team_repo_name +class report(validate): + #reporting takes no extra args, but we define it here so that it can be a mode on the command line + def getReportingPath(self)->pathlib.Path: + return self.path/"reporting/" + + class build(validate): model_config = ConfigDict(use_enum_values=True,validate_default=True, arbitrary_types_allowed=True) @@ -223,6 +230,7 @@ def getAPIPath(self)->pathlib.Path: def getAppTemplatePath(self)->pathlib.Path: return self.path/"app_template" + class StackType(StrEnum): @@ -532,7 +540,16 @@ def serialize_path(paths: List[FilePath])->List[str]: class test_common(build): mode:Union[Changes, Selected, All] = Field(All(), union_mode='left_to_right') - post_test_behavior: PostTestBehavior = Field(default=PostTestBehavior.pause_on_failure, description="") + post_test_behavior: PostTestBehavior = Field(default=PostTestBehavior.pause_on_failure, description="Controls what to do when a test completes.\n\n" + f"'{PostTestBehavior.always_pause.value}' - the state of " + "the test will always pause after a test, allowing the user to log into the " + "server and experiment with the search and data before it is removed.\n\n" + f"'{PostTestBehavior.pause_on_failure.value}' - pause execution ONLY when a test fails. The user may press ENTER in the terminal " + "running the test to move on to the next test.\n\n" + f"'{PostTestBehavior.never_pause.value}' - never stop testing, even if a test fails.\n\n" + "***SPECIAL NOTE FOR CI/CD*** 'never_pause' MUST be used for a test to " + "run in an unattended manner or in a CI/CD system - otherwise a single failed test " + "will result in the testing never finishing as the tool waits for input.") test_instances:List[Infrastructure] = Field(...) enable_integration_testing: bool = Field(default=False, description="Enable integration testing, which REQUIRES Splunk Enterprise Security " "to be installed on the server. This checks for a number of different things including generation " @@ -542,10 +559,60 @@ class test_common(build): "This flag is useful for building your app and generating a test plan to run on different infrastructure. " "This flag does not actually perform the test. Instead, it builds validates all content and builds the app(s). " "It MUST be used with mode.changes and must run in the context of a git repo.") + disable_tqdm:bool = Field(default=False, exclude=True, description="The tdqm library (https://github.com/tqdm/tqdm) is used to facilitate a richer," + " interactive command line workflow that can display progress bars and status information frequently. " + "Unfortunately it is incompatible with, or may cause poorly formatted logs, in many CI/CD systems or other unattended environments. " + "If you are running contentctl in CI/CD, then please set this argument to True. Note that if you are running in a CI/CD context, " + f"you also MUST set post_test_behavior to {PostTestBehavior.never_pause.value}. Otherwiser, a failed detection will cause" + "the CI/CD running to pause indefinitely.") apps: List[TestApp] = Field(default=DEFAULT_APPS, exclude=False, description="List of apps to install in test environment") + def dumpCICDPlanAndQuit(self, githash: str, detections:List[Detection]): + output_file = self.path / "test_plan.yml" + self.mode = Selected(files=sorted([detection.file_path for detection in detections], key=lambda path: str(path))) + self.post_test_behavior = PostTestBehavior.never_pause.value + #required so that CI/CD does not get too much output or hang + self.disable_tqdm = True + + # We will still parse the app, but no need to do enrichments or + # output to dist. We have already built it! + self.build_app = False + self.build_api = False + self.build_ssa = False + self.enrichments = False + + self.enable_integration_testing = True + + data = self.model_dump() + + #Add the hash of the current commit + data['githash'] = str(githash) + + #Remove some fields that are not relevant + for k in ['container_settings', 'test_instances']: + if k in data: + del(data[k]) + + + + try: + YmlWriter.writeYmlFile(str(output_file), data) + print(f"Successfully wrote a test plan for [{len(self.mode.files)} detections] using [{len(self.apps)} apps] to [{output_file}]") + except Exception as e: + raise Exception(f"Error writing test plan file [{output_file}]: {str(e)}") + + + def getLocalAppDir(self)->pathlib.Path: + #docker really wants abolsute paths + path = self.path / "apps" + return path.absolute() + + def getContainerAppDir(self)->pathlib.Path: + #docker really wants abolsute paths + return pathlib.Path("/tmp/apps").absolute() + def enterpriseSecurityInApps(self)->bool: for app in self.apps: @@ -563,10 +630,24 @@ def commonInformationModelInApps(self)->bool: def ensureCommonInformationModel(self)->Self: if self.commonInformationModelInApps(): return self - raise ValueError(f"Common Information Model/CIM " - f"(uid: [{COMMON_INFORMATION_MODEL_UID}]) is not listed in apps. " - f"contentctl test MUST include Common Information Model") - + print(f"INFO: Common Information Model/CIM " + f"(uid: [{COMMON_INFORMATION_MODEL_UID}]) is not listed in apps.\n" + f"contentctl test MUST include Common Information Model.\n" + f"Please note this message is only informational.") + return self + + @model_validator(mode='after') + def suppressTQDM(self)->Self: + if self.disable_tqdm: + tqdm.tqdm.__init__ = partialmethod(tqdm.tqdm.__init__, disable=True) + if self.post_test_behavior != PostTestBehavior.never_pause.value: + raise ValueError(f"You have disabled tqdm, presumably because you are " + f"running in CI/CD or another unattended context.\n" + f"However, post_test_behavior is set to [{self.post_test_behavior}].\n" + f"If that is the case, then you MUST set post_test_behavior " + f"to [{PostTestBehavior.never_pause.value}].\n" + "Otherwise, if a detection fails in CI/CD, your CI/CD runner will hang forever.") + return self @@ -577,10 +658,12 @@ def ensureEnterpriseSecurityForIntegrationTesting(self)->Self: if self.enterpriseSecurityInApps(): return self - raise ValueError(f"enable_integration_testing is [{self.enable_integration_testing}], " - f"but the Splunk Enterprise Security " - f"App (uid: [{ENTERPRISE_SECURITY_UID}]) is not listed in apps. " - f"Integration Testing MUST include Enterprise Security.") + print(f"INFO: enable_integration_testing is [{self.enable_integration_testing}], " + f"but the Splunk Enterprise Security " + f"App (uid: [{ENTERPRISE_SECURITY_UID}]) is not listed in apps.\n" + f"Integration Testing MUST include Enterprise Security.\n" + f"Please note this message is only informational.") + return self @@ -613,51 +696,15 @@ class test(test_common): splunk_api_password: Optional[str] = Field(default=None, exclude = True, description="Splunk API password used for running appinspect or installaing apps from Splunkbase") - @model_validator(mode='after') - def get_test_instances(self)->Self: - - if len(self.test_instances) > 0: - return self + def getContainerInfrastructureObjects(self)->Self: try: self.test_instances = self.container_settings.getContainers() return self except Exception as e: - raise ValueError(f"Error constructing test_instances: {str(e)}") + raise ValueError(f"Error constructing container test_instances: {str(e)}") - def dumpCICDPlanAndQuit(self, githash: str, detections:List[Detection]): - output_file = self.path / "test_plan.yml" - self.mode = Selected(files=sorted([detection.file_path for detection in detections], key=lambda path: str(path))) - self.post_test_behavior = PostTestBehavior.never_pause.value - - # We will still parse the app, but no need to do enrichments or - # output to dist. We have already built it! - self.build_app = False - self.build_api = False - self.build_ssa = False - self.enrichments = False - self.enable_integration_testing = True - - data = self.model_dump() - #Add relevant fields - data['githash'] = str(githash) - - #Remove some fields that are not relevant - del(data['container_settings']) - #del(data['apps']) - - try: - YmlWriter.writeYmlFile(str(output_file), data) - print(f"Successfully wrote a test plan for [{len(self.mode.files)} detections] using [{len(self.apps)} apps] to [{output_file}]") - except Exception as e: - raise Exception(f"Error writing test plan file [{output_file}]: {str(e)}") - - - def getLocalAppDir(self)->pathlib.Path: - return self.path / "apps" - def getContainerAppDir(self)->pathlib.Path: - return pathlib.Path("/tmp/apps") @model_validator(mode='after') @@ -684,7 +731,7 @@ def ensureAppsAreGood(self)->Self: return self - def getContainerEnvironmentString(self,stage_file:bool=True, include_custom_app:bool=True)->str: + def getContainerEnvironmentString(self,stage_file:bool=False, include_custom_app:bool=True)->str: apps:List[App_Base] = self.apps if include_custom_app: apps.append(self.app) @@ -704,9 +751,155 @@ def getAppFilePath(self): return self.path / "apps.yml" - +TEST_ARGS_ENV = "CONTENTCTL_TEST_INFRASTRUCTURES" class test_servers(test_common): model_config = ConfigDict(use_enum_values=True,validate_default=True, arbitrary_types_allowed=True) - test_instances:List[Infrastructure] = Field([Infrastructure(instance_name="splunk_target", instance_address="splunkServerAddress.com")],description="Test against one or more preconfigured servers.") + test_instances:List[Infrastructure] = Field([],description="Test against one or more preconfigured servers.", validate_default=True) + server_info:Optional[str] = Field(None, validate_default=True, description='String of pre-configured servers to use for testing. The list MUST be in the format:\n' + 'address,username,web_ui_port,hec_port,api_port;address_2,username_2,web_ui_port_2,hec_port_2,api_port_2' + '\nFor example, the following string will use 2 preconfigured test instances:\n' + '127.0.0.1,firstUser,firstUserPassword,8000,8088,8089;1.2.3.4,secondUser,secondUserPassword,8000,8088,8089\n' + 'Note that these test_instances may be hosted on the same system, such as localhost/127.0.0.1 or a docker server, or different hosts.\n' + f'This value may also be passed by setting the environment variable [{TEST_ARGS_ENV}] with the value above.') + + @model_validator(mode='before') + @classmethod + def parse_config(cls, data:Any, info: ValidationInfo)->Any: + #Ignore whatever is in the file or defaults, these must be supplied on command line + #if len(v) != 0: + # return v + + + if isinstance(data.get("server_info"),str) : + server_info = data.get("server_info") + elif isinstance(environ.get(TEST_ARGS_ENV),str): + server_info = environ.get(TEST_ARGS_ENV) + else: + raise ValueError(f"server_info not passed on command line or in environment variable {TEST_ARGS_ENV}") + + infrastructures:List[Infrastructure] = [] + + + index = 0 + for server in server_info.split(';'): + address, username, password, web_ui_port, hec_port, api_port = server.split(",") + infrastructures.append(Infrastructure(splunk_app_username = username, splunk_app_password=password, + instance_address=address, hec_port = int(hec_port), + web_ui_port= int(web_ui_port),api_port=int(api_port), instance_name=f"test_server_{index}") + ) + index+=1 + data['test_instances'] = infrastructures + return data + + @field_validator('test_instances',mode='before') + @classmethod + def check_environment_variable_for_config(cls, v:List[Infrastructure]): + return v + #Ignore whatever is in the file or defaults, these must be supplied on command line + #if len(v) != 0: + # return v + TEST_ARGS_ENV = "CONTENTCTL_TEST_INFRASTRUCTURES" + + + #environment variable is present. try to parse it + infrastructures:List[Infrastructure] = [] + server_info:str|None = environ.get(TEST_ARGS_ENV) + if server_info is None: + raise ValueError(f"test_instances not passed on command line or in environment variable {TEST_ARGS_ENV}") + + + index = 0 + for server in server_info.split(';'): + address, username, password, web_ui_port, hec_port, api_port = server.split(",") + infrastructures.append(Infrastructure(splunk_app_username = username, splunk_app_password=password, + instance_address=address, hec_port = int(hec_port), + web_ui_port= int(web_ui_port),api_port=int(api_port), instance_name=f"test_server_{index}") + ) + index+=1 + + + +class release_notes(Config_Base): + old_tag:Optional[str] = Field(None, description="Name of the tag to diff against to find new content. " + "If it is not supplied, then it will be inferred as the " + "second newest tag at runtime.") + new_tag:Optional[str] = Field(None, description="Name of the tag containing new content. If it is not supplied," + " then it will be inferred as the newest tag at runtime.") + latest_branch:Optional[str] = Field(None, description="Branch for which we are generating release notes") + + def releaseNotesFilename(self, filename:str)->pathlib.Path: + #Assume that notes are written to dist/. This does not respect build_dir since that is + #only a member of build + p = self.path / "dist" + try: + p.mkdir(exist_ok=True,parents=True) + except Exception: + raise Exception(f"Error making the directory '{p}' to hold release_notes: {str(e)}") + return p/filename + + @model_validator(mode='after') + def ensureNewTagOrLatestBranch(self): + ''' + Exactly one of latest_branch or new_tag must be defined. otherwise, throw an error + ''' + if self.new_tag is not None and self.latest_branch is not None: + raise ValueError("Both new_tag and latest_branch are defined. EXACTLY one of these MUST be defiend.") + elif self.new_tag is None and self.latest_branch is None: + raise ValueError("Neither new_tag nor latest_branch are defined. EXACTLY one of these MUST be defined.") + return self + + # @model_validator(mode='after') + # def ensureTagsAndBranch(self)->Self: + # #get the repo + # import pygit2 + # from pygit2 import Commit + # repo = pygit2.Repository(path=str(self.path)) + # tags = list(repo.references.iterator(references_return_type=pygit2.enums.ReferenceFilter.TAGS)) + + # #Sort all tags by commit time from newest to oldest + # sorted_tags = sorted(tags, key=lambda tag: repo.lookup_reference(tag.name).peel(Commit).commit_time, reverse=True) + + + # tags_names:List[str] = [t.shorthand for t in sorted_tags] + # print(tags_names) + # if self.new_tag is not None and self.new_tag not in tags_names: + # raise ValueError(f"The new_tag '{self.new_tag}' was not found in the set name tags for this repo: {tags_names}") + # elif self.new_tag is None: + # try: + # self.new_tag = tags_names[0] + # except Exception: + # raise ValueError("Error getting new_tag - there were no tags in the repo") + # elif self.new_tag in tags_names: + # pass + # else: + # raise ValueError(f"Unknown error getting new_tag {self.new_tag}") + + + + # if self.old_tag is not None and self.old_tag not in tags_names: + # raise ValueError(f"The old_tag '{self.new_tag}' was not found in the set name tags for this repo: {tags_names}") + # elif self.new_tag == self.old_tag: + # raise ValueError(f"old_tag '{self.old_tag}' cannot equal new_tag '{self.new_tag}'") + # elif self.old_tag is None: + # try: + # self.old_tag = tags_names[tags_names.index(self.new_tag) + 1] + # except Exception: + # raise ValueError(f"Error getting old_tag. new_tag '{self.new_tag}' is the oldest tag in the repo.") + # elif self.old_tag in tags_names: + # pass + # else: + # raise ValueError(f"Unknown error getting old_tag {self.old_tag}") + + + + # if not tags_names.index(self.new_tag) < tags_names.index(self.old_tag): + # raise ValueError(f"The new_tag '{self.new_tag}' is not newer than the old_tag '{self.old_tag}'") + + # if self.latest_branch is not None: + # if repo.lookup_branch(self.latest_branch) is None: + # raise ValueError("The latest_branch '{self.latest_branch}' was not found in the repository") + + + # return self diff --git a/contentctl/objects/integration_test_result.py b/contentctl/objects/integration_test_result.py index 0460f633..e746731e 100644 --- a/contentctl/objects/integration_test_result.py +++ b/contentctl/objects/integration_test_result.py @@ -1,6 +1,4 @@ from typing import Optional - -from contentctl.objects.test_config import Infrastructure from contentctl.objects.base_test_result import BaseTestResult diff --git a/contentctl/objects/repo_config.py b/contentctl/objects/repo_config.py deleted file mode 100644 index ce9effb3..00000000 --- a/contentctl/objects/repo_config.py +++ /dev/null @@ -1,163 +0,0 @@ - - -# import pathlib - - -# from pydantic import BaseModel, root_validator, validator, ValidationError, Extra, Field -# from pydantic.main import ModelMetaclass -# from dataclasses import dataclass -# from datetime import datetime -# from typing import Union - -# import validators - -# from contentctl.objects.enums import SecurityContentProduct - -# from contentctl.helper.utils import Utils - -# from semantic_version import Version - -# import git -# ALWAYS_PULL = True - -# SPLUNKBASE_URL = "https://splunkbase.splunk.com/app/{uid}/release/{release}/download" - -# class Manifest(BaseModel): -# #Note that many of these fields are mirrored from App - -# #Some information about the developer of the app -# author_name: str = Field(default=None, title="Enter the name of the app author") -# author_email: str = Field(default=None, title="Enter a contact email for the develop(s) of the app") -# author_company: str = Field(default=None, title="Enter the company who is developing the app") - -# #uid is a numeric identifier assigned by splunkbase, so -# #homemade applications will not have this -# uid: Union[int, None] = Field(default=None, title="Unique numeric identifier assigned by Splunkbase to identify your app. You can find it in the URL of your app's landing page. If you do not have one, leave this blank.") - -# #appid is basically the internal name of you app -# appid: str = Field(default=None, title="Internal name of your app. Note that it MUST be alphanumeric with underscores, but no spaces or other special characters") - -# #Title is the human readable name for your application -# title: str = Field(default=None, title="Human-Readable name for your app. This can include any characters you want") - -# #Self explanatory -# description: Union[str,None] = Field(default=None, title="Provide a helpful description of the app.") -# release: str = Field(default=None, title="Provide a name for the current release of the app. This MUST follow semantic version format MAJOR.MINOR.PATCH[-tag]") - - - -# @validator('author_email', always=True) -# def validate_author_email(cls, v): -# print("email is") -# print(v) -# if bool(validators.email(v)) == False: -# raise(ValueError(f"Email address {v} is invalid")) -# return v - -# @validator('release', always=True) -# def validate_release(cls, v): -# try: -# Version(v) -# except Exception as e: -# raise(ValueError(f"The string '{v}' is not a valid Semantic Version. For more information on Semantic Versioning, please refer to https://semver.org/")) - -# return v - - -# class RepoConfig(BaseModel): - -# #Needs a manifest to be able to properly generate the app -# manifest:Manifest = Field(default=None, title="Manifest Object") -# repo_path: str = Field(default='.', title="Path to the root of your app") -# repo_url: Union[str,None] = Field(default=None, title="HTTP(s) path to the repo for repo_path. If this field is blank, it will be inferred from the repo") -# main_branch: str = Field(title="Main branch of the repo.") - - - - -# type: SecurityContentProduct = Field(default=SecurityContentProduct.SPLUNK_ENTERPRISE_APP, title=f"What type of product would you like to build. Choose one of {SecurityContentProduct._member_names_}") -# skip_enrichment: bool = Field(default=True, title="Whether or not to skip the enrichment processes when validating the app. Enrichment increases the amount of time it takes to build an app significantly because it must hit a number of Web APIs.") - -# input_path: str = Field(default='.', title="Path to the root of your app") -# output_path: str = Field(default='./dist', title="Path where 'generate' will write out your raw app") -# #output_path: str = Field(default='./build', title="Path where 'build' will write out your custom app") - -# #test_config: TestConfig = Field(default=TestConfig, title="Test Configuration") - -# #@validator('manifest', always=True, pre=True) -# ''' -# @root_validator(pre=True) -# def validate_manifest(cls, values): - -# try: -# print(Manifest.parse_obj(values)) -# except Exception as e: -# raise(ValueError(f"error validating manifest: {str(e)}")) - - -# return values -# print("TWO") -# #return {} -# #return Manifest.parse_obj({"email":"invalid_email@gmail.com"}) -# ''' -# @validator('repo_path', always=True) -# def validate_repo_path(cls,v): - -# try: -# path = pathlib.Path(v) -# except Exception as e: -# raise(ValueError(f"Error, the provided path is is not a valid path: '{v}'")) - -# try: -# r = git.Repo(path) -# except Exception as e: -# raise(ValueError(f"Error, the provided path is not a valid git repo: '{path}'")) - -# try: - -# if ALWAYS_PULL: -# r.remotes.origin.pull() -# except Exception as e: -# raise ValueError(f"Error pulling git repository {v}: {str(e)}") - - -# return v - - -# @validator('repo_url') -# def validate_repo_url(cls, v, values): - - -# #First try to get the value from the repo -# try: -# remote_url_from_repo = git.Repo(values['repo_path']).remotes.origin.url -# except Exception as e: -# raise(ValueError(f"Error reading remote_url from the repo located at {values['repo_path']}")) - -# if v is not None and remote_url_from_repo != v: -# raise(ValueError(f"The url of the remote repo supplied in the config file {v} does not "\ -# f"match the value read from the repository at {values['repo_path']}, {remote_url_from_repo}")) - - -# if v is None: -# v = remote_url_from_repo - -# #Ensure that the url is the proper format -# try: -# if bool(validators.url(v)) == False: -# raise(Exception) -# except: -# raise(ValueError(f"Error validating the repo_url. The url is not valid: {v}")) - - -# return v - -# @validator('main_branch') -# def valid_main_branch(cls, v, values): - - -# try: -# Utils.validate_git_branch_name(values['repo_path'],values['repo_url'], v) -# except Exception as e: -# raise ValueError(f"Error validating main_branch: {str(e)}") -# return v \ No newline at end of file diff --git a/contentctl/objects/story.py b/contentctl/objects/story.py index c0645577..05a36fb8 100644 --- a/contentctl/objects/story.py +++ b/contentctl/objects/story.py @@ -1,10 +1,8 @@ from __future__ import annotations from typing import TYPE_CHECKING,List from contentctl.objects.story_tags import StoryTags -from pydantic import field_validator, Field, ValidationInfo, model_serializer,computed_field, model_validator +from pydantic import Field, model_serializer,computed_field, model_validator import re -from contentctl.objects.enums import DataModel, KillChainPhase -from contentctl.objects.mitre_attack_enrichment import MitreAttackEnrichment, MitreTactics if TYPE_CHECKING: from contentctl.objects.detection import Detection from contentctl.objects.investigation import Investigation diff --git a/contentctl/objects/test_config.py b/contentctl/objects/test_config.py deleted file mode 100644 index e29670c3..00000000 --- a/contentctl/objects/test_config.py +++ /dev/null @@ -1,630 +0,0 @@ -# Needed for a staticmethod to be able to return an instance of the class it belongs to -from __future__ import annotations - -import git -import validators -import pathlib -import yaml -import os -from pydantic import BaseModel, validator, root_validator, Extra, Field -from typing import Union -import re -import docker -import docker.errors - - -from contentctl.objects.enums import ( - PostTestBehavior, - DetectionTestingMode, - DetectionTestingTargetInfrastructure, -) - -from contentctl.objects.app import App, ENVIRONMENT_PATH_NOT_SET -from contentctl.helper.utils import Utils - - -ALWAYS_PULL_REPO = False -PREVIOUSLY_ALLOCATED_PORTS: set[int] = set() - -LOCAL_APP_DIR = pathlib.Path("apps") -CONTAINER_APP_DIR = pathlib.Path("/tmp/apps") - - -def getTestConfigFromYMLFile(path: pathlib.Path): - try: - with open(path, "r") as config_handle: - cfg = yaml.safe_load(config_handle) - return TestConfig.parse_obj(cfg) - - except Exception as e: - print(f"Error loading test configuration file '{path}': {str(e)}") - - -class Infrastructure(BaseModel, extra=Extra.forbid, validate_assignment=True): - splunk_app_username: Union[str, None] = Field( - default="admin", title="The name of the user for testing" - ) - splunk_app_password: Union[str, None] = Field( - default="password", title="Password for logging into Splunk Server" - ) - instance_address: str = Field( - default="127.0.0.1", - title="Domain name of IP address of Splunk server to be used for testing. Do NOT use a protocol, like http(s):// or 'localhost'", - ) - - instance_name: str = Field( - default="Splunk_Server_Name", - title="Template to be used for naming the Splunk Test Containers or referring to Test Servers.", - ) - - hec_port: int = Field(default=8088, title="HTTP Event Collector Port") - web_ui_port: int = Field(default=8000, title="Web UI Port") - api_port: int = Field(default=8089, title="REST API Port") - - @staticmethod - def get_infrastructure_containers(num_containers:int=1, splunk_app_username:str="admin", splunk_app_password:str="password", instance_name_template="splunk_contentctl_{index}")->list[Infrastructure]: - containers:list[Infrastructure] = [] - if num_containers < 0: - raise ValueError(f"Error - you must specifiy 1 or more containers, not {num_containers}.") - - #Get the starting ports - i = Infrastructure() #Instantiate to get the base port numbers - - for index in range(0, num_containers): - containers.append(Infrastructure(splunk_app_username=splunk_app_username, - splunk_app_password=splunk_app_password, - instance_name=instance_name_template.format(index=index), - hec_port=i.hec_port+(index*2), - web_ui_port=i.web_ui_port+index, - api_port=i.api_port+(index*2))) - - - return containers - - @validator("instance_name") - def validate_instance_name(cls,v,values): - if not re.fullmatch("[a-zA-Z0-9][a-zA-Z0-9_.-]*", v): - raise ValueError(f"The instance_name '{v}' is not valid. Please use an instance name which matches the regular expression '[a-zA-Z0-9][a-zA-Z0-9_.-]*'") - else: - return v - - @validator("instance_address") - def validate_instance_address(cls, v, values): - try: - if v.startswith("http"): - raise (Exception("should not begin with http")) - is_ipv4 = validators.ipv4(v) - if bool(is_ipv4): - return v - is_domain_name = validators.domain(v) - if bool(is_domain_name): - import socket - - try: - socket.gethostbyname(v) - return v - except Exception as e: - pass - raise (Exception("DNS Lookup failed")) - raise (Exception(f"not an IPV4 address or a domain name")) - except Exception as e: - raise ( - Exception( - f"Error, failed to validate instance_address '{v}': {str(e)}" - ) - ) - - - - @validator("splunk_app_password") - def validate_splunk_app_password(cls, v): - if v == None: - # No app password was provided, so generate one - v = Utils.get_random_password() - else: - MIN_PASSWORD_LENGTH = 6 - if len(v) < MIN_PASSWORD_LENGTH: - raise ( - ValueError( - f"Password is less than {MIN_PASSWORD_LENGTH} characters long. This password is extremely weak, please change it." - ) - ) - return v - - @validator("hec_port", "web_ui_port", "api_port") - def validate_ports_range(cls, v): - if v < 2: - raise ( - ValueError( - f"Error, invalid Port number. Port must be between 2-65535: {v}" - ) - ) - elif v > 65535: - raise ( - ValueError( - f"Error, invalid Port number. Port must be between 2-65535: {v}" - ) - ) - return v - - @validator("hec_port", "web_ui_port", "api_port") - def validate_ports_overlap(cls, v): - - if type(v) is not list: - # Otherwise this throws error when we update a single field - return v - if len(set(v)) != len(v): - raise (ValueError(f"Duplicate ports detected: [{v}]")) - - return v - -class InfrastructureConfig(BaseModel, extra=Extra.forbid, validate_assignment=True): - infrastructure_type: DetectionTestingTargetInfrastructure = Field( - default=DetectionTestingTargetInfrastructure.container, - title=f"Control where testing should be launched. Choose one of {DetectionTestingTargetInfrastructure._member_names_}", - ) - - persist_and_reuse_container:bool = True - - full_image_path: str = Field( - default="registry.hub.docker.com/splunk/splunk:latest", - title="Full path to the container image to be used", - ) - infrastructures: list[Infrastructure] = [] - - - @validator("infrastructure_type") - def validate_infrastructure_type(cls, v, values): - if v == DetectionTestingTargetInfrastructure.server: - # No need to validate that the docker client is available - return v - elif v == DetectionTestingTargetInfrastructure.container: - # we need to make sure we can actually get the docker client from the environment - try: - docker.client.from_env() - except Exception as e: - raise ( - Exception( - f"Error, failed to get docker client. Is Docker Installed and running " - f"and are docker environment variables set properly? Error:\n\t{str(e)}" - ) - ) - return v - - - - - @validator("full_image_path") - def validate_full_image_path(cls, v, values): - if ( - values.get("infrastructure_type", None) - == DetectionTestingTargetInfrastructure.server.value - ): - print( - f"No need to validate target image path {v}, testing target is preconfigured server" - ) - return v - # This behavior may change if we start supporting local/offline containers and - # the logic to build them - if ":" not in v: - raise ( - ValueError( - f"Error, the image_name {v} does not include a tag. A tagged container MUST be included to ensure consistency when testing" - ) - ) - - # Check to make sure we have the latest version of the image - # We have this as a wrapped, nested try/except because if we - # encounter some error in trying to get the latest version, but - # we do have some version already, we will allow the test to continue. - # For example, this may occur if an image has been previously downloaded, - # but the server no longer has internet connectivity and can't get the - # image again. in this case, don't fail - continue with the test - try: - try: - # connectivity to docker server is validated previously - client = docker.from_env() - print( - f"Getting the latest version of the container image: {v}...", - end="", - flush=True, - ) - client.images.pull(v, platform="linux/amd64") - print("done") - except docker.errors.APIError as e: - print("error") - if e.is_client_error(): - if "invalid reference format" in str(e.explanation): - simple_explanation = f"The format of the docker image reference is incorrect. Please use a valid image reference" - else: - simple_explanation = ( - f"The most likely cause of this error is that the image/tag " - "does not exist or it is stored in a private repository and you are not logged in." - ) - - elif e.is_server_error(): - simple_explanation = ( - f"The mostly likely cause is that the server cannot be reached. " - "Please ensure that the server hosting your docker image is available " - "and you have internet access, if required." - ) - - else: - simple_explanation = f"Unable to pull image {v} for UNKNOWN reason. Please consult the detailed error below." - - verbose_explanation = e.explanation - - raise ( - ValueError( - f"Error Pulling Docker Image '{v}'\n - EXPLANATION: {simple_explanation} (full error text: '{verbose_explanation}'" - ) - ) - except Exception as e: - print("error") - raise (ValueError(f"Uknown error pulling Docker Image '{v}': {str(e)}")) - - except Exception as e: - # There was some exception that prevented us from getting the latest version - # of the image. However, if we already have it, use the current version and - # down fully raise the exception - just use it - client = docker.from_env() - try: - client.api.inspect_image(v) - print(e) - print( - f"We will default to using the version of the image {v} which has " - "already been downloaded to this machine. Please note that it may be out of date." - ) - - except Exception as e2: - raise ( - ValueError( - f"{str(e)}Image is not previously cached, so we could not use an old version." - ) - ) - - return v - - @validator("infrastructures", always=True) - def validate_infrastructures(cls, v, values): - MAX_RECOMMENDED_CONTAINERS_BEFORE_WARNING = 2 - if values.get("infrastructure_type",None) == DetectionTestingTargetInfrastructure.container and len(v) == 0: - v = [Infrastructure()] - - if len(v) < 1: - #print("Fix number of infrastructure validation later") - return v - raise ( - ValueError( - f"Error validating infrastructures. Test must be run with AT LEAST 1 infrastructure, not {len(v)}" - ) - ) - if (values.get("infrastructure_type", None) == DetectionTestingTargetInfrastructure.container.value) and len(v) > MAX_RECOMMENDED_CONTAINERS_BEFORE_WARNING: - print( - f"You requested to run with [{v}] containers which may use a very large amount of resources " - "as they all run in parallel. The maximum suggested number of parallel containers is " - f"[{MAX_RECOMMENDED_CONTAINERS_BEFORE_WARNING}]. We will do what you asked, but be warned!" - ) - return v - - - @validator("infrastructures", each_item=False) - def validate_ports_overlap(cls, v, values): - ports = set() - if values.get("infrastructure_type", None) == DetectionTestingTargetInfrastructure.server.value: - #ports are allowed to overlap, they are on different servers - return v - - if len(v) == 0: - raise ValueError("Error, there must be at least one test infrastructure defined in infrastructures.") - for infrastructure in v: - for k in ["hec_port", "web_ui_port", "api_port"]: - if getattr(infrastructure, k) in ports: - raise ValueError(f"Port {getattr(infrastructure, k)} used more than once in container infrastructure ports") - ports.add(getattr(infrastructure, k)) - return v - -class VersionControlConfig(BaseModel, extra=Extra.forbid, validate_assignment=True): - repo_path: str = Field(default=".", title="Path to the root of your app") - repo_url: str = Field( - default="https://github.com/your_organization/your_repo", - title="HTTP(s) path to the repo for repo_path. If this field is blank, it will be inferred from the repo", - ) - target_branch: str = Field(default="main", title="Main branch of the repo or target of a Pull Request/Merge Request.") - test_branch: str = Field(default="main", title="Branch of the repo to be tested, if applicable.") - commit_hash: Union[str,None] = Field(default=None, title="Commit hash of the repo state to be tested, if applicable") - pr_number: Union[int,None] = Field(default=None, title="The number of the PR to test") - - @validator('repo_path') - def validate_repo_path(cls,v): - print(f"checking repo path '{v}'") - try: - path = pathlib.Path(v) - except Exception as e: - - raise(ValueError(f"Error, the provided path is is not a valid path: '{v}'")) - - try: - r = git.Repo(path) - except Exception as e: - - raise(ValueError(f"Error, the provided path is not a valid git repo: '{path}'")) - - try: - - if ALWAYS_PULL_REPO: - r.remotes.origin.pull() - except Exception as e: - raise ValueError(f"Error pulling git repository {v}: {str(e)}") - print("repo path looks good") - return v - - @validator('repo_url') - def validate_repo_url(cls, v, values): - #First try to get the value from the repo - try: - remotes = git.Repo(values['repo_path']).remotes - except Exception as e: - raise ValueError(f"Error - repo at {values['repo_path']} has no remotes. Repo must be tracked in a remote git repo.") - - try: - remote_url_from_repo = remotes.origin.url - except Exception as e: - raise(ValueError(f"Error reading remote_url from the repo located at '{values['repo_path']}'")) - - if v is not None and remote_url_from_repo != v: - raise(ValueError(f"The url of the remote repo supplied in the config file {v} does not "\ - f"match the value read from the repository at {values['repo_path']}, {remote_url_from_repo}")) - - if v is None: - v = remote_url_from_repo - - #Ensure that the url is the proper format - # try: - # if bool(validators.url(v)) == False: - # raise(Exception) - # except: - # raise(ValueError(f"Error validating the repo_url. The url is not valid: {v}")) - - return v - - @validator('target_branch') - def valid_target_branch(cls, v, values): - if v is None: - print(f"target_branch is not supplied. Inferring from '{values['repo_path']}'...",end='') - - target_branch = Utils.get_default_branch_name(values['repo_path'], values['repo_url']) - print(f"target_branch name '{target_branch}' inferred'") - #continue with the validation - v = target_branch - - try: - Utils.validate_git_branch_name(values['repo_path'],values['repo_url'], v) - except Exception as e: - raise ValueError(f"Error validating target_branch: {str(e)}") - return v - - @validator('test_branch') - def validate_test_branch(cls, v, values): - if v is None: - print(f"No test_branch provided, so we will default to using the target_branch '{values['target_branch']}'") - v = values['target_branch'] - try: - Utils.validate_git_branch_name(values['repo_path'],values['repo_url'], v) - except Exception as e: - raise ValueError(f"Error validating test_branch: {str(e)}") - - r = git.Repo(values.get("repo_path")) - try: - if r.active_branch.name != v: - print(f"We are trying to test {v} but the current active branch is {r.active_branch}") - print(f"Checking out {v}") - r.git.checkout(v) - except Exception as e: - raise ValueError(f"Error checking out test_branch '{v}': {str(e)}") - return v - - @validator('commit_hash') - def validate_commit_hash(cls, v, values): - try: - #We can a hash with this function too - Utils.validate_git_hash(values['repo_path'],values['repo_url'], v, values['test_branch']) - except Exception as e: - raise ValueError(f"Error validating commit_hash '{v}': {str(e)}") - return v - - @validator('pr_number') - def validate_pr_number(cls, v, values): - if v == None: - return v - - hash = Utils.validate_git_pull_request(values['repo_path'], v) - - #Ensure that the hash is equal to the one in the config file, if it exists. - if values['commit_hash'] is None: - values['commit_hash'] = hash - else: - if values['commit_hash'] != hash: - raise(ValueError(f"commit_hash specified in configuration was {values['commit_hash']}, but commit_hash"\ - f" from pr_number {v} was {hash}. These must match. If you're testing"\ - " a PR, you probably do NOT want to provide the commit_hash in the configuration file "\ - "and always want to test the head of the PR. This will be done automatically if you do "\ - "not provide the commit_hash.")) - - return v - - -class TestConfig(BaseModel, extra=Extra.forbid, validate_assignment=True): - - version_control_config: Union[VersionControlConfig,None] = VersionControlConfig() - - infrastructure_config: InfrastructureConfig = Field( - default=InfrastructureConfig(), - title=f"The infrastructure for testing to be run on", - ) - - - post_test_behavior: PostTestBehavior = Field( - default=PostTestBehavior.pause_on_failure, - title=f"What to do after a test has completed. Choose one of {PostTestBehavior._member_names_}", - ) - mode: DetectionTestingMode = Field( - default=DetectionTestingMode.all, - title=f"Control which detections should be tested. Choose one of {DetectionTestingMode._member_names_}", - ) - detections_list: Union[list[str], None] = Field( - default=None, title="List of paths to detections which should be tested" - ) - - - splunkbase_username: Union[str, None] = Field( - default=None, - title="The username for logging into Splunkbase in case apps must be downloaded", - ) - splunkbase_password: Union[str, None] = Field( - default=None, - title="The password for logging into Splunkbase in case apps must be downloaded", - ) - apps: list[App] = Field( - default=App.get_default_apps(), - title="A list of all the apps to be installed on each container", - ) - enable_integration_testing: bool = Field( - default=False, - title="Whether integration testing should be enabled, in addition to unit testing (requires a configured Splunk" - " instance with ES installed)" - ) - - - - - - - - - - # Ensure that at least 1 of test_branch, commit_hash, and/or pr_number were passed. - # Otherwise, what are we testing?? - # @root_validator(pre=False) - def ensure_there_is_something_to_test(cls, values): - if 'test_branch' not in values and 'commit_hash' not in values and'pr_number' not in values: - if 'mode' in values and values['mode'] == DetectionTestingMode.changes: - raise(ValueError(f"Under mode [{DetectionTestingMode.changes}], 'test_branch', 'commit_hash', and/or 'pr_number' must be defined so that we know what to test.")) - - return values - - - - # presumably the post test behavior is validated by the enum? - # presumably the mode is validated by the enum? - - @validator("detections_list", always=True) - def validate_detections_list(cls, v, values): - # A detections list can only be provided if the mode is selected - # otherwise, we must throw an error - - # First check the mode - if values["mode"] != DetectionTestingMode.selected: - if v is not None: - # We intentionally raise an error even if the list is an empty list - raise ( - ValueError( - f"For Detection Testing Mode '{values['mode']}', " - f"'detections_list' MUST be none. Instead, it was a list containing {len(v)} detections." - ) - ) - return v - - # Mode is DetectionTestingMode.selected - verify the paths of all the detections - all_errors = [] - if v == None: - raise ( - ValueError( - f"mode is '{DetectionTestingMode.selected}', but detections_list was not provided." - ) - ) - for detection in v: - try: - if not pathlib.Path(detection).exists(): - all_errors.append(detection) - except Exception as e: - all_errors.append( - f"Unexpected error validating path '{detection}': {str(e)}" - ) - if len(all_errors): - joined_errors = "\n\t".join(all_errors) - raise ( - ValueError( - f"Paths to the following detections in 'detections_list' " - f"were invalid: \n\t{joined_errors}" - ) - ) - - return v - - - - - - - - @validator("splunkbase_username") - def validate_splunkbase_username(cls, v): - return v - - @validator("splunkbase_password") - def validate_splunkbase_password(cls, v, values): - if values["splunkbase_username"] == None: - return v - elif (v == None and values["splunkbase_username"] != None) or ( - v != None and values["splunkbase_username"] == None - ): - raise ( - ValueError( - "splunkbase_username OR splunkbase_password " - "was provided, but not both. You must provide" - " neither of these value or both, but not just " - "1 of them" - ) - ) - - else: - return v - - @validator("apps",) - def validate_apps(cls, v, values): - - - app_errors = [] - - # ensure that the splunkbase username and password are provided - username = values["splunkbase_username"] - password = values["splunkbase_password"] - app_directory = LOCAL_APP_DIR - try: - os.makedirs(LOCAL_APP_DIR, exist_ok=True) - except Exception as e: - raise ( - Exception(f"Error: When trying to create {CONTAINER_APP_DIR}: {str(e)}") - ) - - for app in v: - if app.environment_path != ENVIRONMENT_PATH_NOT_SET: - #Avoid re-configuring the apps that have already been configured. - continue - - try: - app.configure_app_source_for_container( - username, password, app_directory, CONTAINER_APP_DIR - ) - except Exception as e: - error_string = f"Unable to prepare app '{app.title}': {str(e)}" - app_errors.append(error_string) - - if len(app_errors) != 0: - error_string = "\n\t".join(app_errors) - raise (ValueError(f"Error preparing apps to install:\n\t{error_string}")) - - return v - - \ No newline at end of file diff --git a/contentctl/objects/unit_test.py b/contentctl/objects/unit_test.py index 8d195233..93520f7a 100644 --- a/contentctl/objects/unit_test.py +++ b/contentctl/objects/unit_test.py @@ -1,7 +1,5 @@ from __future__ import annotations - -from typing import Optional -from pydantic import BaseModel, Field +from pydantic import Field from typing import TYPE_CHECKING if TYPE_CHECKING: from contentctl.objects.unit_test_attack_data import UnitTestAttackData diff --git a/contentctl/objects/unit_test_baseline.py b/contentctl/objects/unit_test_baseline.py index a3573772..9ba49336 100644 --- a/contentctl/objects/unit_test_baseline.py +++ b/contentctl/objects/unit_test_baseline.py @@ -1,6 +1,6 @@ -from pydantic import BaseModel, validator, ValidationError +from pydantic import BaseModel from typing import Union class UnitTestBaseline(BaseModel): diff --git a/contentctl/objects/unit_test_result.py b/contentctl/objects/unit_test_result.py index 40924790..8c40da10 100644 --- a/contentctl/objects/unit_test_result.py +++ b/contentctl/objects/unit_test_result.py @@ -1,10 +1,12 @@ -from typing import Union +from __future__ import annotations +from typing import Union,TYPE_CHECKING from splunklib.data import Record - -from contentctl.objects.test_config import Infrastructure from contentctl.objects.base_test_result import BaseTestResult, TestResultStatus +if TYPE_CHECKING: + from contentctl.objects.config import Infrastructure + FORCE_TEST_FAILURE_FOR_MISSING_OBSERVABLE = False NO_SID = "Testing Failed, NO Search ID" diff --git a/contentctl/objects/unit_test_ssa.py b/contentctl/objects/unit_test_ssa.py index 6cd049ef..150b9efe 100644 --- a/contentctl/objects/unit_test_ssa.py +++ b/contentctl/objects/unit_test_ssa.py @@ -1,25 +1,7 @@ from __future__ import annotations - from typing import Optional -from pydantic import BaseModel, Field, HttpUrl, FilePath -import pathlib -from typing import TYPE_CHECKING -if TYPE_CHECKING: - from contentctl.objects.unit_test_attack_data import UnitTestAttackData - from contentctl.objects.unit_test_result import UnitTestResult - -from typing import Union - -from pydantic import Field, field_serializer - -# from contentctl.objects.security_content_object import SecurityContentObject -# from contentctl.objects.enums import SecurityContentType -from contentctl.objects.unit_test_baseline import UnitTestBaseline -from contentctl.objects.unit_test_attack_data import UnitTestAttackData -from contentctl.objects.unit_test_result import UnitTestResult -from contentctl.objects.base_test import BaseTest, TestType -from contentctl.objects.base_test_result import TestResultStatus - +from pydantic import BaseModel, Field +from pydantic import Field class UnitTestAttackDataSSA(BaseModel): diff --git a/contentctl/output/attack_nav_output.py b/contentctl/output/attack_nav_output.py index 1ff07183..e6c3e35b 100644 --- a/contentctl/output/attack_nav_output.py +++ b/contentctl/output/attack_nav_output.py @@ -1,14 +1,25 @@ import os +from typing import List,Union +import pathlib - -from contentctl.objects.enums import SecurityContentType +from contentctl.objects.detection import Detection from contentctl.output.attack_nav_writer import AttackNavWriter class AttackNavOutput(): - def writeObjects(self, objects: list, output_path: str, type: SecurityContentType = None) -> None: - techniques = dict() + def writeObjects(self, detections: List[Detection], output_path: pathlib.Path) -> None: + techniques:dict[str,dict[str,Union[List[str],int]]] = {} + for detection in detections: + for tactic in detection.tags.mitre_attack_id: + if tactic not in techniques: + techniques[tactic] = {'score':0,'file_paths':[]} + + detection_url = f"https://github.com/splunk/security_content/blob/develop/detections/{detection.source}/{detection.file_path.name}" + techniques[tactic]['score'] += 1 + techniques[tactic]['file_paths'].append(detection_url) + + ''' for detection in objects: if detection.tags.mitre_attack_enrichments: for mitre_attack_enrichment in detection.tags.mitre_attack_enrichments: @@ -20,9 +31,9 @@ def writeObjects(self, objects: list, output_path: str, type: SecurityContentTyp else: techniques[mitre_attack_enrichment.mitre_attack_id]['score'] = techniques[mitre_attack_enrichment.mitre_attack_id]['score'] + 1 techniques[mitre_attack_enrichment.mitre_attack_id]['file_paths'].append('https://github.com/splunk/security_content/blob/develop/detections/' + detection.getSource() + '/' + self.convertNameToFileName(detection.name)) - - AttackNavWriter.writeAttackNavFile(techniques, os.path.join(output_path, 'coverage.json')) - + ''' + AttackNavWriter.writeAttackNavFile(techniques, output_path / 'coverage.json') + def convertNameToFileName(self, name: str): file_name = name \ diff --git a/contentctl/output/attack_nav_writer.py b/contentctl/output/attack_nav_writer.py index 6ea6859f..78e8c514 100644 --- a/contentctl/output/attack_nav_writer.py +++ b/contentctl/output/attack_nav_writer.py @@ -1,7 +1,7 @@ import json - - +from typing import Union, List +import pathlib VERSION = "4.3" NAME = "Detection Coverage" DESCRIPTION = "security_content detection coverage" @@ -11,7 +11,7 @@ class AttackNavWriter(): @staticmethod - def writeAttackNavFile(mitre_techniques : dict, output_path : str) -> None: + def writeAttackNavFile(mitre_techniques : dict[str,dict[str,Union[List[str],int]]], output_path : pathlib.Path) -> None: max_count = 0 for technique_id in mitre_techniques.keys(): if mitre_techniques[technique_id]['score'] > max_count: diff --git a/contentctl/output/jinja_writer.py b/contentctl/output/jinja_writer.py index 97d41c2c..05690ea8 100644 --- a/contentctl/output/jinja_writer.py +++ b/contentctl/output/jinja_writer.py @@ -1,5 +1,5 @@ import os - +from typing import Any from jinja2 import Environment, FileSystemLoader @@ -20,7 +20,7 @@ def writeObjectsList(template_name : str, output_path : str, objects : list) -> @staticmethod - def writeObject(template_name : str, output_path : str, object : dict) -> None: + def writeObject(template_name : str, output_path : str, object: dict[str,Any]) -> None: j2_env = Environment( loader=FileSystemLoader(os.path.join(os.path.dirname(__file__), 'templates')), diff --git a/contentctl/output/svg_output.py b/contentctl/output/svg_output.py index a832a0fb..d454ccb2 100644 --- a/contentctl/output/svg_output.py +++ b/contentctl/output/svg_output.py @@ -1,15 +1,18 @@ import os import pathlib +from typing import List, Any from contentctl.objects.enums import SecurityContentType from contentctl.output.jinja_writer import JinjaWriter -from contentctl.objects.config import Config from contentctl.objects.enums import DetectionStatus +from contentctl.objects.detection import Detection class SvgOutput(): - def get_badge_dict(self, name:str, total_detections:list, these_detections:list): - obj = dict() + + def get_badge_dict(self, name:str, total_detections:List[Detection], these_detections:List[Detection])->dict[str,Any]: + obj:dict[str,Any] = {} obj['name'] = name + if name == "Production": obj['color'] = "Green" elif name == "Detections": @@ -26,40 +29,27 @@ def get_badge_dict(self, name:str, total_detections:list, these_detections:list) obj['coverage'] = len(these_detections) / obj['count'] obj['coverage'] = "{:.0%}".format(obj['coverage']) return obj - - def writeObjects(self, objects: list, path: str, type: SecurityContentType = None) -> None: + + def writeObjects(self, detections: List[Detection], output_path: pathlib.Path, type: SecurityContentType = None) -> None: - detections_tmp = objects - - output_path = pathlib.Path(path) - - production_detections = [] - deprecated_detections = [] - experimental_detections = [] - obj = dict() - for detection in detections_tmp: - if detection.status == DetectionStatus.production.value: - production_detections.append(detection) - if detection.status == DetectionStatus.deprecated.value: - deprecated_detections.append(detection) - elif detection.status == DetectionStatus.experimental.value: - experimental_detections.append(detection) + total_dict:dict[str,Any] = self.get_badge_dict("Detections", detections, detections) + production_dict:dict[str,Any] = self.get_badge_dict("% Production", detections, [detection for detection in detections if detection.status == DetectionStatus.production.value]) + #deprecated_dict = self.get_badge_dict("Deprecated", detections, [detection for detection in detections if detection.status == DetectionStatus.deprecated]) + #experimental_dict = self.get_badge_dict("Experimental", detections, [detection for detection in detections if detection.status == DetectionStatus.experimental]) - total_detections = production_detections + deprecated_detections + experimental_detections - total_dict = self.get_badge_dict("Detections", total_detections, production_detections) - production_dict = self.get_badge_dict("Production", total_detections, production_detections) - deprecated_dict = self.get_badge_dict("Deprecated", total_detections, deprecated_detections) - experimental_dict = self.get_badge_dict("Experimental", total_detections, experimental_detections) - JinjaWriter.writeObject('detection_count.j2', os.path.join(output_path, 'detection_count.svg'), total_dict) + + #Total number of detections + JinjaWriter.writeObject('detection_count.j2', output_path /'detection_count.svg', total_dict) #JinjaWriter.writeObject('detection_count.j2', os.path.join(output_path, 'production_count.svg'), production_dict) #JinjaWriter.writeObject('detection_count.j2', os.path.join(output_path, 'deprecated_count.svg'), deprecated_dict) #JinjaWriter.writeObject('detection_count.j2', os.path.join(output_path, 'experimental_count.svg'), experimental_dict) - JinjaWriter.writeObject('detection_coverage.j2', os.path.join(output_path, 'detection_coverage.svg'), total_dict) + #Percentage of detections that are production + JinjaWriter.writeObject('detection_coverage.j2', output_path/'detection_coverage.svg', production_dict) #JinjaWriter.writeObject('detection_coverage.j2', os.path.join(output_path, 'detection_coverage.svg'), deprecated_dict) #JinjaWriter.writeObject('detection_coverage.j2', os.path.join(output_path, 'detection_coverage.svg'), experimental_dict) diff --git a/contentctl/output/templates/detection_coverage.j2 b/contentctl/output/templates/detection_coverage.j2 index d9dca89f..c9c28fd4 100644 --- a/contentctl/output/templates/detection_coverage.j2 +++ b/contentctl/output/templates/detection_coverage.j2 @@ -1,18 +1,16 @@ - + - - - - + + - - coverage - {{ object.coverage }} + + % Production + {{object.coverage}} diff --git a/contentctl/output/yml_writer.py b/contentctl/output/yml_writer.py index 09d8e311..6ceb02a3 100644 --- a/contentctl/output/yml_writer.py +++ b/contentctl/output/yml_writer.py @@ -1,11 +1,11 @@ import yaml - +from typing import Any class YmlWriter: @staticmethod - def writeYmlFile(file_path : str, obj : dict) -> None: + def writeYmlFile(file_path : str, obj : dict[Any,Any]) -> None: with open(file_path, 'w') as outfile: yaml.safe_dump(obj, outfile, default_flow_style=False, sort_keys=False) \ No newline at end of file diff --git a/pyproject.toml b/pyproject.toml index b2b2cd03..fd35d90c 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,6 +1,6 @@ [tool.poetry] name = "contentctl" -version = "3.6.0" +version = "4.0.0" description = "Splunk Content Control Tool" authors = ["STRT "] license = "Apache 2.0"