Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ class DetectionTestingManagerOutputDto:
start_time: Union[datetime.datetime, None] = None
replay_index: str = "CONTENTCTL_TESTING_INDEX"
replay_host: str = "CONTENTCTL_HOST"
timeout_seconds: int = 15
timeout_seconds: int = 60
terminate: bool = False


Expand Down
42 changes: 0 additions & 42 deletions contentctl/actions/inspect.py

This file was deleted.

20 changes: 2 additions & 18 deletions contentctl/contentctl.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@
from contentctl.actions.new_content import NewContentInputDto, NewContent
from contentctl.actions.doc_gen import DocGenInputDto, DocGen
from contentctl.actions.initialize import Initialize, InitializeInputDto
from contentctl.actions.inspect import InspectInputDto, Inspect
from contentctl.actions.api_deploy import API_Deploy, API_DeployInputDto

from contentctl.input.director import DirectorInputDto
Expand Down Expand Up @@ -116,14 +115,6 @@ def build(args, config:Union[Config,None]=None) -> DirectorOutputDto:
return generate.execute(generate_input_dto)


def inspect(args) -> None:
config=start(args)
app_path = pathlib.Path(config.build.path_root)/f"{config.build.name}.tar.gz"
input_dto = InspectInputDto(path=app_path)
i = Inspect()
i.execute(input_dto=input_dto)


def api_deploy(args) -> None:
config = start(args)
deploy_input_dto = API_DeployInputDto(path=pathlib.Path(args.path), config=config)
Expand Down Expand Up @@ -170,6 +161,7 @@ def test(args: argparse.Namespace):
local_path=str(pathlib.Path(config.build.path_root)/f"{config.build.name}.tar.gz"),
description=config.build.description,
splunkbase_path=None,
force_local=True
)

# We need to do this instead of appending to retrigger validation.
Expand Down Expand Up @@ -338,15 +330,7 @@ def main():

reporting_parser.set_defaults(func=reporting)

inspect_parser.add_argument(
"-ap",
"--app_path",
required=False,
type=str,
default=None,
help="path to the Splunk app to be inspected",
)
inspect_parser.set_defaults(func=inspect)


api_deploy_parser.set_defaults(func=api_deploy)

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,157 @@
import uuid
import string
import requests
import time
import sys

from pydantic import BaseModel, validator, root_validator, Extra
from dataclasses import dataclass
from typing import Union
from datetime import datetime, timedelta


from contentctl.objects.security_content_object import SecurityContentObject
from contentctl.objects.enums import AnalyticsType
from contentctl.objects.enums import DataModel
from contentctl.objects.enums import DetectionStatus
from contentctl.objects.detection_tags import DetectionTags
from contentctl.objects.config import ConfigDetectionConfiguration
from contentctl.objects.unit_test import UnitTest
from contentctl.objects.macro import Macro
from contentctl.objects.lookup import Lookup
from contentctl.objects.baseline import Baseline
from contentctl.objects.playbook import Playbook
from contentctl.helper.link_validator import LinkValidator
from contentctl.objects.enums import SecurityContentType


class Detection_Abstract(SecurityContentObject):
contentType: SecurityContentType = SecurityContentType.detections
type: str
status: DetectionStatus
data_source: list[str]
search: Union[str, dict]
how_to_implement: str
known_false_positives: str
check_references: bool = False
references: list
tags: DetectionTags
tests: list[UnitTest] = []

# enrichments
datamodel: list = None
deprecated: bool = None
experimental: bool = None
deployment: ConfigDetectionConfiguration = None
annotations: dict = None
risk: list = None
playbooks: list[Playbook] = None
baselines: list[Baseline] = None
mappings: dict = None
macros: list[Macro] = None
lookups: list[Lookup] = None
cve_enrichment: list = None
splunk_app_enrichment: list = None
file_path: str = None
source: str = None
nes_fields: str = None
providing_technologies: list = None
runtime: str = None

class Config:
use_enum_values = True

@validator("type")
def type_valid(cls, v, values):
if v.lower() not in [el.name.lower() for el in AnalyticsType]:
raise ValueError("not valid analytics type: " + values["name"])
return v

@validator('how_to_implement')
def encode_error(cls, v, values, field):
return SecurityContentObject.free_text_field_valid(cls,v,values,field)

# @root_validator
# def search_validation(cls, values):
# if 'ssa_' not in values['file_path']:
# if not '_filter' in values['search']:
# raise ValueError('filter macro missing in: ' + values["name"])
# if any(x in values['search'] for x in ['eventtype=', 'sourcetype=', ' source=', 'index=']):
# if not 'index=_internal' in values['search']:
# raise ValueError('Use source macro instead of eventtype, sourcetype, source or index in detection: ' + values["name"])
# return values

# disable it because of performance reasons
# @validator('references')
# def references_check(cls, v, values):
# return LinkValidator.check_references(v, values["name"])
# return v


@validator("search")
def search_validate(cls, v, values):
# write search validator
return v

@validator("tests")
def tests_validate(cls, v, values):
if values.get("status","") != DetectionStatus.production and not v:
raise ValueError(
"tests value is needed for production detection: " + values["name"]
)
return v

@validator("experimental", always=True)
def experimental_validate(cls, v, values):
if DetectionStatus(values.get("status","")) == DetectionStatus.experimental:
return True
return False

@validator("deprecated", always=True)
def deprecated_validate(cls, v, values):
if DetectionStatus(values.get("status","")) == DetectionStatus.deprecated:
return True
return False

@validator("datamodel")
def datamodel_valid(cls, v, values):
for datamodel in v:
if datamodel not in [el.name for el in DataModel]:
raise ValueError("not valid data model: " + values["name"])
return v

def all_tests_successful(self) -> bool:
if len(self.tests) == 0:
return False
for test in self.tests:
if test.result is None or test.result.success == False:
return False
return True

def get_summary(
self,
detection_fields: list[str] = ["name", "search"],
test_model_fields: list[str] = ["success", "message"],
test_job_fields: list[str] = ["resultCount", "runDuration"],
) -> dict:
summary_dict = {}
for field in detection_fields:
summary_dict[field] = getattr(self, field)
summary_dict["success"] = self.all_tests_successful()
summary_dict["tests"] = []
for test in self.tests:
result: dict[str, Union[str, bool]] = {"name": test.name}
if test.result is not None:
result.update(
test.result.get_summary_dict(
model_fields=test_model_fields,
job_fields=test_job_fields,
)
)
else:
result["success"] = False
result["message"] = "RESULT WAS NONE"

summary_dict["tests"].append(result)

return summary_dict
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
import abc
import string
import uuid
from datetime import datetime
from pydantic import BaseModel, validator, ValidationError
from contentctl.objects.enums import SecurityContentType


class SecurityContentObject_Abstract(BaseModel, abc.ABC):
contentType: SecurityContentType
name: str
author: str = "UNKNOWN_AUTHOR"
date: str = "1990-01-01"
version: int = 99999
id: str = None
description: str = "UNKNOWN_DESCRIPTION"

@validator('name')
def name_max_length(cls, v):
if len(v) > 67:
print("LENGTH ERROR!")
raise ValueError('name is longer then 67 chars: ' + v)
return v

@validator('name')
def name_invalid_chars(cls, v):
invalidChars = set(string.punctuation.replace("-", ""))
if any(char in invalidChars for char in v):
raise ValueError('invalid chars used in name: ' + v)
return v

@validator('id',always=True)
def id_check(cls, v, values):
try:
uuid.UUID(str(v))
except:
#print(f"Generating missing uuid for {values['name']}")
return str(uuid.uuid4())
raise ValueError('uuid is not valid: ' + values["name"])
return v

@validator('date')
def date_valid(cls, v, values):
try:
datetime.strptime(v, "%Y-%m-%d")
except:
raise ValueError('date is not in format YYYY-MM-DD: ' + values["name"])
return v

@staticmethod
def free_text_field_valid(input_cls, v, values, field):
try:
v.encode('ascii')
except UnicodeEncodeError:
raise ValueError('encoding error in ' + field.name + ': ' + values["name"])
return v

@validator('description')
def description_valid(cls, v, values, field):
return SecurityContentObject_Abstract.free_text_field_valid(cls,v,values,field)
3 changes: 2 additions & 1 deletion contentctl/objects/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ class App(BaseModel, extra=Extra.forbid):
# This will be set via a function call and should not be provided in the YML
# Note that this is the path relative to the container mount
environment_path: str = ENVIRONMENT_PATH_NOT_SET
force_local:bool = False

def configure_app_source_for_container(
self,
Expand All @@ -57,7 +58,7 @@ def configure_app_source_for_container(
splunkbase_username is not None and splunkbase_password is not None
)

if splunkbase_creds_provided and self.splunkbase_path is not None:
if splunkbase_creds_provided and self.splunkbase_path is not None and not self.force_local:
self.environment_path = self.splunkbase_path

elif self.local_path is not None:
Expand Down
2 changes: 1 addition & 1 deletion contentctl/objects/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@
"File Name": 7,
"File Hash": 8,
"Process Name": 9,
"Ressource UID": 10,
"Resource UID": 10,
"Endpoint": 20,
"User": 21,
"Email": 22,
Expand Down
Loading