/
cloudwatchlogs-sender.py
391 lines (347 loc) · 17 KB
/
cloudwatchlogs-sender.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
import sys, os, re, gzip, boto3, json, urllib.parse, urllib.request, traceback, datetime, calendar,hashlib,ast
from base64 import b64decode
logtype_config = json.loads(b64decode(os.environ['logTypeConfig']).decode('utf-8'))
s247_custom_regex = re.compile(logtype_config['regex']) if 'regex' in logtype_config else None
s247_ml_regex = re.compile(logtype_config['ml_regex']) if 'ml_regex' in logtype_config else None
s247_ignored_fields = logtype_config['ignoredFields'] if 'ignoredFields' in logtype_config else []
s247_tz = {'hrs': 0, 'mins': 0} #UTC
config_fields = {}
s247_datetime_format_string = logtype_config['dateFormat']
s247_ml_end_regex = re.compile(logtype_config['ml_end_regex']) if 'ml_end_regex' in logtype_config else None
s247_max_ml_count = s247_custom_regex.pattern.count('\<NewLine\>') if 'ml_regex' in logtype_config else None
s247_max_trace_line = 100
s247_datetime_regex = re.compile(logtype_config['dateRegex'])
log_size = 0
masking_config = logtype_config['maskingConfig'] if 'maskingConfig' in logtype_config else None
hashing_config = logtype_config['hashingConfig'] if 'hashingConfig' in logtype_config else None
derived_eval = logtype_config['derivedConfig'] if 'derivedConfig' in logtype_config else None
config_types = logtype_config['configTypes'] if 'configTypes' in logtype_config else None
if derived_eval:
try:
derived_fields = {}
for key in derived_eval:
derived_fields[key] = []
for values in derived_eval[key]:
derived_fields[key].append(re.compile(values.replace('\\\\', '\\').replace('?<', '?P<')))
except Exception as e:
traceback.print_exc()
if masking_config:
for key in masking_config:
masking_config[key]["regex"] = re.compile(masking_config[key]["regex"])
if hashing_config:
for key in hashing_config:
hashing_config[key]["regex"] = re.compile(hashing_config[key]["regex"])
if "filterConfig" in logtype_config:
for field in logtype_config['filterConfig']:
temp = []
for value in logtype_config['filterConfig'][field]['values']:
temp.append(re.compile(value))
logtype_config['filterConfig'][field]['values'] = '|'.join(x.pattern for x in temp)
if 'unix' not in s247_datetime_format_string:
is_year_present = True if '%y' in s247_datetime_format_string or '%Y' in s247_datetime_format_string else False
if is_year_present is False:
s247_datetime_format_string = s247_datetime_format_string+ ' %Y'
is_timezone_present = True if ('%z' in s247_datetime_format_string or 'T' in s247_datetime_format_string) else False
if not is_timezone_present and 'timezone' in logtype_config:
tz_value = logtype_config['timezone']
if tz_value.startswith('+'):
s247_tz['hrs'] = int('-' + tz_value[1:3])
s247_tz['mins'] = int('-' + tz_value[3:5])
elif tz_value.startswith('-'):
s247_tz['hrs'] = int('+' + tz_value[1:3])
s247_tz['mins'] = int('+' + tz_value[3:5])
def load_config_field_value(payload):
log_stream = payload['logStream']
for field_name in config_types:
config_type = config_types[field_name]
if '@filepath' in config_type:
if ':' in config_type and int(config_type[10:]) < log_stream.count(os.sep):
config_fields[field_name] = log_stream.split(os.sep)[int(config_type[10:])]
def log_line_filter(formatted_line):
if masking_config:
apply_masking(formatted_line)
if hashing_config:
apply_hashing(formatted_line)
if derived_eval:
derivedFields(formatted_line)
def get_timestamp(datetime_string):
try:
''' If the date value is in unix format the no need to process the date string '''
if 'unix' in s247_datetime_format_string:
return datetime_string+'000' if s247_datetime_format_string == 'unix' else datetime_string
if is_year_present is False:
from datetime import date
year = date.today().year
datetime_string = datetime_string + ' ' + str(year)
''' check added to replace +05:30 to +0530 '''
if is_timezone_present:
datetime_string = re.sub(r'([+-])(\d{2}):(\d{2})', r'\1\2\3', datetime_string)
datetime_data = datetime.datetime.strptime(datetime_string, s247_datetime_format_string)
if is_timezone_present is False:
datetime_data += datetime.timedelta(hours=s247_tz['hrs'], minutes=s247_tz['mins'])
timestamp = calendar.timegm(datetime_data.utctimetuple()) *1000 + int(datetime_data.microsecond/1000)
return int(timestamp)
except Exception as e:
return 0
def parse_lines(lines_read, log_group):
global log_size
parsed_lines = []
for line in lines_read:
if line['message']:
try:
matcher = s247_custom_regex.search(line['message'])
if matcher:
log_size += len(line)
log_fields = matcher.groupdict(default='-')
for field_name in s247_ignored_fields:
log_size -= len(log_fields.pop(field_name, ''))
formatted_line={}
formatted_line.update(log_fields)
log_line_filter(formatted_line)
add_message_metadata(formatted_line,log_group)
parsed_lines.append(formatted_line)
else:
is_date_present = s247_datetime_regex.search(line)
if is_date_present is None:
parsed_lines[-1][message_key] += '\n' + line
log_size += len(line)
except Exception as e:
traceback.print_exc()
return parsed_lines, log_size
def add_message_metadata(formatted_line,log_group):
formatted_line.update({'_zl_timestamp' : datetime.datetime.now().timestamp() if logtype_config['dateFormat'] == 'agent_time' else get_timestamp(formatted_line[logtype_config['dateField']]), 's247agentuid' : log_group})
if config_fields:
formatted_line.update(config_fields)
def is_filters_matched(formatted_line):
if 'filterConfig' in logtype_config:
for config in logtype_config['filterConfig']:
if config in formatted_line:
if re.findall(logtype_config['filterConfig'][config]['values'], formatted_line[config]):
val = True
else:
val = False
if (logtype_config['filterConfig'][config]['match'] ^ (val)):
return False
return True
def get_json_value(obj, key):
if key in obj:
val = obj[key]
return str(val) if type(val) in [dict, list] else val
elif '.' in key:
parent_key = key[:key.index('.')]
child_key = key[key.index('.')+1:]
if parent_key in obj:
return get_json_value(obj[parent_key], child_key)
def json_log_parser(lines_read, log_group):
global log_size
parsed_lines = []
for event_obj in lines_read:
formatted_line = {}
json_log_size=0
json_keys_size = len(logtype_config['jsonPath'])
if json_keys_size>2 and not event_obj['message'].startswith('{'):
continue
for path_obj in logtype_config['jsonPath']:
value = get_json_value(event_obj if json_keys_size==2 else json.loads(event_obj['message']), path_obj['key' if 'key' in path_obj else 'name'])
if value is not None and value != '':
formatted_line[path_obj['name']] = value
json_log_size+= len(str(value))
if not is_filters_matched(formatted_line):
continue
log_size += json_log_size
log_line_filter(formatted_line)
add_message_metadata(formatted_line,log_group)
parsed_lines.append(formatted_line)
return parsed_lines, log_size
def get_last_group_inregex(regex):
for group_name in regex.groupindex:
if regex.groupindex[group_name] == regex.groups:
return group_name
''' if last group is empty then we need to get previous one'''
for group_name in regex.groupindex:
if regex.groupindex[group_name] == regex.groups - 1:
return group_name
if 'jsonPath' not in logtype_config:
message_key = get_last_group_inregex(s247_custom_regex)
def ml_regex_applier(line, ml_data,formatted_line):
global log_size
try:
for matcher in re.finditer(s247_custom_regex, line):
log_fields = matcher.groupdict(default='-')
log_fields.update(ml_data)
for field_name in s247_ignored_fields:
log_size -= len(log_fields.pop(field_name, ''))
formatted_line.update(log_fields)
except Exception:
traceback.print_exc()
formatted_line = {}
def ml_log_parser(lines_read, log_group):
global log_size
log_size = 0
parsed_lines = []
ml_trace = ''
ml_trace_buffer = ''
ml_found = False
ml_end_line_found = False
ml_data = None
ml_count = 0
for line_act in lines_read:
ml_line = line_act['message'].split('\n')
for line in ml_line:
if line:
try:
ml_start_matcher = s247_ml_regex.match(line)
if ml_start_matcher or ml_end_line_found:
ml_found = ml_start_matcher
ml_end_line_found = False
formatted_line = {}
if len(ml_trace) > 0:
try:
log_size = log_size+len(ml_trace)
ml_regex_applier(ml_trace, ml_data,formatted_line)
if ml_trace_buffer and formatted_line:
formatted_line[message_key] = formatted_line[message_key] + ml_trace_buffer
log_size+=len(ml_trace_buffer)
log_line_filter(formatted_line)
add_message_metadata(formatted_line,log_group)
parsed_lines.append(formatted_line)
ml_trace = ''
ml_trace_buffer = ''
if ml_found:
ml_data = ml_start_matcher.groupdict()
log_size += len(line)
else:
ml_data = {}
ml_count = 0
except Exception as e:
traceback.print_exc()
elif ml_found:
log_size += len(line)
ml_data = ml_start_matcher.groupdict()
elif ml_found:
if ml_count < s247_max_ml_count:
ml_trace += '<NewLine>' + line
elif s247_ml_end_regex and s247_ml_end_regex.match(line):
ml_end_line_found = True
elif (ml_count - s247_max_ml_count) < s247_max_trace_line:
ml_trace_buffer += "\n" + line
ml_count += 1
except Exception as e:
traceback.print_exc()
if len(ml_trace) > 0:
try:
log_size = log_size+len(ml_trace)
formatted_line={}
ml_regex_applier(ml_trace, ml_data, formatted_line)
if ml_trace_buffer and formatted_line:
formatted_line[message_key] = formatted_line[message_key] + ml_trace_buffer
log_size+=len(ml_trace_buffer)
log_line_filter(formatted_line)
add_message_metadata(formatted_line,log_group)
parsed_lines.append(formatted_line)
except Exception as e:
traceback.print_exc()
return parsed_lines, log_size
def send_logs_to_s247(gzipped_parsed_lines, log_size):
header_obj = {'X-DeviceKey': logtype_config['apiKey'], 'X-LogType': logtype_config['logType'],
'X-StreamMode' :1, 'Log-Size': log_size, 'Content-Type' : 'application/json', 'Content-Encoding' : 'gzip', 'User-Agent' : 'AWS-Lambda'
}
upload_url = 'https://'+logtype_config['uploadDomain']+'/upload'
request = urllib.request.Request(upload_url, headers=header_obj)
s247_response = urllib.request.urlopen(request, data=gzipped_parsed_lines)
dict_responseHeaders = dict(s247_response.getheaders())
if s247_response and s247_response.status == 200:
print('{}:All logs are uploaded to site24x7'.format(dict_responseHeaders['x-uploadid']))
else:
print('{}:Problem in uploading to site24x7 status {}, Reason : {}'.format(dict_responseHeaders['x-uploadid'], s247_response.status, s247_response.read()))
def apply_masking(formatted_line):
global log_size
try:
for config in masking_config:
adjust_length = 0
mask_regex = masking_config[config]['regex']
if config in formatted_line :
field_value = str(formatted_line[config])
for matcher in re.finditer(mask_regex, field_value):
if matcher:
for i in range(mask_regex.groups):
matched_value = matcher.group(i + 1)
if matched_value:
start = matcher.start(i + 1)
end = matcher.end(i + 1)
if start >= 0 and end > 0:
start = start - adjust_length
end = end - adjust_length
adjust_length += (end - start) -len(masking_config[config]['string'])
field_value = field_value[:start] + masking_config[config]['string'] + field_value[end:]
formatted_line[config] = field_value
log_size -= adjust_length
except Exception as e:
traceback.print_exc()
def apply_hashing(formatted_line):
global log_size
try:
for config in hashing_config:
adjust_length = 0
mask_regex = hashing_config[config]['regex']
if config in formatted_line:
field_value = str(formatted_line[config])
for matcher in re.finditer(mask_regex, field_value):
if matcher:
for i in range(mask_regex.groups):
matched_value = matcher.group(i + 1)
if matched_value:
start = matcher.start(i + 1)
end = matcher.end(i + 1)
if start >= 0 and end > 0:
start = start - adjust_length
end = end - adjust_length
hash_string = hashlib.sha256(matched_value.encode('utf-8')).hexdigest()
adjust_length += (end - start) - len(hash_string)
field_value = field_value[:start] + hash_string + field_value[end:]
formatted_line[config] = field_value
log_size -= adjust_length
except Exception as e:
traceback.print_exc()
def derivedFields(formatted_line):
global log_size
try:
for items in derived_fields:
for each in derived_fields[items]:
if items in formatted_line:
match_derived = each.search(formatted_line[items])
if match_derived:
match_derived_field = match_derived.groupdict(default='-')
formatted_line.update(match_derived_field)
for field_name in match_derived_field:
log_size += len(formatted_line[field_name])
break
except Exception as e:
traceback.print_exc()
def lambda_handler(event, context):
global log_size
try:
cw_data = event['awslogs']['data']
compressed_payload = b64decode(cw_data)
uncompressed_payload = gzip.decompress(compressed_payload)
payload = json.loads(uncompressed_payload)
if config_types:
load_config_field_value(payload)
log_group = payload['logGroup']
log_events = payload['logEvents']
if 'jsonPath' in logtype_config:
parsed_lines, log_size = json_log_parser(log_events, log_group)
elif s247_ml_regex:
parsed_lines, log_size = ml_log_parser(log_events, log_group)
else:
parsed_lines, log_size = parse_lines(log_events, log_group)
if parsed_lines:
gzipped_parsed_lines = gzip.compress(json.dumps(parsed_lines).encode())
send_logs_to_s247(gzipped_parsed_lines, log_size)
except Exception as e:
print(e)
raise e
return {
'statusCode': 200,
'body': json.dumps('Logs Uploaded Successfully')
}