Skip to content
Permalink
Browse files

Create simple cloudtrail excessive describe calls alert

  • Loading branch information...
pwnbus committed May 14, 2019
1 parent 18e12c4 commit d5a543038ece8b76ab78b00852e8159344d578e6
@@ -0,0 +1,37 @@
#!/usr/bin/env python

# This Source Code Form is subject to the terms of the Mozilla Public
# License, v. 2.0. If a copy of the MPL was not distributed with this
# file, You can obtain one at http://mozilla.org/MPL/2.0/.
# Copyright (c) 2014 Mozilla Corporation


from lib.alerttask import AlertTask
from mozdef_util.query_models import SearchQuery, TermMatch, ExistsMatch


class AlertCloudtrailExcessiveDescribe(AlertTask):
def main(self):
# Create a query to look back the last 20 minutes
search_query = SearchQuery(minutes=20)

# Add search terms to our query
search_query.add_must([
TermMatch('source', 'cloudtrail'),
TermMatch('details.eventverb', 'Describe'),
ExistsMatch('details.source')
])

self.filtersManual(search_query)
# We aggregate on details.hostname which is the AWS service name
self.searchEventsAggregated('details.source', samplesLimit=2)
self.walkAggregations(threshold=50)

def onAggregation(self, aggreg):
category = 'access'
tags = ['cloudtrail']
severity = 'WARNING'
summary = "Excessive Describe calls on {0} ({1})".format(aggreg['value'], aggreg['count'])

# Create the alert object based on these properties
return self.createAlertDict(summary, category, tags, aggreg['events'], severity)
@@ -173,7 +173,12 @@ def verify_saved_events(self, found_alert, test_case):
if self.deadman:
return

assert len(found_alert['_source']['events']) == len(test_case.full_events)
# If we override the number of expected events, let's use that value
num_events = len(test_case.full_events)
if hasattr(self, 'num_samples'):
num_events = self.num_samples
assert len(found_alert['_source']['events']) == num_events

for event in found_alert['_source']['events']:
event_id = event['documentid']
found_event = self.es_client.get_event_by_id(event_id)
@@ -0,0 +1,83 @@
# This Source Code Form is subject to the terms of the Mozilla Public
# License, v. 2.0. If a copy of the MPL was not distributed with this
# file, You can obtain one at http://mozilla.org/MPL/2.0/.
# Copyright (c) 2017 Mozilla Corporation
from positive_alert_test_case import PositiveAlertTestCase
from negative_alert_test_case import NegativeAlertTestCase
from alert_test_suite import AlertTestSuite


class TestCloudtrailExcessiveDescribe(AlertTestSuite):
alert_filename = "cloudtrail_excessive_describe"
alert_classname = "AlertCloudtrailExcessiveDescribe"
num_samples = 2

default_event = {
"_source": {
"category": "AwsApiCall",
"source": "cloudtrail",
"details": {
"eventverb": "Describe",
"source": "dynamodb.application-autoscaling.amazonaws.com",
}
}
}

# This alert is the expected result from running this task
default_alert = {
"category": "access",
"tags": ['cloudtrail'],
"severity": "WARNING",
"summary": 'Excessive Describe calls on dynamodb.application-autoscaling.amazonaws.com (50)',
}

test_cases = []

test_cases.append(
PositiveAlertTestCase(
description="Positive test with default events and default alert expected",
events=AlertTestSuite.create_events(default_event, 50),
expected_alert=default_alert
)
)

events = AlertTestSuite.create_events(default_event, 50)
for event in events:
event['_source']['source'] = 'bad'
test_cases.append(
NegativeAlertTestCase(
description="Negative test case with events with incorrect source",
events=events,
)
)

events = AlertTestSuite.create_events(default_event, 50)
for event in events:
event['_source']['details']['eventverb'] = 'bad'
test_cases.append(
NegativeAlertTestCase(
description="Negative test case with events with incorrect details.eventverb",
events=events,
)
)

events = AlertTestSuite.create_events(default_event, 50)
for event in events:
event['_source']['details']['source'] = None
test_cases.append(
NegativeAlertTestCase(
description="Negative test case with events with non-existent details.source",
events=events,
)
)

events = AlertTestSuite.create_events(default_event, 50)
for event in events:
event['_source']['utctimestamp'] = AlertTestSuite.subtract_from_timestamp_lambda(date_timedelta={'minutes': 21})
event['_source']['receivedtimestamp'] = AlertTestSuite.subtract_from_timestamp_lambda(date_timedelta={'minutes': 21})
test_cases.append(
NegativeAlertTestCase(
description="Negative test case with old timestamp",
events=events,
)
)

0 comments on commit d5a5430

Please sign in to comment.
You can’t perform that action at this time.