Skip to content
Permalink
Browse files

Merge pull request #1452 from mozilla/alert-aws-privshare

Alert aws privshare
  • Loading branch information...
pwnbus committed Oct 8, 2019
2 parents 1832078 + c13f611 commit 68764b0739c191d63442b37256bdabd161ee1672
Showing with 151 additions and 0 deletions.
  1. +8 −0 alerts/aws_privilege_share.json
  2. +63 −0 alerts/aws_privilege_share.py
  3. +80 −0 tests/alerts/test_aws_privilege_share.py
@@ -0,0 +1,8 @@
{
"rootUsers": [
"InfosecAdmin"
],
"searchWindow": {
"minutes": 30
}
}
@@ -0,0 +1,63 @@
from functools import reduce
import json
import os

from lib.alerttask import AlertTask
from mozdef_util.query_models import SearchQuery, TermMatch, QueryStringMatch


_CONFIG_FILE = os.path.join(
os.path.dirname(__file__),
'aws_privilege_share.json')

_AGGREGATE_KEY = 'details.requestparameters.username'

_IAM_USER_KEY = 'details.useridentity.sessioncontext.sessionissuer.username'

_AWS_EVENT_KEY = 'details.eventname'

_ATTACH_POLICY_ACTION = 'AttachUserPolicy'


class AlertAWSPrivilegeShare(AlertTask):
'''An alert that fires when any of a configured list of AWS IAM users
perform the AttachUserPolicy action on another user. Such activity may
indicate that root privileges are being shared.
'''

def main(self):
with open(_CONFIG_FILE) as cfg_file:
self.config = json.load(cfg_file)

query_string = ' OR '.join([
'{0}: {1}'.format(_IAM_USER_KEY, user)
for user in self.config['rootUsers']
])

query = SearchQuery(**self.config['searchWindow'])
query.add_must([
QueryStringMatch(query_string),
TermMatch(_AWS_EVENT_KEY, _ATTACH_POLICY_ACTION)
])

self.filtersManual(query)
self.searchEventsAggregated(_AGGREGATE_KEY, samplesLimit=10)
self.walkAggregations(threshold=1)

def onAggregation(self, aggreg):
# Index all the way into the first event to get the name of the IAM
# user that attached a new policy to another user.
issuing_user = reduce(
lambda d, k: d and d.get(k),
_IAM_USER_KEY.split('.'),
aggreg['events'][0])

summary = '{0} granted permissions to {1} in AWS'.format(
issuing_user,
aggreg['value'])
category = 'privileges'
tags = ['aws', 'privileges']
events = aggreg['events']
severity = 'NOTICE'

return self.createAlertDict(summary, category, tags, events, severity)
@@ -0,0 +1,80 @@
from .alert_test_suite import AlertTestSuite
from .negative_alert_test_case import NegativeAlertTestCase
from .positive_alert_test_case import PositiveAlertTestCase


class TestAlertAWSPrivilegeShare(AlertTestSuite):
alert_filename = 'aws_privilege_share'

default_event = {
'_source': {
'details': {
'eventname': 'AttachUserPolicy',
'requestparameters': {
'username': 'tester'
},
'useridentity': {
'sessioncontext': {
'sessionissuer': {
'username': 'InfosecAdmin'
}
}
}
}
}
}

non_root_user_evt = {
'_source': {
'details': {
'eventname': 'AttachUserPolicy',
'requestparameters': {
'username': 'tester'
},
'useridentity': {
'sessioncontext': {
'sessionissuer': {
'username': 'NotAdmin'
}
}
}
}
}
}

not_attach_policy_evt = {
'_source': {
'details': {
'eventname': 'SomethingElse',
'requestparameters': {
'username': 'tester'
},
'useridentity': {
'sessioncontext': {
'sessionissuer': {
'username': 'InfosecAdmin'
}
}
}
}
}
}

default_alert = {
'category': 'privileges',
'tags': ['aws', 'privileges'],
'severity': 'NOTICE',
}

test_cases = [
PositiveAlertTestCase(
description='Fire when InfosecAdmin attaches policy',
events=[default_event],
expected_alert=default_alert),
NegativeAlertTestCase(
description='Does not fire if non-root user attaches policy',
events=[non_root_user_evt]),
NegativeAlertTestCase(
description='Does not fire if action is not AttachUserPolicy',
events=[not_attach_policy_evt])
]

0 comments on commit 68764b0

Please sign in to comment.
You can’t perform that action at this time.