Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
41 changes: 40 additions & 1 deletion contentctl/actions/validate.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,14 @@

import pathlib

from contentctl.input.director import Director, DirectorOutputDto
from contentctl.objects.config import validate
from contentctl.enrichments.attack_enrichment import AttackEnrichment
from contentctl.enrichments.cve_enrichment import CveEnrichment
from contentctl.objects.atomic import AtomicTest
from contentctl.helper.utils import Utils
from contentctl.objects.data_source import DataSource
from contentctl.helper.splunk_app import SplunkApp


class Validate:
Expand Down Expand Up @@ -33,6 +36,9 @@ def execute(self, input_dto: validate) -> DirectorOutputDto:
director = Director(director_output_dto)
director.execute(input_dto)
self.ensure_no_orphaned_files_in_lookups(input_dto.path, director_output_dto)
if input_dto.data_source_TA_validation:
self.validate_latest_TA_information(director_output_dto.data_sources)

return director_output_dto


Expand Down Expand Up @@ -72,4 +78,37 @@ def ensure_no_orphaned_files_in_lookups(self, repo_path:pathlib.Path, director_o
if len(unusedLookupFiles) > 0:
raise Exception(f"The following .csv or .mlmodel files exist in '{lookupsDirectory}', but are not referenced by a lookup file: {[str(path) for path in unusedLookupFiles]}")
return



def validate_latest_TA_information(self, data_sources: list[DataSource]) -> None:
validated_TAs: list[tuple[str, str]] = []
errors:list[str] = []
print("----------------------")
print("Validating latest TA:")
print("----------------------")
for data_source in data_sources:
for supported_TA in data_source.supported_TA:
ta_identifier = (supported_TA.name, supported_TA.version)
if ta_identifier in validated_TAs:
continue
if supported_TA.url is not None:
validated_TAs.append(ta_identifier)
uid = int(str(supported_TA.url).rstrip('/').split("/")[-1])
try:
splunk_app = SplunkApp(app_uid=uid)
if splunk_app.latest_version != supported_TA.version:
errors.append(f"Version mismatch in '{data_source.file_path}' supported TA '{supported_TA.name}'"
f"\n Latest version on Splunkbase : {splunk_app.latest_version}"
f"\n Version specified in data source: {supported_TA.version}")
except Exception as e:
errors.append(f"Error processing checking version of TA {supported_TA.name}: {str(e)}")

if len(errors) > 0:
errorString = '\n\n'.join(errors)
raise Exception(f"[{len(errors)}] or more TA versions are out of date or have other errors."
f"Please update the following data sources with the latest versions of "
f"their supported tas:\n\n{errorString}")
print("All TA versions are up to date.")



263 changes: 263 additions & 0 deletions contentctl/helper/splunk_app.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,263 @@
import os
import time
import json
import xml.etree.ElementTree as ET
from typing import List, Tuple, Optional
from urllib.parse import urlencode

import requests
import urllib3
import xmltodict
from requests.adapters import HTTPAdapter
from requests.packages.urllib3.util.retry import Retry

urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)

MAX_RETRY = 3

class APIEndPoint:
"""
Class which contains Static Endpoint
"""

SPLUNK_BASE_AUTH_URL = "https://splunkbase.splunk.com/api/account:login/"
SPLUNK_BASE_FETCH_APP_BY_ENTRY_ID = (
"https://apps.splunk.com/api/apps/entriesbyid/{app_name_id}"
)
SPLUNK_BASE_GET_UID_REDIRECT = "https://apps.splunk.com/apps/id/{app_name_id}"
SPLUNK_BASE_APP_INFO = "https://splunkbase.splunk.com/api/v1/app/{app_uid}"

class RetryConstant:
"""
Class which contains Retry Constant
"""

RETRY_COUNT = 3
RETRY_INTERVAL = 15


class SplunkBaseError(requests.HTTPError):
"""An error raise in communicating with Splunkbase"""
pass


# TODO (PEX-306): validate w/ Splunkbase team if there are better APIs we can rely on being supported
class SplunkApp:
"""
A Splunk app available for download on Splunkbase
"""

class InitializationError(Exception):
"""An initialization error during SplunkApp setup"""
pass

@staticmethod
def requests_retry_session(
retries=RetryConstant.RETRY_COUNT,
backoff_factor=1,
status_forcelist=(500, 502, 503, 504),
session=None,
):
session = session or requests.Session()
retry = Retry(
total=retries,
read=retries,
connect=retries,
backoff_factor=backoff_factor,
status_forcelist=status_forcelist,
)
adapter = HTTPAdapter(max_retries=retry)
session.mount('http://', adapter)
session.mount('https://', adapter)
return session

def __init__(
self,
app_uid: Optional[int] = None,
app_name_id: Optional[str] = None,
manual_setup: bool = False,
) -> None:
if app_uid is None and app_name_id is None:
raise SplunkApp.InitializationError(
"Either app_uid (the numeric app UID e.g. 742) or app_name_id (the app name "
"idenitifier e.g. Splunk_TA_windows) must be provided"
)

# init or declare instance vars
self.app_uid: Optional[int] = app_uid
self.app_name_id: Optional[str] = app_name_id
self.manual_setup = manual_setup
self.app_title: str
self.latest_version: str
self.latest_version_download_url: str
self._app_info_cache: Optional[dict] = None

# set instance vars as needed; skip if manual setup was indicated
if not self.manual_setup:
self.set_app_name_id()
self.set_app_uid()
self.set_app_title()
self.set_latest_version_info()

def __eq__(self, __value: object) -> bool:
if isinstance(__value, SplunkApp):
return self.app_uid == __value.app_uid
return False

def __repr__(self) -> str:
return (
f"SplunkApp(app_name_id='{self.app_name_id}', app_uid={self.app_uid}, "
f"latest_version_download_url='{self.latest_version_download_url}')"
)

def __str__(self) -> str:
return f"<'{self.app_name_id}' ({self.app_uid})"

def get_app_info_by_uid(self) -> dict:
"""
Retrieve app info via app_uid (e.g. 742)
:return: dictionary of app info
"""
# return cache if already set and raise and raise is app_uid is not set
if self._app_info_cache is not None:
return self._app_info_cache
elif self.app_uid is None:
raise SplunkApp.InitializationError("app_uid must be set in order to fetch app info")

# NOTE: auth not required
# Get app info by uid
try:
response = self.requests_retry_session().get(
APIEndPoint.SPLUNK_BASE_APP_INFO.format(app_uid=self.app_uid),
timeout=RetryConstant.RETRY_INTERVAL
)
response.raise_for_status()
except requests.exceptions.RequestException as e:
raise SplunkBaseError(f"Error fetching app info for app_uid {self.app_uid}: {str(e)}")

# parse JSON and set cache
self._app_info_cache: dict = json.loads(response.content)

return self._app_info_cache

def set_app_name_id(self) -> None:
"""
Set app_name_id
"""
# return if app_name_id is already set
if self.app_name_id is not None:
return

# get app info by app_uid
app_info = self.get_app_info_by_uid()

# set app_name_id if found
if "appid" in app_info:
self.app_name_id = app_info["appid"]
else:
raise SplunkBaseError(f"Invalid response from Splunkbase; missing key 'appid': {app_info}")

def set_app_uid(self) -> None:
"""
Set app_uid
"""
# return if app_uid is already set and raise if app_name_id was not set
if self.app_uid is not None:
return
elif self.app_name_id is None:
raise SplunkApp.InitializationError("app_name_id must be set in order to fetch app_uid")

# NOTE: auth not required
# Get app_uid by app_name_id via a redirect
try:
response = self.requests_retry_session().get(
APIEndPoint.SPLUNK_BASE_GET_UID_REDIRECT.format(app_name_id=self.app_name_id),
allow_redirects=False,
timeout=RetryConstant.RETRY_INTERVAL
)
response.raise_for_status()
except requests.exceptions.RequestException as e:
raise SplunkBaseError(f"Error fetching app_uid for app_name_id '{self.app_name_id}': {str(e)}")

# Extract the app_uid from the redirect path
if "Location" in response.headers:
self.app_uid = response.headers.split("/")[-1]
else:
raise SplunkBaseError(
"Invalid response from Splunkbase; missing 'Location' in redirect header"
)

def set_app_title(self) -> None:
"""
Set app_title
"""
# get app info by app_uid
app_info = self.get_app_info_by_uid()

# set app_title if found
if "title" in app_info:
self.app_title = app_info["title"]
else:
raise SplunkBaseError(f"Invalid response from Splunkbase; missing key 'title': {app_info}")

def __fetch_url_latest_version_info(self) -> str:
"""
Identify latest version of the app and return a URL pointing to download info for the build
:return: url for download info on the latest build
"""
# retrieve app entries using the app_name_id
try:
response = self.requests_retry_session().get(
APIEndPoint.SPLUNK_BASE_FETCH_APP_BY_ENTRY_ID.format(app_name_id=self.app_name_id),
timeout=RetryConstant.RETRY_INTERVAL
)
response.raise_for_status()
except requests.exceptions.RequestException as e:
raise SplunkBaseError(f"Error fetching app entries for app_name_id '{self.app_name_id}': {str(e)}")

# parse xml
app_xml = xmltodict.parse(response.content)

# convert to list if only one entry exists
app_entries = app_xml.get("feed").get("entry")
if not isinstance(app_entries, list):
app_entries = [app_entries]

# iterate over multiple entries if present
for entry in app_entries:
for key in entry.get("content").get("s:dict").get("s:key"):
if key.get("@name") == "islatest" and key.get("#text") == "True":
return entry.get("link").get("@href")

# raise if no entry was found
raise SplunkBaseError(f"No app entry found with 'islatest' tag set to True: {self.app_name_id}")

def __fetch_url_latest_version_download(self, info_url: str) -> str:
"""
Fetch the download URL via the provided URL to build info
:param info_url: URL for download info for the latest build
:return: URL for downloading the latest build
"""
# fetch download info
try:
response = self.requests_retry_session().get(info_url, timeout=RetryConstant.RETRY_INTERVAL)
response.raise_for_status()
except requests.exceptions.RequestException as e:
raise SplunkBaseError(f"Error fetching download info for app_name_id '{self.app_name_id}': {str(e)}")

# parse XML and extract download URL
build_xml = xmltodict.parse(response.content)
download_url = build_xml.get("feed").get("entry").get("link").get("@href")
return download_url

def set_latest_version_info(self) -> None:
# raise if app_name_id not set
if self.app_name_id is None:
raise SplunkApp.InitializationError("app_name_id must be set in order to fetch latest version info")

# fetch the info URL
info_url = self.__fetch_url_latest_version_info()

# parse out the version number and fetch the download URL
self.latest_version = info_url.split("/")[-1]
self.latest_version_download_url = self.__fetch_url_latest_version_download(info_url)
1 change: 1 addition & 0 deletions contentctl/objects/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -176,6 +176,7 @@ class validate(Config_Base):
build_app: bool = Field(default=True, description="Should an app be built and output in the build_path?")
build_api: bool = Field(default=False, description="Should api objects be built and output in the build_path?")
build_ssa: bool = Field(default=False, description="Should ssa objects be built and output in the build_path?")
data_source_TA_validation: bool = Field(default=False, description="Validate latest TA information from Splunkbase")

def getAtomicRedTeamRepoPath(self, atomic_red_team_repo_name:str = "atomic-red-team"):
return self.path/atomic_red_team_repo_name
Expand Down
9 changes: 7 additions & 2 deletions contentctl/objects/data_source.py
Original file line number Diff line number Diff line change
@@ -1,15 +1,20 @@
from __future__ import annotations
from typing import Optional, Any
from pydantic import Field, FilePath, model_serializer
from pydantic import Field, HttpUrl, model_serializer, BaseModel
from contentctl.objects.security_content_object import SecurityContentObject
from contentctl.objects.event_source import EventSource


class TA(BaseModel):
name: str
url: HttpUrl | None = None
version: str
class DataSource(SecurityContentObject):
source: str = Field(...)
sourcetype: str = Field(...)
separator: Optional[str] = None
configuration: Optional[str] = None
supported_TA: Optional[list] = None
supported_TA: list[TA] = []
fields: Optional[list] = None
field_mappings: Optional[list] = None
convert_to_log_source: Optional[list] = None
Expand Down
8 changes: 4 additions & 4 deletions contentctl/output/data_source_writer.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,10 @@ def writeDataSourceCsv(data_source_objects: List[DataSource], file_path: pathlib
])
# Write the data
for data_source in data_source_objects:
if data_source.supported_TA and isinstance(data_source.supported_TA, list) and len(data_source.supported_TA) > 0:
supported_TA_name = data_source.supported_TA[0].get('name', '')
supported_TA_version = data_source.supported_TA[0].get('version', '')
supported_TA_url = data_source.supported_TA[0].get('url', '')
if len(data_source.supported_TA) > 0:
supported_TA_name = data_source.supported_TA[0].name
supported_TA_version = data_source.supported_TA[0].version
supported_TA_url = data_source.supported_TA[0].url or ''
else:
supported_TA_name = ''
supported_TA_version = ''
Expand Down
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[tool.poetry]
name = "contentctl"
version = "4.2.2"
version = "4.2.3"
description = "Splunk Content Control Tool"
authors = ["STRT <research@splunk.com>"]
license = "Apache 2.0"
Expand Down