Skip to content
Permalink
Browse files

Merge pull request #1064 from mozilla/replace_timer_with_threads

Replace timer with threads
  • Loading branch information...
pwnbus committed Feb 28, 2019
2 parents 4190c8d + 40611de commit e9566f614acc0d79def0d04e50e3b6ef5a4439fd
@@ -24,3 +24,9 @@ Update Geolite db location
------------------

Add is_ip utility function


1.0.4 (2019-01-23)
------------------

* Replaced timer with threads for cleaner bulk importing
@@ -1,4 +1,5 @@
from threading import Timer
from threading import Thread, Lock
import time


class BulkQueue():
@@ -8,24 +9,25 @@ def __init__(self, es_client, threshold=10, flush_time=30):
self.threshold = threshold
self.list = list()
self.flush_time = flush_time
self.time_thread = Timer(self.flush_time, self.timer_over)
self.flush_thread = Thread(target=self.flush_periodically)
self.flush_thread.daemon = True
self.lock = Lock()
self.running = False

def timer_over(self):
self.flush()
self.time_thread = Timer(self.flush_time, self.timer_over)
self.start_timer()

def start_timer(self):
""" Start timer thread that flushes queue every X seconds """
self.time_thread.start()
def start_thread(self):
self.stopping_thread = False
self.running = True
self.flush_thread.start()

def stop_timer(self):
""" Stop timer thread """
self.time_thread.cancel()
def stop_thread(self):
self.stopping_thread = True
self.running = False

def flush_periodically(self):
while True and not self.stopping_thread:
time.sleep(self.flush_time)
self.flush()

def started(self):
return self.running

@@ -37,7 +39,11 @@ def add(self, index, doc_type, body, doc_id=None):
"_id": doc_id,
"_source": body
}
self.list.append(bulk_doc)
self.lock.acquire()
try:
self.list.append(bulk_doc)
finally:
self.lock.release()
if self.size() >= self.threshold:
self.flush()

@@ -48,4 +54,8 @@ def size(self):
def flush(self):
""" Write all stored events to ES """
self.es_client.save_documents(self.list)
self.list = list()
self.lock.acquire()
try:
self.list = list()
finally:
self.lock.release()
@@ -114,15 +114,13 @@ def save_documents(self, documents):
except BulkIndexError as e:
logger.error("Error bulk indexing: " + str(e))

def start_bulk_timer(self):
if not self.bulk_queue.started():
self.bulk_queue.start_timer()

def finish_bulk(self):
self.bulk_queue.stop_timer()
self.bulk_queue.flush()
self.bulk_queue.stop_thread()

def __bulk_save_document(self, index, doc_type, body, doc_id=None):
self.start_bulk_timer()
if not self.bulk_queue.started():
self.bulk_queue.start_thread()
self.bulk_queue.add(index=index, doc_type=doc_type, body=body, doc_id=doc_id)

def __save_document(self, index, doc_type, body, doc_id=None, bulk=False):
@@ -56,6 +56,6 @@
test_suite='tests',
tests_require=[],
url='https://github.com/mozilla/MozDef/tree/master/lib',
version='1.0.3',
version='1.0.4',
zip_safe=False,
)
@@ -17,11 +17,11 @@
from boto.sqs.message import RawMessage
import gzip
from StringIO import StringIO
from threading import Timer
import re
import time
import kombu
from ssl import SSLEOFError, SSLError
from threading import Thread

from mozdef_util.utilities.toUTC import toUTC
from mozdef_util.elasticsearch_client import ElasticsearchClient, ElasticsearchBadServer, ElasticsearchInvalidIndex, ElasticsearchException
@@ -283,16 +283,12 @@ def __init__(self, mqConnection, taskQueue, esConnection):
# This value controls how long we sleep
# between reauthenticating and getting a new set of creds
self.flush_wait_time = 1800

if options.esbulksize != 0:
# if we are bulk posting enable a timer to occasionally flush the bulker even if it's not full
# to prevent events from sticking around an idle worker
self.esConnection.start_bulk_timer()

self.authenticate()
# This cycles the role manager creds every 30 minutes
# or else we would be getting errors after a while
Timer(self.flush_wait_time, self.flush_s3_creds).start()

# Run thread to flush s3 credentials
reauthenticate_thread = Thread(target=self.reauth_timer)
reauthenticate_thread.daemon = True
reauthenticate_thread.start()

def authenticate(self):
if options.cloudtrail_arn not in ['<cloudtrail_arn>', 'cloudtrail_arn']:
@@ -306,10 +302,11 @@ def authenticate(self):
role_creds = {}
self.s3_connection = boto.connect_s3(**role_creds)

def flush_s3_creds(self):
logger.debug('Recycling credentials and reassuming role')
self.authenticate()
Timer(self.flush_wait_time, self.flush_s3_creds).start()
def reauth_timer(self):
while True:
time.sleep(self.flush_wait_time)
logger.debug('Recycling credentials and reassuming role')
self.authenticate()

def process_file(self, s3file):
logger.debug("Fetching %s" % s3file.name)
@@ -169,10 +169,6 @@ def __init__(self, mqConnection, taskQueue, topicExchange, esConnection):
self.muleid = uwsgi.mule_id()
else:
self.muleid = 0
if options.esbulksize != 0:
# if we are bulk posting enable a timer to occasionally flush the bulker even if it's not full
# to prevent events from sticking around an idle worker
self.esConnection.start_bulk_timer()

def get_consumers(self, Consumer, channel):
consumer = Consumer(self.taskQueue, callbacks=[self.on_message], accept=['json', 'text/plain'], no_ack=(not options.mqack))
@@ -224,11 +224,6 @@ def __init__(self, ptRequestor, esConnection):
self.lastRequestTime = toUTC(datetime.now()) - timedelta(seconds=options.ptinterval) - \
timedelta(seconds=options.ptbackoff)

if options.esbulksize != 0:
# if we are bulk posting enable a timer to occasionally flush the bulker even if it's not full
# to prevent events from sticking around an idle worker
self.esConnection.start_bulk_timer()

def run(self):
while True:
try:
@@ -47,16 +47,9 @@ def __init__(self, mqConnection, taskQueue, esConnection, options):
self.connection = mqConnection
self.esConnection = esConnection
self.taskQueue = taskQueue

self.pluginList = registerPlugins()

self.options = options

if self.options.esbulksize != 0:
# if we are bulk posting enable a timer to occasionally flush the bulker even if it's not full
# to prevent events from sticking around an idle worker
self.esConnection.start_bulk_timer()

def run(self):
self.taskQueue.set_message_class(RawMessage)

@@ -161,11 +161,6 @@ def __init__(self, mqConnection, taskQueue, esConnection):
self.esConnection = esConnection
self.taskQueue = taskQueue

if options.esbulksize != 0:
# if we are bulk posting enable a timer to occasionally flush the bulker even if it's not full
# to prevent events from sticking around an idle worker
self.esConnection.start_bulk_timer()

def run(self):
# Boto expects base64 encoded messages - but if the writer is not boto it's not necessarily base64 encoded
# Thus we've to detect that and decode or not decode accordingly
@@ -31,7 +31,7 @@ jmespath==0.9.3
kombu==4.1.0
meld3==1.0.2
mozdef-client==1.0.11
mozdef-util==1.0.3
mozdef-util==1.0.4
MySQL-python==1.2.5
netaddr==0.7.1
nose==1.3.7
@@ -84,30 +84,30 @@ class TestTimer(BulkQueueTest):
def test_basic_timer(self):
queue = BulkQueue(self.es_client, flush_time=2)
assert queue.started() is False
queue.start_timer()
queue.start_thread()
assert queue.started() is True
queue.add(index='events', doc_type='event', body={'keyname': 'valuename'})
assert queue.size() == 1
time.sleep(3)
assert queue.size() == 0
queue.stop_timer()
queue.stop_thread()
assert queue.started() is False

def test_over_threshold(self):
queue = BulkQueue(self.es_client, flush_time=3, threshold=10)
queue.start_timer()
queue.start_thread()
for num in range(0, 201):
queue.add(index='events', doc_type='event', body={'keyname': 'value' + str(num)})
assert self.num_objects_saved() == 200
assert queue.size() == 1
time.sleep(4)
assert self.num_objects_saved() == 201
assert queue.size() == 0
queue.stop_timer()
queue.stop_thread()

def test_two_iterations(self):
queue = BulkQueue(self.es_client, flush_time=3, threshold=10)
queue.start_timer()
queue.start_thread()
for num in range(0, 201):
queue.add(index='events', doc_type='event', body={'keyname': 'value' + str(num)})
assert self.num_objects_saved() == 200
@@ -120,17 +120,17 @@ def test_two_iterations(self):
assert self.num_objects_saved() == 401
time.sleep(3)
assert self.num_objects_saved() == 402
queue.stop_timer()
queue.stop_thread()

def test_ten_iterations(self):
queue = BulkQueue(self.es_client, flush_time=3, threshold=10)
queue.start_timer()
queue.start_thread()
total_events = 0
for num_rounds in range(0, 10):
for num in range(0, 20):
total_events += 1
queue.add(index='events', doc_type='event', body={'keyname': 'value' + str(num)})
assert self.num_objects_saved() == total_events
assert queue.size() == 0
queue.stop_timer()
queue.stop_thread()
assert self.num_objects_saved() == 200

0 comments on commit e9566f6

Please sign in to comment.
You can’t perform that action at this time.