Skip to content

Commit

Permalink
api: optimistic concurrency control for delete
Browse files Browse the repository at this point in the history
* Adds usage of optimistic concurrency control for delete requests
  to ensure proper operation of delete operation as well as ensuring
  that documents can be recreated and reindexed immediately after a
  delete request.
  • Loading branch information
lnielsen committed Sep 10, 2020
1 parent b858090 commit a91d295
Show file tree
Hide file tree
Showing 5 changed files with 27 additions and 1 deletion.
10 changes: 10 additions & 0 deletions CHANGES.rst
Expand Up @@ -8,6 +8,16 @@
Changes
=======

Version 1.2.0 (released TBD)

- Changes delete requests to optimistic concurrency control by providing the
the version and version_type in delete requests. The previous behavior can
restored by calling
``RecordIndexer().delete(record, version=None, version_type=None)`` instead.

- Adds support for using new-style record dumping controlled via the
``Record.enable_jsonref`` flag.

Version 1.1.2 (released 2020-04-28)

- Introduces ``RecordIndexer.record_cls`` for customizing the record class.
Expand Down
12 changes: 12 additions & 0 deletions invenio_indexer/api.py
Expand Up @@ -178,6 +178,8 @@ def delete(self, record, **kwargs):
id=str(record.id),
index=index,
doc_type=doc_type,
version=kwargs.pop('version', record.revision_id),
version_type=kwargs.pop('version_type', self._version_type),
**kwargs
)

Expand Down Expand Up @@ -295,18 +297,28 @@ def _delete_action(self, payload):
:param payload: Decoded message body.
:returns: Dictionary defining an Elasticsearch bulk 'delete' action.
"""
kwargs = {}
index, doc_type = payload.get('index'), payload.get('doc_type')
if not (index and doc_type):
record = self.record_cls.get_record(
payload['id'], with_deleted=True)
index, doc_type = self.record_to_index(record)
kwargs['_version'] = record.revision_id
kwargs['_version_type'] = self._version_type
else:
# Allow version to be sent in the payload (but only use if we
# haven't loaded the record.
if 'version' in payload:
kwargs['_version'] = payload['version']
kwargs['_version_type'] = self._version_type
index, doc_type = self._prepare_index(index, doc_type)

return {
'_op_type': 'delete',
'_index': index,
'_type': doc_type,
'_id': payload['id'],
**kwargs,
}

def _index_action(self, payload):
Expand Down
2 changes: 1 addition & 1 deletion invenio_indexer/version.py
Expand Up @@ -14,4 +14,4 @@

from __future__ import absolute_import, print_function

__version__ = '1.1.2'
__version__ = '1.2.0a1.dev0'
2 changes: 2 additions & 0 deletions tests/test_api.py
Expand Up @@ -221,6 +221,8 @@ def test_delete(app):
id=str(recid),
index=app.config['INDEXER_DEFAULT_INDEX'],
doc_type=doc_type,
version=record.revision_id,
version_type='external_gte',
)

with patch('invenio_indexer.api.RecordIndexer.delete') as fun:
Expand Down
2 changes: 2 additions & 0 deletions tests/test_invenio_indexer.py
Expand Up @@ -176,4 +176,6 @@ def test_index_prefixing(base_app):
id=str(record3.id),
index='test-' + default_index,
doc_type=default_doc_type if lt_es7 else '_doc',
version=record3.revision_id,
version_type='external_gte',
)

0 comments on commit a91d295

Please sign in to comment.