Skip to content
This repository has been archived by the owner on Sep 28, 2022. It is now read-only.

Commit

Permalink
Merge pull request #30 from postatum/94098496_esindex_not_found
Browse files Browse the repository at this point in the history
Handle IndexNotFound errors on ES reads
  • Loading branch information
jstoiko committed May 8, 2015
2 parents 26c336b + 685e2f0 commit 1ed8afa
Show file tree
Hide file tree
Showing 2 changed files with 154 additions and 15 deletions.
72 changes: 58 additions & 14 deletions nefertari/elasticsearch.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,10 @@
]


class IndexNotFoundException(Exception):
pass


class ESHttpConnection(elasticsearch.Urllib3HttpConnection):
def perform_request(self, *args, **kw):
try:
Expand All @@ -34,6 +38,8 @@ def perform_request(self, *args, **kw):
return super(ESHttpConnection, self).perform_request(*args, **kw)
except Exception as e:
status_code = e.status_code
if status_code == 404:
raise IndexNotFoundException()
if status_code == 'N/A':
status_code = 400
raise exception_response(
Expand Down Expand Up @@ -227,13 +233,19 @@ def index_missing(self, documents, chunk_size=None):
if not documents:
log.info('No documents to index')
return
response = ES.api.mget(
query_kwargs = dict(
index=self.index_name,
doc_type=self.doc_type,
fields=['_id'],
body={'ids': [d['id'] for d in documents]},
)
indexed_ids = set(d['_id'] for d in response['docs'] if d.get('found'))
try:
response = ES.api.mget(**query_kwargs)
except IndexNotFoundException:
indexed_ids = set()
else:
indexed_ids = set(
d['_id'] for d in response['docs'] if d.get('found'))
documents = [d for d in documents if str(d['id']) not in indexed_ids]

if not documents:
Expand Down Expand Up @@ -277,9 +289,21 @@ def get_by_ids(self, ids, **params):
)
if fields:
params['fields'] = fields

data = ES.api.mget(**params)
documents = _ESDocs()
documents._nefertari_meta = dict(
start=_start,
fields=fields,
)

try:
data = ES.api.mget(**params)
except IndexNotFoundException:
if __raise_on_empty:
raise JHTTPNotFound(
'{}({}) resource not found (Index does not exist)'.format(
self.doc_type, params))
documents._nefertari_meta.update(total=0)
return documents

for _d in data['docs']:
try:
Expand All @@ -295,10 +319,8 @@ def get_by_ids(self, ids, **params):

documents.append(dict2obj(dictset(_d)))

documents._nefertari_meta = dict(
documents._nefertari_meta.update(
total=len(documents),
start=_start,
fields=fields,
)

return documents
Expand Down Expand Up @@ -352,7 +374,10 @@ def do_count(self, params):
params.pop('size', None)
params.pop('from_', None)
params.pop('sort', None)
return ES.api.count(**params)['count']
try:
return ES.api.count(**params)['count']
except IndexNotFoundException:
return 0

def get_collection(self, **params):
__raise_on_empty = params.pop('__raise_on_empty', False)
Expand All @@ -368,23 +393,34 @@ def get_collection(self, **params):
# pop the fields before passing to search.
# ES does not support passing names of nested structures
_fields = _params.pop('fields', '')
data = ES.api.search(**_params)
documents = _ESDocs()
documents._nefertari_meta = dict(
start=_params['from_'],
fields=_fields)

try:
data = ES.api.search(**_params)
except IndexNotFoundException:
if __raise_on_empty:
raise JHTTPNotFound(
'{}({}) resource not found (Index does not exist)'.format(
self.doc_type, params))
documents._nefertari_meta.update(
total=0, took=0)
return documents

for da in data['hits']['hits']:
_d = da['fields'] if _fields else da['_source']
_d['_score'] = da['_score']
documents.append(dict2obj(_d))

documents._nefertari_meta = dict(
documents._nefertari_meta.update(
total=data['hits']['total'],
start=_params['from_'],
fields=_fields,
took=data['took'],
)

if not documents:
msg = "'%s(%s)' resource not found" % (self.doc_type, params)
msg = "%s(%s) resource not found" % (self.doc_type, params)
if __raise_on_empty:
raise JHTTPNotFound(msg)
else:
Expand All @@ -402,7 +438,15 @@ def get_resource(self, **kw):
params.setdefault('ignore', 404)
params.update(kw)

data = ES.api.get_source(**params)
try:
data = ES.api.get_source(**params)
except IndexNotFoundException:
if __raise:
raise JHTTPNotFound(
"{}({}) resource not found (Index does not exist)".format(
self.doc_type, params))
data = {}

if not data:
msg = "'%s(%s)' resource not found" % (self.doc_type, params)
if __raise:
Expand Down
97 changes: 96 additions & 1 deletion tests/test_elasticsearch.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,14 @@ def test_perform_request_exception(self):
with pytest.raises(JHTTPBadRequest):
conn.perform_request('POST', 'http://localhost:9200')

@patch('nefertari.elasticsearch.log')
def test_perform_request_no_index(self, mock_log):
mock_log.level = logging.DEBUG
mock_log.debug.side_effect = TransportError(404, '')
conn = es.ESHttpConnection()
with pytest.raises(es.IndexNotFoundException):
conn.perform_request('POST', 'http://localhost:9200')


class TestHelperFunctions(object):
@patch('nefertari.elasticsearch.ES')
Expand Down Expand Up @@ -269,6 +277,24 @@ def test_index_missing(self, mock_mget, mock_bulk):
mock_bulk.assert_called_once_with(
'index', [{'id': 1, 'name': 'foo'}, {'id': 3, 'name': 'baz'}], 10)

@patch('nefertari.elasticsearch.ES._bulk')
@patch('nefertari.elasticsearch.ES.api.mget')
def test_index_missing_no_index(self, mock_mget, mock_bulk):
obj = es.ES('Foo', 'foondex')
documents = [
{'id': 1, 'name': 'foo'},
]
mock_mget.side_effect = es.IndexNotFoundException()
obj.index_missing(documents, 10)
mock_mget.assert_called_once_with(
index='foondex',
doc_type='foo',
fields=['_id'],
body={'ids': [1]}
)
mock_bulk.assert_called_once_with(
'index', [{'id': 1, 'name': 'foo'}], 10)

@patch('nefertari.elasticsearch.ES._bulk')
@patch('nefertari.elasticsearch.ES.api.mget')
def test_index_missing_no_docs_passed(self, mock_mget, mock_bulk):
Expand Down Expand Up @@ -351,6 +377,26 @@ def test_get_by_ids_fields(self, mock_mget):
assert docs._nefertari_meta['start'] == 0
assert docs._nefertari_meta['fields'] == ['name']

@patch('nefertari.elasticsearch.ES.api.mget')
def test_get_by_ids_no_index_raise(self, mock_mget):
obj = es.ES('Foo', 'foondex')
documents = [{'_id': 1, '_type': 'Story'}]
mock_mget.side_effect = es.IndexNotFoundException()
with pytest.raises(JHTTPNotFound) as ex:
obj.get_by_ids(documents, __raise_on_empty=True)
assert 'resource not found (Index does not exist)' in str(ex.value)

@patch('nefertari.elasticsearch.ES.api.mget')
def test_get_by_ids_no_index_not_raise(self, mock_mget):
obj = es.ES('Foo', 'foondex')
documents = [{'_id': 1, '_type': 'Story'}]
mock_mget.side_effect = es.IndexNotFoundException()
try:
docs = obj.get_by_ids(documents, __raise_on_empty=False)
except JHTTPNotFound:
raise Exception('Unexpected error')
assert len(docs) == 0

@patch('nefertari.elasticsearch.ES.api.mget')
def test_get_by_ids_not_found_raise(self, mock_mget):
obj = es.ES('Foo', 'foondex')
Expand All @@ -365,9 +411,10 @@ def test_get_by_ids_not_found_not_raise(self, mock_mget):
documents = [{'_id': 1, '_type': 'Story'}]
mock_mget.return_value = {'docs': [{'_type': 'foo', '_id': 1}]}
try:
obj.get_by_ids(documents, __raise_on_empty=False)
docs = obj.get_by_ids(documents, __raise_on_empty=False)
except JHTTPNotFound:
raise Exception('Unexpected error')
assert len(docs) == 0

def test_build_search_params_no_body(self):
obj = es.ES('Foo', 'foondex')
Expand Down Expand Up @@ -435,6 +482,15 @@ def test_do_count(self, mock_count):
assert val == 123
mock_count.assert_called_once_with(foo=1)

@patch('nefertari.elasticsearch.ES.api.count')
def test_do_count_no_index(self, mock_count):
obj = es.ES('Foo', 'foondex')
mock_count.side_effect = es.IndexNotFoundException()
val = obj.do_count(
{'foo': 1, 'size': 2, 'from_': 0, 'sort': 'foo:asc'})
assert val == 0
mock_count.assert_called_once_with(foo=1)

@patch('nefertari.elasticsearch.ES.build_search_params')
@patch('nefertari.elasticsearch.ES.do_count')
def test_get_collection_count_without_body(self, mock_count, mock_build):
Expand Down Expand Up @@ -496,6 +552,28 @@ def test_get_collection_source(self, mock_search):
assert docs._nefertari_meta['fields'] == ''
assert docs._nefertari_meta['took'] == 2.8

@patch('nefertari.elasticsearch.ES.api.search')
def test_get_collection_no_index_raise(self, mock_search):
obj = es.ES('Foo', 'foondex')
mock_search.side_effect = es.IndexNotFoundException()
with pytest.raises(JHTTPNotFound) as ex:
obj.get_collection(
body={'foo': 'bar'}, __raise_on_empty=True,
from_=0)
assert 'resource not found (Index does not exist)' in str(ex.value)

@patch('nefertari.elasticsearch.ES.api.search')
def test_get_collection_no_index_not_raise(self, mock_search):
obj = es.ES('Foo', 'foondex')
mock_search.side_effect = es.IndexNotFoundException()
try:
docs = obj.get_collection(
body={'foo': 'bar'}, __raise_on_empty=False,
from_=0)
except JHTTPNotFound:
raise Exception('Unexpected error')
assert len(docs) == 0

@patch('nefertari.elasticsearch.ES.api.search')
def test_get_collection_not_found_raise(self, mock_search):
obj = es.ES('Foo', 'foondex')
Expand Down Expand Up @@ -539,6 +617,23 @@ def test_get_resource(self, mock_get):
mock_get.assert_called_once_with(
name='foo', index='foondex', doc_type='foo', ignore=404)

@patch('nefertari.elasticsearch.ES.api.get_source')
def test_get_resource_no_index_raise(self, mock_get):
obj = es.ES('Foo', 'foondex')
mock_get.side_effect = es.IndexNotFoundException()
with pytest.raises(JHTTPNotFound) as ex:
obj.get_resource(name='foo')
assert 'resource not found (Index does not exist)' in str(ex.value)

@patch('nefertari.elasticsearch.ES.api.get_source')
def test_get_resource_no_index_not_raise(self, mock_get):
obj = es.ES('Foo', 'foondex')
mock_get.side_effect = es.IndexNotFoundException()
try:
obj.get_resource(name='foo', __raise_on_empty=False)
except JHTTPNotFound:
raise Exception('Unexpected error')

@patch('nefertari.elasticsearch.ES.api.get_source')
def test_get_resource_not_found_raise(self, mock_get):
obj = es.ES('Foo', 'foondex')
Expand Down

0 comments on commit 1ed8afa

Please sign in to comment.