Skip to content

Commit

Permalink
implemented analysis
Browse files Browse the repository at this point in the history
  • Loading branch information
ncc-erik-steringer committed Aug 24, 2019
1 parent 2e62c6d commit f55c7b4
Show file tree
Hide file tree
Showing 2 changed files with 306 additions and 1 deletion.
11 changes: 10 additions & 1 deletion principalmapper/__main__.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
from pathlib import Path
import sys

from principalmapper.analysis.risks import gen_findings_and_print
import principalmapper.graphing.graph_actions
from principalmapper.graphing.edge_identification import checker_map
from principalmapper.querying import query_actions
Expand Down Expand Up @@ -282,4 +283,12 @@ def handle_visualization(parsed_args):

def handle_analysis(parsed_args):
"""Processes the arguments for the analysis subcommand and executes related tasks"""
raise NotImplementedError('analysis subcommand is not ready for use')
# get Graph
if parsed_args.account is None:
session = botocore_tools.get_session(parsed_args.profile)
else:
session = None
graph = principalmapper.graphing.graph_actions.get_existing_graph(session, parsed_args.account, parsed_args.debug)

# execute analysis
gen_findings_and_print(graph, parsed_args.output_type)
296 changes: 296 additions & 0 deletions principalmapper/analysis/risks.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,296 @@
"""Python code for identifying risks using a Graph generated by Principal Mapper. The findings are tracked using
dictionary objects with the format:
{
"title": <str>,
"severity": "Low|Medium|High",
"impact": <str>,
"description": <str>,
"recommendation": <str>
}
"""

import datetime as dt
import json
from typing import List

import principalmapper
from principalmapper.common.graphs import Graph
from principalmapper.common.nodes import Node
from principalmapper.querying import query_interface
from principalmapper.querying.query_result import QueryResult
from principalmapper.querying.presets.privesc import can_privesc
from principalmapper.util import arns


def gen_findings_and_print(graph: Graph, formatting: str) -> None:
"""Generates findings of risk, prints them out."""

findings = {
'account': graph.metadata['account_id'],
'date and time': dt.datetime.now(dt.timezone.utc).isoformat(),
'findings': [],
'source': 'Findings identified using Principal Mapper ({}) from NCC Group: '
'https://github.com/nccgroup/PMapper'.format(principalmapper.__version__)
}

findings['findings'].extend(gen_all_findings(graph))

if formatting == 'text':
print_findings(findings)
else: # format == 'json'
print(json.dumps(findings, indent=4))


def gen_all_findings(graph: Graph) -> List[dict]:
"""Generates findings of risk, returns a list of finding-dictionary objects."""
result = []
result.extend(gen_privesc_findings(graph))
result.extend(gen_mfa_actions_finding(graph))
result.extend(gen_overprivileged_function_finding(graph))
result.extend(gen_overprivileged_instance_profile_finding(graph))
result.extend(gen_overprivileged_stack_finding(graph))
return result


def gen_privesc_findings(graph: Graph) -> List[dict]:
"""Generates findings related to privilege escalation risks."""
result = []

node_path_list = []

for node in graph.nodes:
privesc_res, edge_list = can_privesc(graph, node)
if privesc_res:
node_path_list.append((node, edge_list))

if len(node_path_list) > 0:
finding = {
'title': 'IAM Principal{} Can Escalate Privileges'.format('s' if len(node_path_list) > 1 else ''),
'severity': 'High',
'impact': 'A lower-privilege IAM User or Role is able to gain administrative privileges. This could lead '
'to the lower-privilege principal being used to compromise the account\'s resources.',
'recommendation': 'Review the IAM Policies that are applicable to the affected IAM User(s) or Role(s). '
'Either reduce the permissions of the administrative principal(s), or reduce the '
'permissions of the principal(s) that can access the administrative principals.'
}
description_preamble = 'In AWS, IAM Principals such as IAM Users or IAM Roles have their permissions defined ' \
'using IAM Policies. These policies describe different actions, resources, and ' \
'conditions where the principal can make a given API call to a service.\n\n' \
'The following principals could escalate privileges:\n\n'

description_body = ''
for node, edge_list in node_path_list:
end_of_list = edge_list[-1].destination
description_body += '{} can escalate privileges by accessing the administrative principal {}:\n'.format(
node.searchable_name(), end_of_list.searchable_name())
for edge in edge_list:
description_body += ' {}\n'.format(edge.describe_edge())
description_body += '\n'

finding['description'] = description_preamble + description_body

result.append(finding)

return result


def gen_mfa_actions_finding(graph: Graph) -> List[dict]:
"""Generates findings related to risk from IAM Users able to call sensitive actions without needing MFA."""
result = []
affected_users = []
for node in graph.nodes:
if ':user/' in node.arn and node.is_admin and node.access_keys > 0:
# Check if the given admin user with access keys can call sensitive actions without MFA
# TODO: Should include more sensitive actions here
actions = ['iam:CreateUser', 'iam:CreateRole', 'iam:CreateGroup', 'iam:PutUserPolicy', 'iam:PutRolePolicy',
'iam:PutGroupPolicy', 'iam:AttachUserPolicy', 'iam:AttachRolePolicy', 'iam:AttachGroupPolicy',
'sts:AssumeRole']
if _can_call_without_mfa(node, actions):
affected_users.append(node)

if len(affected_users) > 0:
finding = {
'title': 'Administrative IAM {} Can Call Sensitive Actions Without MFA'.format(
'Users' if len(affected_users) > 1 else 'User'
),
'severity': 'Medium',
'impact': 'An adminstrative IAM User is able to call sensitive actions, such as creating more '
'principals or modifying permissions, without using MFA.',
'recommendation': 'Implement and attach an IAM Policy to the noted user(s) that rejects requests when MFA '
'is not used.'
}

description_preamble = 'In AWS, IAM Users can be configured to use an MFA device. When an IAM User has MFA ' \
'enabled, they are required to provide the second factor of authentication when they ' \
'log in to the AWS Console. However, unless there is a specific IAM policy attached ' \
'to the user, they will not need to provide a second factor of authentication when ' \
'making API calls.\n\nThe following administrative IAM Users have at least one set of ' \
'access keys, and can call sensitive actions to alter permissions or add users ' \
'without using a second factor of authentication:\n\n'

description_body = ''
for node in affected_users:
description_body += '* {}\n'.format(node.searchable_name())

finding['description'] = description_preamble + description_body
result.append(finding)

return result


def _can_call_without_mfa(node: Node, actions: List[str]) -> bool:
"""Returns true if node can call sensitive action without MFA"""
for action in actions:
auth, needmfa = query_interface.local_check_authorization_handling_mfa(
node,
action,
'*',
{}
)
if auth and not needmfa:
return True
return False


def gen_overprivileged_instance_profile_finding(graph: Graph) -> List[dict]:
"""Generates findings related to risk from EC2 instances being loaded with overprivileged instance profiles."""
result = []
affected_roles = []
for node in graph.nodes:
if ':role/' in node.arn and node.is_admin and node.instance_profile is not None:
affected_roles.append(node)

if len(affected_roles) > 0:
finding = {
'title': 'Instance Profile Has Administrator Privileges',
'severity': 'High',
'impact': 'If an instance with the noted instance profile(s) is compromised, then the AWS account as a '
'whole is at risk of compromise.',
'recommendation': 'Reduce the scope of permissions attached to the noted instance profile(s).'
}

description_preamble = 'In AWS, EC2 instances can be given an instance profile. These instance profiles ' \
'are associated with an IAM Role, and grants access to the permissions of the IAM ' \
'Role. Because EC2 instances are at a higher risk of exposure and compromise, both ' \
'to external attackers and authorized users in the AWS account, they should not have ' \
'access to administrative privileges. The following IAM Roles have administrative ' \
'permissions and are associated with an instance profile:\n\n'

description_body = ''
for node in affected_roles:
description_body += '* {}\n'.format(node.searchable_name())

finding['description'] = description_preamble + description_body

result.append(finding)

return result


def gen_overprivileged_function_finding(graph: Graph) -> List[dict]:
"""Generates findings related to risk from Lambda functions being loaded with overprivileged roles"""
result = []
affected_roles = []
for node in graph.nodes:
if ':role/' in node.arn and node.is_admin:
if query_interface.resource_policy_authorization('lambda.amazonaws.com', arns.get_account_id(node.arn),
node.trust_policy, 'sts:AssumeRole', node.arn, {}, False)\
== query_interface.ResourcePolicyEvalResult.SERVICE_MATCH:
affected_roles.append(node)

if len(affected_roles) > 0:
finding = {
'title': 'IAM Role Available to Lambda Functions Has Administrator Privileges',
'severity': 'Medium',
'impact': 'If an attacker can inject code or commands into the function, or if a lower-privileged '
'principal can alter the function, the AWS account as a whole could be compromised.',
'recommendation': 'Reduce the scope of permissions attached to the noted IAM Role(s).'
}

description_preamble = 'In AWS, Lambda functions can be assigned an IAM Role to use during execution. These ' \
'IAM Roles give the function access to call the AWS API with the permissions of the ' \
'IAM Role, depending on the policies attached to it. If the Lambda function can be ' \
'compromised, and the attacker can alter the code it executes, the attacker could ' \
'make AWS API calls with the IAM Role\'s permissions. The following IAM Roles have ' \
'administrative privileges, and can be passed to Lambda functions:\n\n'

description_body = ''
for node in affected_roles:
description_body += '* {}\n'.format(node.searchable_name())

finding['description'] = description_preamble + description_body

result.append(finding)

return result


def gen_overprivileged_stack_finding(graph: Graph) -> List[dict]:
"""Generates findings related to risk from CloudFormation stacks being loaded with overprivileged roles"""
result = []
affected_roles = []
for node in graph.nodes:
if ':role/' in node.arn and node.is_admin:
if query_interface.resource_policy_authorization('cloudformation.amazonaws.com',
arns.get_account_id(node.arn), node.trust_policy,
'sts:AssumeRole', node.arn, {}, False) == \
query_interface.ResourcePolicyEvalResult.SERVICE_MATCH:
affected_roles.append(node)

if len(affected_roles) > 0:
finding = {
'title': 'IAM Role Available to CloudFormation Stacks Has Administrator Privileges',
'severity': 'Low',
'impact': 'If an attacker has the right permissions in the AWS Account, they can grant themselves '
'adminstrative access to the account to compromise the account.',
'recommendation': 'Reduce the scope of permissions attached to the noted IAM Role(s).'
}

description_preamble = 'In AWS, CloudFormation stacks can be given an IAM Role. When a stack has an IAM ' \
'Role, it can use that IAM Role to make AWS API calls to create the resources ' \
'defined in the template for that stack. If the IAM Role has administrator access ' \
'to the account, and an attacker is able to make the right CloudFormation API calls, ' \
'they would be able to use the IAM Role to escalate privileges and compromise the ' \
'account as a whole. The following IAM Roles can be used in CloudFormation and ' \
'have administrative privileges:\n\n'

description_body = ''
for node in affected_roles:
description_body += '* {}\n'.format(node.searchable_name())

finding['description'] = description_preamble + description_body

result.append(finding)

return result


def print_findings(findings: dict) -> None:
"""Given a set of findings, uses print() to print out their contents in a nice format."""

# Header
print('Findings identified in account {}'.format(findings['account']))
print('Date and Time: {}'.format(findings['date and time']))
print()

# Body
if len(findings['findings']) == 0:
print('None found.')
print()
else:
for finding in findings['findings']:
print('# {}'.format(finding['title']))
print()
print('Severity: {}'.format(finding['severity']))
print()
print('Impact: {}'.format(finding['impact']))
print()
print('Description: {}'.format(finding['description']))
print()
print('Recommendation: {}'.format(finding['recommendation']))
print()
print()

# Footer
print(findings['source'])

0 comments on commit f55c7b4

Please sign in to comment.