Skip to content

Commit

Permalink
Merge fe7f85f into 16e6b1e
Browse files Browse the repository at this point in the history
  • Loading branch information
slint committed Nov 22, 2019
2 parents 16e6b1e + fe7f85f commit 2242821
Show file tree
Hide file tree
Showing 7 changed files with 63 additions and 109 deletions.
4 changes: 4 additions & 0 deletions CHANGES.rst
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,10 @@
Changes
=======

Version 1.1.1 (released 2019-11-21)

- Fix bulk action parameters compatibility for Elasticsearch v7.

Version 1.1.0 (released 2019-07-19)

- Add support for Elasticsearch v7.
Expand Down
9 changes: 7 additions & 2 deletions invenio_indexer/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,9 @@

import pytz
from celery import current_app as current_celery_app
from elasticsearch import VERSION as ES_VERSION
from elasticsearch.helpers import bulk
from elasticsearch.helpers import expand_action as default_expand_action
from flask import current_app
from invenio_records.api import Record
from invenio_search import current_search_client
Expand All @@ -26,7 +28,7 @@

from .proxies import current_record_to_index
from .signals import before_record_index
from .utils import es_bulk_param_compatibility
from .utils import _es7_expand_action


class Producer(KombuProducer):
Expand Down Expand Up @@ -201,6 +203,10 @@ def process_bulk_queue(self, es_bulk_kwargs=None):
self._actionsiter(consumer.iterqueue()),
stats_only=True,
request_timeout=req_timeout,
expand_action_callback=(
_es7_expand_action if ES_VERSION[0] >= 7
else default_expand_action
),
**es_bulk_kwargs
)

Expand Down Expand Up @@ -280,7 +286,6 @@ def _delete_action(self, payload):
'_id': payload['id'],
}

@es_bulk_param_compatibility
def _index_action(self, payload):
"""Bulk index action.
Expand Down
83 changes: 48 additions & 35 deletions invenio_indexer/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import os
from functools import wraps

import six
from elasticsearch import VERSION as ES_VERSION
from flask import current_app
from invenio_search import current_search
Expand Down Expand Up @@ -73,38 +74,50 @@ def default_record_to_index(record):
return index, doc_type


def es_bulk_param_compatibility(f):
"""Decorator to ensure parameter compatibility.
ES version 7 removed deprecated params for bulk indexing which are modified
by this decorator.
"""
removed_params = (
'opType',
'versionType',
'_versionType',
'_parent',
'_retry_on_conflict',
'_routing',
'_version',
'_version_type'
)

def update_params(obj, from_key, to_key):
if from_key in obj:
obj[to_key] = obj[from_key]
del obj[from_key]

@wraps(f)
def inner(*args, **kwargs):
action = f(*args, **kwargs)
if isinstance(action, dict) and ES_VERSION[0] >= 7:
for param in removed_params:
if param == 'opType':
update_params(action, 'opType', 'op_type')
elif param in ('versionType', '_versionType'):
update_params(action, param, 'version_type')
else:
update_params(action, param, param[1:])
return action
return inner
# NOTE: Remove when https://github.com/elastic/elasticsearch-py/pull/1062 is
# merged.
def _es7_expand_action(data):
"""ES7-compatible bulk action expand."""
# when given a string, assume user wants to index raw json
if isinstance(data, six.string_types):
return '{"index":{}}', data

# make sure we don't alter the action
data = data.copy()
op_type = data.pop("_op_type", "index")
action = {op_type: {}}
for key in (
"_id",
"_index",
"_parent",
"_percolate",
"_retry_on_conflict",
"_routing",
"_timestamp",
"_type",
"_version",
"_version_type",
"parent",
"pipeline",
"retry_on_conflict",
"routing",
"version",
"version_type",
):
if key in data:
if key in (
"_parent",
"_retry_on_conflict",
"_routing",
"_version",
"_version_type",
):
action[op_type][key[1:]] = data.pop(key)
else:
action[op_type][key] = data.pop(key)

# no data payload for delete
if op_type == "delete":
return action, None

return action, data.get("_source", data)
2 changes: 1 addition & 1 deletion invenio_indexer/version.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,4 +14,4 @@

from __future__ import absolute_import, print_function

__version__ = '1.1.0'
__version__ = '1.1.1'
10 changes: 0 additions & 10 deletions tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@
drop_database

from invenio_indexer import InvenioIndexer
from invenio_indexer.utils import es_bulk_param_compatibility


@pytest.fixture()
Expand Down Expand Up @@ -93,12 +92,3 @@ def queue(app):
q.purge()

return queue


@pytest.fixture()
def generate_action():
"""Generate action function to test bulk param decorator."""
@es_bulk_param_compatibility
def inner(action):
return action
return inner
6 changes: 2 additions & 4 deletions tests/test_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -117,12 +117,10 @@ def receiver(sender, json=None, record=None, arguments=None, **kwargs):
if lt_es7:
assert action['_type'] == \
app.config['INDEXER_DEFAULT_DOC_TYPE']
assert action['_version'] == record.revision_id
assert action['_version_type'] == 'external_gte'
else:
assert action['_type'] == '_doc'
assert action['version'] == record.revision_id
assert action['version_type'] == 'external_gte'
assert action['_version'] == record.revision_id
assert action['_version_type'] == 'external_gte'
assert action['pipeline'] == 'foobar'
assert 'title' in action['_source']
assert 'extra' in action['_source']
Expand Down
58 changes: 1 addition & 57 deletions tests/test_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
from elasticsearch import VERSION as ES_VERSION
from mock import patch

from invenio_indexer.utils import es_bulk_param_compatibility, schema_to_index
from invenio_indexer.utils import schema_to_index


def test_schema_to_index_with_names(app):
Expand Down Expand Up @@ -62,59 +62,3 @@ def test_schema_to_index(schema, expected, index_names, app):
if ES_VERSION[0] >= 7 and expected[0]:
expected = (expected[0], '_doc')
assert result == expected


def test_es6_bulk_param_compat(generate_action):
"""Test ES bulk param compatibility decorator."""
action = {
'_index': 'index',
'_type': '_doc',
'test': 'test',
'opType': 'opType',
'versionType': 'version type',
'_parent': 'parent',
'_retry_on_conflict': 'retry',
'_routing': 'routing',
'_version': 'version',
}
expected_action = {
'_index': 'index',
'_type': '_doc',
'test': 'test',
'opType': 'opType',
'versionType': 'version type',
'_parent': 'parent',
'_retry_on_conflict': 'retry',
'_routing': 'routing',
'_version': 'version',
}
with patch('invenio_indexer.utils.ES_VERSION', (6, 3, 0)) as version:
assert generate_action(action) == expected_action


def test_es7_bulk_param_compat(generate_action):
"""Test ES bulk param compatibility decorator."""
action = {
'_index': 'index',
'_type': '_doc',
'test': 'test',
'opType': 'opType',
'versionType': 'version type',
'_parent': 'parent',
'_retry_on_conflict': 'retry',
'_routing': 'routing',
'_version': 'version',
}
expected_action = {
'_index': 'index',
'_type': '_doc',
'test': 'test',
'op_type': 'opType',
'version_type': 'version type',
'parent': 'parent',
'retry_on_conflict': 'retry',
'routing': 'routing',
'version': 'version',
}
with patch('invenio_indexer.utils.ES_VERSION', (7, 0, 1)) as version:
assert generate_action(action) == expected_action

0 comments on commit 2242821

Please sign in to comment.