Skip to content
This repository has been archived by the owner on Nov 3, 2021. It is now read-only.

Commit

Permalink
Merge pull request #230 from ameihm0912/master
Browse files Browse the repository at this point in the history
add MozDef vulnerability processing plugin
  • Loading branch information
jeffbryner committed Jan 30, 2015
2 parents aa53e90 + 9a4efd1 commit 73d210b
Showing 1 changed file with 92 additions and 0 deletions.
92 changes: 92 additions & 0 deletions mq/plugins/vulnerability.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
# This Source Code Form is subject to the terms of the Mozilla Public
# License, v. 2.0. If a copy of the MPL was not distributed with this
# file, You can obtain one at http://mozilla.org/MPL/2.0/.
# Copyright (c) 2015 Mozilla Corporation
#
# Contributors:
# Aaron Meihm ameihm@mozilla.com

import hashlib
import sys
import unittest

class message(object):
def __init__(self):
self.registration = ['vulnerability']
self.priority = 20

def validate(self,message):
for k in ['utctimestamp', 'description', 'vuln', 'asset']:
if k not in message.keys():
return False
for k in ['assetid', 'ipv4address', 'hostname', 'macaddress']:
if k not in message['asset'].keys():
return False
for k in ['status', 'vulnid', 'title', 'discovery_time', 'age_days',
'known_malware', 'known_exploits', 'cvss', 'cves']:
if k not in message['vuln'].keys():
return False
return True

def calculate_id(self, message):
s = '{0}|{1}'.format(message['asset']['assetid'], message['vuln']['vulnid'])
return hashlib.md5(s).hexdigest()

def onMessage(self, message, metadata):
if metadata['doc_type'] != 'vulnerability':
return (message, metadata)
if not self.validate(message):
sys.stderr.write('error: invalid format for vulnerability {0}'.format(message))
return (None, None)
metadata['id'] = self.calculate_id(message)
metadata['doc_type'] = 'vulnerability_state'
metadata['index'] = 'vulnerability'
return (message, metadata)

class MessageTestFunctions(unittest.TestCase):
def setUp(self):
self.msgobj = message()

self.msg = {}
self.msg['description'] = 'system vulnerability management automation'
self.msg['utctimestamp'] = '2015-01-21T15:33:51.136378+00:00'
self.msg['asset'] = {}
self.msg['asset']['assetid'] = 23
self.msg['asset']['ipv4address'] = '1.2.3.4'
self.msg['asset']['macaddress'] = ''
self.msg['asset']['hostname'] = 'git.mozilla.com'
self.msg['vuln'] = {}
self.msg['vuln']['status'] = 'new'
self.msg['vuln']['vulnid'] = 'nexpose:43883'
self.msg['vuln']['title'] = 'RHSA-2013:1475: postgresql and postgresql84 security update'
self.msg['vuln']['discovery_time'] = 1421845863
self.msg['vuln']['age_days'] = 32.7
self.msg['vuln']['known_malware'] = False
self.msg['vuln']['known_exploits'] = False
self.msg['vuln']['cvss'] = 8.5
self.msg['vuln']['cves'] = ['CVE-2013-022', 'CVE-2013-1900']

def test_onMessage(self):
metadata = {}
metadata['doc_type'] = 'vulnerability'
(retmessage, retmeta) = self.msgobj.onMessage(self.msg, metadata)
self.assertIsNotNone(retmessage)
self.assertIsNotNone(retmeta)
self.assertEquals(retmeta['id'], 'ed40885b25e1c5b03c95f689924b5d8e')

def test_calculate_id(self):
self.assertEquals(self.msgobj.calculate_id(self.msg), 'ed40885b25e1c5b03c95f689924b5d8e')

def test_validate_correct(self):
self.assertTrue(self.msgobj.validate(self.msg))

def test_validate_incorrect(self):
del self.msg['utctimestamp']
self.assertFalse(self.msgobj.validate(self.msg))

def test_validate_incorrect_vuln(self):
del self.msg['vuln']['age_days']
self.assertFalse(self.msgobj.validate(self.msg))

if __name__ == '__main__':
unittest.main(verbosity=2)

0 comments on commit 73d210b

Please sign in to comment.