Skip to content

Commit

Permalink
added tests
Browse files Browse the repository at this point in the history
  • Loading branch information
twobraids committed Jul 6, 2015
1 parent c718a8c commit 808d522
Show file tree
Hide file tree
Showing 2 changed files with 228 additions and 128 deletions.
283 changes: 172 additions & 111 deletions socorro/external/es/crashstorage.py
Expand Up @@ -4,6 +4,10 @@

import elasticsearch

from threading import Thread
from Queue import Queue
from contextlib import contextmanager

from socorro.external.crashstorage_base import CrashStorageBase
from socorro.external.es.index_creator import IndexCreator
from socorro.lib import datetimeutil
Expand Down Expand Up @@ -94,24 +98,39 @@ def save_raw_and_processed(self, raw_crash, dumps, processed_crash,
crash_document=crash_document
)

#--------------------------------------------------------------------------
@staticmethod
def reconstitute_datetimes(processed_crash):
datetime_fields = [
'submitted_timestamp',
'date_processed',
'client_crash_date',
'started_datetime',
'startedDateTime',
'completed_datetime',
'completeddatetime',
]
for a_key in datetime_fields:
try:
processed_crash[a_key] = string_to_datetime(
processed_crash[a_key]
)
except KeyError:
# not there? we don't care
pass

#--------------------------------------------------------------------------
def _submit_crash_to_elasticsearch(self, connection, crash_document):
"""Submit a crash report to elasticsearch.
"""

# Massage the crash such that the date_processed field is formatted
# in the fashion of our established mapping.
# First create a datetime object from the string in the crash report.
crash_date = datetimeutil.string_to_datetime(
crash_document['processed_crash']['date_processed']
)
# Then convert it back to a string with the expected formatting.
crash_date_with_t = datetimeutil.date_to_string(crash_date)
# Finally, re-insert that string back into the report for indexing.
crash_document['processed_crash']['date_processed'] = crash_date_with_t
self.reconstitute_datetimes(crash_document['processed_crash'])

# Obtain the index name.
es_index = self.get_index_for_crash(crash_date)
es_index = self.get_index_for_crash(
crash_document['processed_crash']['date_processed']
)
es_doctype = self.config.elasticsearch.elasticsearch_doctype
crash_id = crash_document['crash_id']

Expand Down Expand Up @@ -145,7 +164,7 @@ def _submit_crash_to_elasticsearch(self, connection, crash_document):


#==============================================================================
class ESCrashStorageNoStackwalkerOutput(ESCrashStorage):
class ESCrashStorageRedactedSave(ESCrashStorage):
required_config = Namespace()
required_config.namespace('es_redactor')
required_config.es_redactor.add_option(
Expand All @@ -167,7 +186,7 @@ class ESCrashStorageNoStackwalkerOutput(ESCrashStorage):
def __init__(self, config, quit_check_callback=None):
"""Init, you know.
"""
super(ESCrashStorageNoStackwalkerOutput, self).__init__(
super(ESCrashStorageRedactedSave, self).__init__(
config,
quit_check_callback
)
Expand All @@ -178,27 +197,6 @@ def __init__(self, config, quit_check_callback=None):
"find the modified processed crash saved to the other crashstores"
)

#--------------------------------------------------------------------------
@staticmethod
def reconstitute_datetimes(processed_crash):
datetime_fields = [
'submitted_timestamp',
'date_processed',
'client_crash_date',
'started_datetime',
'startedDateTime',
'completed_datetime',
'completeddatetime',
]
for a_key in datetime_fields:
try:
processed_crash[a_key] = string_to_datetime(
processed_crash[a_key]
)
except KeyError:
# not there? we don't care
pass

#--------------------------------------------------------------------------
def save_raw_and_processed(self, raw_crash, dumps, processed_crash,
crash_id):
Expand All @@ -208,109 +206,172 @@ def save_raw_and_processed(self, raw_crash, dumps, processed_crash,
self.reconstitute_datetimes(processed_crash)
self.redactor.redact(processed_crash)

super(ESCrashStorageNoStackwalkerOutput, self).save_raw_and_processed(
super(ESCrashStorageRedactedSave, self).save_raw_and_processed(
raw_crash,
dumps,
processed_crash,
crash_id
)


from threading import Thread
from Queue import Queue


#==============================================================================
class ESCrashBulkStorageNoStackwalkerOutput(ESCrashStorageNoStackwalkerOutput):
required_config = Namespace()
required_config.add_option(
'items_per_bulk_load',
default=500,
doc="the number of crashes that triggers a flush to ES"
)
required_config.add_option(
'maximum_queue_size',
default=512,
doc='the maximum size of the internal queue'
)
class QueueWrapper(Queue):
"""this class allows a queue to be a standin for a connection to an
external resource. The queue then becomes compatible with the
TransactionExecutor classes"""

#--------------------------------------------------------------------------
def __init__(self, config, quit_check_callback=None):
super(ESCrashBulkStorageNoStackwalkerOutput, self).__init__(
config,
quit_check_callback
)
def commit(self):
pass

self.task_queue = Queue(config.maximum_queue_size)
self.consuming_thread = Thread(
name="ConsumingThread",
target=self._consuming_thread_func
)
#--------------------------------------------------------------------------
def rollback(self):
pass

# overwrites original
self.transaction = config.transaction_executor_class(
config,
lambda: self.task_queue,
quit_check_callback
)
self.done = False
self.consuming_thread.start()
#--------------------------------------------------------------------------
def close(self):
pass

#--------------------------------------------------------------------------
def _submit_crash_to_elasticsearch(self, queue, crash_document):
crash_date = datetimeutil.string_to_datetime(
crash_document['processed_crash']['date_processed']
)
@contextmanager
def __call__(self):
yield self

# Obtain the index name.
es_index = self.get_index_for_crash(crash_date)
es_doctype = self.config.elasticsearch.elasticsearch_doctype
crash_id = crash_document['crash_id']

# Attempt to create the index; it's OK if it already exists.
if es_index not in self.indices_cache:
index_creator = IndexCreator(config=self.config)
index_creator.create_socorro_index(es_index)
#==============================================================================
class QueueContextSource(object):
"""this class allows a queue to be a standin for a connection to an
external resource. The queue then becomes compatible with the
TransactionExecutor classes"""
#--------------------------------------------------------------------------
def __init__(self, a_queue):
self.queue = a_queue

#--------------------------------------------------------------------------
@contextmanager
def __call__(self):
yield self.queue

#--------------------------------------------------------------------------
operational_exceptions = ()
conditional_exceptions = ()


#------------------------------------------------------------------------------
def _create_bulk_load_crashstore(base_class):

#==========================================================================
class ESBulkClassTemplate(base_class):
required_config = Namespace()
required_config.add_option(
'items_per_bulk_load',
default=500,
doc="the number of crashes that triggers a flush to ES"
)
required_config.add_option(
'maximum_queue_size',
default=512,
doc='the maximum size of the internal queue'
)

action = {
'_index': es_index,
'_type': es_doctype,
'_id': crash_id,
'_source': crash_document,
}
queue.put(action)
#----------------------------------------------------------------------
def __init__(self, config, quit_check_callback=None):
super(ESBulkClassTemplate, self).__init__(
config,
quit_check_callback
)

#--------------------------------------------------------------------------
def _consumer_iter(self):
try:
self.task_queue = QueueWrapper(config.maximum_queue_size)
self.consuming_thread = Thread(
name="ConsumingThread",
target=self._consuming_thread_func
)

# overwrites original
self.transaction = config.transaction_executor_class(
config,
QueueContextSource(self.task_queue),
quit_check_callback
)
self.done = False
self.consuming_thread.start()

#----------------------------------------------------------------------
def _submit_crash_to_elasticsearch(self, queue, crash_document):
# Massage the crash such that the date_processed field is formatted
# in the fashion of our established mapping.
# First create a datetime object from the string in the crash
# report.
self.reconstitute_datetimes(crash_document['processed_crash'])

# Obtain the index name.
es_index = self.get_index_for_crash(
crash_document['processed_crash']['date_processed']
)
es_doctype = self.config.elasticsearch.elasticsearch_doctype
crash_id = crash_document['crash_id']

# Attempt to create the index; it's OK if it already exists.
if es_index not in self.indices_cache:
index_creator = IndexCreator(config=self.config)
index_creator.create_socorro_index(es_index)

action = {
'_index': es_index,
'_type': es_doctype,
'_id': crash_id,
'_source': crash_document,
}
queue.put(action)

#----------------------------------------------------------------------
def _consumer_iter(self):
while True:
crash_document = self.task_queue.get()
try:
crash_document = self.task_queue.get()
except Exception:
self.config.logger.critical(
"Failure in ES Bulktask_queue",
exc_info=True
)
crash_document = None
if crash_document is None:
self.done = True
break
yield crash_document # execute the task

#----------------------------------------------------------------------
def close(self):
self.task_queue.put(None)
self.consuming_thread.join()

#----------------------------------------------------------------------
def _consuming_thread_func(self): # execute the bulk load
with self.es_context() as es:
try:
yield crash_document # execute the task
elasticsearch.helpers.bulk(
es,
self._consumer_iter(),
chunk_size=self.config.items_per_bulk_load
)
except Exception:
self.config.logger.error(
"Error in processing a job",
self.config.logger.critical(
"Failure in ES elasticsearch.helpers.bulk",
exc_info=True
)
except Exception:
self.config.logger.critical(
"Failure in ES Bulktask_queue",
exc_info=True
)

#--------------------------------------------------------------------------
def close(self):
self.task_queue.put(None)
self.consuming_thread.join()
return ESBulkClassTemplate

#--------------------------------------------------------------------------
def _consuming_thread_func(self): # execute the bulk load
es = self.es_context()
elasticsearch.helpers.streaming_bulk(
es,
self._consumer_iter,
chunk_size=self.items_per_bulk_load
)

#==============================================================================
ESBulkCrashStorage = _create_bulk_load_crashstore(ESCrashStorage)


#==============================================================================
ESBulkCrashStorageRedactedSave = _create_bulk_load_crashstore(
ESCrashStorageRedactedSave
)


#==============================================================================
ESCrashStorageNoStackwalkerOutput = ESCrashStorageRedactedSave

0 comments on commit 808d522

Please sign in to comment.