Skip to content

Commit

Permalink
Merge pull request #2623 from mitre/bleepbop/VIRTS-4159/update_check_…
Browse files Browse the repository at this point in the history
…reason_skipped

[VIRTS-4159] update check reason skipped
  • Loading branch information
elegantmoose committed Aug 21, 2023
2 parents c1b9aa4 + 9aab656 commit 3140411
Show file tree
Hide file tree
Showing 3 changed files with 125 additions and 10 deletions.
32 changes: 23 additions & 9 deletions app/objects/c_operation.py
Expand Up @@ -279,7 +279,7 @@ async def get_skipped_abilities_by_agent(self, data_svc):
state=self.state, agent_ran=agent_ran)
if skipped:
if agent_skipped[skipped['ability_id']]:
if agent_skipped[skipped['ability_id']]['reason_id'] < skipped['reason_id']:
if agent_skipped[skipped['ability_id']]['reason_id'] > skipped['reason_id']:
agent_skipped[skipped['ability_id']] = skipped
else:
agent_skipped[skipped['ability_id']] = skipped
Expand Down Expand Up @@ -452,22 +452,34 @@ def _check_reason_skipped(self, agent, ability, op_facts, state, agent_executors
facts = re.findall(BasePlanningService.re_variable, executor.test) if executor.command else []
if not facts or all(fact in op_facts for fact in facts):
fact_dependency_fulfilled = True
associated_links = set([link.id for link in self.chain if link.paw == agent.paw
and link.ability.ability_id == ability.ability_id])

if not agent.trusted:
return dict(reason='Agent untrusted', reason_id=self.Reason.UNTRUSTED.value,
if agent.platform == 'unknown':
return dict(reason='Platform not available', reason_id=self.Reason.PLATFORM.value,
ability_id=ability.ability_id, ability_name=ability.name)
elif not valid_executors:
return dict(reason='Executor not available', reason_id=self.Reason.EXECUTOR.value,
return dict(reason='Mismatched ability platform and executor', reason_id=self.Reason.EXECUTOR.value,
ability_id=ability.ability_id, ability_name=ability.name)
elif not agent.privileged_to_run(ability):
return dict(reason='Ability privilege not fulfilled', reason_id=self.Reason.PRIVILEGE.value,
ability_id=ability.ability_id, ability_name=ability.name)
elif not fact_dependency_fulfilled:
return dict(reason='Fact dependency not fulfilled', reason_id=self.Reason.FACT_DEPENDENCY.value,
ability_id=ability.ability_id, ability_name=ability.name)
elif not agent.privileged_to_run(ability):
return dict(reason='Ability privilege not fulfilled', reason_id=self.Reason.PRIVILEGE.value,
elif not set(associated_links).isdisjoint(self.ignored_links):
return dict(reason='Link ignored - highly visible or discarded link',
reason_id=self.Reason.LINK_IGNORED.value, ability_id=ability.ability_id,
ability_name=ability.name)
elif not agent.trusted:
return dict(reason='Agent not trusted', reason_id=self.Reason.UNTRUSTED.value,
ability_id=ability.ability_id, ability_name=ability.name)
elif state != 'finished':
return dict(reason='Operation not completed', reason_id=self.Reason.OP_RUNNING.value,
ability_id=ability.ability_id, ability_name=ability.name)
else:
return dict(reason='Other', reason_id=self.Reason.OTHER.value,
ability_id=ability.ability_id, ability_name=ability.name)

def _get_operation_metadata_for_event_log(self):
return dict(operation_name=self.name,
Expand Down Expand Up @@ -524,10 +536,12 @@ async def _get_agent_info_for_event_log(agent_paw, data_svc):
class Reason(Enum):
PLATFORM = 0
EXECUTOR = 1
FACT_DEPENDENCY = 2
PRIVILEGE = 3
OP_RUNNING = 4
PRIVILEGE = 2
FACT_DEPENDENCY = 3
LINK_IGNORED = 4
UNTRUSTED = 5
OP_RUNNING = 6
OTHER = 7

class States(Enum):
RUNNING = 'running'
Expand Down
2 changes: 1 addition & 1 deletion tests/conftest.py
Expand Up @@ -502,7 +502,7 @@ def test_agent(event_loop):

@pytest.fixture
def test_executor(test_agent):
return ExecutorSchema().load(dict(timeout=60, platform=test_agent.platform, name='linux', command='ls'))
return ExecutorSchema().load(dict(timeout=60, platform=test_agent.platform, name='sh', command='ls'))


@pytest.fixture
Expand Down
101 changes: 101 additions & 0 deletions tests/objects/test_operation.py
Expand Up @@ -177,6 +177,17 @@ def op_without_learning_parser(ability, adversary):
return op


@pytest.fixture
def custom_agent(test_agent, test_executor):
def _make_agent(platform='windows', trusted=True, executor_name='psh'):
test_executor.name = executor_name
test_agent.platform = platform
test_agent.executors = [test_executor.name]
test_agent.trusted = trusted
return test_agent
return _make_agent


@pytest.fixture
def op_with_learning_and_seeded(ability, adversary, operation_agent, parse_datestring):
sc = Source(id='3124', name='test', facts=[Fact(trait='domain.user.name', value='bob')])
Expand Down Expand Up @@ -486,3 +497,93 @@ def test_update_untrusted_agents_with_untrusted_no_operation_agents(self, operat
op = Operation(name='test', agents=[], adversary=adversary)
op.update_untrusted_agents(operation_agent)
assert not op.untrusted_agents

def test_check_reason_skipped_unknown_platform(self, test_agent, test_ability):
test_agent.platform = 'unknown'
op = Operation(name='test', agents=[test_agent], state='running')
reason = op._check_reason_skipped(agent=test_agent, ability=test_ability, op_facts=[], state=op.state,
agent_executors=test_agent.executors, agent_ran={})
assert reason['reason'] == 'Platform not available'
assert reason['reason_id'] == Operation.Reason.PLATFORM.value
assert reason['ability_id'] == test_ability.ability_id
assert reason['ability_name'] == test_ability.name

async def test_check_reason_skipped_valid_executor(self, test_agent, test_ability):
test_agent.platform = 'darwin'
op = Operation(name='test', agents=[test_agent], state='running')
reason = op._check_reason_skipped(agent=test_agent, ability=test_ability, op_facts=[], state=op.state,
agent_executors=[], agent_ran={})
assert reason['reason'] == 'Mismatched ability platform and executor'
assert reason['reason_id'] == Operation.Reason.EXECUTOR.value
assert reason['ability_id'] == test_ability.ability_id
assert reason['ability_name'] == test_ability.name

async def test_check_reason_skipped_privilege(self, custom_agent, test_ability, mocker, test_executor):
test_executor.name = 'psh'
agent = custom_agent()
test_ability.privilege = 'Elevated'
op = Operation(name='test', agents=[agent], state='running')
reason = op._check_reason_skipped(agent=agent, ability=test_ability, op_facts=[], state=op.state,
agent_executors=agent.executors, agent_ran={})
assert reason['reason'] == 'Ability privilege not fulfilled'
assert reason['reason_id'] == Operation.Reason.PRIVILEGE.value
assert reason['ability_id'] == test_ability.ability_id
assert reason['ability_name'] == test_ability.name

async def test_check_reason_skipped_fact_dependency(self, custom_agent, test_ability, mocker, test_executor, fact):
test_executor.name = 'psh'
agent = custom_agent()
op = Operation(name='test', agents=[agent], state='running')
with mocker.patch('app.objects.c_ability.Ability.find_executors') as mock_find_executors:
mock_find_executors.return_value = [test_executor]
with mocker.patch('re.findall') as mock_findall:
mock_findall.return_value = [fact('test.fact.attribute')]
reason = op._check_reason_skipped(agent=agent, ability=test_ability, op_facts=[], state=op.state,
agent_executors=agent.executors, agent_ran={})
assert reason['reason'] == 'Fact dependency not fulfilled'
assert reason['reason_id'] == Operation.Reason.FACT_DEPENDENCY.value
assert reason['ability_id'] == test_ability.ability_id
assert reason['ability_name'] == test_ability.name

async def test_check_reason_skipped_link_ignored(self, custom_agent, test_ability, mocker, active_link):
agent = custom_agent()
op = Operation(name='test', agents=[agent], state='running')
test_link = Link.load(active_link)
op.chain = [test_link]
op.ignored_links = [test_link.id]
reason = op._check_reason_skipped(agent=agent, ability=test_ability, op_facts=[], state=op.state,
agent_executors=agent.executors, agent_ran={})
assert reason['reason'] == 'Link ignored - highly visible or discarded link'
assert reason['reason_id'] == Operation.Reason.LINK_IGNORED.value
assert reason['ability_id'] == test_ability.ability_id
assert reason['ability_name'] == test_ability.name

async def test_check_reason_skipped_untrusted(self, custom_agent, test_ability, mocker):
agent = custom_agent(trusted=False)
op = Operation(name='test', agents=[agent], state='running')
reason = op._check_reason_skipped(agent=agent, ability=test_ability, op_facts=[], state=op.state,
agent_executors=agent.executors, agent_ran={})
assert reason['reason'] == 'Agent not trusted'
assert reason['reason_id'] == Operation.Reason.UNTRUSTED.value
assert reason['ability_id'] == test_ability.ability_id
assert reason['ability_name'] == test_ability.name

async def test_check_reason_skipped_op_running(self, custom_agent, test_ability, mocker):
agent = custom_agent()
op = Operation(name='test', agents=[agent], state='running')
reason = op._check_reason_skipped(agent=agent, ability=test_ability, op_facts=[], state=op.state,
agent_executors=agent.executors, agent_ran={})
assert reason['reason'] == 'Operation not completed'
assert reason['reason_id'] == Operation.Reason.OP_RUNNING.value
assert reason['ability_id'] == test_ability.ability_id
assert reason['ability_name'] == test_ability.name

async def test_check_reason_skipped_other(self, custom_agent, test_ability, mocker):
agent = custom_agent()
op = Operation(name='test', agents=[agent], state='finished')
reason = op._check_reason_skipped(agent=agent, ability=test_ability, op_facts=[], state=op.state,
agent_executors=agent.executors, agent_ran={})
assert reason['reason'] == 'Other'
assert reason['reason_id'] == Operation.Reason.OTHER.value
assert reason['ability_id'] == test_ability.ability_id
assert reason['ability_name'] == test_ability.name

0 comments on commit 3140411

Please sign in to comment.