Skip to content

Commit

Permalink
Merge 7a84377 into 65abaa1
Browse files Browse the repository at this point in the history
  • Loading branch information
slint committed Oct 5, 2018
2 parents 65abaa1 + 7a84377 commit 5725cc2
Show file tree
Hide file tree
Showing 5 changed files with 33 additions and 18 deletions.
5 changes: 5 additions & 0 deletions CHANGES.rst
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,11 @@
Changes
=======

Version 1.0.1 (released 2018-10-08)

- Allow forwarding arguments from ``RecordIndexer.process_bulk_queue`` to
``elasticsearch.helpers.bulk`` calls.

Version 1.0.0 (released 2018-03-23)

- Initial public release.
14 changes: 9 additions & 5 deletions invenio_indexer/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,10 +61,10 @@ def __init__(self, search_client=None, exchange=None, queue=None,
from the record.
"""
self.client = search_client or current_search_client
self._exchange = None
self._queue = None
self._exchange = exchange
self._queue = queue
self._record_to_index = record_to_index or current_record_to_index
self._routing_key = None
self._routing_key = routing_key
self._version_type = version_type or 'external_gte'

def record_to_index(self, record):
Expand Down Expand Up @@ -163,8 +163,11 @@ def bulk_delete(self, record_id_iterator):
"""
self._bulk_op(record_id_iterator, 'delete')

def process_bulk_queue(self):
"""Process bulk indexing queue."""
def process_bulk_queue(self, **kwargs):
"""Process bulk indexing queue.
:param dict kwargs: Passed to ``elasticsearch.helpers.bulk``.
"""
with current_celery_app.pool.acquire(block=True) as conn:
consumer = Consumer(
connection=conn,
Expand All @@ -180,6 +183,7 @@ def process_bulk_queue(self):
self._actionsiter(consumer.iterqueue()),
stats_only=True,
request_timeout=req_timeout,
**kwargs,
)

consumer.close()
Expand Down
22 changes: 13 additions & 9 deletions invenio_indexer/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,25 +36,29 @@ def abort_if_false(ctx, param, value):
@click.option('--queue', '-q', type=str,
help='Name of the celery queue used to put the tasks into.')
@click.option('--version-type')
@click.option('--raise-on-error/--skip-errors', default=True)
@with_appcontext
def run(delayed, concurrency, version_type=None, queue=None):
def run(delayed, concurrency, version_type=None, queue=None,
raise_on_error=True):
"""Run bulk record indexing."""
if delayed:
click.secho(
'Starting {0} tasks for indexing records...'.format(concurrency),
fg='green')
data = {
celery_kwargs = {
'kwargs': {
'version_type': version_type
'version_type': version_type,
'raise_on_error': raise_on_error,
}
}
click.secho(
'Starting {0} tasks for indexing records...'.format(concurrency),
fg='green')
if queue is not None:
data.update({'queue': queue})
celery_kwargs.update({'queue': queue})
for c in range(0, concurrency):
process_bulk_queue.apply_async(**data)
process_bulk_queue.apply_async(**celery_kwargs)
else:
click.secho('Indexing records...', fg='green')
RecordIndexer(version_type=version_type).process_bulk_queue()
RecordIndexer(version_type=version_type).process_bulk_queue(
raise_on_error=raise_on_error)


@index.command()
Expand Down
8 changes: 5 additions & 3 deletions invenio_indexer/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,14 +16,16 @@


@shared_task(ignore_result=True)
def process_bulk_queue(version_type=None):
def process_bulk_queue(version_type=None, raise_on_error=True):
"""Process bulk indexing queue.
:param version_type: Elasticsearch version type.
:param str version_type: Elasticsearch version type.
:param bool raise_on_error: If ``True`` raises on bulk indexing errors.
Note: You can start multiple versions of this task.
"""
RecordIndexer(version_type=version_type).process_bulk_queue()
RecordIndexer(version_type=version_type).process_bulk_queue(
raise_on_error=raise_on_error)


@shared_task(ignore_result=True)
Expand Down
2 changes: 1 addition & 1 deletion invenio_indexer/version.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,4 +14,4 @@

from __future__ import absolute_import, print_function

__version__ = '1.0.0'
__version__ = '1.0.1'

0 comments on commit 5725cc2

Please sign in to comment.