From b7bbd8a12e644e1de0512453121cdf37a35564b5 Mon Sep 17 00:00:00 2001 From: Niklas Persson Date: Thu, 16 May 2019 16:21:30 +0200 Subject: [PATCH] global: add support for ES7 Closes #104 --- .travis.yml | 5 +- examples/data/v7/__init__.py | 7 +++ .../v7/testrecords/testrecord-v1.0.0.json | 25 +++++++++ invenio_indexer/__init__.py | 3 +- invenio_indexer/api.py | 2 + invenio_indexer/utils.py | 53 ++++++++++++++++-- setup.py | 5 +- tests/conftest.py | 10 ++++ tests/data/v7/__init__.py | 9 +++ .../records/authorities/authority-v1.0.0.json | 1 + .../bibliographic/bibliographic-v1.0.0.json | 1 + tests/data/v7/records/default-v1.0.0.json | 1 + tests/test_api.py | 23 ++++++-- tests/test_cli.py | 4 +- tests/test_invenio_bulkindexer.py | 8 ++- tests/test_utils.py | 55 +++++++++++++++++++ 16 files changed, 192 insertions(+), 20 deletions(-) create mode 100644 examples/data/v7/__init__.py create mode 100644 examples/data/v7/testrecords/testrecord-v1.0.0.json create mode 100644 tests/data/v7/__init__.py create mode 100644 tests/data/v7/records/authorities/authority-v1.0.0.json create mode 100644 tests/data/v7/records/bibliographic/bibliographic-v1.0.0.json create mode 100644 tests/data/v7/records/default-v1.0.0.json create mode 100644 tests/test_utils.py diff --git a/.travis.yml b/.travis.yml index 11d7ec6..00dde4d 100644 --- a/.travis.yml +++ b/.travis.yml @@ -19,6 +19,8 @@ matrix: - env: REQUIREMENTS=devel EXTRAS=all,elasticsearch5 BROKER_URL=redis://localhost:6379/0 ES_URL=$ES5_DOWNLOAD_URL - env: REQUIREMENTS=devel EXTRAS=all,elasticsearch5 BROKER_URL=amqp://localhost:5672// ES_URL=$ES5_DOWNLOAD_URL - env: REQUIREMENTS=devel EXTRAS=all,elasticsearch2 BROKER_URL=redis://localhost:6379/0 ES_URL=$ES2_DOWNLOAD_URL + - env: REQUIREMENTS=release EXTRAS=all,elasticsearch7 BROKER_URL=redis://localhost:6379/0 ES_URL=$ES7_DOWNLOAD_URL + - env: REQUIREMENTS=devel EXTRAS=all,elasticsearch7 BROKER_URL=amqp://localhost:5672// ES_URL=$ES7_DOWNLOAD_URL cache: - pip @@ -32,16 +34,15 @@ env: - ES2_DOWNLOAD_URL="https://download.elastic.co/elasticsearch/release/org/elasticsearch/distribution/tar/elasticsearch/2.4.2/elasticsearch-2.4.2.tar.gz" - ES5_DOWNLOAD_URL="https://artifacts.elastic.co/downloads/elasticsearch/elasticsearch-5.6.4.tar.gz" - ES6_DOWNLOAD_URL="https://artifacts.elastic.co/downloads/elasticsearch/elasticsearch-6.2.0.tar.gz" + - ES7_DOWNLOAD_URL="https://artifacts.elastic.co/downloads/elasticsearch/elasticsearch-7.0.1-linux-x86_64.tar.gz" - ES_HOST=127.0.0.1 matrix: # ES 6: Redis vs RabbitMQ - REQUIREMENTS=release EXTRAS=all,elasticsearch6 BROKER_URL=redis://localhost:6379/0 ES_URL=$ES6_DOWNLOAD_URL - REQUIREMENTS=devel EXTRAS=all,elasticsearch6 BROKER_URL=amqp://localhost:5672// ES_URL=$ES6_DOWNLOAD_URL # ES 5: Redis vs RabbitMQ - - REQUIREMENTS=lowest EXTRAS=all,elasticsearch5 BROKER_URL=redis://localhost:6379/0 ES_URL=$ES5_DOWNLOAD_URL - REQUIREMENTS=lowest EXTRAS=all,elasticsearch5 BROKER_URL=amqp://localhost:5672// ES_URL=$ES5_DOWNLOAD_URL - REQUIREMENTS=release EXTRAS=all,elasticsearch5 BROKER_URL=redis://localhost:6379/0 ES_URL=$ES5_DOWNLOAD_URL - - REQUIREMENTS=release EXTRAS=all,elasticsearch5 BROKER_URL=amqp://localhost:5672// ES_URL=$ES5_DOWNLOAD_URL DEPLOY=true # ES 2: Redis only - REQUIREMENTS=lowest EXTRAS=all,elasticsearch2 BROKER_URL=redis://localhost:6379/0 ES_URL=$ES2_DOWNLOAD_URL - REQUIREMENTS=release EXTRAS=all,elasticsearch2 BROKER_URL=redis://localhost:6379/0 ES_URL=$ES2_DOWNLOAD_URL diff --git a/examples/data/v7/__init__.py b/examples/data/v7/__init__.py new file mode 100644 index 0000000..6ee8cbf --- /dev/null +++ b/examples/data/v7/__init__.py @@ -0,0 +1,7 @@ +# -*- coding: utf-8 -*- +# +# This file is part of Invenio. +# Copyright (C) 2018 CERN. +# +# Invenio is free software; you can redistribute it and/or modify it +# under the terms of the MIT License; see LICENSE file for more details. diff --git a/examples/data/v7/testrecords/testrecord-v1.0.0.json b/examples/data/v7/testrecords/testrecord-v1.0.0.json new file mode 100644 index 0000000..f0f4fde --- /dev/null +++ b/examples/data/v7/testrecords/testrecord-v1.0.0.json @@ -0,0 +1,25 @@ +{ + "mappings": { + "properties": { + "control_number": { + "type": "keyword" + }, + "description": { + "type": "text" + }, + "participants": { + "type": "long" + }, + "title": { + "type": "text", + "copy_to": "suggest_title" + }, + "type": { + "type": "keyword" + }, + "suggest_title": { + "type": "completion" + } + } + } +} diff --git a/invenio_indexer/__init__.py b/invenio_indexer/__init__.py index df7f12f..c7fa6ef 100644 --- a/invenio_indexer/__init__.py +++ b/invenio_indexer/__init__.py @@ -80,8 +80,7 @@ Would be indexed in the following Elasticsearch index/doctype: ->>> print(indexer.record_to_index(record)) -('records-record-v1.0.0', 'record-v1.0.0') +>>> index, doc_type = indexer.record_to_index(record) Bulk indexing ------------- diff --git a/invenio_indexer/api.py b/invenio_indexer/api.py index d3c63ff..a5351c6 100644 --- a/invenio_indexer/api.py +++ b/invenio_indexer/api.py @@ -25,6 +25,7 @@ from .proxies import current_record_to_index from .signals import before_record_index +from .utils import es_bulk_param_compatibility class Producer(KombuProducer): @@ -275,6 +276,7 @@ def _delete_action(self, payload): '_id': payload['id'], } + @es_bulk_param_compatibility def _index_action(self, payload): """Bulk index action. diff --git a/invenio_indexer/utils.py b/invenio_indexer/utils.py index d17a155..fe31517 100644 --- a/invenio_indexer/utils.py +++ b/invenio_indexer/utils.py @@ -8,6 +8,9 @@ """Utility functions for data processing.""" +from functools import wraps + +from elasticsearch import VERSION as ES_VERSION from flask import current_app from invenio_search import current_search from invenio_search.utils import schema_to_index @@ -29,8 +32,48 @@ def default_record_to_index(record): index, doc_type = schema_to_index(schema, index_names=index_names) - if index and doc_type: - return index, doc_type - else: - return (current_app.config['INDEXER_DEFAULT_INDEX'], - current_app.config['INDEXER_DEFAULT_DOC_TYPE']) + if not (index and doc_type): + index, doc_type = (current_app.config['INDEXER_DEFAULT_INDEX'], + current_app.config['INDEXER_DEFAULT_DOC_TYPE']) + + if ES_VERSION[0] >= 7: + doc_type = '_doc' + + return index, doc_type + + +def es_bulk_param_compatibility(f): + """Decorator to ensure parameter compatibility. + + ES version 7 removed deprecated params for bulk indexing which are modified + by this decorator. + """ + removed_params = ( + 'opType', + 'versionType', + '_versionType', + '_parent', + '_retry_on_conflict', + '_routing', + '_version', + '_version_type' + ) + + def update_params(obj, from_key, to_key): + if from_key in obj: + obj[to_key] = obj[from_key] + del obj[from_key] + + @wraps(f) + def inner(*args, **kwargs): + action = f(*args, **kwargs) + if isinstance(action, dict) and ES_VERSION[0] >= 7: + for param in removed_params: + if param == 'opType': + update_params(action, 'opType', 'op_type') + elif param in ('versionType', '_versionType'): + update_params(action, param, 'version_type') + else: + update_params(action, param, param[1:]) + return action + return inner diff --git a/setup.py b/setup.py index cc14c7a..a097ed0 100644 --- a/setup.py +++ b/setup.py @@ -47,12 +47,15 @@ 'elasticsearch6': [ 'invenio-search[elasticsearch6]>={}'.format(invenio_search_version), ], + 'elasticsearch7': [ + 'invenio-search[elasticsearch7]>={}'.format(invenio_search_version), + ], 'tests': tests_require, } extras_require['all'] = [] for name, reqs in extras_require.items(): - if name[0] == ':' or name in ('elasticsearch2', 'elasticsearch5', 'elasticsearch6'): + if name[0] == ':' or name in ('elasticsearch2', 'elasticsearch5', 'elasticsearch6', 'elasticsearch7'): continue extras_require['all'].extend(reqs) diff --git a/tests/conftest.py b/tests/conftest.py index a2a0461..e428a88 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -26,6 +26,7 @@ drop_database from invenio_indexer import InvenioIndexer +from invenio_indexer.utils import es_bulk_param_compatibility @pytest.fixture() @@ -92,3 +93,12 @@ def queue(app): q.purge() return queue + + +@pytest.fixture() +def generate_action(): + """Generate action function to test bulk param decorator.""" + @es_bulk_param_compatibility + def inner(action): + return action + return inner diff --git a/tests/data/v7/__init__.py b/tests/data/v7/__init__.py new file mode 100644 index 0000000..ce5c941 --- /dev/null +++ b/tests/data/v7/__init__.py @@ -0,0 +1,9 @@ +# -*- coding: utf-8 -*- +# +# This file is part of Invenio. +# Copyright (C) 2017-2018 CERN. +# +# Invenio is free software; you can redistribute it and/or modify it +# under the terms of the MIT License; see LICENSE file for more details. + +"""Elasticsearch v7 mappings.""" diff --git a/tests/data/v7/records/authorities/authority-v1.0.0.json b/tests/data/v7/records/authorities/authority-v1.0.0.json new file mode 100644 index 0000000..62d35da --- /dev/null +++ b/tests/data/v7/records/authorities/authority-v1.0.0.json @@ -0,0 +1 @@ +{"mappings": {}} diff --git a/tests/data/v7/records/bibliographic/bibliographic-v1.0.0.json b/tests/data/v7/records/bibliographic/bibliographic-v1.0.0.json new file mode 100644 index 0000000..62d35da --- /dev/null +++ b/tests/data/v7/records/bibliographic/bibliographic-v1.0.0.json @@ -0,0 +1 @@ +{"mappings": {}} diff --git a/tests/data/v7/records/default-v1.0.0.json b/tests/data/v7/records/default-v1.0.0.json new file mode 100644 index 0000000..62d35da --- /dev/null +++ b/tests/data/v7/records/default-v1.0.0.json @@ -0,0 +1 @@ +{"mappings": {}} diff --git a/tests/test_api.py b/tests/test_api.py index 8ac1c87..65a3c25 100644 --- a/tests/test_api.py +++ b/tests/test_api.py @@ -15,6 +15,7 @@ import pytz from celery.messaging import establish_connection +from elasticsearch import VERSION as ES_VERSION from invenio_db import db from invenio_records.api import Record from jsonresolver import JSONResolver @@ -25,6 +26,8 @@ from invenio_indexer.api import BulkRecordIndexer, RecordIndexer from invenio_indexer.signals import before_record_index +lt_es7 = ES_VERSION[0] < 7 + def test_indexer_bulk_index(app, queue): """Test delay indexing.""" @@ -73,7 +76,7 @@ def test_delete_action(app): dict(id='myid', op='delete', index=None, doc_type=None)) assert action['_op_type'] == 'delete' assert action['_index'] == 'records-authorities-authority-v1.0.0' - assert action['_type'] == 'authority-v1.0.0' + assert action['_type'] == 'authority-v1.0.0' if lt_es7 else '_doc' assert action['_id'] == 'myid' @@ -94,10 +97,16 @@ def receiver(sender, json=None, record=None, arguments=None, **kwargs): )) assert action['_op_type'] == 'index' assert action['_index'] == app.config['INDEXER_DEFAULT_INDEX'] - assert action['_type'] == app.config['INDEXER_DEFAULT_DOC_TYPE'] assert action['_id'] == str(record.id) - assert action['_version'] == record.revision_id - assert action['_version_type'] == 'external_gte' + if lt_es7: + assert action['_type'] == \ + app.config['INDEXER_DEFAULT_DOC_TYPE'] + assert action['_version'] == record.revision_id + assert action['_version_type'] == 'external_gte' + else: + assert action['_type'] == '_doc' + assert action['version'] == record.revision_id + assert action['version_type'] == 'external_gte' assert action['pipeline'] == 'foobar' assert 'title' in action['_source'] assert 'extra' in action['_source'] @@ -163,12 +172,13 @@ def test_index(app): RecordIndexer(search_client=client_mock, version_type='force').index( record, arguments={'pipeline': 'foobar'}) + doc_type = app.config['INDEXER_DEFAULT_DOC_TYPE'] if lt_es7 else '_doc' client_mock.index.assert_called_with( id=str(recid), version=0, version_type='force', index=app.config['INDEXER_DEFAULT_INDEX'], - doc_type=app.config['INDEXER_DEFAULT_DOC_TYPE'], + doc_type=doc_type, body={ 'title': 'Test', '_created': pytz.utc.localize(record.created).isoformat(), @@ -192,10 +202,11 @@ def test_delete(app): client_mock = MagicMock() RecordIndexer(search_client=client_mock).delete(record) + doc_type = app.config['INDEXER_DEFAULT_DOC_TYPE'] if lt_es7 else '_doc' client_mock.delete.assert_called_with( id=str(recid), index=app.config['INDEXER_DEFAULT_INDEX'], - doc_type=app.config['INDEXER_DEFAULT_DOC_TYPE'], + doc_type=doc_type, ) with patch('invenio_indexer.api.RecordIndexer.delete') as fun: diff --git a/tests/test_cli.py b/tests/test_cli.py index e7f5e62..c57ff8a 100644 --- a/tests/test_cli.py +++ b/tests/test_cli.py @@ -81,7 +81,7 @@ def test_reindex(app, script_info): # Both records should be indexed res = current_search_client.search(index=index) - assert res['hits']['total'] == 2 + assert len(res['hits']['hits']) == 2 # Delete one of the records record2 = Record.get_record(id2) @@ -99,7 +99,7 @@ def test_reindex(app, script_info): # Check that the deleted record is not indexed res = current_search_client.search(index=index) - assert res['hits']['total'] == 1 + assert len(res['hits']['hits']) == 1 assert res['hits']['hits'][0]['_source']['title'] == 'Test 1' # Destroy queue and the index diff --git a/tests/test_invenio_bulkindexer.py b/tests/test_invenio_bulkindexer.py index edb065f..e86ea2b 100644 --- a/tests/test_invenio_bulkindexer.py +++ b/tests/test_invenio_bulkindexer.py @@ -13,6 +13,7 @@ import uuid import pytz +from elasticsearch import VERSION as ES_VERSION from flask import Flask from invenio_db import db from invenio_records import Record @@ -24,6 +25,8 @@ _global_magic_hook = MagicMock() """Iternal importable magic hook instance.""" +lt_es7 = ES_VERSION[0] < 7 + def test_version(): """Test version import.""" @@ -62,9 +65,10 @@ def test_hook_initialization(base_app): RecordIndexer(search_client=client_mock, version_type='force').index( record) args = (app, ) + doc_type = app.config['INDEXER_DEFAULT_DOC_TYPE'] if lt_es7 else '_doc' kwargs = dict( index=app.config['INDEXER_DEFAULT_INDEX'], - doc_type=app.config['INDEXER_DEFAULT_DOC_TYPE'], + doc_type=doc_type, arguments={}, record=record, json={ @@ -80,7 +84,7 @@ def test_hook_initialization(base_app): version=0, version_type='force', index=app.config['INDEXER_DEFAULT_INDEX'], - doc_type=app.config['INDEXER_DEFAULT_DOC_TYPE'], + doc_type=doc_type, body={ 'title': 'Test', '_created': pytz.utc.localize(record.created).isoformat(), diff --git a/tests/test_utils.py b/tests/test_utils.py new file mode 100644 index 0000000..e79f5ef --- /dev/null +++ b/tests/test_utils.py @@ -0,0 +1,55 @@ +# -*- coding: utf-8 -*- +# +# This file is part of Invenio. +# Copyright (C) 2016-2018 CERN. +# +# Invenio is free software; you can redistribute it and/or modify it +# under the terms of the MIT License; see LICENSE file for more details. + +"""Test API.""" + +from __future__ import absolute_import, print_function + +from elasticsearch import VERSION as ES_VERSION + +from invenio_indexer.utils import es_bulk_param_compatibility + + +def test_es_bulk_param_compat(generate_action): + """Test ES bulk param compatibility decorator.""" + action = { + '_index': 'index', + '_type': '_doc', + 'test': 'test', + 'opType': 'opType', + 'versionType': 'version type', + '_parent': 'parent', + '_retry_on_conflict': 'retry', + '_routing': 'routing', + '_version': 'version', + } + if ES_VERSION[0] < 7: + expected_action = { + '_index': 'index', + '_type': '_doc', + 'test': 'test', + 'opType': 'opType', + 'versionType': 'version type', + '_parent': 'parent', + '_retry_on_conflict': 'retry', + '_routing': 'routing', + '_version': 'version', + } + else: + expected_action = { + '_index': 'index', + '_type': '_doc', + 'test': 'test', + 'op_type': 'opType', + 'version_type': 'version type', + 'parent': 'parent', + 'retry_on_conflict': 'retry', + 'routing': 'routing', + 'version': 'version', + } + assert generate_action(action) == expected_action