Skip to content
This repository has been archived by the owner on Mar 15, 2018. It is now read-only.

Commit

Permalink
Rewrite reindex command to parallelize indexing tasks (bug 1025301)
Browse files Browse the repository at this point in the history
  • Loading branch information
robhudson committed Oct 3, 2014
1 parent 01051ad commit 721a8cf
Show file tree
Hide file tree
Showing 3 changed files with 81 additions and 108 deletions.
185 changes: 79 additions & 106 deletions lib/es/management/commands/reindex.py
Original file line number Original file line Diff line number Diff line change
Expand Up @@ -4,13 +4,13 @@
Currently creates the indexes and re-indexes apps and feed elements. Currently creates the indexes and re-indexes apps and feed elements.
""" """
import logging import logging
import os
import sys import sys
import time import time
from math import ceil
from optparse import make_option from optparse import make_option


import elasticsearch import elasticsearch
from celery import task from celery import chain, chord, task


from django.conf import settings from django.conf import settings
from django.core.management.base import BaseCommand, CommandError from django.core.management.base import BaseCommand, CommandError
Expand Down Expand Up @@ -62,21 +62,19 @@




@task @task
def delete_index(old_index): def pre_index(new_index, old_index, alias, indexer, settings):
"""Removes the index.""" """
sys.stdout.write('Removing index %r\n' % old_index) This sets up everything needed before indexing:
ES.indices.delete(index=old_index) * Flags the database.

* Creates the new index.

@task
def create_index(new_index, alias, indexer, settings):
"""Creates a mapping for the new index.
- new_index: new index name
- alias: alias name
- settings: a dictionary of settings
""" """
# Flag the database to indicate that the reindexing has started.
sys.stdout.write('Flagging the database to start the reindexation\n')
Reindexing.flag_reindexing(new_index=new_index, old_index=old_index,
alias=alias)
time.sleep(5) # Give celeryd some time to flag the DB.

sys.stdout.write( sys.stdout.write(
'Create the mapping for index %r, alias: %r\n' % (new_index, alias)) 'Create the mapping for index %r, alias: %r\n' % (new_index, alias))


Expand All @@ -98,52 +96,16 @@ def create_index(new_index, alias, indexer, settings):
wait_for_relocating_shards=0) wait_for_relocating_shards=0)




@task(time_limit=time_limits['hard'], soft_time_limit=time_limits['soft'])
def run_indexing(index, indexer, chunk_size):
"""Index the objects.
- index: name of the index
Note: Our ES doc sizes are about 5k in size. Chunking by 100 sends ~500kb
of data to ES at a time.
TODO: Use celery chords here to parallelize these indexing chunks. This
requires celery 3 (bug 825938).
"""
sys.stdout.write('Indexing apps into index: %s\n' % index)

qs = indexer.get_indexable().values_list('id', flat=True)
for ids in chunked(list(qs), chunk_size):
indexer.run_indexing(ids, ES, index=index)


@task
def flag_database(new_index, old_index, alias):
"""Flags the database to indicate that the reindexing has started."""
sys.stdout.write('Flagging the database to start the reindexation\n')
Reindexing.flag_reindexing(new_index=new_index, old_index=old_index,
alias=alias)
time.sleep(5) # Give celeryd some time to flag the DB.


@task @task
def unflag_database(): def post_index(new_index, old_index, alias, indexer, settings):
"""Unflag the database to indicate that the reindexing is over."""
sys.stdout.write('Unflagging the database\n')
Reindexing.unflag_reindexing()


@task
def update_alias(new_index, old_index, alias, settings):
""" """
Update the alias now that indexing is over. Perform post-indexing tasks:
* Optimize (which also does a refresh and a flush by default).
We do 3 things: * Update settings to reset number of replicas.
* Point the alias to this new index.
1. Optimize (which also does a refresh and a flush by default). * Unflag the database.
2. Update settings to reset number of replicas. * Remove the old index.
3. Point the alias to this new index. * Output the current alias configuration.
""" """
sys.stdout.write('Optimizing, updating settings and aliases.\n') sys.stdout.write('Optimizing, updating settings and aliases.\n')
Expand All @@ -164,17 +126,42 @@ def update_alias(new_index, old_index, alias, settings):
) )
ES.indices.update_aliases(body=dict(actions=actions)) ES.indices.update_aliases(body=dict(actions=actions))


sys.stdout.write('Unflagging the database\n')
Reindexing.unflag_reindexing()

sys.stdout.write('Removing index %r\n' % old_index)
try:
ES.indices.delete(index=old_index)
except elasticsearch.NotFoundError:
pass


@task
def output_summary():
alias_output = '' alias_output = ''
for ALIAS, INDEXER, CHUNK_SIZE in INDEXES: for ALIAS, INDEXER, CHUNK_SIZE in INDEXES:
alias_output += unicode(ES.indices.get_aliases(index=ALIAS)) + '\n' alias_output += unicode(ES.indices.get_aliases(index=ALIAS)) + '\n'
sys.stdout.write( sys.stdout.write(
'Reindexation done. Current Aliases configuration: %s\n' % 'Reindexation done. Current aliases configuration: %s\n' %
alias_output) alias_output)




@task(ignore_result=False)
def run_indexing(index, indexer, ids):
"""Index the objects.
- index: name of the index
Note: `ignore_result=False` is required for the chord to work and trigger
the callback.
"""
indexer.run_indexing(ids, ES, index=index)


def chunk_indexing(indexer, chunk_size):
"""Chunk the items to index."""
chunks = list(indexer.get_indexable().values_list('id', flat=True))
return chunked(chunks, chunk_size), len(chunks)


class Command(BaseCommand): class Command(BaseCommand):
help = 'Reindex all ES indexes' help = 'Reindex all ES indexes'
option_list = BaseCommand.option_list + ( option_list = BaseCommand.option_list + (
Expand Down Expand Up @@ -210,18 +197,27 @@ def handle(self, *args, **kwargs):
raise CommandError('Indexation already occuring - use --force to ' raise CommandError('Indexation already occuring - use --force to '
'bypass') 'bypass')
elif force: elif force:
unflag_database() Reindexing.unflag_reindexing()


chain = None
old_indexes = []
for ALIAS, INDEXER, CHUNK_SIZE in INDEXES: for ALIAS, INDEXER, CHUNK_SIZE in INDEXES:

chunks, total = chunk_indexing(INDEXER, CHUNK_SIZE)
if not total:
self.stdout.write('\nNo tasks to queue for %s' % ALIAS)
continue
else:
total_chunks = int(ceil(total / float(CHUNK_SIZE)))
self.stdout.write(
'\nParallel indexing {total} items into {n} chunks of '
'size {size}'.format(total=total, n=total_chunks,
size=CHUNK_SIZE))

# Get the old index if it exists. # Get the old index if it exists.
try: try:
aliases = ES.indices.get_alias(name=ALIAS).keys() aliases = ES.indices.get_alias(name=ALIAS).keys()
except elasticsearch.NotFoundError: except elasticsearch.NotFoundError:
aliases = [] aliases = []
old_index = aliases[0] if aliases else None old_index = aliases[0] if aliases else None
old_indexes.append(old_index)


# Create a new index, using the index name with a timestamp. # Create a new index, using the index name with a timestamp.
new_index = timestamp_index(prefix + ALIAS) new_index = timestamp_index(prefix + ALIAS)
Expand All @@ -240,45 +236,22 @@ def handle(self, *args, **kwargs):
num_shards = s.get('number_of_shards', num_shards = s.get('number_of_shards',
settings.ES_DEFAULT_NUM_SHARDS) settings.ES_DEFAULT_NUM_SHARDS)


# Flag the database to mark as currently indexing. # Ship it.
if not chain: chain(
chain = flag_database.si(new_index, old_index, ALIAS) pre_index.si(new_index, old_index, ALIAS, INDEXER, {
else: 'analysis': INDEXER.get_analysis(),
chain |= flag_database.si(new_index, old_index, ALIAS) 'number_of_replicas': 0,

'number_of_shards': num_shards,
# Create the indexes and mappings. 'store.compress.tv': True,
# Note: We set num_replicas=0 here to lower load while re-indexing. 'store.compress.stored': True,
# In later step we increase it which results in more efficient bulk 'refresh_interval': '-1'}),
# copy in ES. For ES < 0.90 we manually enable compression. chord(
chain |= create_index.si(new_index, ALIAS, INDEXER, { header=[run_indexing.si(new_index, INDEXER, chunk)
'analysis': INDEXER.get_analysis(), for chunk in chunks],
'number_of_replicas': 0, 'number_of_shards': num_shards, body=post_index.si(new_index, old_index, ALIAS, INDEXER, {
'store.compress.tv': True, 'store.compress.stored': True, 'number_of_replicas': num_replicas,
'refresh_interval': '-1'}) 'refresh_interval': '5s'})

)
# Index all the things! ).apply_async()
chain |= run_indexing.si(new_index, INDEXER, CHUNK_SIZE)

# After indexing we optimize the index, adjust settings, and point
# alias to the new index.
chain |= update_alias.si(new_index, old_index, ALIAS, {
'number_of_replicas': num_replicas, 'refresh_interval': '5s'})

# Unflag the database to mark as done indexing.
chain |= unflag_database.si()

# Delete the old index, if any.
for old_index in old_indexes:
if old_index:
chain |= delete_index.si(old_index)

# All done!
chain |= output_summary.si()


# Ship it.
self.stdout.write('\nNew index and indexing tasks all queued up.\n') self.stdout.write('\nNew index and indexing tasks all queued up.\n')
os.environ['FORCE_INDEXING'] = '1'
try:
chain.apply_async()
finally:
del os.environ['FORCE_INDEXING']
2 changes: 1 addition & 1 deletion mkt/webapps/indexers.py
Original file line number Original file line Diff line number Diff line change
Expand Up @@ -437,7 +437,7 @@ def run_indexing(cls, ids, ES, index=None, **kw):
sys.stdout.write('Failed to index webapp {0}: {1}\n'.format( sys.stdout.write('Failed to index webapp {0}: {1}\n'.format(
obj.id, e)) obj.id, e))


WebappIndexer.bulk_index(docs, es=ES, index=index or cls.get_index()) cls.bulk_index(docs, es=ES, index=index or cls.get_index())


@classmethod @classmethod
def get_app_filter(cls, request, additional_data=None, sq=None, def get_app_filter(cls, request, additional_data=None, sq=None,
Expand Down
2 changes: 1 addition & 1 deletion requirements/prod.txt
Original file line number Original file line Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ billiard==2.7.3.34
bleach==1.4 bleach==1.4
boto==2.20.0 boto==2.20.0
cef==0.5 cef==0.5
celery==3.0.24 celery==3.0.25
celery-tasktree==0.3.2 celery-tasktree==0.3.2
certifi==0.0.8 certifi==0.0.8
chardet==2.1.1 chardet==2.1.1
Expand Down

0 comments on commit 721a8cf

Please sign in to comment.