-
Notifications
You must be signed in to change notification settings - Fork 28
/
lambda_handler.py
145 lines (118 loc) · 5.76 KB
/
lambda_handler.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
import boto3
import json
import logging
import os
import sys
from dynamodb_json import json_util as dynamodb_json
from config_rules import *
from sns_logging_handler import *
class Remediate:
def __init__(self, logging, event):
# parameters
self.logging = logging
self.event = event
# event payload
self.logging.debug("Event payload: %s" % self.event)
# variables
self.settings = self.get_settings()
# classes
self.config = ConfigRules(self.logging)
self.security_hub = SecurityHubRules(self.logging)
self.custom = CustomRules(self.logging)
def remediate(self):
for record in self.event.get('Records'):
remediation = True
config_message = json.loads(record.get('body'))
config_rule_name = Remediate.get_config_rule_name(config_message)
config_rule_compliance = Remediate.get_config_rule_compliance(config_message)
if config_rule_compliance == 'NON_COMPLIANT':
if self.intend_to_remediate(config_rule_name):
if 'auto-remediate' in config_rule_name:
# AWS Config Managed Rules
if 'access-keys-rotated' in config_rule_name:
remediation = self.config.access_keys_rotated(config_message)
elif 'restricted-ssh' in config_rule_name:
remediation = self.config.restricted_ssh(config_message)
elif 'rds-instance-public-access-check' in config_rule_name:
remediation = self.config.rds_instance_public_access_check(config_message)
else:
self.logging.warning("No remediation available for Config Rule '%s' "
"with payload '%s'." % (config_rule_name, config_message))
elif 'securityhub' in config_rule_name:
# AWS Security Hub Rules
self.logging.warning("No remediation available for Config Rule '%s' "
"with payload '%s'." % (config_rule_name, config_message))
else:
# Custom Config Rules
self.logging.warning("No remediation available for Config Rule '%s' "
"with payload '%s'." % (config_rule_name, config_message))
else:
self.logging.info("Config Rule '%s' was not remediated "
"based on user preferences." % config_rule_name)
# if remediation was not successful, send message to DLQ
if not remediation:
self.send_to_dlq(config_message)
def intend_to_remediate(self, config_rule_name):
return self.settings.get('rules').get(config_rule_name, {}).get('remediate', True)
def get_settings(self):
settings = {}
try:
for record in boto3.client('dynamodb').scan(TableName=os.environ['SETTINGSTABLE'])['Items']:
record_json = dynamodb_json.loads(record, True)
settings[record_json.get('key')] = record_json.get('value')
except:
self.logging.error("Could not read DynamoDB table '%s'." % os.environ['SETTINGSTABLE'])
self.logging.error(sys.exc_info()[1])
return settings
def send_to_dlq(self, message):
"""
Sends a message to the DLQ
"""
client = boto3.client('sqs')
try:
# TODO Check how many times this message was retried, if 3 times then stop sending to DLQ
client.send_message(
QueueUrl=self.get_queue_url(),
MessageBody=json.dumps(message))
self.logging.debug("Remediation failed. Payload has been sent to DLQ '%s'." % os.environ.get('DLQ'))
except:
self.logging.error("Could not send payload to DLQ '%s'." % os.environ.get('DLQ'))
self.logging.error(sys.exc_info()[1])
def get_queue_url(self):
"""
Retrieves the SQS Queue URL from the SQS Queue Name.
"""
client = boto3.client('sqs')
try:
response = client.get_queue_url(QueueName=os.environ.get('DLQ'))
return response.get('QueueUrl')
except:
self.logging.error("Could not retrieve SQS Queue URL "
"for SQS Queue '%s'." % os.environ.get('DLQ'))
self.logging.error(sys.exc_info()[1])
@staticmethod
def get_config_rule_name(record):
return record.get('detail').get('configRuleName')
@staticmethod
def get_config_rule_compliance(record):
return record.get('detail').get('newEvaluationResult').get('complianceType')
def lambda_handler(event, context):
loggger = logging.getLogger()
if loggger.handlers:
for handler in loggger.handlers:
loggger.removeHandler(handler)
# change logging levels for boto and others
logging.getLogger('boto3').setLevel(logging.ERROR)
logging.getLogger('botocore').setLevel(logging.ERROR)
logging.getLogger('urllib3').setLevel(logging.ERROR)
# set logging format
logging.basicConfig(format="[%(levelname)s] %(message)s (%(filename)s, %(funcName)s(), line %(lineno)d)",
level=os.environ.get('LOGLEVEL', 'WARNING'))
# add SNS logger
sns_logger = SNSLoggingHandler(os.environ.get('LOGTOPIC'))
sns_logger.setLevel(logging.INFO)
loggger.addHandler(sns_logger)
# instantiate class
remediate = Remediate(logging, event)
# run functions
remediate.remediate()