Skip to content

Commit

Permalink
Merge pull request #4368 from safwanrahman/comman
Browse files Browse the repository at this point in the history
[Fix  #4333] Implement asynchronous search reindex functionality using celery
  • Loading branch information
ericholscher committed Jul 31, 2018
2 parents 665cc08 + 652f869 commit 463f9e2
Show file tree
Hide file tree
Showing 20 changed files with 382 additions and 93 deletions.
2 changes: 1 addition & 1 deletion conftest.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
# -*- coding: utf-8 -*-
import pytest
from django.conf import settings
from rest_framework.test import APIClient

try:
Expand Down Expand Up @@ -46,7 +47,6 @@ def pytest_configure(config):
def settings_modification(settings):
settings.CELERY_ALWAYS_EAGER = True


@pytest.fixture
def api_client():
return APIClient()
58 changes: 0 additions & 58 deletions readthedocs/core/management/commands/reindex_elasticsearch.py

This file was deleted.

22 changes: 22 additions & 0 deletions readthedocs/projects/migrations/0028_importedfile_modified_date.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
# -*- coding: utf-8 -*-
# Generated by Django 1.9.13 on 2018-07-27 09:54
from __future__ import unicode_literals

from django.db import migrations, models
import django.utils.timezone


class Migration(migrations.Migration):

dependencies = [
('projects', '0027_add_htmlfile_model'),
]

operations = [
migrations.AddField(
model_name='importedfile',
name='modified_date',
field=models.DateTimeField(auto_now=True, default=django.utils.timezone.now, verbose_name='Modified date'),
preserve_default=False,
),
]
2 changes: 2 additions & 0 deletions readthedocs/projects/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
from django.contrib.auth.models import User
from django.core.urlresolvers import NoReverseMatch, reverse
from django.db import models
from django.utils import timezone
from django.utils.encoding import python_2_unicode_compatible
from django.utils.functional import cached_property
from django.utils.translation import ugettext_lazy as _
Expand Down Expand Up @@ -911,6 +912,7 @@ class ImportedFile(models.Model):
path = models.CharField(_('Path'), max_length=255)
md5 = models.CharField(_('MD5 checksum'), max_length=255)
commit = models.CharField(_('Commit'), max_length=255)
modified_date = models.DateTimeField(_('Modified date'), auto_now=True)

def get_absolute_url(self):
return resolve(project=self.project, version_slug=self.version.slug, filename=self.path)
Expand Down
4 changes: 4 additions & 0 deletions readthedocs/projects/signals.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,3 +14,7 @@
project_import = django.dispatch.Signal(providing_args=["project"])

files_changed = django.dispatch.Signal(providing_args=["project", "files"])

bulk_post_create = django.dispatch.Signal(providing_args=["instance_list"])

bulk_post_delete = django.dispatch.Signal(providing_args=["instance_list"])
30 changes: 22 additions & 8 deletions readthedocs/projects/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,8 @@
from .constants import LOG_TEMPLATE
from .exceptions import RepositoryError
from .models import ImportedFile, Project, Domain, Feature, HTMLFile
from .signals import before_vcs, after_vcs, before_build, after_build, files_changed
from .signals import before_vcs, after_vcs, before_build, after_build, files_changed, \
bulk_post_create, bulk_post_delete
from readthedocs.builds.constants import (
BUILD_STATE_BUILDING, BUILD_STATE_CLONING, BUILD_STATE_FINISHED,
BUILD_STATE_INSTALLING, LATEST, LATEST_VERBOSE_NAME, STABLE_VERBOSE_NAME)
Expand Down Expand Up @@ -986,6 +987,7 @@ def _manage_imported_files(version, path, commit):
:param commit: Commit that updated path
"""
changed_files = set()
created_html_files = []
for root, __, filenames in os.walk(path):
for filename in filenames:
if fnmatch.fnmatch(filename, '*.html'):
Expand Down Expand Up @@ -1015,15 +1017,27 @@ def _manage_imported_files(version, path, commit):
obj.commit = commit
obj.save()

# Delete the HTMLFile first from previous versions
HTMLFile.objects.filter(project=version.project,
version=version
).exclude(commit=commit).delete()
if model_class == HTMLFile:
# the `obj` is HTMLFile, so add it to the list
created_html_files.append(obj)

# Send bulk_post_create signal for bulk indexing to Elasticsearch
bulk_post_create.send(sender=HTMLFile, instance_list=created_html_files)

# Delete the HTMLFile first from previous commit and
# send bulk_post_delete signal for bulk removing from Elasticsearch
delete_queryset = (HTMLFile.objects.filter(project=version.project, version=version)
.exclude(commit=commit))
# Keep the objects into memory to send it to signal
instance_list = list(delete_queryset)
# Safely delete from database
delete_queryset.delete()
# Always pass the list of instance, not queryset.
bulk_post_delete.send(sender=HTMLFile, instance_list=instance_list)

# Delete ImportedFiles from previous versions
ImportedFile.objects.filter(project=version.project,
version=version
).exclude(commit=commit).delete()
(ImportedFile.objects.filter(project=version.project, version=version)
.exclude(commit=commit).delete())
changed_files = [
resolve_path(
version.project, filename=file, version_slug=version.slug,
Expand Down
1 change: 1 addition & 0 deletions readthedocs/search/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
default_app_config = 'readthedocs.search.apps.SearchConfig'
10 changes: 10 additions & 0 deletions readthedocs/search/apps.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
"""Project app config"""

from django.apps import AppConfig


class SearchConfig(AppConfig):
name = 'readthedocs.search'

def ready(self):
from .signals import index_html_file, remove_html_file
27 changes: 5 additions & 22 deletions readthedocs/search/documents.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,9 @@
from elasticsearch_dsl.query import SimpleQueryString, Bool

from readthedocs.projects.models import Project, HTMLFile
from .conf import SEARCH_EXCLUDED_FILE

from readthedocs.search.faceted_search import ProjectSearch, FileSearch
from .conf import SEARCH_EXCLUDED_FILE
from .mixins import RTDDocTypeMixin

project_conf = settings.ES_INDEXES['project']
project_index = Index(project_conf['name'])
Expand All @@ -17,7 +17,7 @@


@project_index.doc_type
class ProjectDocument(DocType):
class ProjectDocument(RTDDocTypeMixin, DocType):

class Meta(object):
model = Project
Expand Down Expand Up @@ -47,11 +47,12 @@ def faceted_search(cls, query, language=None, using=None, index=None):


@page_index.doc_type
class PageDocument(DocType):
class PageDocument(RTDDocTypeMixin, DocType):

class Meta(object):
model = HTMLFile
fields = ('commit',)
ignore_signals = settings.ES_PAGE_IGNORE_SIGNALS

project = fields.KeywordField(attr='project.slug')
version = fields.KeywordField(attr='version.slug')
Expand Down Expand Up @@ -121,21 +122,3 @@ def get_queryset(self):
queryset = (queryset.filter(project__documentation_type='sphinx')
.exclude(name__in=SEARCH_EXCLUDED_FILE))
return queryset

def update(self, thing, refresh=None, action='index', **kwargs):
"""Overwrite in order to index only certain files"""
# Object not exist in the provided queryset should not be indexed
# TODO: remove this overwrite when the issue has been fixed
# See below link for more information
# https://github.com/sabricot/django-elasticsearch-dsl/issues/111
# Moreover, do not need to check if its a delete action
# Because while delete action, the object is already remove from database
if isinstance(thing, HTMLFile) and action != 'delete':
# Its a model instance.
queryset = self.get_queryset()
obj = queryset.filter(pk=thing.pk)
if not obj.exists():
return None

return super(PageDocument, self).update(thing=thing, refresh=refresh,
action=action, **kwargs)
Empty file.
Empty file.
101 changes: 101 additions & 0 deletions readthedocs/search/management/commands/reindex_elasticsearch.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
import datetime
import logging

from celery import chord, chain
from django.apps import apps
from django.conf import settings
from django.core.management import BaseCommand
from django.utils import timezone
from django_elasticsearch_dsl.registries import registry

from ...tasks import (index_objects_to_es, switch_es_index, create_new_es_index,
index_missing_objects)
from ...utils import chunk_queryset

log = logging.getLogger(__name__)


class Command(BaseCommand):

@staticmethod
def _get_indexing_tasks(app_label, model_name, queryset, document_class, index_name):
queryset = queryset.values_list('id', flat=True)
chunked_queryset = chunk_queryset(queryset, settings.ES_TASK_CHUNK_SIZE)

for chunk in chunked_queryset:
data = {
'app_label': app_label,
'model_name': model_name,
'document_class': document_class,
'index_name': index_name,
'objects_id': list(chunk)
}
yield index_objects_to_es.si(**data)

def _run_reindex_tasks(self, models):
for doc in registry.get_documents(models):
queryset = doc().get_queryset()
# Get latest object from the queryset
index_time = timezone.now()

app_label = queryset.model._meta.app_label
model_name = queryset.model.__name__

index_name = doc._doc_type.index
timestamp = datetime.datetime.now().strftime('%Y%m%d%H%M%S')
new_index_name = "{}_{}".format(index_name, timestamp)

pre_index_task = create_new_es_index.si(app_label=app_label,
model_name=model_name,
index_name=index_name,
new_index_name=new_index_name)

indexing_tasks = self._get_indexing_tasks(app_label=app_label, model_name=model_name,
queryset=queryset,
document_class=str(doc),
index_name=new_index_name)

post_index_task = switch_es_index.si(app_label=app_label, model_name=model_name,
index_name=index_name,
new_index_name=new_index_name)

# Task to run in order to add the objects
# that has been inserted into database while indexing_tasks was running
# We pass the creation time of latest object, so its possible to index later items
missed_index_task = index_missing_objects.si(app_label=app_label,
model_name=model_name,
document_class=str(doc),
index_generation_time=index_time)

# http://celery.readthedocs.io/en/latest/userguide/canvas.html#chords
chord_tasks = chord(header=indexing_tasks, body=post_index_task)
# http://celery.readthedocs.io/en/latest/userguide/canvas.html#chain
chain(pre_index_task, chord_tasks, missed_index_task).apply_async()

message = ("Successfully issued tasks for {}.{}, total {} items"
.format(app_label, model_name, queryset.count()))
log.info(message)

def add_arguments(self, parser):
parser.add_argument(
'--models',
dest='models',
type=str,
nargs='*',
help=("Specify the model to be updated in elasticsearch."
"The format is <app_label>.<model_name>")
)

def handle(self, *args, **options):
"""
Index models into Elasticsearch index asynchronously using celery.
You can specify model to get indexed by passing
`--model <app_label>.<model_name>` parameter.
Otherwise, it will reindex all the models
"""
models = None
if options['models']:
models = [apps.get_model(model_name) for model_name in options['models']]

self._run_reindex_tasks(models=models)
Loading

0 comments on commit 463f9e2

Please sign in to comment.