/
__init__.py
352 lines (289 loc) · 12.9 KB
/
__init__.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
import atexit
import json
import logging
import socket
import time
import traceback
from threading import Timer
import requests
from requests.adapters import HTTPAdapter
from requests.packages.urllib3.util.retry import Retry
instances = [] # For keeping track of running class instances
DEFAULT_QUEUE_SIZE = 5000
# Called when application exit imminent (main thread ended / got kill signal)
@atexit.register
def perform_exit():
for instance in instances:
try:
instance.shutdown()
except Exception:
pass
def force_flush():
for instance in instances:
try:
instance.force_flush()
except Exception:
pass
def wait_until_empty():
for instance in instances:
try:
instance.wait_until_empty()
except Exception:
pass
class SplunkHandler(logging.Handler):
"""
A logging handler to send events to a Splunk Enterprise instance
running the Splunk HTTP Event Collector.
"""
def __init__(self, host, port, token, index,
allow_overrides=False, debug=False, flush_interval=15.0,
force_keep_ahead=False, hostname=None, protocol='https',
proxies=None, queue_size=DEFAULT_QUEUE_SIZE, record_format=False,
retry_backoff=2.0, retry_count=5, source=None,
sourcetype='text', timeout=60, url=None, verify=True):
"""
Args:
host (str): The Splunk host param
port (int): The port the host is listening on
token (str): Authentication token
index (str): Splunk index to write to
allow_overrides (bool): Whether to look for _<param> in log data (ex: _index)
debug (bool): Whether to print debug console messages
flush_interval (float): How often to push events to splunk host in seconds
force_keep_ahead (bool): Sleep instead of dropping logs when queue fills
hostname (str): The Splunk Enterprise hostname
protocol (str): The web protocol to use
proxies (dict): The proxies to use for the request
queue_size (int): The max number of logs to queue, set to 0 for no max
record_format (bool): Whether the log record will be json
retry_backoff (float): The requests lib backoff factor
retry_count (int): The number of times to retry a failed request
source (str): The Splunk source param
sourcetype (str): The Splunk sourcetype param
timeout (float): The time to wait for a response from Splunk
url (str): Override of the url to send the event to
verify (bool): Whether to perform ssl certificate validation
"""
global instances
instances.append(self)
logging.Handler.__init__(self)
self.allow_overrides = allow_overrides
self.host = host
self.port = port
self.token = token
self.index = index
self.source = source
self.sourcetype = sourcetype
self.verify = verify
self.timeout = timeout
self.flush_interval = flush_interval
self.force_keep_ahead = force_keep_ahead
self.log_payload = ""
self.SIGTERM = False # 'True' if application requested exit
self.timer = None
# It is possible to get 'behind' and never catch up, so we limit the queue size
self.queue = list()
self.max_queue_size = max(queue_size, 0) # 0 is min queue size
self.debug = debug
self.session = requests.Session()
self.retry_count = retry_count
self.retry_backoff = retry_backoff
self.protocol = protocol
self.proxies = proxies
self.record_format = record_format
self.processing_payload = False
if not url:
self.url = '%s://%s:%s/services/collector/event' % (self.protocol, self.host, self.port)
else:
self.url = url
# Keep ahead depends on queue size, so cannot be 0
if self.force_keep_ahead and not self.max_queue_size:
self.write_log("Cannot keep ahead of unbound queue, using default queue size")
self.max_queue_size = DEFAULT_QUEUE_SIZE
self.write_debug_log("Starting debug mode")
if hostname is None:
self.hostname = socket.gethostname()
else:
self.hostname = hostname
self.write_debug_log("Preparing to override loggers")
# prevent infinite recursion by silencing requests and urllib3 loggers
logging.getLogger('requests').propagate = False
logging.getLogger('urllib3').propagate = False
# and do the same for ourselves
logging.getLogger(__name__).propagate = False
# disable all warnings from urllib3 package
if not self.verify:
requests.packages.urllib3.disable_warnings()
if self.verify and self.protocol == 'http':
print("[SplunkHandler DEBUG] " + 'cannot use SSL Verify and unsecure connection')
if self.proxies is not None:
self.session.proxies = self.proxies
# Set up automatic retry with back-off
self.write_debug_log("Preparing to create a Requests session")
retry = Retry(total=self.retry_count,
backoff_factor=self.retry_backoff,
allowed_methods=None, # Retry for any HTTP verb
status_forcelist=[500, 502, 503, 504])
self.session.mount(self.protocol + '://', HTTPAdapter(max_retries=retry))
self.start_worker_thread()
self.write_debug_log("Class initialize complete")
def emit(self, record):
self.write_debug_log("emit() called")
try:
record = self.format_record(record)
except Exception as e:
self.write_log("Exception in Splunk logging handler: %s" % str(e))
self.write_log(traceback.format_exc())
return
if self.flush_interval <= 0:
# Flush log immediately; is blocking call
self._splunk_worker(payload=record)
return
self.write_debug_log("Writing record to log queue")
# If force keep ahead, sleep until space in queue to prevent falling behind
while self.force_keep_ahead and len(self.queue) >= self.max_queue_size:
time.sleep(self.alt_flush_interval)
# Put log message into queue; worker thread will pick up
if not self.max_queue_size or len(self.queue) < self.max_queue_size:
self.queue.append(record)
else:
self.write_log("Log queue full; log data will be dropped.")
def close(self):
self.shutdown()
logging.Handler.close(self)
#
# helper methods
#
def start_worker_thread(self):
# Start a worker thread responsible for sending logs
if self.flush_interval > 0:
self.write_debug_log("Preparing to spin off first worker thread Timer")
self.timer = Timer(self.flush_interval, self._splunk_worker)
self.timer.daemon = True # Auto-kill thread if main process exits
self.timer.start()
def write_log(self, log_message):
print("[SplunkHandler] " + log_message)
def write_debug_log(self, log_message):
if self.debug:
print("[SplunkHandler DEBUG] " + log_message)
def format_record(self, record):
self.write_debug_log("format_record() called")
if self.record_format:
try:
record = json.dumps(record)
except Exception:
pass
params = {
'time': self.getsplunkattr(record, '_time', time.time()),
'host': self.getsplunkattr(record, '_host', self.hostname),
'index': self.getsplunkattr(record, '_index', self.index),
'source': record.pathname if self.source is None else self.source,
'sourcetype': self.getsplunkattr(record, '_sourcetype', self.sourcetype),
'event': self.format(record)
}
self.write_debug_log("Record dictionary created")
formatted_record = json.dumps(params, sort_keys=True)
self.write_debug_log("Record formatting complete")
return formatted_record
def getsplunkattr(self, obj, attr, default=None):
val = default
if self.allow_overrides:
val = getattr(obj, attr, default)
try:
delattr(obj, attr)
except Exception:
pass
return val
def _splunk_worker(self, payload=None):
self.write_debug_log("_splunk_worker() called")
if self.flush_interval > 0:
# Stop the timer. Happens automatically if this is called
# via the timer, does not if invoked by force_flush()
self.timer.cancel()
self.processing_payload = True
queue_is_empty = self.empty_queue()
if not payload:
payload = self.log_payload
self.log_payload = ""
if payload:
self.write_debug_log("Payload available for sending")
self.write_debug_log("Destination URL is " + self.url)
try:
self.write_debug_log("Sending payload: " + payload)
r = self.session.post(
self.url,
data=payload,
headers={'Authorization': "Splunk %s" % self.token},
verify=self.verify,
timeout=self.timeout
)
r.raise_for_status() # Throws exception for 4xx/5xx status
self.write_debug_log("Payload sent successfully")
except Exception as e:
try:
self.write_log("Exception in Splunk logging handler: %s" % str(e))
self.write_log(traceback.format_exc())
except Exception:
self.write_debug_log("Exception encountered," +
"but traceback could not be formatted")
else:
self.write_debug_log("Timer thread executed but no payload was available to send")
# Restart the timer
if self.flush_interval > 0:
timer_interval = self.flush_interval
if self.SIGTERM:
self.write_debug_log("Timer reset aborted due to SIGTERM received")
else:
if not queue_is_empty:
self.write_debug_log("Queue not empty, scheduling timer to run immediately")
# Start up again right away if queue was not cleared
timer_interval = self.alt_flush_interval
self.write_debug_log("Resetting timer thread")
self.timer = Timer(timer_interval, self._splunk_worker)
self.timer.daemon = True # Auto-kill thread if main process exits
self.timer.start()
self.write_debug_log("Timer thread scheduled")
self.processing_payload = False
def empty_queue(self):
if len(self.queue) == 0:
self.write_debug_log("Queue was empty")
return True
self.write_debug_log("Recursing through queue")
if self.SIGTERM:
self.log_payload += ''.join(self.queue)
self.queue.clear()
else:
# without looking at each item, estimate how many can fit in 50 MB
apprx_size_base = len(self.queue[0])
# dont eval max/event size ration as less than 1
# dont count more than what is in queue to ensure the same number as pulled are deleted
count = min(max(int(524288 / apprx_size_base), 1), len(self.queue))
self.log_payload += ''.join(self.queue[:count])
del self.queue[:count]
self.write_debug_log("Queue task completed")
return len(self.queue) > 0
def force_flush(self):
self.write_debug_log("Force flush requested")
self._splunk_worker()
self.wait_until_empty() # guarantees queue is emptied
def shutdown(self):
self.write_debug_log("Immediate shutdown requested")
# Only initiate shutdown once
if self.SIGTERM:
return
self.write_debug_log("Setting instance SIGTERM=True")
self.SIGTERM = True
if self.flush_interval > 0:
self.timer.cancel() # Cancels the scheduled Timer, allows exit immediately
self.write_debug_log("Starting up the final run of the worker thread before shutdown")
# Send the remaining items that might be sitting in queue.
self._splunk_worker()
self.wait_until_empty() # guarantees queue is emptied before exit
def wait_until_empty(self):
self.write_debug_log("Waiting until queue empty")
while len(self.queue) > 0 or self.processing_payload:
self.write_debug_log("Current queue size: " + str(len(self.queue)))
time.sleep(self.alt_flush_interval)
@property
def alt_flush_interval(self):
return min(1.0, self.flush_interval / 2)