This repository has been archived by the owner on Nov 3, 2021. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 329
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
- Loading branch information
Showing
8 changed files
with
260 additions
and
13 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,81 @@ | ||
from typing import Callable, List, NamedTuple | ||
from functools import reduce | ||
|
||
from .alert import Alert, Severity | ||
|
||
|
||
class Enhancement(NamedTuple): | ||
'''Information to enhance an `Alert` with, produced by an implementation of | ||
a `FactorInterface`. The `pipe` function handles constructing a modified | ||
`Alert` with enhancements applied. | ||
''' | ||
|
||
extras: dict | ||
severity: Severity | ||
|
||
|
||
# A factor is a sort of plugin intended to enrich a GeoModel alert with extra | ||
# information that may be useful to incident responders as well as to modify | ||
# the alert's severity. | ||
FactorInterface = Callable[[Alert], Enhancement] | ||
|
||
|
||
def pipe(alert: Alert, factors: List[FactorInterface]) -> Alert: | ||
'''Run an alert through an ordered pipeline of factors, applying the | ||
`Enhancement`s produced by each in turn. | ||
''' | ||
|
||
def _apply_enhancement(alert: Alert, enhance: Enhancement) -> Alert: | ||
return Alert( | ||
username=alert.username, | ||
hops=alert.hops, | ||
severity=enhance.severity, | ||
factors=alert.factors + [enhance.extras]) | ||
|
||
return reduce( | ||
lambda alrt, fctr: _apply_enhancement(alrt, fctr(alrt)), | ||
factors, | ||
alert) | ||
|
||
|
||
def asn_movement(db, escalate: Severity) -> FactorInterface: | ||
'''Enriches GeoModel alerts with information about the ASNs from which IPs | ||
in hops originate. When movement from one ASN to another is detected, the | ||
alert's severity will be raised. | ||
`maxmind_db_path` is the path to a MaxMind database file containing | ||
information about ASNs. | ||
`escalate` is the severity to (de-)escalate the alert to/from in the case | ||
that movement from one ASN to another is detected in the alert. | ||
''' | ||
|
||
# Keys in the dictionaries returned by MaxMind. | ||
# asn = 'autonomous_system_number' # currently not used | ||
org = 'autonomous_system_organization' | ||
|
||
def factor(alert: Alert) -> Enhancement: | ||
ips = [hop.origin.ip for hop in alert.hops] | ||
if len(alert.hops) > 0: | ||
ips.append(alert.hops[-1].destination.ip) | ||
|
||
# Converting the list of IPs to a set to get the unique items can | ||
# result in items being re-arranged. | ||
unique_ips = list({ip: True for ip in ips}.keys()) | ||
|
||
asn_info = [db.get(ip) for ip in unique_ips] | ||
asn_pairs = [ | ||
(asn_info[i], asn_info[i + 1]) | ||
for i in range(len(asn_info) - 1) | ||
] | ||
asn_hops = [ | ||
pair | ||
for pair in asn_pairs | ||
if pair[0][org] != pair[1][org] | ||
] | ||
|
||
return Enhancement( | ||
extras={'asn_hops': asn_hops}, | ||
severity=escalate if len(asn_hops) > 0 else alert.severity) | ||
|
||
return factor |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -13,5 +13,8 @@ | |
"whitelist": { | ||
"users": [], | ||
"cidrs": [] | ||
}, | ||
"factors": { | ||
"asn_movement": null | ||
} | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,90 @@ | ||
from datetime import datetime | ||
|
||
import alerts.geomodel.alert as alert | ||
import alerts.geomodel.factors as factors | ||
|
||
|
||
class MockMMDB: | ||
'''Mocks a MaxMind database connection with a dictionary of records mapping | ||
IP adresses to dictionaries containing information about ASNs. | ||
''' | ||
|
||
def __init__(self, records): | ||
self.records = records | ||
|
||
def get(self, ip): | ||
return self.records.get(ip) | ||
|
||
def close(self): | ||
return | ||
|
||
|
||
def null_origin(ip): | ||
return alert.Origin( | ||
ip=ip, | ||
city='Null', | ||
country='NA', | ||
latitude=0.0, | ||
longitude=0.0, | ||
observed=datetime.now(), | ||
geopoint='0.0,0.0') | ||
|
||
|
||
# A set of records for a mocked MaxMind database containing information about | ||
# ASNs used to test the `asn_movement` factor implementation with. | ||
asn_mvmt_records = { | ||
'1.2.3.4': { | ||
'autonomous_system_number': 54321, | ||
'autonomous_system_organization': 'CLOUDFLARENET' | ||
}, | ||
'4.3.2.1': { | ||
'autonomous_system_number': 12345, | ||
'autonomous_system_organization': 'MOZILLA_SFO1' | ||
}, | ||
'5.6.7.8': { | ||
'autonomous_system_number': 67891, | ||
'autonomous_system_organization': 'AMAZONAWSNET' | ||
} | ||
} | ||
|
||
|
||
def test_asn_movement(): | ||
factor = factors.asn_movement( | ||
MockMMDB(asn_mvmt_records), | ||
alert.Severity.WARNING) | ||
|
||
test_hops = [ | ||
alert.Hop( | ||
origin=null_origin('1.2.3.4'), | ||
destination=null_origin('4.3.2.1')), | ||
alert.Hop( | ||
origin=null_origin('4.3.2.1'), | ||
destination=null_origin('5.6.7.8')) | ||
] | ||
|
||
test_alert = alert.Alert( | ||
username='tester', | ||
hops=test_hops, | ||
severity=alert.Severity.INFO, | ||
factors=[]) | ||
|
||
pipeline = [factor] | ||
|
||
modified_alert = factors.pipe(test_alert, pipeline) | ||
|
||
assert modified_alert.username == test_alert.username | ||
assert modified_alert.severity == alert.Severity.WARNING | ||
assert len(modified_alert.factors) == 1 | ||
assert 'asn_hops' in modified_alert.factors[0] | ||
assert len(modified_alert.factors[0]['asn_hops']) == 2 | ||
|
||
asn_key = 'autonomous_system_organization' | ||
asn1 = modified_alert.factors[0]['asn_hops'][0][0][asn_key] | ||
asn2 = modified_alert.factors[0]['asn_hops'][0][1][asn_key] | ||
asn3 = modified_alert.factors[0]['asn_hops'][1][0][asn_key] | ||
asn4 = modified_alert.factors[0]['asn_hops'][1][1][asn_key] | ||
|
||
assert asn1 == 'CLOUDFLARENET' | ||
assert asn2 == 'MOZILLA_SFO1' | ||
assert asn3 == 'MOZILLA_SFO1' | ||
assert asn4 == 'AMAZONAWSNET' |
Oops, something went wrong.