Skip to content

Commit

Permalink
api: allow customization of ES indexing arguments
Browse files Browse the repository at this point in the history
  • Loading branch information
Pablo Panero authored and slint committed May 14, 2019
1 parent 917465f commit e52dc11
Show file tree
Hide file tree
Showing 4 changed files with 41 additions and 13 deletions.
44 changes: 33 additions & 11 deletions invenio_indexer/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ def mq_routing_key(self):
#
# High-level API
#
def index(self, record):
def index(self, record, arguments=None, **kwargs):
"""Index a record.
The caller is responsible for ensuring that the record has already been
Expand All @@ -115,39 +115,51 @@ def index(self, record):
:param record: Record instance.
"""
index, doc_type = self.record_to_index(record)
arguments = arguments or {}
body = self._prepare_record(
record, index, doc_type, arguments, **kwargs)

return self.client.index(
id=str(record.id),
version=record.revision_id,
version_type=self._version_type,
index=index,
doc_type=doc_type,
body=self._prepare_record(record, index, doc_type),
body=body,
**arguments
)

def index_by_id(self, record_uuid):
def index_by_id(self, record_uuid, **kwargs):
"""Index a record by record identifier.
:param record_uuid: Record identifier.
:param kwargs: Passed to :meth:`RecordIndexer.index`.
"""
return self.index(Record.get_record(record_uuid))
return self.index(Record.get_record(record_uuid), **kwargs)

def delete(self, record):
def delete(self, record, **kwargs):
"""Delete a record.
:param record: Record instance.
:param kwargs: Passed to
:meth:`elasticsearch:elasticsearch.Elasticsearch.delete`.
"""
index, doc_type = self.record_to_index(record)

return self.client.delete(
id=str(record.id),
index=index,
doc_type=doc_type,
**kwargs
)

def delete_by_id(self, record_uuid):
"""Delete record from index by record identifier."""
self.delete(Record.get_record(record_uuid))
def delete_by_id(self, record_uuid, **kwargs):
"""Delete record from index by record identifier.
:param record_uuid: Record identifier.
:param kwargs: Passed to :meth:`RecordIndexer.delete`.
"""
self.delete(Record.get_record(record_uuid), **kwargs)

def bulk_index(self, record_id_iterator):
"""Bulk index records.
Expand Down Expand Up @@ -272,23 +284,31 @@ def _index_action(self, payload):
record = Record.get_record(payload['id'])
index, doc_type = self.record_to_index(record)

return {
arguments = {}
body = self._prepare_record(record, index, doc_type, arguments)

action = {
'_op_type': 'index',
'_index': index,
'_type': doc_type,
'_id': str(record.id),
'_version': record.revision_id,
'_version_type': self._version_type,
'_source': self._prepare_record(record, index, doc_type),
'_source': body
}
action.update(arguments)

return action

@staticmethod
def _prepare_record(record, index, doc_type):
def _prepare_record(record, index, doc_type, arguments=None, **kwargs):
"""Prepare record data for indexing.
:param record: The record to prepare.
:param index: The Elasticsearch index.
:param doc_type: The Elasticsearch document type.
:param arguments: The arguments to send to Elasticsearch upon indexing.
:param **kwargs: Extra parameters.
:returns: The record metadata.
"""
if current_app.config['INDEXER_REPLACE_REFS']:
Expand All @@ -308,6 +328,8 @@ def _prepare_record(record, index, doc_type):
record=record,
index=index,
doc_type=doc_type,
arguments={} if arguments is None else arguments,
**kwargs
)

return data
Expand Down
2 changes: 2 additions & 0 deletions invenio_indexer/signals.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,4 +24,6 @@
- ``record``: The record being indexed.
- ``index``: The index in which the record will be indexed.
- ``doc_type``: The doc_type for the record.
- ``arguments``: The arguments to pass to Elasticsearch for indexing.
- ``**kwargs``: Extra arguments.
"""
7 changes: 5 additions & 2 deletions tests/test_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -83,8 +83,9 @@ def test_index_action(app):
record = Record.create({'title': 'Test'})
db.session.commit()

def receiver(sender, json=None, record=None, **kwargs):
def receiver(sender, json=None, record=None, arguments=None, **kwargs):
json['extra'] = 'extra'
arguments['pipeline'] = 'foobar'

with before_record_index.connected_to(receiver):
action = RecordIndexer()._index_action(dict(
Expand All @@ -97,6 +98,7 @@ def receiver(sender, json=None, record=None, **kwargs):
assert action['_id'] == str(record.id)
assert action['_version'] == record.revision_id
assert action['_version_type'] == 'external_gte'
assert action['pipeline'] == 'foobar'
assert 'title' in action['_source']
assert 'extra' in action['_source']

Expand Down Expand Up @@ -159,7 +161,7 @@ def test_index(app):

client_mock = MagicMock()
RecordIndexer(search_client=client_mock, version_type='force').index(
record)
record, arguments={'pipeline': 'foobar'})

client_mock.index.assert_called_with(
id=str(recid),
Expand All @@ -172,6 +174,7 @@ def test_index(app):
'_created': pytz.utc.localize(record.created).isoformat(),
'_updated': pytz.utc.localize(record.updated).isoformat(),
},
pipeline='foobar',
)

with patch('invenio_indexer.api.RecordIndexer.index') as fun:
Expand Down
1 change: 1 addition & 0 deletions tests/test_invenio_bulkindexer.py
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ def test_hook_initialization(base_app):
kwargs = dict(
index=app.config['INDEXER_DEFAULT_INDEX'],
doc_type=app.config['INDEXER_DEFAULT_DOC_TYPE'],
arguments={},
record=record,
json={
'title': 'Test',
Expand Down

0 comments on commit e52dc11

Please sign in to comment.