forked from hubblestack/hubble
-
Notifications
You must be signed in to change notification settings - Fork 0
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
- Loading branch information
Showing
2 changed files
with
129 additions
and
130 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,128 @@ | ||
|
||
import urllib3 | ||
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning) | ||
|
||
_max_content_bytes = 100000 | ||
http_event_collector_debug = False | ||
|
||
# Thanks to George Starcher for the http_event_collector class (https://github.com/georgestarcher/) | ||
# Default batch max size to match splunk's default limits for max byte | ||
# See http_input stanza in limits.conf; note in testing I had to limit to 100,000 to avoid http event collector breaking connection | ||
# Auto flush will occur if next event payload will exceed limit | ||
class http_event_collector: | ||
|
||
def __init__(self, token, http_event_server, host='', http_event_port='8088', | ||
http_event_server_ssl=True, http_event_collector_ssl_verify=True, | ||
max_bytes=_max_content_bytes, proxy=None, timeout=9.05): | ||
self.timeout = timeout | ||
self.token = token | ||
self.batchEvents = [] | ||
self.maxByteLength = max_bytes | ||
self.currentByteLength = 0 | ||
self.server_uri = [] | ||
self.http_event_collector_ssl_verify = http_event_collector_ssl_verify | ||
if proxy and http_event_server_ssl: | ||
self.proxy = {'https': 'https://{0}'.format(proxy)} | ||
elif proxy: | ||
self.proxy = {'http': 'http://{0}'.format(proxy)} | ||
else: | ||
self.proxy = {} | ||
|
||
# Set host to specified value or default to localhostname if no value provided | ||
if host: | ||
self.host = host | ||
else: | ||
self.host = socket.gethostname() | ||
|
||
# Build and set server_uri for http event collector | ||
# Defaults to SSL if flag not passed | ||
# Defaults to port 8088 if port not passed | ||
|
||
servers = http_event_server | ||
if not isinstance(servers, list): | ||
servers = [servers] | ||
for server in servers: | ||
if http_event_server_ssl: | ||
self.server_uri.append(['https://%s:%s/services/collector/event' % (server, http_event_port), True]) | ||
else: | ||
self.server_uri.append(['http://%s:%s/services/collector/event' % (server, http_event_port), True]) | ||
|
||
if http_event_collector_debug: | ||
print self.token | ||
print self.server_uri | ||
|
||
def sendEvent(self, payload, eventtime=''): | ||
# Method to immediately send an event to the http event collector | ||
|
||
headers = {'Authorization': 'Splunk ' + self.token} | ||
|
||
# If eventtime in epoch not passed as optional argument use current system time in epoch | ||
if not eventtime: | ||
eventtime = str(int(time.time())) | ||
|
||
# Fill in local hostname if not manually populated | ||
if 'host' not in payload: | ||
payload.update({'host': self.host}) | ||
|
||
# Update time value on payload if need to use system time | ||
data = {'time': eventtime} | ||
data.update(payload) | ||
|
||
# send event to http event collector | ||
r = requests.post(self.server_uri, data=json.dumps(data), headers=headers, | ||
verify=self.http_event_collector_ssl_verify, proxies=self.proxy) | ||
|
||
# Print debug info if flag set | ||
if http_event_collector_debug: | ||
logger.debug(r.text) | ||
logger.debug(data) | ||
|
||
def batchEvent(self, payload, eventtime=''): | ||
# Method to store the event in a batch to flush later | ||
|
||
# Fill in local hostname if not manually populated | ||
if 'host' not in payload: | ||
payload.update({'host': self.host}) | ||
|
||
payloadLength = len(json.dumps(payload)) | ||
|
||
if (self.currentByteLength + payloadLength) > self.maxByteLength: | ||
self.flushBatch() | ||
# Print debug info if flag set | ||
if http_event_collector_debug: | ||
print 'auto flushing' | ||
else: | ||
self.currentByteLength = self.currentByteLength + payloadLength | ||
|
||
# If eventtime in epoch not passed as optional argument use current system time in epoch | ||
if not eventtime: | ||
eventtime = str(int(time.time())) | ||
|
||
# Update time value on payload if need to use system time | ||
data = {'time': eventtime} | ||
data.update(payload) | ||
|
||
self.batchEvents.append(json.dumps(data)) | ||
|
||
def flushBatch(self): | ||
# Method to flush the batch list of events | ||
|
||
if len(self.batchEvents) > 0: | ||
headers = {'Authorization': 'Splunk ' + self.token} | ||
self.server_uri = [x for x in self.server_uri if x[1] is not False] | ||
for server in self.server_uri: | ||
try: | ||
r = requests.post(server[0], data=' '.join(self.batchEvents), headers=headers, | ||
verify=self.http_event_collector_ssl_verify, | ||
proxies=self.proxy, timeout=self.timeout) | ||
r.raise_for_status() | ||
server[1] = True | ||
break | ||
except requests.exceptions.RequestException: | ||
#log.info('Request to splunk server "%s" failed. Marking as bad.' % server[0]) | ||
server[1] = False | ||
except Exception as e: | ||
#log.error('Request to splunk threw an error: {0}'.format(e)) | ||
pass | ||
self.batchEvents = [] | ||
self.currentByteLength = 0 |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters