Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions .github/workflows/test_against_escu.yml
Original file line number Diff line number Diff line change
Expand Up @@ -53,10 +53,11 @@ jobs:
poetry install --no-interaction


- name: Clone the AtomicRedTeam Repo (for extended validation)
- name: Clone the AtomicRedTeam Repo and the Mitre/CTI repos for testing enrichments
run: |
cd security_content
git clone --depth 1 https://github.com/redcanaryco/atomic-red-team
git clone --single-branch https://github.com/redcanaryco/atomic-red-team external_repos/atomic-red-team
git clone --single-branch https://github.com/mitre/cti external_repos/cti


# We do not separately run validate and build
Expand Down
10 changes: 3 additions & 7 deletions contentctl/actions/validate.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,20 +5,16 @@
from contentctl.objects.config import validate
from contentctl.enrichments.attack_enrichment import AttackEnrichment
from contentctl.enrichments.cve_enrichment import CveEnrichment
from contentctl.objects.atomic import AtomicTest
from contentctl.objects.atomic import AtomicEnrichment
from contentctl.helper.utils import Utils
from contentctl.objects.data_source import DataSource
from contentctl.helper.splunk_app import SplunkApp


class Validate:
def execute(self, input_dto: validate) -> DirectorOutputDto:

def execute(self, input_dto: validate) -> DirectorOutputDto:
director_output_dto = DirectorOutputDto(
AtomicTest.getAtomicTestsFromArtRepo(
repo_path=input_dto.getAtomicRedTeamRepoPath(),
enabled=input_dto.enrichments,
),
AtomicEnrichment.getAtomicEnrichment(input_dto),
AttackEnrichment.getAttackEnrichment(input_dto),
CveEnrichment.getCveEnrichment(input_dto),
[],
Expand Down
3 changes: 3 additions & 0 deletions contentctl/contentctl.py
Original file line number Diff line number Diff line change
Expand Up @@ -211,6 +211,9 @@ def main():
test_common_func(config)
else:
raise Exception(f"Unknown command line type '{type(config).__name__}'")
except FileNotFoundError as e:
print(e)
sys.exit(1)
except Exception as e:
if config is None:
print("There was a serious issue where the config file could not be created.\n"
Expand Down
130 changes: 49 additions & 81 deletions contentctl/enrichments/attack_enrichment.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,13 @@

from __future__ import annotations
import csv
import os
import sys
from attackcti import attack_client
import logging
from pydantic import BaseModel, Field
from pydantic import BaseModel
from dataclasses import field
from typing import Annotated,Any
from contentctl.objects.mitre_attack_enrichment import MitreAttackEnrichment
from typing import Any
from pathlib import Path
from contentctl.objects.mitre_attack_enrichment import MitreAttackEnrichment, MitreTactics
from contentctl.objects.config import validate
from contentctl.objects.annotated_types import MITRE_ATTACK_ID_TYPE
logging.getLogger('taxii2client').setLevel(logging.CRITICAL)
Expand All @@ -21,84 +20,82 @@ class AttackEnrichment(BaseModel):
@staticmethod
def getAttackEnrichment(config:validate)->AttackEnrichment:
enrichment = AttackEnrichment(use_enrichment=config.enrichments)
_ = enrichment.get_attack_lookup(str(config.path))
_ = enrichment.get_attack_lookup(config.mitre_cti_repo_path, config.enrichments)
return enrichment

def getEnrichmentByMitreID(self, mitre_id:MITRE_ATTACK_ID_TYPE)->MitreAttackEnrichment:
if not self.use_enrichment:
raise Exception(f"Error, trying to add Mitre Enrichment, but use_enrichment was set to False")
raise Exception("Error, trying to add Mitre Enrichment, but use_enrichment was set to False")

enrichment = self.data.get(mitre_id, None)
if enrichment is not None:
return enrichment
else:
raise Exception(f"Error, Unable to find Mitre Enrichment for MitreID {mitre_id}")

def addMitreIDViaGroupNames(self, technique:dict, tactics:list[str], groupNames:list[str])->None:
def addMitreIDViaGroupNames(self, technique:dict[str,Any], tactics:list[str], groupNames:list[str])->None:
technique_id = technique['technique_id']
technique_obj = technique['technique']
tactics.sort()

if technique_id in self.data:
raise Exception(f"Error, trying to redefine MITRE ID '{technique_id}'")
self.data[technique_id] = MitreAttackEnrichment(mitre_attack_id=technique_id,
mitre_attack_technique=technique_obj,
mitre_attack_tactics=tactics,
mitre_attack_groups=groupNames,
mitre_attack_group_objects=[])
self.data[technique_id] = MitreAttackEnrichment.model_validate({'mitre_attack_id':technique_id,
'mitre_attack_technique':technique_obj,
'mitre_attack_tactics':tactics,
'mitre_attack_groups':groupNames,
'mitre_attack_group_objects':[]})

def addMitreIDViaGroupObjects(self, technique:dict, tactics:list[str], groupObjects:list[dict[str,Any]])->None:
def addMitreIDViaGroupObjects(self, technique:dict[str,Any], tactics:list[MitreTactics], groupDicts:list[dict[str,Any]])->None:
technique_id = technique['technique_id']
technique_obj = technique['technique']
tactics.sort()

groupNames:list[str] = sorted([group['group'] for group in groupObjects])
groupNames:list[str] = sorted([group['group'] for group in groupDicts])

if technique_id in self.data:
raise Exception(f"Error, trying to redefine MITRE ID '{technique_id}'")
self.data[technique_id] = MitreAttackEnrichment(mitre_attack_id=technique_id,
mitre_attack_technique=technique_obj,
mitre_attack_tactics=tactics,
mitre_attack_groups=groupNames,
mitre_attack_group_objects=groupObjects)

self.data[technique_id] = MitreAttackEnrichment.model_validate({'mitre_attack_id': technique_id,
'mitre_attack_technique': technique_obj,
'mitre_attack_tactics': tactics,
'mitre_attack_groups': groupNames,
'mitre_attack_group_objects': groupDicts})


def get_attack_lookup(self, input_path: str, store_csv: bool = False, force_cached_or_offline: bool = False, skip_enrichment:bool = False) -> dict:
if not self.use_enrichment:
return {}
print("Getting MITRE Attack Enrichment Data. This may take some time...")
attack_lookup = dict()
file_path = os.path.join(input_path, "app_template", "lookups", "mitre_enrichment.csv")

if skip_enrichment is True:
print("Skipping enrichment")
def get_attack_lookup(self, input_path: Path, enrichments:bool = False) -> dict[str,MitreAttackEnrichment]:
attack_lookup:dict[str,MitreAttackEnrichment] = {}
if not enrichments:
return attack_lookup

try:

if force_cached_or_offline is True:
raise(Exception("WARNING - Using cached MITRE Attack Enrichment. Attack Enrichment may be out of date. Only use this setting for offline environments and development purposes."))
print(f"\r{'Client'.rjust(23)}: [{0:3.0f}%]...", end="", flush=True)
lift = attack_client()
print(f"\r{'Client'.rjust(23)}: [{100:3.0f}%]...Done!", end="\n", flush=True)
print(f"Performing MITRE Enrichment using the repository at {input_path}...",end="", flush=True)
# The existence of the input_path is validated during cli argument validation, but it is
# possible that the repo is in the wrong format. If the following directories do not
# exist, then attack_client will fall back to resolving via REST API. We do not
# want this as it is slow and error prone, so we will force an exception to
# be generated.
enterprise_path = input_path/"enterprise-attack"
mobile_path = input_path/"ics-attack"
ics_path = input_path/"mobile-attack"
if not (enterprise_path.is_dir() and mobile_path.is_dir() and ics_path.is_dir()):
raise FileNotFoundError("One or more of the following paths does not exist: "
f"{[str(enterprise_path),str(mobile_path),str(ics_path)]}. "
f"Please ensure that the {input_path} directory "
"has been git cloned correctly.")
lift = attack_client(
local_paths= {
"enterprise":str(enterprise_path),
"mobile":str(mobile_path),
"ics":str(ics_path)
}
)

print(f"\r{'Techniques'.rjust(23)}: [{0.0:3.0f}%]...", end="", flush=True)
all_enterprise_techniques = lift.get_enterprise_techniques(stix_format=False)

print(f"\r{'Techniques'.rjust(23)}: [{100:3.0f}%]...Done!", end="\n", flush=True)

print(f"\r{'Relationships'.rjust(23)}: [{0.0:3.0f}%]...", end="", flush=True)
enterprise_relationships = lift.get_enterprise_relationships(stix_format=False)
print(f"\r{'Relationships'.rjust(23)}: [{100:3.0f}%]...Done!", end="\n", flush=True)

print(f"\r{'Groups'.rjust(23)}: [{0:3.0f}%]...", end="", flush=True)
enterprise_groups = lift.get_enterprise_groups(stix_format=False)
print(f"\r{'Groups'.rjust(23)}: [{100:3.0f}%]...Done!", end="\n", flush=True)


for index, technique in enumerate(all_enterprise_techniques):
progress_percent = ((index+1)/len(all_enterprise_techniques)) * 100
if (sys.stdout.isatty() and sys.stdin.isatty() and sys.stderr.isatty()):
print(f"\r\t{'MITRE Technique Progress'.rjust(23)}: [{progress_percent:3.0f}%]...", end="", flush=True)
for technique in all_enterprise_techniques:
apt_groups:list[dict[str,Any]] = []
for relationship in enterprise_relationships:
if (relationship['target_object'] == technique['id']) and relationship['source_object'].startswith('intrusion-set'):
Expand All @@ -115,39 +112,10 @@ def get_attack_lookup(self, input_path: str, store_csv: bool = False, force_cach
self.addMitreIDViaGroupObjects(technique, tactics, apt_groups)
attack_lookup[technique['technique_id']] = {'technique': technique['technique'], 'tactics': tactics, 'groups': apt_groups}

if store_csv:
f = open(file_path, 'w')
writer = csv.writer(f)
writer.writerow(['mitre_id', 'technique', 'tactics' ,'groups'])
for key in attack_lookup.keys():
if len(attack_lookup[key]['groups']) == 0:
groups = 'no'
else:
groups = '|'.join(attack_lookup[key]['groups'])

writer.writerow([
key,
attack_lookup[key]['technique'],
'|'.join(attack_lookup[key]['tactics']),
groups
])

f.close()


except Exception as err:
print(f'\nError: {str(err)}')
print('Use local copy app_template/lookups/mitre_enrichment.csv')
with open(file_path, mode='r') as inp:
reader = csv.reader(inp)
attack_lookup = {rows[0]:{'technique': rows[1], 'tactics': rows[2].split('|'), 'groups': rows[3].split('|')} for rows in reader}
attack_lookup.pop('mitre_id')
for key in attack_lookup.keys():
technique_input = {'technique_id': key , 'technique': attack_lookup[key]['technique'] }
tactics_input = attack_lookup[key]['tactics']
groups_input = attack_lookup[key]['groups']
self.addMitreIDViaGroupNames(technique=technique_input, tactics=tactics_input, groups=groups_input)



raise Exception(f"Error getting MITRE Enrichment: {str(err)}")

print("Done!")
return attack_lookup
6 changes: 3 additions & 3 deletions contentctl/input/director.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
from contentctl.objects.deployment import Deployment
from contentctl.objects.macro import Macro
from contentctl.objects.lookup import Lookup
from contentctl.objects.atomic import AtomicTest
from contentctl.objects.atomic import AtomicEnrichment
from contentctl.objects.security_content_object import SecurityContentObject
from contentctl.objects.data_source import DataSource
from contentctl.objects.event_source import EventSource
Expand All @@ -39,7 +39,7 @@
class DirectorOutputDto:
# Atomic Tests are first because parsing them
# is far quicker than attack_enrichment
atomic_tests: None | list[AtomicTest]
atomic_enrichment: AtomicEnrichment
attack_enrichment: AttackEnrichment
cve_enrichment: CveEnrichment
detections: list[Detection]
Expand Down Expand Up @@ -145,7 +145,7 @@ def createSecurityContent(self, contentType: SecurityContentType) -> None:
f for f in files
]
else:
raise (Exception(f"Cannot createSecurityContent for unknown product."))
raise (Exception(f"Cannot createSecurityContent for unknown product {contentType}."))

validation_errors = []

Expand Down
Loading
Loading