Skip to content
This repository has been archived by the owner on Nov 3, 2021. It is now read-only.

Commit

Permalink
Alert plugin possible usernames (#1598)
Browse files Browse the repository at this point in the history
  • Loading branch information
arcrose committed Apr 20, 2020
1 parent 6ede727 commit f75d3d5
Show file tree
Hide file tree
Showing 3 changed files with 236 additions and 0 deletions.
4 changes: 4 additions & 0 deletions alerts/plugins/possible_usernames.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
{
"search_window_hours": 24,
"indices_to_search": ["events-*"]
}
119 changes: 119 additions & 0 deletions alerts/plugins/possible_usernames.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,119 @@
# This Source Code Form is subject to the terms of the Mozilla Public
# License, v. 2.0. If a copy of the MPL was not distributed with this
# file, You can obtain one at https://mozilla.org/MPL/2.0/.
# Copyright (c) 2014 Mozilla Corporation

import json
import os
import typing as types

from lib.config import ES
from mozdef_util.query_models import PhraseMatch, SearchQuery, TermMatch
from mozdef_util.elasticsearch_client import ElasticsearchClient


CONFIG_FILE = os.path.join(
os.path.dirname(__file__),
'possible_usernames.json')


# TODO: Switch to dataclasses when we move to Python 3.7+


class Config(types.NamedTuple):
'''Container for the configuration required by the plugin.
'''

search_window_hours: int
indices_to_search: types.List[str]

def load(path: str) -> 'Config':
'''Attempt to load a `Config` from a JSON file.
'''

with open(path) as cfg_file:
return Config(**json.load(cfg_file))


class message:
'''Alert plugin that attempts to enrich any alert with a new
`details.possible_usernames` field containing a list of names of users who
have connected to the host described in the alert within some window of
time.
'''

def __init__(self):
self.registration = ['promisckernel']

self._config = Config.load(CONFIG_FILE)

self._es_client = ElasticsearchClient(ES['servers'])

def onMessage(self, message):
hostname = _most_common_hostname(message.get('events', []))

query = SearchQuery(hours=self._config.search_window_hours)

query.add_must([
TermMatch('category', 'syslog'),
TermMatch('hostname', hostname),
TermMatch('details.program', 'sshd'),
PhraseMatch('summary', 'Accepted publickey for '),
])

results = query.execute(
self._es_client, indices=self._config.indices_to_search)

events = [
hit.get('_source', {})
for hit in results.get('hits', [])
]

return enrich(message, events)


def enrich(alert: dict, syslog_evts: types.List[dict]) -> dict:
'''Scan syslog events looking for usernames and append them to an alert's
new `details.possible_usernames` field.
'''

details = alert.get('details', {})

scan_results = [
evt.get('details', {}).get('username')
for evt in syslog_evts
]

possible_usernames = list(set([
username
for username in scan_results
if username is not None
]))

details['possible_usernames'] = possible_usernames

alert['details'] = details

return alert


def _most_common_hostname(events: types.List[dict]) -> types.Optional[str]:
findings = {}

for event in events:
host = event.get('documentsource', {}).get('hostname')

if host is None:
continue

findings[host] = (findings[host] + 1) if host in findings else 1

# Sorting a list of (int, str) pairs results in a sort on the int.
sorted_findings = sorted(
[(count, hostname) for hostname, count in findings.items()],
reverse=True)

if len(sorted_findings) == 0:
return None

return sorted_findings[0][1]
113 changes: 113 additions & 0 deletions tests/alerts/plugins/test_possible_usernames.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,113 @@
# This Source Code Form is subject to the terms of the Mozilla Public
# License, v. 2.0. If a copy of the MPL was not distributed with this
# file, You can obtain one at https://mozilla.org/MPL/2.0/.
# Copyright (c) 2017 Mozilla Corporation

import os
import sys


class TestPossibleUsernames:
def setup(self):
self._orig_path = os.getcwd()

self._alerts_path = os.path.join(
os.path.dirname(__file__),
'../../../alerts')

sys.path.insert(0, self._alerts_path)

def teardown(self):
os.chdir(self._orig_path)

sys.path.remove(self._alerts_path)

if 'lib' in sys.modules:
del sys.modules['lib']

def test_enrichment(self):
from alerts.plugins.possible_usernames import enrich

events = [
{
# Expected event
'details': {
'username': 'tester1'
}
},
{
# No username
'details': {
'otherthing': 'somevalue'
}
},
{
# No details
'notwhatwewant': {
'something': 'else'
}
},
{
# Duplicate user
'details': {
'username': 'tester1'
}
}
]

alert = {
'details': {
'username': 'tester2'
}
}

enriched = enrich(alert, events)

# Ensure old fields still present.
assert enriched['details']['username'] == 'tester2'

# Ensure possible users found and duplicates removed.
assert len(enriched['details']['possible_usernames']) == 1
assert enriched['details']['possible_usernames'][0] == 'tester1'

def test_hostname_detection(self):
from alerts.plugins.possible_usernames import _most_common_hostname

# Stripped down version of events expected to be in alert['events'].
events = [
{
# First hostname
'documentsource': {
'hostname': 'host1',
},
},
{
# Missing documentsource
'notdocsource': {
'hostname': 'host1',
},
},
{
# Missing hostname
'documentsource': {
'nothostname': 'notahost',
},
},
{
# Duplicate hostname
'documentsource': {
'hostname': 'host1',
},
},
{
# Alternative hostname
'documentsource': {
'hostname': 'host2',
},
},
]

hostname = _most_common_hostname(events)

# host1 appears twice, host2 appears once.
assert hostname == 'host1'

0 comments on commit f75d3d5

Please sign in to comment.