Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 5 additions & 2 deletions contentctl/objects/annotated_types.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
from pydantic import Field
from typing import Annotated

from pydantic import Field

CVE_TYPE = Annotated[str, Field(pattern=r"^CVE-[1|2]\d{3}-\d+$")]
MITRE_ATTACK_ID_TYPE = Annotated[str, Field(pattern=r"^T\d{4}(.\d{3})?$")]
MITRE_ATTACK_ID_TYPE_PARENT = Annotated[str, Field(pattern=r"^T\d{4}$")]
MITRE_ATTACK_ID_TYPE_SUBTYPE = Annotated[str, Field(pattern=r"^T\d{4}(.\d{3})$")]
MITRE_ATTACK_ID_TYPE = MITRE_ATTACK_ID_TYPE_PARENT | MITRE_ATTACK_ID_TYPE_SUBTYPE
APPID_TYPE = Annotated[str, Field(pattern="^[a-zA-Z0-9_-]+$")]
61 changes: 57 additions & 4 deletions contentctl/objects/detection_tags.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,10 @@
SecurityContentProductName,
SecurityDomain,
)
from contentctl.objects.mitre_attack_enrichment import MitreAttackEnrichment
from contentctl.objects.mitre_attack_enrichment import (
MitreAttackEnrichment,
MitreAttackGroup,
)


class DetectionTags(BaseModel):
Expand All @@ -44,7 +47,7 @@ class DetectionTags(BaseModel):
asset_type: AssetType = Field(...)
group: list[str] = []

mitre_attack_id: List[MITRE_ATTACK_ID_TYPE] = []
mitre_attack_id: list[MITRE_ATTACK_ID_TYPE] = []
nist: list[NistCategory] = []

product: list[SecurityContentProductName] = Field(..., min_length=1)
Expand All @@ -68,6 +71,15 @@ def kill_chain_phases(self) -> list[KillChainPhase]:
phases.add(phase)
return sorted(list(phases))

# We do not want this to be included in serialization. By default, @property
# objects are not included in dumps
@property
def unique_mitre_attack_groups(self) -> list[MitreAttackGroup]:
group_set: set[MitreAttackGroup] = set()
for enrichment in self.mitre_attack_enrichments:
group_set.update(set(enrichment.mitre_attack_group_objects))
return sorted(group_set, key=lambda k: k.group)

# enum is intentionally Cis18 even though field is named cis20 for legacy reasons
@computed_field
@property
Expand Down Expand Up @@ -134,8 +146,8 @@ def addAttackEnrichment(self, info: ValidationInfo):

if len(missing_tactics) > 0:
raise ValueError(f"Missing Mitre Attack IDs. {missing_tactics} not found.")
else:
self.mitre_attack_enrichments = mitre_enrichments

self.mitre_attack_enrichments = mitre_enrichments

return self

Expand All @@ -159,6 +171,44 @@ def addAttackEnrichments(cls, v:list[MitreAttackEnrichment], info:ValidationInfo
return enrichments
"""

@field_validator("mitre_attack_id", mode="after")
@classmethod
def sameTypeAndSubtypeNotPresent(
cls, techniques_and_subtechniques: list[MITRE_ATTACK_ID_TYPE]
) -> list[MITRE_ATTACK_ID_TYPE]:
techniques: list[str] = [
f"{unknown_technique}."
for unknown_technique in techniques_and_subtechniques
if "." not in unknown_technique
]
subtechniques: list[MITRE_ATTACK_ID_TYPE] = [
unknown_technique
for unknown_technique in techniques_and_subtechniques
if "." in unknown_technique
]
subtype_and_parent_exist_exceptions: list[ValueError] = []

for subtechnique in subtechniques:
for technique in techniques:
if subtechnique.startswith(technique):
subtype_and_parent_exist_exceptions.append(
ValueError(
f" Technique : {technique.split('.')[0]}\n"
f" SubTechnique: {subtechnique}\n"
)
)

if len(subtype_and_parent_exist_exceptions):
error_string = "\n".join(
str(e) for e in subtype_and_parent_exist_exceptions
)
raise ValueError(
"Overlapping MITRE Attack ID Techniques and Subtechniques may not be defined. "
f"Remove the Technique and keep the Subtechnique:\n{error_string}"
)

return techniques_and_subtechniques

@field_validator("analytic_story", mode="before")
@classmethod
def mapStoryNamesToStoryObjects(
Expand Down Expand Up @@ -238,3 +288,6 @@ def mapAtomicGuidsToAtomicTests(
return matched_tests + [
AtomicTest.AtomicTestWhenTestIsMissing(test) for test in missing_tests
]
return matched_tests + [
AtomicTest.AtomicTestWhenTestIsMissing(test) for test in missing_tests
]
19 changes: 16 additions & 3 deletions contentctl/objects/mitre_attack_enrichment.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,11 @@
from __future__ import annotations
from pydantic import BaseModel, Field, ConfigDict, HttpUrl, field_validator
from typing import List
from enum import StrEnum

import datetime
from enum import StrEnum
from typing import List

from pydantic import BaseModel, ConfigDict, Field, HttpUrl, field_validator

from contentctl.objects.annotated_types import MITRE_ATTACK_ID_TYPE


Expand Down Expand Up @@ -84,6 +87,16 @@ def standardize_contributors(cls, contributors: list[str] | None) -> list[str]:
return []
return contributors

def __lt__(self, other: MitreAttackGroup) -> bool:
if not isinstance(object, MitreAttackGroup):
raise Exception(
f"Cannot compare object of type MitreAttackGroup to object of type [{type(object).__name__}]"
)
return self.group < other.group

def __hash__(self) -> int:
return hash(self.group)


class MitreAttackEnrichment(BaseModel):
ConfigDict(extra="forbid")
Expand Down
11 changes: 6 additions & 5 deletions contentctl/objects/story_tags.py
Original file line number Diff line number Diff line change
@@ -1,17 +1,18 @@
from __future__ import annotations
from pydantic import BaseModel, Field, model_serializer, ConfigDict
from typing import List, Set, Optional

from enum import Enum
from typing import List, Optional, Set

from contentctl.objects.mitre_attack_enrichment import MitreAttackEnrichment
from pydantic import BaseModel, ConfigDict, Field, model_serializer

from contentctl.objects.annotated_types import CVE_TYPE, MITRE_ATTACK_ID_TYPE
from contentctl.objects.enums import (
StoryCategory,
DataModel,
KillChainPhase,
SecurityContentProductName,
StoryCategory,
)
from contentctl.objects.annotated_types import CVE_TYPE, MITRE_ATTACK_ID_TYPE
from contentctl.objects.mitre_attack_enrichment import MitreAttackEnrichment


class StoryUseCase(str, Enum):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,6 @@ tags:
asset_type: Endpoint
mitre_attack_id:
- T1560.001
- T1560
product:
- Splunk Enterprise
- Splunk Enterprise Security
Expand Down
Loading