Skip to content
Browse files

Merge pull request #1516 from mpurzynski/gdnew

A new version of the guardduty plugin and a dedicated worker
  • Loading branch information
Phrozyn committed Dec 2, 2019
2 parents 1ca268f + 6090dfe commit 1e5eb6d1f38b041431f6c93a78c43743a633e814
Showing with 2,979 additions and 70 deletions.
  1. +166 −0 mq/
  2. +2 −2 mq/
  3. +121 −68 mq/plugins/
  4. +1,195 −0 mq/plugins/guardduty_mapping.yml
  5. +1,495 −0 tests/mq/plugins/
@@ -0,0 +1,166 @@
#!/usr/bin/env python

# This Source Code Form is subject to the terms of the Mozilla Public
# License, v. 2.0. If a copy of the MPL was not distributed with this
# file, You can obtain one at
# Copyright (c) 2017 Mozilla Corporation

import json
import sys
import socket
from configlib import getConfig, OptionParser
from datetime import datetime
from mozdef_util.utilities.toUTC import toUTC
from mozdef_util.utilities.logger import logger, initLogger

from esworker_sns_sqs import taskConsumer
from lib.plugins import sendEventToPlugins
from lib.sqs import connect_sqs
from mozdef_util.elasticsearch_client import ElasticsearchClient
from mozdef_util.utilities.key_exists import key_exists

# running under uwsgi?
import uwsgi

hasUWSGI = True
except ImportError as e:
hasUWSGI = False

class GDtaskConsumer(taskConsumer):
def build_submit_message(self, message):
# default elastic search metadata for an event
metadata = {"index": "events", "id": None}

event = {}

event["receivedtimestamp"] = toUTC(
event["mozdefhostname"] = self.options.mozdefhostname

if "tags" in event:
event["tags"] = [self.options.taskexchange]

event["severity"] = "INFO"
event["source"] = "guardduty"
event["details"] = {}

event["details"] = message["details"]
if "hostname" in message:
event["hostname"] = message["hostname"]
if "summary" in message:
event["summary"] = message["summary"]
if "category" in message:
event["details"]["category"] = message["category"]
if "tags" in message:
event["details"]["tags"] = message["tags"]
event["utctimestamp"] = toUTC(message["timestamp"]).isoformat()
event["timestamp"] = event["utctimestamp"]
(event, metadata) = sendEventToPlugins(event, metadata, self.pluginList)
# Drop message if plugins set to None
if event is None:

self.save_event(event, metadata)

def on_message(self, message_raw):
if "Message" in message_raw:
message = json.loads(message_raw["Message"])
if key_exists('details.finding.action.actionType', message):
if message["details"]["finding"]["action"]["actionType"] == "PORT_PROBE":
if "portProbeDetails" in message["details"]["finding"]["action"]["portProbeAction"]:
for probe in message["details"]["finding"]["action"]["portProbeAction"]["portProbeDetails"]:
isolatedmessage = message
isolatedmessage["details"]["finding"]["probeevent"] = probe
elif message["details"]["finding"]["action"]["actionType"] == "AWS_API_CALL":
if "recentApiCalls" in message["details"]["finding"]["additionalInfo"]:
message["details"]["finding"]["additionalInfo"]["apiCalls"] = message["details"]["finding"][
for call in message["details"]["finding"]["additionalInfo"]["apiCalls"]:
isolatedmessage = message
isolatedmessage["details"]["finding"]["apicalls"] = call

def esConnect():
"""open or re-open a connection to elastic search"""
return ElasticsearchClient((list("{0}".format(s) for s in options.esservers)), options.esbulksize)

def initConfig():
# capture the hostname
options.mozdefhostname = getConfig("mozdefhostname", socket.gethostname(), options.configfile)

# elastic search options. set esbulksize to a non-zero value to enable bulk posting, set timeout to post no matter how many events after X seconds.
options.esservers = list(getConfig("esservers", "http://localhost:9200", options.configfile).split(","))
options.esbulksize = getConfig("esbulksize", 0, options.configfile)
options.esbulktimeout = getConfig("esbulktimeout", 30, options.configfile)

# set to sqs for Amazon
options.mqprotocol = getConfig("mqprotocol", "sqs", options.configfile)

# rabbit message queue options
options.taskexchange = getConfig("taskexchange", "eventtask", options.configfile)
# rabbit: how many messages to ask for at once from the message queue
options.prefetch = getConfig("prefetch", 10, options.configfile)

# aws options
options.accesskey = getConfig("accesskey", "", options.configfile)
options.secretkey = getConfig("secretkey", "", options.configfile)
options.region = getConfig("region", "", options.configfile)

# How long to sleep between polling
options.sleep_time = getConfig("sleep_time", 0.1, options.configfile)

def main():
if hasUWSGI:"started as uwsgi mule {0}".format(uwsgi.mule_id()))
else:"started without uwsgi")

if options.mqprotocol not in ("sqs"):
logger.error("Can only process SQS queues, terminating")

sqs_queue = connect_sqs(
# consume our queue
GDtaskConsumer(sqs_queue, es, options).run()

if __name__ == "__main__":
# configure ourselves
parser = OptionParser()
"-c", dest="configfile", default=sys.argv[0].replace(".py", ".conf"), help="configuration file to use"
(options, args) = parser.parse_args()

# open ES connection globally so we don't waste time opening it per message
es = esConnect()

except KeyboardInterrupt as e:"Exiting worker")
if options.esbulksize != 0:
except Exception as e:
if options.esbulksize != 0:
@@ -53,7 +53,7 @@ def __init__(self, queue, esConnection, options):
def run(self):
while True:
records = self.sqs_queue.receive_messages(MaxNumberOfMessages=options.prefetch)
records = self.sqs_queue.receive_messages(MaxNumberOfMessages=self.options.prefetch)
for msg in records:
msg_body = msg.body
@@ -66,7 +66,7 @@ def run(self):
logger.error("Invalid message, not JSON <dropping message and continuing>: %r" % msg_body)
except (SSLEOFError, SSLError, socket.error):"Received network related error...reconnecting")
@@ -3,87 +3,140 @@
# file, You can obtain one at
# Copyright (c) 2017 Mozilla Corporation

from mozdef_util.utilities.key_exists import key_exists
import os
import yaml
import jmespath
from mozdef_util.utilities.toUTC import toUTC
from mozdef_util.utilities.dot_dict import DotDict
from platform import node

class message(object):
def __init__(self):
Plugin used to fix object type discretions with cloudtrail messages
self.registration = ['guardduty']
self.priority = 10
self.registration = ["guardduty"]
self.priority = 3

self.mozdefhostname = "{0}".format(node())
self.mozdefhostname = "failed to fetch mozdefhostname"

with open(os.path.join(os.path.dirname(__file__), "guardduty_mapping.yml"), "r") as f:
mapping_map =

yap = yaml.safe_load(mapping_map)
self.eventtypes = list(yap.keys())
self.yap = yap
del mapping_map

# AWS guard duty sends dates as iso_8601 which ES doesn't appreciate
# here's a list of date fields we'll convert to isoformat
self.date_keys = [

# AWS guard duty can send IPs in a bunch of places
# Lets pick out some likely targets and format them
# so other mozdef plugins can rely on their location
self.ipaddress_keys = [

def convert_key_date_format(self, needle, haystack):
num_levels = needle.split(".")
if len(num_levels) == 0:
return False
current_pointer = haystack
for updated_key in num_levels:
if updated_key == num_levels[-1]:
current_pointer[updated_key] = toUTC(
return haystack
if updated_key in current_pointer:
current_pointer = current_pointer[updated_key]
return haystack
self.date_keys = ["gdeventcreatedts", "gdeventupdatedts", "gdeventfirstseents", "gdeventlastseents"]

def onMessage(self, message, metadata):
if 'source' not in message:
if "source" not in message:
return (message, metadata)

if not message['source'] == 'guardduty':
if not message["source"] == "guardduty":
return (message, metadata)

# reformat the date fields to iosformat
if "details" not in message:
return (message, metadata)

newmessage = dict()
newmessage["receivedtimestamp"] = message["receivedtimestamp"]
newmessage["timestamp"] = message["timestamp"]
newmessage["utctimestamp"] = message["utctimestamp"]
newmessage["mozdefhostname"] = message["mozdefhostname"]
newmessage["tags"] = ["aws", "guardduty"] + message["tags"]
newmessage["category"] = "guardduty"
newmessage["source"] = "guardduty"
newmessage["customendpoint"] = ""
newmessage["details"] = {}
newmessage["details"]["type"] = message["details"]["finding"]["action"]["actionType"].lower()
newmessage["details"]["finding"] = message['details']["category"]
newmessage["summary"] = message["details"]["title"]
newmessage["details"]["resourcerole"] = message["details"]["finding"]["resourceRole"].lower()

# This is a hack to let the following code match and extract useful information about local network configuration
# Sometimes AWS does not feel like sending it at all or sends an empty list or a single element list or a multiple-elements list or a dictionary - so try to handle them all
if message["details"]["finding"]["action"]["actionType"] != "AWS_API_CALL":
if "networkInterfaces" in message["details"]["resource"]["instanceDetails"]:
nic = message["details"]["resource"]["instanceDetails"]["networkInterfaces"]
if isinstance(nic, list):
if len(nic) > 0:
message["details"]["resource"]["instanceDetails"]["networkInterfaces"] = nic[0]
if message["details"]["category"] in self.eventtypes:
for key in self.yap[newmessage["details"]["finding"]]:
mappedvalue =[newmessage["details"]["finding"]][key], message)
# JMESPath likes to silently return a None object
if mappedvalue is not None:
newmessage["details"][key] = mappedvalue

# reformat the date fields to isoformat
for date_key in self.date_keys:
if key_exists(date_key, message):
if message.get(date_key) is None:
message = self.convert_key_date_format(date_key, message)

# convert the dict to a dot dict for saner deep key/value processing
message = DotDict(message)
# pull out the likely source IP address
for ipaddress_key in self.ipaddress_keys:
if 'sourceipaddress' not in message['details']:
if key_exists(ipaddress_key, message):
message.details.sourceipaddress = message.get(

# if we still haven't found what we are looking for #U2
# sometimes it's in a list
if 'sourceipaddress' not in message['details']:
if key_exists('details.finding.action.portprobeaction.portprobedetails', message) \
and isinstance(message.details.finding.action.portprobeaction.portprobedetails, list):

# inspect the first list entry and see if it contains an IP
portprobedetails = DotDict(
if key_exists('remoteipdetails.ipaddressv4', portprobedetails):
message.details.sourceipaddress = portprobedetails.remoteipdetails.ipaddressv4

# recovert the message back to a plain dict
return (dict(message), metadata)
if date_key in newmessage["details"]:
newmessage["details"][date_key] = toUTC(newmessage["details"][date_key]).isoformat()

# Handle some special cases

# Propagate domain
if "miscinfo" in newmessage["details"]:
if "domain" in newmessage["details"]["miscinfo"]:
newmessage["details"]["query"] = newmessage["details"]["miscinfo"]["domain"]

# Flatten tags
if "tags" in newmessage["details"]:
newmessage["details"]["awstags"] = []
for tagkve in newmessage["details"]["tags"]:
for k, v in tagkve.items():
del newmessage["details"]["tags"]

# Find something that remotely resembles an FQDN
if "publicdnsname" in newmessage["details"]:
newmessage["hostname"] = newmessage["details"]["publicdnsname"]
elif "privatednsname" in newmessage["details"]:
newmessage["hostname"] = newmessage["details"]["privatednsname"]

# Flip IP addresses in we are the source of attacks
if (newmessage["details"]["finding"] == "UnauthorizedAccess:EC2/RDPBruteForce" or newmessage["details"]["finding"] == "UnauthorizedAccess:EC2/SSHBruteForce"):
if newmessage["details"]["direction"] == "OUTBOUND":
# could be more optimized here but need to be careful
truedstip = ""
truesrcip = ""
if "destinationipaddress" in newmessage["details"]:
truedstip = newmessage["details"]["sourceipaddress"]
if "sourceipaddress" in newmessage["details"]:
truesrcip = newmessage["details"]["destinationipaddress"]
newmessage["details"]["destinationipaddress"] = truedstip
newmessage["details"]["sourceipaddress"] = truesrcip
del newmessage["details"]["sourceport"]
del newmessage["details"]["destinationport"]

# Last resort in case we don't have any local IP address yet
# Fake it till you make it
attdir = {
"Recon:EC2/PortProbeUnprotectedPort": "INBOUND",
"CryptoCurrency:EC2/BitcoinTool.B!DNS": "INBOUND",
"Trojan:EC2/DGADomainRequest.B": "INBOUND",
"UnauthorizedAccess:IAMUser/TorIPCaller": "INBOUND",
"Persistence:IAMUser/ResourcePermissions": "INBOUND",
"Persistence:IAMUser/NetworkPermissions": "INBOUND",
"Persistence:IAMUser/UserPermissions": "INBOUND",
if "direction" not in newmessage["details"]:
newmessage["details"]["direction"] = attdir[newmessage["details"]["finding"]]
if newmessage["details"]["direction"] == "INBOUND":
if "destinationipaddress" not in newmessage["details"]:
if "publicip" in newmessage["details"]:
newmessage["details"]["destinationipaddress"] = newmessage["details"]["publicip"]
if newmessage["details"]["direction"] == "OUTBOUND":
if "sourceipaddress" not in newmessage["details"]:
if "publicip" in newmessage["details"]:
newmessage["details"]["sourceipaddress"] = newmessage["details"]["publicip"]

return (newmessage, metadata)

0 comments on commit 1e5eb6d

Please sign in to comment.
You can’t perform that action at this time.