Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

cli: configurable ES version_type when running indexer #68

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
9 changes: 5 additions & 4 deletions invenio_indexer/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,18 +49,19 @@ def abort_if_false(ctx, param, value):
@click.option(
'--concurrency', '-c', default=1, type=int,
help='Number of concurrent indexing tasks to start.')
@click.option('--version-type')
@with_appcontext
def run(delayed, concurrency):
def run(delayed, concurrency, version_type=None):
"""Run bulk record indexing."""
if delayed:
click.secho(
'Starting {0} tasks for indexing records...'.format(concurrency),
fg='green')
for c in range(0, concurrency):
process_bulk_queue.delay()
process_bulk_queue.delay(version_type=version_type)
else:
click.secho('Indexing records...', fg='green')
RecordIndexer().process_bulk_queue()
RecordIndexer(version_type=version_type).process_bulk_queue()


@index.command()
Expand All @@ -69,7 +70,7 @@ def run(delayed, concurrency):
prompt='Do you really want to reindex all records?')
@click.option('-t', '--pid-type', multiple=True, required=True)
@with_appcontext
def reindex(pid_type, ids=None):
def reindex(pid_type):
"""Reindex all records.

:param pid_type: Pid type.
Expand Down
6 changes: 4 additions & 2 deletions invenio_indexer/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,12 +32,14 @@


@shared_task(ignore_result=True)
def process_bulk_queue():
def process_bulk_queue(version_type=None):
"""Process bulk indexing queue.

:param version_type: Elasticsearch version type.

Note: You can start multiple versions of this task.
"""
RecordIndexer().process_bulk_queue()
RecordIndexer(version_type=version_type).process_bulk_queue()


@shared_task(ignore_result=True)
Expand Down