Skip to content
This repository has been archived by the owner on Sep 28, 2022. It is now read-only.

Commit

Permalink
Merge pull request #136 from postatum/107303060_es_2_0
Browse files Browse the repository at this point in the history
Support Elasticsearch >2.0
  • Loading branch information
jstoiko committed Feb 9, 2016
2 parents b81b04c + 850df08 commit 7fab246
Show file tree
Hide file tree
Showing 8 changed files with 52 additions and 144 deletions.
25 changes: 8 additions & 17 deletions nefertari/elasticsearch.py
Original file line number Diff line number Diff line change
Expand Up @@ -214,6 +214,14 @@ def create_index(cls, index_name=None):
except (IndexNotFoundException, JHTTPNotFound):
cls.api.indices.create(index_name)

@classmethod
def delete_index(cls, index_name=None):
index_name = index_name or cls.settings.index_name
try:
cls.api.indices.delete([index_name])
except (IndexNotFoundException, JHTTPNotFound):
return

@classmethod
def setup_mappings(cls, force=False):
""" Setup ES mappings for all existing models.
Expand Down Expand Up @@ -241,12 +249,6 @@ def setup_mappings(cls, force=False):
raise Exception(ex.json['extra']['data'])
cls._mappings_setup = True

def delete_mapping(self):
self.api.indices.delete_mapping(
index=self.index_name,
doc_type=self.doc_type,
)

def put_mapping(self, body, **kwargs):
self.api.indices.put_mapping(
doc_type=self.doc_type,
Expand Down Expand Up @@ -309,12 +311,6 @@ def _bulk(self, action, documents, request=None):

documents_actions = self.prep_bulk_documents(action, documents)

if action == 'index':
for doc in documents_actions:
doc_data = doc.get('_source', {})
if 'timestamp' in doc_data:
doc['_timestamp'] = doc_data['timestamp']

if documents_actions:
operation = partial(_bulk_body, request=request)
self.process_chunks(
Expand Down Expand Up @@ -503,13 +499,9 @@ def aggregate(self, **params):
:_raise_on_empty: Boolean indicating whether to raise exception
when IndexNotFoundException exception happens. Optional,
defaults to False.
:_search_type: Type of search to use. Optional, defaults to
'count'. You might want to provide this argument explicitly
when performing nested aggregations on buckets.
"""
_aggregations_params = params.pop('_aggregations_params', None)
_raise_on_empty = params.pop('_raise_on_empty', False)
_search_type = params.pop('_search_type', 'count')

if not _aggregations_params:
raise Exception('Missing _aggregations_params')
Expand All @@ -521,7 +513,6 @@ def aggregate(self, **params):
search_params.pop('from_', None)
search_params.pop('sort', None)

search_params['search_type'] = _search_type
search_params['body']['aggregations'] = _aggregations_params

log.debug('Performing aggregation: {}'.format(_aggregations_params))
Expand Down
72 changes: 38 additions & 34 deletions nefertari/scripts/es.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,11 +38,6 @@ def __init__(self, argv, log):
parser.add_argument(
'--quiet', help='Quiet mode', action='store_true',
default=False)
parser.add_argument(
'--models',
help=('Comma-separated list of model names to index '
'(required)'),
required=True)
parser.add_argument(
'--params', help='Url-encoded params for each model')
parser.add_argument('--index', help='Index name', default=None)
Expand All @@ -51,11 +46,14 @@ def __init__(self, argv, log):
help=('Index chunk size. If chunk size not provided '
'`elasticsearch.chunk_size` setting is used'),
type=int)
parser.add_argument(
'--force',
help=('Recreate ES mappings and reindex all documents of provided '
'models. By default, only documents that are missing from '
'index are indexed.'),

group = parser.add_mutually_exclusive_group(required=True)
group.add_argument(
'--models',
help=('Comma-separated list of model names to index'))
group.add_argument(
'--recreate',
help='Recreate index and reindex all documents',
action='store_true',
default=False)

Expand Down Expand Up @@ -84,36 +82,42 @@ def __init__(self, argv, log):

self.settings = dictset(registry.settings)

def run(self):
ES.setup(self.settings)
model_names = split_strip(self.options.models)
def index_models(self, model_names):
self.log.info('Indexing models documents')
params = self.options.params or ''
params = dict([
[k, v[0]] for k, v in urllib.parse.parse_qs(params).items()
])
params.setdefault('_limit', params.get('_limit', 10000))
chunk_size = self.options.chunk or params['_limit']

for model_name in model_names:
self.log.info('Processing model `{}`'.format(model_name))
model = engine.get_document_cls(model_name)

params = self.options.params or ''
params = dict([
[k, v[0]] for k, v in urllib.parse.parse_qs(params).items()
])
params.setdefault('_limit', params.get('_limit', 10000))
chunk_size = self.options.chunk or params['_limit']

es = ES(source=model_name, index_name=self.options.index,
chunk_size=chunk_size)
query_set = model.get_collection(**params)
documents = to_dicts(query_set)
self.log.info('Indexing missing `{}` documents'.format(
model_name))
es.index_missing_documents(documents)

def recreate_index(self):
self.log.info('Deleting index')
ES.delete_index()
self.log.info('Creating index')
ES.create_index()
self.log.info('Creating mappings')
ES.setup_mappings()

if self.options.force:
self.log.info('Recreating `{}` ES mapping'.format(model_name))
es.delete_mapping()
es.put_mapping(body=model.get_es_mapping())
self.log.info('Indexing all `{}` documents'.format(
model_name))
es.index(documents)
else:
self.log.info('Indexing missing `{}` documents'.format(
model_name))
es.index_missing_documents(documents)

return 0
def run(self):
ES.setup(self.settings)
if self.options.recreate:
self.recreate_index()
models = engine.get_document_classes()
model_names = [
name for name, model in models.items()
if getattr(model, '_index_enabled', False)]
else:
model_names = split_strip(self.options.models)
self.index_models(model_names)
5 changes: 0 additions & 5 deletions nefertari/view.py
Original file line number Diff line number Diff line change
Expand Up @@ -245,17 +245,12 @@ def setup_default_wrappers(self):
Wrappers are applied when view method does not return instance
of Response. In this case nefertari renderers call wrappers and
handle response generation.
Note: It's important for `add_etag` wrapper be applied before
`apply_privacy` as later may remove response data that
is used to generate etag
"""
# Index
self._after_calls['index'] = [
wrappers.wrap_in_dict(self.request),
wrappers.add_meta(self.request),
wrappers.add_object_url(self.request),
wrappers.add_etag(self.request),
]

# Show
Expand Down
38 changes: 0 additions & 38 deletions nefertari/wrappers.py
Original file line number Diff line number Diff line change
Expand Up @@ -317,7 +317,6 @@ def _set_object_self(self, obj):
location = self.request.route_url(
resource.uid, **route_kwargs)
obj.setdefault('_self', location)
log.info('Added `_self` attr: {}'.format(location))

def __call__(self, **kwargs):
result = kwargs['result']
Expand Down Expand Up @@ -359,43 +358,6 @@ def __call__(self, **kwargs):
q_or_a, self.request.method))


class add_etag(object):
""" Add ETAG header to response.
Etag is generated md5-encoding '_version' + '_pk' of each object
in a sequence of objects returned.
This wrapper should be applied before `apply_privacy` if later is
used or before any wrapper that may remove `_version` and `_pk` keys
from output.
"""
def __init__(self, request):
self.request = request

def __call__(self, **kwargs):
result = kwargs['result']

etag_src = ''

def etag(data):
return str(data.get('_version', '')) + str(data.get('_pk', ''))

try:
etag_src += etag(result)

for each in result['data']:
etag_src += etag(each)

except (TypeError, KeyError):
pass

finally:
if etag_src:
etag = md5(six.b(etag_src)).hexdigest()
self.request.response.etag = etag
return result


class set_total(object):
def __init__(self, request, total):
self.request = request
Expand Down
2 changes: 1 addition & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
VERSION = open(os.path.join(here, 'VERSION')).read()

install_requires = [
'pyramid',
'pyramid==1.5.7',
'tempita',
'requests',
'simplejson',
Expand Down
5 changes: 2 additions & 3 deletions tests/test_elasticsearch.py
Original file line number Diff line number Diff line change
Expand Up @@ -284,10 +284,10 @@ def test_bulk(self, mock_proc, mock_prep, mock_part):
es._bulk_body, request=None)
mock_proc.assert_called_once_with(
documents=[{
'_id': 'story1', '_op_type': 'index', '_timestamp': 1,
'_id': 'story1', '_op_type': 'index',
'_source': {'timestamp': 1, '_type': 'Story', 'id': 'story1'}
}, {
'_id': 'story2', '_op_type': 'index', '_timestamp': 2,
'_id': 'story2', '_op_type': 'index',
'_source': {'timestamp': 2, '_type': 'Story', 'id': 'story2'}
}],
operation=mock_part(),
Expand Down Expand Up @@ -624,7 +624,6 @@ def test_aggregation(self, mock_search, mock_build):
assert resp == {'foo': 1}
mock_build.assert_called_once_with({'_limit': 0, 'param1': 6})
mock_search.assert_called_once_with(
search_type='count',
body={'aggregations': {'zoo': 5}, 'query': 'query1'},
)

Expand Down
6 changes: 3 additions & 3 deletions tests/test_view.py
Original file line number Diff line number Diff line change
Expand Up @@ -411,7 +411,7 @@ def test_setup_default_wrappers_with_auth(self, run, wrap):
context={}, request=request, _query_params={'foo': 'bar'})
view._auth_enabled = True
view.setup_default_wrappers()
assert len(view._after_calls['index']) == 5
assert len(view._after_calls['index']) == 4
assert len(view._after_calls['show']) == 4
assert len(view._after_calls['create']) == 4
assert len(view._after_calls['update']) == 4
Expand All @@ -426,7 +426,7 @@ def test_setup_default_wrappers_no_auth(self, run, wrap):
context={}, request=request, _query_params={'foo': 'bar'})
view._auth_enabled = False
view.setup_default_wrappers()
assert len(view._after_calls['index']) == 4
assert len(view._after_calls['index']) == 3
assert len(view._after_calls['show']) == 3
assert not wrap.apply_privacy.called

Expand Down Expand Up @@ -455,7 +455,7 @@ def convert_ids2objects(self, *args, **kwargs):
resource = MagicMock(actions=['index'])
view = MyView(resource, request)

assert len(view._after_calls['index']) == 4
assert len(view._after_calls['index']) == 3
assert len(view._after_calls['show']) == 3

assert view.index._before_calls == [before_call]
Expand Down
43 changes: 0 additions & 43 deletions tests/test_wrappers.py
Original file line number Diff line number Diff line change
Expand Up @@ -246,49 +246,6 @@ def test_add_confirmation_url_no_request_params(self, mock_eng):
assert result['confirmation_url'] == (
'http://example.com/api?__confirmation&_m=GET')

def test_add_etag_no_data(self):
wrapper = wrappers.add_etag(Mock())
wrapper.request.response.etag = None
wrapper(result={'data': []})
assert wrapper.request.response.etag is None
wrapper(result={})
assert wrapper.request.response.etag is None

def test_add_etag(self):
wrapper = wrappers.add_etag(Mock())
wrapper.request.response.etag = None
wrapper(result={'data': [
{'_pk': 1, '_version': 1},
{'_pk': 2, '_version': 1},
]})
expected1 = '20d135f0f28185b84a4cf7aa51f29500'
assert wrapper.request.response.etag == expected1

# Etag is the same when data isn't changed
wrapper(result={'data': [
{'_pk': 1, '_version': 1},
{'_pk': 2, '_version': 1},
]})
assert isinstance(wrapper.request.response.etag, six.string_types)
assert wrapper.request.response.etag == expected1

# New object added
wrapper(result={'data': [
{'_pk': 1, '_version': 1},
{'_pk': 2, '_version': 1},
{'_pk': 3, '_version': 1},
]})
assert isinstance(wrapper.request.response.etag, six.string_types)
assert wrapper.request.response.etag != expected1

# Existing object's version changed
wrapper(result={'data': [
{'_pk': 1, '_version': 1},
{'_pk': 2, '_version': 2},
]})
assert isinstance(wrapper.request.response.etag, six.string_types)
assert wrapper.request.response.etag != expected1

def test_set_total(self):
result = Mock(_nefertari_meta={'total': 5})
processed = wrappers.set_total(None, 2)(result=result)
Expand Down

0 comments on commit 7fab246

Please sign in to comment.