Skip to content

Commit

Permalink
Merge d35d339 into e52dc11
Browse files Browse the repository at this point in the history
  • Loading branch information
equadon committed May 16, 2019
2 parents e52dc11 + d35d339 commit 2631612
Show file tree
Hide file tree
Showing 16 changed files with 194 additions and 20 deletions.
7 changes: 5 additions & 2 deletions .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ language: python
matrix:
fast_finish: true
allow_failures:
- env: REQUIREMENTS=devel EXTRAS=all,elasticsearch7 BROKER_URL=amqp://localhost:5672// ES_URL=$ES7_DOWNLOAD_URL
- env: REQUIREMENTS=devel EXTRAS=all,elasticsearch5 BROKER_URL=redis://localhost:6379/0 ES_URL=$ES5_DOWNLOAD_URL
- env: REQUIREMENTS=devel EXTRAS=all,elasticsearch5 BROKER_URL=amqp://localhost:5672// ES_URL=$ES5_DOWNLOAD_URL
- env: REQUIREMENTS=devel EXTRAS=all,elasticsearch2 BROKER_URL=redis://localhost:6379/0 ES_URL=$ES2_DOWNLOAD_URL
Expand All @@ -32,16 +33,18 @@ env:
- ES2_DOWNLOAD_URL="https://download.elastic.co/elasticsearch/release/org/elasticsearch/distribution/tar/elasticsearch/2.4.2/elasticsearch-2.4.2.tar.gz"
- ES5_DOWNLOAD_URL="https://artifacts.elastic.co/downloads/elasticsearch/elasticsearch-5.6.4.tar.gz"
- ES6_DOWNLOAD_URL="https://artifacts.elastic.co/downloads/elasticsearch/elasticsearch-6.2.0.tar.gz"
- ES7_DOWNLOAD_URL="https://artifacts.elastic.co/downloads/elasticsearch/elasticsearch-7.0.1-linux-x86_64.tar.gz"
- ES_HOST=127.0.0.1
matrix:
# ES 7: Redis vs RabbitMQ
- REQUIREMENTS=release EXTRAS=all,elasticsearch7 BROKER_URL=redis://localhost:6379/0 ES_URL=$ES7_DOWNLOAD_URL
- REQUIREMENTS=devel EXTRAS=all,elasticsearch7 BROKER_URL=amqp://localhost:5672// ES_URL=$ES7_DOWNLOAD_URL
# ES 6: Redis vs RabbitMQ
- REQUIREMENTS=release EXTRAS=all,elasticsearch6 BROKER_URL=redis://localhost:6379/0 ES_URL=$ES6_DOWNLOAD_URL
- REQUIREMENTS=devel EXTRAS=all,elasticsearch6 BROKER_URL=amqp://localhost:5672// ES_URL=$ES6_DOWNLOAD_URL
# ES 5: Redis vs RabbitMQ
- REQUIREMENTS=lowest EXTRAS=all,elasticsearch5 BROKER_URL=redis://localhost:6379/0 ES_URL=$ES5_DOWNLOAD_URL
- REQUIREMENTS=lowest EXTRAS=all,elasticsearch5 BROKER_URL=amqp://localhost:5672// ES_URL=$ES5_DOWNLOAD_URL
- REQUIREMENTS=release EXTRAS=all,elasticsearch5 BROKER_URL=redis://localhost:6379/0 ES_URL=$ES5_DOWNLOAD_URL
- REQUIREMENTS=release EXTRAS=all,elasticsearch5 BROKER_URL=amqp://localhost:5672// ES_URL=$ES5_DOWNLOAD_URL DEPLOY=true
# ES 2: Redis only
- REQUIREMENTS=lowest EXTRAS=all,elasticsearch2 BROKER_URL=redis://localhost:6379/0 ES_URL=$ES2_DOWNLOAD_URL
- REQUIREMENTS=release EXTRAS=all,elasticsearch2 BROKER_URL=redis://localhost:6379/0 ES_URL=$ES2_DOWNLOAD_URL
Expand Down
7 changes: 7 additions & 0 deletions examples/data/v7/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
# -*- coding: utf-8 -*-
#
# This file is part of Invenio.
# Copyright (C) 2018 CERN.
#
# Invenio is free software; you can redistribute it and/or modify it
# under the terms of the MIT License; see LICENSE file for more details.
25 changes: 25 additions & 0 deletions examples/data/v7/testrecords/testrecord-v1.0.0.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
{
"mappings": {
"properties": {
"control_number": {
"type": "keyword"
},
"description": {
"type": "text"
},
"participants": {
"type": "long"
},
"title": {
"type": "text",
"copy_to": "suggest_title"
},
"type": {
"type": "keyword"
},
"suggest_title": {
"type": "completion"
}
}
}
}
3 changes: 1 addition & 2 deletions invenio_indexer/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -80,8 +80,7 @@
Would be indexed in the following Elasticsearch index/doctype:
>>> print(indexer.record_to_index(record))
('records-record-v1.0.0', 'record-v1.0.0')
>>> index, doc_type = indexer.record_to_index(record)
Bulk indexing
-------------
Expand Down
2 changes: 2 additions & 0 deletions invenio_indexer/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@

from .proxies import current_record_to_index
from .signals import before_record_index
from .utils import es_bulk_param_compatibility


class Producer(KombuProducer):
Expand Down Expand Up @@ -275,6 +276,7 @@ def _delete_action(self, payload):
'_id': payload['id'],
}

@es_bulk_param_compatibility
def _index_action(self, payload):
"""Bulk index action.
Expand Down
53 changes: 48 additions & 5 deletions invenio_indexer/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,9 @@

"""Utility functions for data processing."""

from functools import wraps

from elasticsearch import VERSION as ES_VERSION
from flask import current_app
from invenio_search import current_search
from invenio_search.utils import schema_to_index
Expand All @@ -29,8 +32,48 @@ def default_record_to_index(record):

index, doc_type = schema_to_index(schema, index_names=index_names)

if index and doc_type:
return index, doc_type
else:
return (current_app.config['INDEXER_DEFAULT_INDEX'],
current_app.config['INDEXER_DEFAULT_DOC_TYPE'])
if not (index and doc_type):
index, doc_type = (current_app.config['INDEXER_DEFAULT_INDEX'],
current_app.config['INDEXER_DEFAULT_DOC_TYPE'])

if ES_VERSION[0] >= 7:
doc_type = '_doc'

return index, doc_type


def es_bulk_param_compatibility(f):
"""Decorator to ensure parameter compatibility.
ES version 7 removed deprecated params for bulk indexing which are modified
by this decorator.
"""
removed_params = (
'opType',
'versionType',
'_versionType',
'_parent',
'_retry_on_conflict',
'_routing',
'_version',
'_version_type'
)

def update_params(obj, from_key, to_key):
if from_key in obj:
obj[to_key] = obj[from_key]
del obj[from_key]

@wraps(f)
def inner(*args, **kwargs):
action = f(*args, **kwargs)
if isinstance(action, dict) and ES_VERSION[0] >= 7:
for param in removed_params:
if param == 'opType':
update_params(action, 'opType', 'op_type')
elif param in ('versionType', '_versionType'):
update_params(action, param, 'version_type')
else:
update_params(action, param, param[1:])
return action
return inner
5 changes: 4 additions & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,12 +47,15 @@
'elasticsearch6': [
'invenio-search[elasticsearch6]>={}'.format(invenio_search_version),
],
'elasticsearch7': [
'invenio-search[elasticsearch7]>={}'.format(invenio_search_version),
],
'tests': tests_require,
}

extras_require['all'] = []
for name, reqs in extras_require.items():
if name[0] == ':' or name in ('elasticsearch2', 'elasticsearch5', 'elasticsearch6'):
if name[0] == ':' or name in ('elasticsearch2', 'elasticsearch5', 'elasticsearch6', 'elasticsearch7'):
continue
extras_require['all'].extend(reqs)

Expand Down
10 changes: 10 additions & 0 deletions tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
drop_database

from invenio_indexer import InvenioIndexer
from invenio_indexer.utils import es_bulk_param_compatibility


@pytest.fixture()
Expand Down Expand Up @@ -92,3 +93,12 @@ def queue(app):
q.purge()

return queue


@pytest.fixture()
def generate_action():
"""Generate action function to test bulk param decorator."""
@es_bulk_param_compatibility
def inner(action):
return action
return inner
9 changes: 9 additions & 0 deletions tests/data/v7/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
# -*- coding: utf-8 -*-
#
# This file is part of Invenio.
# Copyright (C) 2017-2018 CERN.
#
# Invenio is free software; you can redistribute it and/or modify it
# under the terms of the MIT License; see LICENSE file for more details.

"""Elasticsearch v7 mappings."""
1 change: 1 addition & 0 deletions tests/data/v7/records/authorities/authority-v1.0.0.json
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
{"mappings": {}}
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
{"mappings": {}}
1 change: 1 addition & 0 deletions tests/data/v7/records/default-v1.0.0.json
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
{"mappings": {}}
23 changes: 17 additions & 6 deletions tests/test_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@

import pytz
from celery.messaging import establish_connection
from elasticsearch import VERSION as ES_VERSION
from invenio_db import db
from invenio_records.api import Record
from jsonresolver import JSONResolver
Expand All @@ -25,6 +26,8 @@
from invenio_indexer.api import BulkRecordIndexer, RecordIndexer
from invenio_indexer.signals import before_record_index

lt_es7 = ES_VERSION[0] < 7


def test_indexer_bulk_index(app, queue):
"""Test delay indexing."""
Expand Down Expand Up @@ -73,7 +76,7 @@ def test_delete_action(app):
dict(id='myid', op='delete', index=None, doc_type=None))
assert action['_op_type'] == 'delete'
assert action['_index'] == 'records-authorities-authority-v1.0.0'
assert action['_type'] == 'authority-v1.0.0'
assert action['_type'] == 'authority-v1.0.0' if lt_es7 else '_doc'
assert action['_id'] == 'myid'


Expand All @@ -94,10 +97,16 @@ def receiver(sender, json=None, record=None, arguments=None, **kwargs):
))
assert action['_op_type'] == 'index'
assert action['_index'] == app.config['INDEXER_DEFAULT_INDEX']
assert action['_type'] == app.config['INDEXER_DEFAULT_DOC_TYPE']
assert action['_id'] == str(record.id)
assert action['_version'] == record.revision_id
assert action['_version_type'] == 'external_gte'
if lt_es7:
assert action['_type'] == \
app.config['INDEXER_DEFAULT_DOC_TYPE']
assert action['_version'] == record.revision_id
assert action['_version_type'] == 'external_gte'
else:
assert action['_type'] == '_doc'
assert action['version'] == record.revision_id
assert action['version_type'] == 'external_gte'
assert action['pipeline'] == 'foobar'
assert 'title' in action['_source']
assert 'extra' in action['_source']
Expand Down Expand Up @@ -163,12 +172,13 @@ def test_index(app):
RecordIndexer(search_client=client_mock, version_type='force').index(
record, arguments={'pipeline': 'foobar'})

doc_type = app.config['INDEXER_DEFAULT_DOC_TYPE'] if lt_es7 else '_doc'
client_mock.index.assert_called_with(
id=str(recid),
version=0,
version_type='force',
index=app.config['INDEXER_DEFAULT_INDEX'],
doc_type=app.config['INDEXER_DEFAULT_DOC_TYPE'],
doc_type=doc_type,
body={
'title': 'Test',
'_created': pytz.utc.localize(record.created).isoformat(),
Expand All @@ -192,10 +202,11 @@ def test_delete(app):
client_mock = MagicMock()
RecordIndexer(search_client=client_mock).delete(record)

doc_type = app.config['INDEXER_DEFAULT_DOC_TYPE'] if lt_es7 else '_doc'
client_mock.delete.assert_called_with(
id=str(recid),
index=app.config['INDEXER_DEFAULT_INDEX'],
doc_type=app.config['INDEXER_DEFAULT_DOC_TYPE'],
doc_type=doc_type,
)

with patch('invenio_indexer.api.RecordIndexer.delete') as fun:
Expand Down
4 changes: 2 additions & 2 deletions tests/test_cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ def test_reindex(app, script_info):

# Both records should be indexed
res = current_search_client.search(index=index)
assert res['hits']['total'] == 2
assert len(res['hits']['hits']) == 2

# Delete one of the records
record2 = Record.get_record(id2)
Expand All @@ -99,7 +99,7 @@ def test_reindex(app, script_info):

# Check that the deleted record is not indexed
res = current_search_client.search(index=index)
assert res['hits']['total'] == 1
assert len(res['hits']['hits']) == 1
assert res['hits']['hits'][0]['_source']['title'] == 'Test 1'

# Destroy queue and the index
Expand Down
8 changes: 6 additions & 2 deletions tests/test_invenio_bulkindexer.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
import uuid

import pytz
from elasticsearch import VERSION as ES_VERSION
from flask import Flask
from invenio_db import db
from invenio_records import Record
Expand All @@ -24,6 +25,8 @@
_global_magic_hook = MagicMock()
"""Iternal importable magic hook instance."""

lt_es7 = ES_VERSION[0] < 7


def test_version():
"""Test version import."""
Expand Down Expand Up @@ -62,9 +65,10 @@ def test_hook_initialization(base_app):
RecordIndexer(search_client=client_mock, version_type='force').index(
record)
args = (app, )
doc_type = app.config['INDEXER_DEFAULT_DOC_TYPE'] if lt_es7 else '_doc'
kwargs = dict(
index=app.config['INDEXER_DEFAULT_INDEX'],
doc_type=app.config['INDEXER_DEFAULT_DOC_TYPE'],
doc_type=doc_type,
arguments={},
record=record,
json={
Expand All @@ -80,7 +84,7 @@ def test_hook_initialization(base_app):
version=0,
version_type='force',
index=app.config['INDEXER_DEFAULT_INDEX'],
doc_type=app.config['INDEXER_DEFAULT_DOC_TYPE'],
doc_type=doc_type,
body={
'title': 'Test',
'_created': pytz.utc.localize(record.created).isoformat(),
Expand Down
55 changes: 55 additions & 0 deletions tests/test_utils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
# -*- coding: utf-8 -*-
#
# This file is part of Invenio.
# Copyright (C) 2016-2018 CERN.
#
# Invenio is free software; you can redistribute it and/or modify it
# under the terms of the MIT License; see LICENSE file for more details.

"""Test API."""

from __future__ import absolute_import, print_function

from elasticsearch import VERSION as ES_VERSION

from invenio_indexer.utils import es_bulk_param_compatibility


def test_es_bulk_param_compat(generate_action):
"""Test ES bulk param compatibility decorator."""
action = {
'_index': 'index',
'_type': '_doc',
'test': 'test',
'opType': 'opType',
'versionType': 'version type',
'_parent': 'parent',
'_retry_on_conflict': 'retry',
'_routing': 'routing',
'_version': 'version',
}
if ES_VERSION[0] < 7:
expected_action = {
'_index': 'index',
'_type': '_doc',
'test': 'test',
'opType': 'opType',
'versionType': 'version type',
'_parent': 'parent',
'_retry_on_conflict': 'retry',
'_routing': 'routing',
'_version': 'version',
}
else:
expected_action = {
'_index': 'index',
'_type': '_doc',
'test': 'test',
'op_type': 'opType',
'version_type': 'version type',
'parent': 'parent',
'retry_on_conflict': 'retry',
'routing': 'routing',
'version': 'version',
}
assert generate_action(action) == expected_action

0 comments on commit 2631612

Please sign in to comment.