Skip to content
This repository has been archived by the owner on Sep 28, 2022. It is now read-only.

Commit

Permalink
Merge pull request #45 from postatum/94850096_es_race_condition
Browse files Browse the repository at this point in the history
Call ES.api.bulk with refresh=True
  • Loading branch information
jstoiko committed May 29, 2015
2 parents 0878f78 + 7aaf17f commit 9d36012
Show file tree
Hide file tree
Showing 3 changed files with 65 additions and 35 deletions.
46 changes: 30 additions & 16 deletions nefertari/elasticsearch.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
from __future__ import absolute_import
import logging
from functools import partial

import elasticsearch

Expand Down Expand Up @@ -56,8 +57,11 @@ def includeme(config):
ES.create_index()


def _bulk_body(body):
return ES.api.bulk(body=body)
def _bulk_body(body, refresh_index=None):
kwargs = {'body': body}
if refresh_index is not None:
kwargs['refresh'] = refresh_index
return ES.api.bulk(**kwargs)


def process_fields_param(fields):
Expand Down Expand Up @@ -138,6 +142,7 @@ def src2type(cls, source):
@classmethod
def setup(cls, settings):
ES.settings = settings.mget('elasticsearch')
ES.settings.setdefault('chunk_size', 500)

try:
_hosts = ES.settings.hosts
Expand Down Expand Up @@ -181,9 +186,11 @@ def setup_mappings(cls):
except JHTTPBadRequest as ex:
raise Exception(ex.json['extra']['data'])

def __init__(self, source='', index_name=None, chunk_size=100):
def __init__(self, source='', index_name=None, chunk_size=None):
self.doc_type = self.src2type(source)
self.index_name = index_name or ES.settings.index_name
if chunk_size is None:
chunk_size = ES.settings.asint('chunk_size')
self.chunk_size = chunk_size

def put_mapping(self, body, **kwargs):
Expand All @@ -193,10 +200,12 @@ def put_mapping(self, body, **kwargs):
index=self.index_name,
**kwargs)

def process_chunks(self, documents, operation, chunk_size):
def process_chunks(self, documents, operation, chunk_size=None):
""" Apply `operation` to chunks of `documents` of size `chunk_size`.
"""
if chunk_size is None:
chunk_size = self.chunk_size
start = end = 0
count = len(documents)

Expand All @@ -206,7 +215,7 @@ def process_chunks(self, documents, operation, chunk_size):
end += chunk_size

bulk = documents[start:end]
operation(bulk)
operation(body=bulk)

start += chunk_size
count -= chunk_size
Expand Down Expand Up @@ -240,7 +249,8 @@ def prep_bulk_documents(self, action, documents):

return _docs

def _bulk(self, action, documents, chunk_size=None):
def _bulk(self, action, documents, chunk_size=None,
refresh_index=None):
if chunk_size is None:
chunk_size = self.chunk_size

Expand All @@ -263,18 +273,21 @@ def _bulk(self, action, documents, chunk_size=None):
if body:
# Use chunk_size*2, because `body` is a sequence of
# meta, document, meta, ...
operation = partial(_bulk_body, refresh_index=refresh_index)
self.process_chunks(
documents=body,
operation=_bulk_body,
operation=operation,
chunk_size=chunk_size*2)
else:
log.warning('Empty body')

def index(self, documents, chunk_size=None):
def index(self, documents, chunk_size=None,
refresh_index=None):
""" Reindex all `document`s. """
self._bulk('index', documents, chunk_size)
self._bulk('index', documents, chunk_size, refresh_index)

def index_missing_documents(self, documents, chunk_size=None):
def index_missing_documents(self, documents, chunk_size=None,
refresh_index=None):
""" Index documents that are missing from ES index.
Determines which documents are missing using ES `mget` call which
Expand Down Expand Up @@ -306,14 +319,14 @@ def index_missing_documents(self, documents, chunk_size=None):
'index `{}`'.format(self.doc_type, self.index_name))
return

self._bulk('index', documents, chunk_size)
self._bulk('index', documents, chunk_size, refresh_index)

def delete(self, ids):
def delete(self, ids, refresh_index=None):
if not isinstance(ids, list):
ids = [ids]

documents = [{'id': _id, '_type': self.doc_type} for _id in ids]
self._bulk('delete', documents)
self._bulk('delete', documents, refresh_index=refresh_index)

def get_by_ids(self, ids, **params):
if not ids:
Expand Down Expand Up @@ -520,7 +533,8 @@ def get(self, **kw):
return self.get_resource(**kw)

@classmethod
def index_refs(cls, db_obj):
def index_refs(cls, db_obj, refresh_index=None):
for model_cls, documents in db_obj.get_reference_documents():
if getattr(model_cls, '_index_enabled', False):
cls(model_cls.__name__).index(documents)
if getattr(model_cls, '_index_enabled', False) and documents:
cls(model_cls.__name__).index(
documents, refresh_index=refresh_index)
22 changes: 16 additions & 6 deletions nefertari/scripts/es.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@

from pyramid.paster import bootstrap
from pyramid.config import Configurator
from zope.dottedname.resolve import resolve

from nefertari.utils import dictset, split_strip, to_dicts
from nefertari import engine
Expand Down Expand Up @@ -45,11 +44,20 @@ def __init__(self, argv, log):
parser.add_argument(
'--params', help='Url-encoded params for each model')
parser.add_argument('--index', help='Index name', default=None)
parser.add_argument('--chunk', help='Index chunk size', type=int)
parser.add_argument(
'--chunk',
help=('Index chunk size. If chunk size not provided '
'`elasticsearch.chunk_size` setting is used'),
type=int)
parser.add_argument(
'--force',
help=('Force reindexing of all documents. By default, only '
'documents that are missing from index are indexed.'),
'documents that are missing from index are indexed.'),
action='store_true',
default=False)
parser.add_argument(
'--refresh',
help='Refresh the index after performing the operation',
action='store_true',
default=False)

Expand Down Expand Up @@ -86,13 +94,15 @@ def run(self):
params.setdefault('_limit', params.get('_limit', 10000))
chunk_size = self.options.chunk or params['_limit']

es = ES(source=model_name, index_name=self.options.index)
es = ES(source=model_name, index_name=self.options.index,
chunk_size=chunk_size)
query_set = model.get_collection(**params)
documents = to_dicts(query_set)

if self.options.force:
es.index(documents, chunk_size=chunk_size)
es.index(documents, refresh_index=self.options.refresh)
else:
es.index_missing_documents(documents, chunk_size=chunk_size)
es.index_missing_documents(
documents, refresh_index=self.options.refresh)

return 0
32 changes: 19 additions & 13 deletions tests/test_elasticsearch.py
Original file line number Diff line number Diff line change
Expand Up @@ -104,8 +104,9 @@ def test_es_docs(self):

@patch('nefertari.elasticsearch.ES')
def test_bulk_body(self, mock_es):
es._bulk_body('foo')
mock_es.api.bulk.assert_called_once_with(body='foo')
es._bulk_body('foo', refresh_index=True)
mock_es.api.bulk.assert_called_once_with(
body='foo', refresh=True)


class TestES(object):
Expand All @@ -115,7 +116,7 @@ def test_init(self, mock_set):
obj = es.ES(source='Foo')
assert obj.index_name == mock_set.index_name
assert obj.doc_type == 'foo'
assert obj.chunk_size == 100
assert obj.chunk_size == mock_set.asint()
obj = es.ES(source='Foo', index_name='a', chunk_size=2)
assert obj.index_name == 'a'
assert obj.doc_type == 'foo'
Expand Down Expand Up @@ -156,14 +157,14 @@ def test_process_chunks(self):
operation = Mock()
documents = [1, 2, 3, 4, 5]
obj.process_chunks(documents, operation, chunk_size=100)
operation.assert_called_once_with([1, 2, 3, 4, 5])
operation.assert_called_once_with(body=[1, 2, 3, 4, 5])

def test_process_chunks_multiple(self):
obj = es.ES('Foo', 'foondex')
operation = Mock()
documents = [1, 2, 3, 4, 5]
obj.process_chunks(documents, operation, chunk_size=3)
operation.assert_has_calls([call([1, 2, 3]), call([4, 5])])
operation.assert_has_calls([call(body=[1, 2, 3]), call(body=[4, 5])])

def test_process_chunks_no_docs(self):
obj = es.ES('Foo', 'foondex')
Expand Down Expand Up @@ -216,9 +217,10 @@ def test_bulk_no_docs(self):
obj = es.ES('Foo', 'foondex')
assert obj._bulk('myaction', []) is None

@patch('nefertari.elasticsearch.partial')
@patch('nefertari.elasticsearch.ES.prep_bulk_documents')
@patch('nefertari.elasticsearch.ES.process_chunks')
def test_bulk(self, mock_proc, mock_prep):
def test_bulk(self, mock_proc, mock_prep, mock_part):
obj = es.ES('Foo', 'foondex', chunk_size=1)
docs = [
[{'delete': {'action': 'delete', '_id': 'story1'}},
Expand All @@ -229,14 +231,15 @@ def test_bulk(self, mock_proc, mock_prep):
mock_prep.return_value = docs
obj._bulk('myaction', docs)
mock_prep.assert_called_once_with('myaction', docs)
mock_part.assert_called_once_with(es._bulk_body, refresh_index=None)
mock_proc.assert_called_once_with(
documents=[
{'delete': {'action': 'delete', '_id': 'story1'}},
{'index': {'action': 'index', '_id': 'story2'},
'_timestamp': 2},
{'_type': 'Story', 'id': 'story2', 'timestamp': 2},
],
operation=es._bulk_body,
operation=mock_part(),
chunk_size=2
)

Expand All @@ -253,21 +256,23 @@ def test_bulk_no_prepared_docs(self, mock_proc, mock_prep):
def test_index(self, mock_bulk):
obj = es.ES('Foo', 'foondex')
obj.index(['a'], chunk_size=4)
mock_bulk.assert_called_once_with('index', ['a'], 4)
mock_bulk.assert_called_once_with('index', ['a'], 4, None)

@patch('nefertari.elasticsearch.ES._bulk')
def test_delete(self, mock_bulk):
obj = es.ES('Foo', 'foondex')
obj.delete(ids=[1, 2])
mock_bulk.assert_called_once_with(
'delete', [{'id': 1, '_type': 'foo'}, {'id': 2, '_type': 'foo'}])
'delete', [{'id': 1, '_type': 'foo'}, {'id': 2, '_type': 'foo'}],
refresh_index=None)

@patch('nefertari.elasticsearch.ES._bulk')
def test_delete_single_obj(self, mock_bulk):
obj = es.ES('Foo', 'foondex')
obj.delete(ids=1)
mock_bulk.assert_called_once_with(
'delete', [{'id': 1, '_type': 'foo'}])
'delete', [{'id': 1, '_type': 'foo'}],
refresh_index=None)

@patch('nefertari.elasticsearch.ES._bulk')
@patch('nefertari.elasticsearch.ES.api.mget')
Expand All @@ -291,7 +296,8 @@ def test_index_missing_documents(self, mock_mget, mock_bulk):
body={'ids': [1, 2, 3]}
)
mock_bulk.assert_called_once_with(
'index', [{'id': 1, 'name': 'foo'}, {'id': 3, 'name': 'baz'}], 10)
'index', [{'id': 1, 'name': 'foo'}, {'id': 3, 'name': 'baz'}],
10, None)

@patch('nefertari.elasticsearch.ES._bulk')
@patch('nefertari.elasticsearch.ES.api.mget')
Expand All @@ -309,7 +315,7 @@ def test_index_missing_documents_no_index(self, mock_mget, mock_bulk):
body={'ids': [1]}
)
mock_bulk.assert_called_once_with(
'index', [{'id': 1, 'name': 'foo'}], 10)
'index', [{'id': 1, 'name': 'foo'}], 10, None)

@patch('nefertari.elasticsearch.ES._bulk')
@patch('nefertari.elasticsearch.ES.api.mget')
Expand Down Expand Up @@ -688,7 +694,7 @@ class Foo(object):
db_obj.get_reference_documents.return_value = [(Foo, docs)]
mock_settings.index_name = 'foo'
es.ES.index_refs(db_obj)
mock_ind.assert_called_once_with(docs)
mock_ind.assert_called_once_with(docs, refresh_index=None)

@patch('nefertari.elasticsearch.ES.settings')
@patch('nefertari.elasticsearch.ES.index')
Expand Down

0 comments on commit 9d36012

Please sign in to comment.