generated from jacobm3/template-python-terraform-gitignore
-
Notifications
You must be signed in to change notification settings - Fork 4
/
vault-audit-log-handler.py
114 lines (89 loc) · 3.52 KB
/
vault-audit-log-handler.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
#!/usr/bin/python3
import argparse
import configparser
import json
import pprint as pp
import requests
import sys
import traceback
def options():
'Parse command line options with argparse.'
global args,config
parser = argparse.ArgumentParser()
parser.add_argument("-d", dest="debug", action='store_true', help="print debug information")
parser.add_argument("-c", dest="configpath", default='/usr/local/etc/vault-log-handler.ini', help='config file path')
args = parser.parse_args()
# if not options.feedlistfile:
# print("\nError: Must specify -l <feed list file>\n")
# parser.print_help()
# sys.exit(1)
config = configparser.ConfigParser()
config.read(args.configpath)
def check(j,ln):
# any audit log entries with these policies or namespaces will generate an alert
alert_policies = ['root','some-other-policy']
alert_namespaces = ['break-glass']
# Parse commonly referenced parameters for easy reference
path = j['request']['path']
policies = j['auth']['policies']
# Namespace section not present with Vault OSS
namespace = None
try:
namespace = j['request']['namespace']['path']
except Exception:
pass
path = j['request']['path']
# Policies only present on authn calls
policies = []
try:
policies = j['auth']['policies']
except KeyError as e:
pass
alert_msgs = []
# Alert if polices from alert_policies are used
for policy in policies:
if policy in alert_policies:
alert_msgs.append('%s policy used' % policy)
# Alert if namespaces from alert_namespaces are used
for alert_namespace in alert_namespaces:
if namespace == alert_namespace:
alert_msgs.append('%s namespace used' % namespace)
if path.startswith('sys/generate-root'):
alert_msgs.append('Root token generation path accessed: %s' % path)
if path.startswith('secret/data/emergency-only'):
alert_msgs.append('sensitive path accessed: %s' % path)
if namespace == 'prod/' and path.startswith('secret/data/never-use'):
alert_msgs.append('namespace %s, path %s accessed' % ('prod',path))
if namespace == 'prod-us/' and path.startswith('pki/issue/hashicorp-test-dot-com'):
alert_msgs.append('namespace %s, path %s accessed' % ('prod-us',path))
if alert_msgs:
if args.debug: print('check():',alert_msgs)
alert_slack(j,ln,alert_msgs)
def alert_slack(j,ln,alert_msgs):
if args.debug: print('alert_slack():',alert_msgs)
with open('/var/log/vault/audit.alert', "a") as f:
f.write('{"text":"ALERT: %s, request_id: %s "}' % (alert_msgs,j['request']['id']))
try:
payload = '{"text":"ALERT: %s, request_id: %s "}' % (alert_msgs,j['request']['id'])
headers = {'Content-type': 'application/json'}
r = requests.post(config['slack']['url'], headers=headers, data=payload)
# TODO - figure out how to post the full audit entry, nicely formatted
#unquoted = ln.replace('"','')
#payload = '{"text":"%s"}' % unquoted
#r = requests.post(slack_webhook.url, headers=headers, data=payload)
except Exception as e:
pass
#print(e)
def read_stdin():
ln = sys.stdin.readline().rstrip()
while ln:
try:
j = json.loads(ln)
check(j,ln)
except Exception as e:
pass
#traceback.print_exc()
ln = sys.stdin.readline().rstrip()
if __name__ == '__main__':
options()
read_stdin()