Skip to content
Browse files

[bug 718826, 715932] Make ES indexing less sucky

* the ES connection already has code for forcing bulk, so we don't need to
  repeat that. this changes the code to push the setting to ES.
* this also tweaks the estimation code so that it shows minutes and seconds and
  shows the total delta later. Now I can stop running
  "time ./manage.py esreindex".
* fix esreindex so that you can specify doctypes. This will appropriately
  create/delete indexes so that what you don't want to delete won't get
  deleted.
* adds basic handling for bad data.

  This does a log.exception, but we really should log more than that and/or
  make it more obvious to developers that there's bad data out there.

  In the meantime, this allows us to continue indexing.
* reduced memory usage of indexing by iterating over ids---now it runs on
  my laptop.
* ghanges _get_index() to get_es_index(). We use it so often it might as
  well be part of the "public API".
* fixed create/delete indexes so that to switch doctypes to their own index
  is now just a change in settings---no code changes needed.
* fix DEBUG = True case by reseting queries
* this also adds a bunch of helpful comments, moves reindex_model to
  SearchMixin.index_all, and has some other cosmetic code cleanup.

End result of this is that indexing doesn't die if it hits bad data, indexing
takes much less memory to run, you can specify specific doctypes to index at
the command line, and the code is better.
  • Loading branch information...
1 parent 7c4190d commit 57f4faf507f9e995b2b8f533530ad678171ce483 @willkg willkg committed Jan 13, 2012
Showing with 168 additions and 72 deletions.
  1. +1 −1 apps/questions/models.py
  2. +42 −59 apps/search/es_utils.py
  3. +13 −1 apps/search/management/commands/esreindex.py
  4. +93 −6 apps/search/models.py
  5. +18 −4 docs/searchchapter.rst
  6. +1 −1 settings.py
View
2 apps/questions/models.py
@@ -346,7 +346,7 @@ def extract_document(self):
# answer_votes is the sum of votes for all of the answers.
answer_votes = 0
- for ans in self.answers.all():
+ for ans in self.answers.iterator():
answer_content.append(ans.content)
has_helpful = has_helpful or bool(ans.num_helpful_votes)
answer_creator.add(ans.creator.username)
View
101 apps/search/es_utils.py
@@ -1,7 +1,6 @@
from itertools import chain, count, izip
import logging
from pprint import pprint
-import time
import elasticutils
import pyes
@@ -39,66 +38,40 @@ def get_doctype_stats():
return stats
-def reindex_model(cls, percent=100):
- """Reindexes all the objects for a single mode.
+def get_es(**kwargs):
+ """Returns a fresh ES instance
- Yields number of documents done.
+ Defaults for these arguments come from settings. Specifying them
+ in the function call will override the default.
- Note: This gets run from the command line, so we log stuff to let
- the user know what's going on.
-
- :arg cls: the model class
- :arg percent: The percentage of questions to index. Defaults to
- 100--e.g. all of them.
+ :arg server: settings.ES_HOSTS
+ :arg timeout: settings.ES_INDEXING_TIMEOUT
+ :arg bulk_size: settings.ES_FLUSH_BULK_EVERY
"""
- doc_type = cls._meta.db_table
- index = cls._get_index()
-
- start_time = time.time()
-
- log.info('reindex %s into %s index', doc_type, index)
-
- es = pyes.ES(settings.ES_HOSTS, timeout=settings.ES_INDEXING_TIMEOUT)
-
- log.info('setting up mapping....')
- mapping = cls.get_mapping()
- es.put_mapping(doc_type, mapping, index)
-
- log.info('iterating through %s....', doc_type)
- total = cls.objects.count()
- to_index = int(total * (percent / 100.0))
- log.info('total %s: %s (to be indexed: %s)', doc_type, total, to_index)
- total = to_index
+ defaults = {
+ 'server': settings.ES_HOSTS,
+ 'timeout': settings.ES_INDEXING_TIMEOUT,
+ 'bulk_size': settings.ES_FLUSH_BULK_EVERY
+ }
+ defaults.update(kwargs)
- t = 0
- for obj in cls.objects.order_by('id').all():
- t += 1
- if t % 1000 == 0:
- time_to_go = (total - t) * ((time.time() - start_time) / t)
- if time_to_go < 60:
- time_to_go = "%d secs" % time_to_go
- else:
- time_to_go = "%d min" % (time_to_go / 60)
- log.info('%s/%s... (%s to go)', t, total, time_to_go)
+ return pyes.ES(**defaults)
- if t % settings.ES_FLUSH_BULK_EVERY == 0:
- es.flush_bulk()
- if t > total:
- break
+def format_time(time_to_go):
+ """Returns minutes and seconds string for given time in seconds"""
+ if time_to_go < 60:
+ return "%ds" % time_to_go
+ return "%dm %ds" % (time_to_go / 60, time_to_go % 60)
- cls.index(obj.extract_document(), bulk=True, es=es)
- yield t
- es.flush_bulk(forced=True)
- log.info('done!')
- es.refresh()
-
-
-def es_reindex_with_progress(percent=100):
+def es_reindex_with_progress(doctypes=None, percent=100):
"""Rebuild Elastic indexes as you iterate over yielded progress ratios.
+ :arg doctypes: Defaults to None which will index all doctypes.
+ Otherwise indexes the doctypes specified. See
+ :py:func:`.get_doctype_stats()` for what doctypes look like.
:arg percent: Defaults to 100. Allows you to specify how much of
each doctype you want to index. This is useful for
development where doing a full reindex takes an hour.
@@ -108,24 +81,34 @@ def es_reindex_with_progress(percent=100):
es = elasticutils.get_es()
- # Go through and delete, then recreate the indexes.
- for index in settings.ES_INDEXES.values():
- es.delete_index_if_exists(index)
- es.create_index(index)
-
search_models = get_search_models()
+ if doctypes:
+ search_models = [cls for cls in search_models
+ if cls._meta.db_table in doctypes]
+
+ if len(search_models) == len(get_search_models()):
+ index = settings.ES_INDEXES.get('default')
+ if index is not None:
+ # If we're indexing everything and there's a default index
+ # specified in settings, then we delete and recreate it.
+ es.delete_index_if_exists(index)
+ es.create_index(index)
total = sum([cls.objects.count() for cls in search_models])
- to_index = [reindex_model(cls, percent) for cls in search_models]
+ to_index = [cls.index_all(percent) for cls in search_models]
return (float(done) / total for done, _ in
izip(count(1), chain(*to_index)))
-def es_reindex(percent=100):
- """Rebuild ElasticSearch indexes"""
- [x for x in es_reindex_with_progress(percent) if False]
+def es_reindex(doctypes=None, percent=100):
+ """Rebuild ElasticSearch indexes
+
+ See :py:func:`.es_reindex_with_progress` for argument details.
+
+ """
+ [x for x in es_reindex_with_progress(doctypes, percent) if False]
def es_whazzup():
View
14 apps/search/management/commands/esreindex.py
@@ -2,6 +2,7 @@
from django.core.management.base import BaseCommand, CommandError
from optparse import make_option
from search.es_utils import es_reindex
+from search.models import get_search_models
class Command(BaseCommand):
@@ -15,4 +16,15 @@ def handle(self, *args, **options):
percent = options['percent']
if percent > 100 or percent < 1:
raise CommandError('percent should be between 1 and 100')
- es_reindex(percent)
+
+ if args:
+ search_models = get_search_models()
+ possible_doctypes = dict((cls._meta.db_table, cls)
+ for cls in search_models)
+ for mem in args:
+ if mem not in possible_doctypes:
+ raise CommandError('"%s" is not a valid doctype (%s)' %
+ (mem, possible_doctypes.keys()))
+
+ # args are the list of doctypes to index.
+ es_reindex(args, percent)
View
99 apps/search/models.py
@@ -1,14 +1,17 @@
import elasticutils
import logging
import pyes
+import time
from threading import local
from django.conf import settings
from django.core import signals
+from django.db import reset_queries
from django.db.models.signals import pre_delete, post_save
from django.dispatch import receiver
from search.tasks import index_task, unindex_task
+from search import es_utils
log = logging.getLogger('es_search')
@@ -78,7 +81,7 @@ def extract_document(self):
raise NotImplementedError
@classmethod
- def _get_index(cls):
+ def get_es_index(cls):
"""Returns the index for this class"""
indexes = settings.ES_INDEXES
return indexes.get(cls._meta.db_table) or indexes['default']
@@ -87,12 +90,94 @@ def index_later(self):
"""Register myself to be indexed at the end of the request."""
_local_tasks().add((index_task.delay, (self.__class__, (self.id,))))
-
def unindex_later(self):
"""Register myself to be unindexed at the end of the request."""
_local_tasks().add((unindex_task.delay, (self.__class__, (self.id,))))
@classmethod
+ def index_all(cls, percent=100):
+ """Reindexes all the objects for this model.
+
+ Yields number of documents done.
+
+ Note: This can get run from the command line, so we log stuff
+ to let the user know what's going on.
+
+ :arg percent: The percentage of questions to index. Defaults to
+ 100--e.g. all of them.
+
+ """
+ es = es_utils.get_es()
+
+ doc_type = cls._meta.db_table
+ index = cls.get_es_index()
+
+ if index != settings.ES_INDEXES.get('default'):
+ # If this doctype isn't using the default index, then this
+ # doctype is responsible for deleting and re-creating the
+ # index.
+ es.delete_index_if_exists(index)
+ es.create_index(index)
+
+ start_time = time.time()
+
+ log.info('reindex %s into %s index', doc_type, index)
+
+ log.info('setting up mapping....')
+ mapping = cls.get_mapping()
+ es.put_mapping(doc_type, mapping, index)
+
+ log.info('iterating through %s....', doc_type)
+ total = cls.objects.count()
+ to_index = int(total * (percent / 100.0))
+ log.info('total %s: %s (to be indexed: %s)', doc_type, total, to_index)
+ total = to_index
+
+ # Some models have a gazillion instances. So we want to go
+ # through them one at a time in a way that doesn't pull all
+ # the data into memory all at once. So we iterate through ids
+ # and pull the objects one at a time.
+ qs = cls.objects.order_by('id').values_list('id', flat=True)
+
+ for t, obj_id in enumerate(qs.iterator()):
+ if t > total:
+ break
+
+ obj = cls.objects.get(pk=obj_id)
+
+ if t % 1000 == 0 and t > 0:
+ time_to_go = (total - t) * ((time.time() - start_time) / t)
+ log.info('%s/%s... (%s to go)', t, total,
+ es_utils.format_time(time_to_go))
+
+ # We call this every 1000 or so because we're
+ # essentially loading the whole db and if DEBUG=True,
+ # then Django saves every sql statement which causes
+ # our memory to go up up up. So we reset it and that
+ # makes things happier even in DEBUG environments.
+ reset_queries()
+
+ if t % settings.ES_FLUSH_BULK_EVERY == 0:
+ # We built the ES with this setting, but it doesn't
+ # actually do anything with it unless we call
+ # flush_bulk which causes it to check its bulk_size
+ # and flush it if it's too big.
+ es.flush_bulk()
+
+ try:
+ cls.index(obj.extract_document(), bulk=True, es=es)
+ except Exception:
+ log.exception('Unable to extract/index document (id: %d)',
+ obj.id)
+
+ yield t
+
+ es.flush_bulk(forced=True)
+ end_time = time.time()
+ log.info('done! (%s)', es_utils.format_time(end_time - start_time))
+ es.refresh()
+
+ @classmethod
def index(cls, document, bulk=False, force_insert=False, refresh=False,
es=None):
"""Indexes a single document"""
@@ -102,7 +187,7 @@ def index(cls, document, bulk=False, force_insert=False, refresh=False,
if es is None:
es = elasticutils.get_es()
- index = cls._get_index()
+ index = cls.get_es_index()
doc_type = cls._meta.db_table
# TODO: handle pyes.urllib3.TimeoutErrors here.
@@ -118,7 +203,7 @@ def unindex(cls, id):
if not settings.ES_LIVE_INDEXING:
return
- index = cls._get_index()
+ index = cls.get_es_index()
doc_type = cls._meta.db_table
try:
elasticutils.get_es().delete(index, doc_type, id)
@@ -129,9 +214,11 @@ def unindex(cls, id):
_identity = lambda s: s
+
+
def register_for_indexing(sender_class,
- app,
- instance_to_indexee=_identity):
+ app,
+ instance_to_indexee=_identity):
"""Register a model whose changes might invalidate ElasticSearch indexes.
Specifically, each time an instance of this model is saved or deleted, the
View
22 docs/searchchapter.rst
@@ -188,7 +188,7 @@ Other things you can change:
``ES_FLUSH_BULK_EVERY``
- Defaults to 1000.
+ Defaults to 100.
We do bulk indexing meaning we queue up a bunch and then push them
through all at the same time. This requires memory to queue them,
@@ -248,7 +248,7 @@ Do a complete reindexing of everything by::
$ ./manage.py esreindex
This will delete the existing indexes, create new ones, and reindex
-everything in your database. On my machine it takes about > 30 minutes.
+everything in your database. On my machine it takes about an hour.
If you need to get stuff done and don't want to wait for a full indexing,
you can index a percentage of things.
@@ -263,12 +263,25 @@ This indexes 50% of your data ordered by id::
I use this when I'm fiddling with mappings and the indexing code.
+Also, you can index specific doctypes. Doctypes are named are the
+``_meta.db_table`` of the model they map to. At the time of this writing,
+there are three doctypes:
+
+* questions_question
+* wiki_document
+* forums_thread
+
+You can index specific doctypes by specifying the doctypes on the command
+line. This reindexes just questions::
+
+ $ ./manage.py esreindex questions_question
+
.. Note::
Once you've indexed everything, you won't have to do it again unless
- indexing code changes. The models have post_save and pre_delete hooks
- that will update the index as the data changes.
+ indexing code changes. The models have ``post_save`` and ``pre_delete``
+ hooks that will update the index as the data changes.
Health/statistics
@@ -278,4 +291,5 @@ You can see Elastic Search statistics/health with::
$ ./manage.py eswhazzup
+The last few lines tell you how many documents are in the index by doctype.
I use this to make sure I've got stuff in my index.
View
2 settings.py
@@ -598,7 +598,7 @@ def JINJA_CONFIG():
ES_INDEXING_TIMEOUT = 30 # 30 second timeouts for all things indexing
# Seconds between updating admin progress bar:
ES_REINDEX_PROGRESS_BAR_INTERVAL = 5
-ES_FLUSH_BULK_EVERY = 1000
+ES_FLUSH_BULK_EVERY = 100
#
# Connection information for Sphinx search

0 comments on commit 57f4faf

Please sign in to comment.
Something went wrong with that request. Please try again.