Skip to content
This repository has been archived by the owner on Jan 31, 2018. It is now read-only.

Commit

Permalink
[bug 788527] Add indexing to search admin
Browse files Browse the repository at this point in the history
* add indexing-related celery tasks
* add indexing form in admin
* add reset form in admin
* add record history to admin
* add test for index_chunk_test
* fix ES test infrastructure to work correctly
* updated ES docs
  • Loading branch information
willkg committed Sep 10, 2012
1 parent 29634e3 commit 4561ae7
Show file tree
Hide file tree
Showing 12 changed files with 417 additions and 21 deletions.
19 changes: 7 additions & 12 deletions docs/es.rst
Expand Up @@ -101,25 +101,20 @@ See ``--help`` for more details::

.. Note::

Once you've indexed everything, if you have ``ES_LIVE_INDEXING``
set to ``True``, you won't have to do it again unless indexing code
changes. The models have ``post_save`` and ``pre_delete`` hooks
that will update the index as the data changes.


.. Note::
TODO: This doesn't work with celery 2.1, but will when we upgrade
to something more recent. Leaving it here in the docs because we're
definitely upgrading.

If you kick off indexing with the admin, then indexing gets done in
chunks by celery tasks. If you need to halt indexing, you can purge
the tasks with::

$ ./manage.py celeryctl purge

If you purge the tasks, you need to reset the Redis scoreboard.
Connect to the appropriate Redis and set the value for the magic
key to 0. For example, my Redis is running at port 6383, so I::
If you purge the tasks, you need to cancel outstanding records. Run
this sql in mysql::

$ redis-cli -p 6383 set search:outstanding_index_chunks 0
UPDATE search_record SET status=2 WHERE status=0 or status=1;

If you do this often, it helps to write a shell script for it.

Expand Down Expand Up @@ -150,7 +145,7 @@ Maintaining your index
======================

When you add data to the database, it needs to be added to the index.
This happens automatically in the post_save hook as long as celery
This happens automatically in the ``post_save`` hook as long as celery
tasks are being handled.

You can also reindex everything using the admin or using the esreindex
Expand Down
70 changes: 69 additions & 1 deletion fjord/search/admin.py
Expand Up @@ -2,22 +2,85 @@
from datetime import datetime

from django.contrib import admin
from django.http import HttpResponseRedirect
from django.shortcuts import render

import pyes

from fjord.search.index import get_index, get_index_stats, get_indexes
from fjord.search.index import (
chunked, get_index, get_index_stats, get_indexes, get_indexable,
recreate_index, create_batch_id)
from fjord.search.models import Record
from fjord.search.tasks import index_chunk_task


log = logging.getLogger('i.search')


CHUNK_SIZE = 50000


def handle_reset(request):
"""Mark outstanding Records as failed.
Why? You'd want to reset the system if it gets itself wedged
thinking there are outstanding tasks, but there aren't. This
lets you fix that.
"""
for rec in Record.outstanding():
rec.mark_fail('Cancelled.')
return HttpResponseRedirect(request.path)


def handle_reindex(request):
"""Caculate chunks and kick off indexing tasks."""
index = get_index()

batch_id = create_batch_id()

# Break up all the things we want to index into chunks. This
# chunkifies by class then by chunk size.
chunks = []
for cls, indexable in get_indexable():
chunks.extend(
(cls, chunk) for chunk in chunked(indexable, CHUNK_SIZE))

# The previous lines do a lot of work and take some time to
# execute. So we wait until here to wipe and rebuild the
# index. That reduces the time that there is no index by a little.
recreate_index()

for cls, id_list in chunks:
chunk_name = '%s %d -> %d' % (cls.get_mapping_type_name(),
id_list[0], id_list[-1])
rec = Record(batch_id=batch_id, name=chunk_name)
rec.save()
index_chunk_task.delay(index, batch_id, rec.id, (cls, id_list))

return HttpResponseRedirect(request.path)


def search_admin_view(request):
"""Render the admin view containing search tools"""
error_messages = []
stats = None
indexes = []

reset_requested = 'reset' in request.POST
if reset_requested:
try:
return handle_reset(request)
except Exception as exc:
error_messages.append(u'Error: %s' % exc.message)

reindex_requested = 'reindex' in request.POST
if reindex_requested:
try:
return handle_reindex(request)
except Exception as exc:
error_messages.append(u'Error: %s' % exc.message)

try:
# This gets index stats, but also tells us whether ES is in
# a bad state.
Expand All @@ -38,12 +101,17 @@ def search_admin_view(request):
error_messages.append('Error: Connection to Elastic Search timed out. '
'(TimeoutError)')

outstanding_records = Record.outstanding()
recent_records = Record.objects.order_by('-creation_time')[:20]

return render(request, 'admin/search_admin_view.html', {
'title': 'Search',
'mapping_type_stats': stats,
'indexes': indexes,
'index': get_index(),
'error_messages': error_messages,
'recent_records': recent_records,
'outstanding_records': outstanding_records,
'now': datetime.now(),
})

Expand Down
8 changes: 8 additions & 0 deletions fjord/search/index.py
Expand Up @@ -73,6 +73,14 @@ def format_time(time_to_go):
return "%dm %ds" % (time_to_go / 60, time_to_go % 60)


def create_batch_id():
"""Returns a batch_id"""
# TODO: This is silly, but it's a good enough way to distinguish
# between batches by looking at a Record. This is just over the
# number of seconds in a day.
return str(int(time.time()))[-6:]


def chunked(iterable, n):
"""Return chunks of n length of iterable.
Expand Down
44 changes: 44 additions & 0 deletions fjord/search/migrations/0001_initial.py
@@ -0,0 +1,44 @@
# -*- coding: utf-8 -*-
import datetime
from south.db import db
from south.v2 import SchemaMigration
from django.db import models


class Migration(SchemaMigration):

def forwards(self, orm):
# Adding model 'Record'
db.create_table('search_record', (
('id', self.gf('django.db.models.fields.AutoField')(primary_key=True)),
('batch_id', self.gf('django.db.models.fields.CharField')(max_length=10)),
('name', self.gf('django.db.models.fields.CharField')(max_length=255)),
('creation_time', self.gf('django.db.models.fields.DateTimeField')(auto_now_add=True, blank=True)),
('start_time', self.gf('django.db.models.fields.DateTimeField')(null=True)),
('end_time', self.gf('django.db.models.fields.DateTimeField')(null=True)),
('status', self.gf('django.db.models.fields.IntegerField')(default=0)),
('message', self.gf('django.db.models.fields.CharField')(max_length=255, blank=True)),
))
db.send_create_signal('search', ['Record'])


def backwards(self, orm):
# Deleting model 'Record'
db.delete_table('search_record')


models = {
'search.record': {
'Meta': {'object_name': 'Record'},
'batch_id': ('django.db.models.fields.CharField', [], {'max_length': '10'}),
'creation_time': ('django.db.models.fields.DateTimeField', [], {'auto_now_add': 'True', 'blank': 'True'}),
'end_time': ('django.db.models.fields.DateTimeField', [], {'null': 'True'}),
'id': ('django.db.models.fields.AutoField', [], {'primary_key': 'True'}),
'message': ('django.db.models.fields.CharField', [], {'max_length': '255', 'blank': 'True'}),
'name': ('django.db.models.fields.CharField', [], {'max_length': '255'}),
'start_time': ('django.db.models.fields.DateTimeField', [], {'null': 'True'}),
'status': ('django.db.models.fields.IntegerField', [], {'default': '0'})
}
}

complete_apps = ['search']
Empty file.
67 changes: 67 additions & 0 deletions fjord/search/models.py
@@ -0,0 +1,67 @@
import datetime

from django.db import models


# Note: This doesn't extend our caching ModelBase because we
# explicitly want the livest data possible and not cached data.

class Record(models.Model):
"""Indexing record."""
STATUS_NEW = 0
STATUS_IN_PROGRESS = 1
STATUS_FAIL = 2
STATUS_SUCCESS = 3

STATUS_CHOICES = (
(STATUS_NEW, 'new'),
(STATUS_IN_PROGRESS, 'in progress'),
(STATUS_FAIL, 'done - fail'),
(STATUS_SUCCESS, 'done - success'),
)

batch_id = models.CharField(max_length=10)
name = models.CharField(max_length=255)
creation_time = models.DateTimeField(auto_now_add=True)
start_time = models.DateTimeField(null=True)
end_time = models.DateTimeField(null=True)
status = models.IntegerField(choices=STATUS_CHOICES, default=STATUS_NEW)
message = models.CharField(max_length=255, blank=True)

def delta(self):
"""Return the timedelta."""
if self.start_time and self.end_time:
return self.end_time - self.start_time
return None

def _complete(self, status, msg='Done'):
self.end_time = datetime.datetime.now()
self.status = status
self.message = msg

def mark_fail(self, msg):
"""Mark as failed.
:arg msg: the error message it failed with
"""
self._complete(self.STATUS_FAIL, msg)
self.save()

def mark_success(self, msg='Success'):
"""Mark as succeeded.
:arg msg: success message if any
"""
self._complete(self.STATUS_SUCCESS, msg)
self.save()

@classmethod
def outstanding(cls):
"""Return queryset of outstanding records."""
return cls.objects.filter(status__in=[
cls.STATUS_NEW, cls.STATUS_IN_PROGRESS])

def __unicode__(self):
return '%s:%s%s' % (self.batch_id, self.name, self.status)
48 changes: 48 additions & 0 deletions fjord/search/tasks.py
@@ -0,0 +1,48 @@
import datetime
import logging
import sys

from celery.decorators import task
from multidb.pinning import pin_this_thread, unpin_this_thread

from fjord.search.index import index_chunk
from fjord.search.models import Record


log = logging.getLogger('i.task')


@task
def index_chunk_task(index, batch_id, rec_id, chunk):
"""Index a chunk of things.
:arg index: the name of the index to index to
:arg batch_id: the name for the batch this chunk belongs to
:arg rec_id: the id for the record for this task
:arg chunk: a (class, id_list) of things to index
"""
cls, id_list = chunk

try:
# Pin to master db to avoid replication lag issues and stale
# data.
pin_this_thread()

# Update record data.
rec = Record.objects.get(pk=rec_id)
rec.start_time = datetime.datetime.now()
rec.message = u'Reindexing into %s' % index
rec.status = Record.STATUS_IN_PROGRESS
rec.save()

index_chunk(cls, id_list, reraise=True)

rec.mark_success()

except Exception:
rec.mark_fail(u'Errored out %s %s' % (
sys.exc_type, sys.exc_value))
raise

finally:
unpin_this_thread()

0 comments on commit 4561ae7

Please sign in to comment.