Skip to content

Commit

Permalink
tests: Cleaned up Actions test cases somewhat
Browse files Browse the repository at this point in the history
* Mainly disabled most of them for non-mock environments.  They had become a
  mess of sleep() calls waiting for ES to update.

Signed-off-by: Joe MacMahon <joe.macmahon@cern.ch>
  • Loading branch information
Joe MacMahon committed Jan 13, 2015
1 parent 4f1d423 commit 5c572c3
Show file tree
Hide file tree
Showing 2 changed files with 76 additions and 140 deletions.
17 changes: 8 additions & 9 deletions README.rst
Original file line number Diff line number Diff line change
Expand Up @@ -32,17 +32,16 @@ Testing

By default, the test suite works by monkey-patching bits of the
elasticsearch-py module and asserting that they were called with the right
arguments. This is OK for automated testing like Travis, but isn't entirely
"real-world".
arguments.

To run the test suite on an actual Elasticsearch cluster, you should edit the
constants at the top of tests/common.py in the obvious way. The main thing you
should do is set MOCK = False.
Some of the tests also can be run using an actual Elasticsearch cluster. To do
this, you should edit the constants at the top of tests/common.py in the
obvious way. The main thing you should do is set MOCK = False.

By default the tests expect an Elasticsearch node at <http://localhost:9199>.
My setup is that I forward this port over SSH to the actual node I want to
connect to; that way I can firewall the nodes and also easily switch between
them.
By default the live tests expect an Elasticsearch node at
<http://localhost:9199>. My setup is that I forward this port over SSH to the
actual node I want to connect to; that way I can firewall the nodes and also
easily switch between them.

To run the tests themselves, you should run::

Expand Down
199 changes: 68 additions & 131 deletions tests/actions.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,9 @@

from .common import LumberjackTestCase, HOSTS, MOCK, TestHandler, skipIfNotMock

INTERVAL_SHORT = 2
INTERVAL_SHORT = 1
INTERVAL_LONG = 10*INTERVAL_SHORT
INTERVAL_JUMP_THREAD = 0.1
MAX_QUEUE_LENGTH = 20


Expand All @@ -49,110 +50,76 @@ def test_params_passed_to_action_queue(self):
self.assertEqual(MAX_QUEUE_LENGTH,
self.lj.action_queue.config['max_queue_length'])

@skipIfNotMock
def test_basic_flush(self):
if MOCK:
def mock_bulk_f(es, actions):
self.assertEqual(es, self.elasticsearch)
if len(actions) == 0:
return
self.assertEqual(len(actions), 1)
action = actions[0]

self.assertEqual(action['_type'], __name__)
self.assertEqual(action['_index'],
self.config['index_prefix'] + 'test')
self.assertEqual(action['_source'], {'message': 'testD'})

self.lj.action_queue.bulk = mock_bulk_f
def mock_bulk_f(es, actions):
self.assertEqual(es, self.elasticsearch)
if len(actions) == 0:
return
self.assertEqual(len(actions), 1)
action = actions[0]

self.assertEqual(action['_type'], __name__)
self.assertEqual(action['_index'],
self.config['index_prefix'] + 'test')
self.assertEqual(action['_source'], {'message': 'testD'})

self.lj.action_queue.bulk = mock_bulk_f
self.lj.action_queue.queue_index(suffix='test',
doc_type=__name__,
body={'message': 'testD'})
self.lj.trigger_flush()

if not MOCK:
time.sleep(3)
res = self.elasticsearch.search(
index=self.config['index_prefix'] + '*', doc_type=__name__,
body={
'query': {
'match': {'message': 'testD'}
}
})
self.assertEqual(res['hits']['total'], 1)

@skipIfNotMock
def test_basic_timeouts(self):
if MOCK:
actions_list = []
def mock_bulk_f(es, actions):
actions_list.extend(actions)
actions_list = []
def mock_bulk_f(es, actions):
actions_list.extend(actions)

self.lj.action_queue.bulk = mock_bulk_f
self.lj.action_queue.bulk = mock_bulk_f
self.lj.trigger_flush()
time.sleep(1)
time.sleep(INTERVAL_SHORT)

if MOCK:
self.assertEqual(len(actions_list), 0)
self.assertEqual(len(actions_list), 0)

self.lj.action_queue.queue_index(suffix='test',
doc_type=__name__,
body={'message': 'testE'})
time.sleep(INTERVAL_SHORT + 3)

if MOCK:
self.assertEqual(len(actions_list), 1)
self.assertEqual(actions_list[0]['_source'], {'message': 'testE'})
else:
res = self.elasticsearch.search(
index=self.config['index_prefix'] + '*', doc_type=__name__,
body={
'query': {
'match': {'message': 'testE'}
}
})
self.assertEqual(res['hits']['total'], 1)
time.sleep(INTERVAL_JUMP_THREAD)

self.assertEqual(len(actions_list), 1)
self.assertEqual(actions_list[0]['_source'], {'message': 'testE'})

@skipIfNotMock
def test_change_update_interval(self):
if MOCK:
actions_list = []
def mock_bulk_f(es, actions):
actions_list.extend(actions)
actions_list = []
def mock_bulk_f(es, actions):
actions_list.extend(actions)

self.lj.action_queue.bulk = mock_bulk_f
self.lj.action_queue.bulk = mock_bulk_f

self.lj.action_queue.queue_index(suffix='test',
doc_type=__name__,
body={'message': 'testA'})
time.sleep(INTERVAL_SHORT + 3)

if not MOCK:
res = self.elasticsearch.search(
index=self.config['index_prefix'] + '*', doc_type=__name__,
body={
'query': {
'match': {
'message': 'testA'
}
}
})
self.assertEqual(res['hits']['total'], 1)
else:
self.assertEqual(len(actions_list), 1)
self.assertDictContainsSubset(
{'_source': {'message': 'testA'},
'_type': __name__,
'_index': self.config['index_prefix'] + 'test'},
actions_list[-1])
time.sleep(INTERVAL_SHORT)

self.assertEqual(len(actions_list), 1)
self.assertDictContainsSubset(
{'_source': {'message': 'testA'},
'_type': __name__,
'_index': self.config['index_prefix'] + 'test'},
actions_list[-1])

self.lj.action_queue.config['interval'] = INTERVAL_LONG
self.lj.trigger_flush()

# Wait for the flush to complete before adding to the queue.
time.sleep(1)
time.sleep(INTERVAL_JUMP_THREAD)

self.lj.action_queue.queue_index(suffix='test',
doc_type=__name__,
body={'message': 'testB'})
time.sleep(INTERVAL_SHORT + 3)
time.sleep(INTERVAL_JUMP_THREAD)
b_query = {
'query': {
'match': {
Expand All @@ -161,44 +128,31 @@ def mock_bulk_f(es, actions):
}
}

# Should not have been procesed yet.
if not MOCK:
res = self.elasticsearch.search(
index=self.config['index_prefix'] + '*', doc_type=__name__,
body=b_query)
self.assertEqual(res['hits']['total'], 0)
else:
self.assertEqual(len(actions_list), 1)
self.assertEqual(len(actions_list), 1)

time.sleep(INTERVAL_LONG)

if not MOCK:
res = self.elasticsearch.search(
index=self.config['index_prefix'] + '*', doc_type=__name__,
body=b_query)
self.assertEqual(res['hits']['total'], 1)
else:
self.assertEqual(len(actions_list), 2)
self.assertDictContainsSubset(
{'_source': {'message': 'testB'},
'_type': __name__,
'_index': self.config['index_prefix'] + 'test'},
actions_list[-1])
self.assertEqual(len(actions_list), 2)
self.assertDictContainsSubset(
{'_source': {'message': 'testB'},
'_type': __name__,
'_index': self.config['index_prefix'] + 'test'},
actions_list[-1])

@skipIfNotMock
def test_max_queue_length(self):
if MOCK:
actions_list = []
def mock_bulk_f(es, actions):
actions_list.extend(actions)
actions_list = []
def mock_bulk_f(es, actions):
actions_list.extend(actions)

self.lj.action_queue.bulk = mock_bulk_f
self.lj.action_queue.bulk = mock_bulk_f

# Disable periodic flushing
self.lj.action_queue.config['interval'] = None
self.lj.trigger_flush()

# Wait for flush to complete
time.sleep(1)
time.sleep(INTERVAL_JUMP_THREAD)

self.assertEqual(len(self.lj.action_queue.queue), 0)

Expand All @@ -207,39 +161,20 @@ def mock_bulk_f(es, actions):
self.lj.action_queue.queue_index(suffix='test',
doc_type=__name__,
body=doc)
# Wait for ES indexing in case the test failed and already flushed.
time.sleep(3)

if not MOCK:
res = self.elasticsearch.search(
index=self.config['index_prefix'] + '*', doc_type=__name__,
body={
'query': {
'match': doc
}
})
self.assertEqual(res['hits']['total'], 0)
else:
self.assertEqual(len(actions_list), 0)
# In case the test failed and already flushed.
time.sleep(INTERVAL_JUMP_THREAD)

self.assertEqual(len(actions_list), 0)

self.lj.action_queue.queue_index(suffix='test',
doc_type=__name__,
body=doc)
# Wait for flush and ES indexing
time.sleep(3)

if not MOCK:
res = self.elasticsearch.search(
index=self.config['index_prefix'] + '*', doc_type=__name__,
body={
'query': {
'match': doc
}
})
self.assertEqual(res['hits']['total'], MAX_QUEUE_LENGTH)
else:
self.assertEqual(len(actions_list), MAX_QUEUE_LENGTH)
# Wait for flush
time.sleep(INTERVAL_JUMP_THREAD)

self.assertEqual(len(actions_list), MAX_QUEUE_LENGTH)

@skipIfNotMock
def test_transport_error(self):
my_handler = TestHandler()
logging.getLogger('lumberjack.actions').addHandler(my_handler)
Expand All @@ -264,6 +199,7 @@ def mock_bulk_f(es, actions):
'Error in flushing queue. Falling back to file.',
test_exception)

@skipIfNotMock
def test_general_error(self):
my_handler = TestHandler()
logging.getLogger('lumberjack.actions').addHandler(my_handler)
Expand Down Expand Up @@ -318,14 +254,15 @@ def mock_bulk_f(es, actions):
self.lj.action_queue.queue_index(suffix='test',
doc_type=__name__,
body=doc)
time.sleep(0.1)
time.sleep(INTERVAL_SHORT)
self.assertGreater(len(completed_actions), MAX_QUEUE_LENGTH)

self.lj.action_queue.queue_index(suffix='test',
doc_type=__name__,
body=doc)
self.lj.action_queue._flush()
time.sleep(0.1)
time.sleep(INTERVAL_SHORT)

self.assertTrue(called['called'])
with open(self.lj.config['fallback_log_file'], 'r') as f:
line = f.next()
Expand Down

0 comments on commit 5c572c3

Please sign in to comment.