Skip to content
This repository has been archived by the owner on Nov 3, 2021. It is now read-only.

Commit

Permalink
Set geomodel alert severity to be configurable (#1675)
Browse files Browse the repository at this point in the history
  • Loading branch information
pwnbus committed Sep 9, 2020
1 parent 3291e8d commit ddd2114
Show file tree
Hide file tree
Showing 7 changed files with 26 additions and 30 deletions.
20 changes: 4 additions & 16 deletions alerts/geomodel/alert.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
from datetime import datetime
from enum import Enum
import math
from operator import attrgetter
from typing import List, NamedTuple, Optional
Expand Down Expand Up @@ -44,26 +43,14 @@ def to_json(self):
}


class Severity(Enum):
'''A representation of the different levels of severity that an alert can
be raised to.
'''

STATUS = 'STATUS'
INFO = 'INFO'
WARNING = 'WARNING'
CRITICAL = 'CRITICAL'
ERROR = 'ERROR'


class Alert(NamedTuple):
'''A container for the data the alerts output by GeoModel contain.
'''

username: str
hops: List[Hop]
severity: Severity
severity: str
# Because we cannot know ahead of time what factors (see factors.py) will
# have been implemented and registered for use, this container should be
# thought of as something of a black-box useful only for humans looking
Expand Down Expand Up @@ -92,7 +79,8 @@ def _travel_possible(loc1: Locality, loc2: Locality) -> bool:
def alert(
username: str,
from_evts: List[Locality],
from_es: List[Locality]
from_es: List[Locality],
severity: str
) -> Optional[Alert]:
'''Determine whether an alert should fire given a particular user's
locality state. If an alert should fire, an `Alert` is returned, otherwise
Expand Down Expand Up @@ -120,7 +108,7 @@ def alert(
if len(hops) == 0:
return None

return Alert(username, hops, Severity.INFO, [])
return Alert(username, hops, severity, [])


def summary(alert: Alert) -> str:
Expand Down
2 changes: 2 additions & 0 deletions alerts/geomodel/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,8 @@ class Config(NamedTuple):
'''The top-level configuration type.
'''

asn_movement_severity: str
severity: str
localities: Localities
events: Events
whitelist: Whitelist
Expand Down
6 changes: 3 additions & 3 deletions alerts/geomodel/factors.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
from typing import Callable, List, NamedTuple
from functools import reduce

from .alert import Alert, Severity
from .alert import Alert


class Enhancement(NamedTuple):
Expand All @@ -11,7 +11,7 @@ class Enhancement(NamedTuple):
'''

extras: dict
severity: Severity
severity: str


# A factor is a sort of plugin intended to enrich a GeoModel alert with extra
Expand All @@ -38,7 +38,7 @@ def _apply_enhancement(alert: Alert, enhance: Enhancement) -> Alert:
alert)


def asn_movement(db, escalate: Severity) -> FactorInterface:
def asn_movement(db, escalate: str) -> FactorInterface:
'''Enriches GeoModel alerts with information about the ASNs from which IPs
in hops originate. When movement from one ASN to another is detected, the
alert's severity will be raised.
Expand Down
5 changes: 5 additions & 0 deletions alerts/geomodel_location.json
Original file line number Diff line number Diff line change
@@ -1,4 +1,9 @@
{
// severity to (de-)escalate the alert to/from in the case
// that movement from one ASN to another is detected in the alert.
"asn_movement_severity": "DEBUG",
// Default severity of alert
"severity": "INFO",
"localities": {
"es_index": "localities",
"valid_duration_days": 1,
Expand Down
11 changes: 6 additions & 5 deletions alerts/geomodel_location.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
# Copyright (c) 2015 Mozilla Corporation

from datetime import datetime, timedelta
import json
import hjson
import os

import maxminddb as mmdb
Expand Down Expand Up @@ -144,7 +144,8 @@ def _utctimestamp(event):
new_alert = alert.alert(
cleaned.state.username,
new_state.localities,
cleaned.state.localities)
cleaned.state.localities,
cfg.severity)

updated = locality.update(cleaned.state, new_state)

Expand All @@ -165,7 +166,7 @@ def _utctimestamp(event):
'geomodel',
['geomodel'],
events,
modded_alert.severity.value)
modded_alert.severity)

# The IP that the user is acting from is the one they hopped to.

Expand All @@ -184,7 +185,7 @@ def _utctimestamp(event):

def _load_config(self):
with open(_CONFIG_FILE) as cfg_file:
cfg = json.load(cfg_file)
cfg = hjson.load(cfg_file)

cfg['localities'] = config.Localities(**cfg['localities'])

Expand All @@ -208,7 +209,7 @@ def _prepare_factor_pipeline(self, cfg):
pipeline.append(
factors.asn_movement(
mmdb.open_database(cfg.factors.asn_movement.maxmind_db_path),
alert.Severity.WARNING
cfg.asn_movement_severity
)
)

Expand Down
6 changes: 3 additions & 3 deletions tests/alerts/geomodel/test_alert.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ def test_do_not_alert_when_travel_possible(self):
radius=50)
]

alert_produced = alert('tester1', evts, [])
alert_produced = alert('tester1', evts, [], 'INFO')

assert alert_produced is None

Expand All @@ -54,7 +54,7 @@ def test_do_alert_when_travel_impossible(self):
radius=50)
]

alert_produced = alert('testuser', evts, [])
alert_produced = alert('testuser', evts, [], 'INFO')

assert alert_produced is not None
assert alert_produced.username == 'testuser'
Expand Down Expand Up @@ -89,7 +89,7 @@ def test_alerts_include_all_impossible_hops(self):
radius=50)
]

alert_produced = alert('tester', evts, [])
alert_produced = alert('tester', evts, [], 'INFO')

assert alert_produced is not None
assert len(alert_produced.hops) == 2
Expand Down
6 changes: 3 additions & 3 deletions tests/alerts/geomodel/test_factors.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ def null_origin(ip):
def test_asn_movement():
factor = factors.asn_movement(
MockMMDB(asn_mvmt_records),
alert.Severity.WARNING)
'WARNING')

test_hops = [
alert.Hop(
Expand All @@ -65,15 +65,15 @@ def test_asn_movement():
test_alert = alert.Alert(
username='tester',
hops=test_hops,
severity=alert.Severity.INFO,
severity='INFO',
factors=[])

pipeline = [factor]

modified_alert = factors.pipe(test_alert, pipeline)

assert modified_alert.username == test_alert.username
assert modified_alert.severity == alert.Severity.WARNING
assert modified_alert.severity == 'WARNING'
assert len(modified_alert.factors) == 1
assert 'asn_hops' in modified_alert.factors[0]
assert len(modified_alert.factors[0]['asn_hops']) == 2
Expand Down

0 comments on commit ddd2114

Please sign in to comment.