Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Fix #4333] Implement asynchronous search reindex functionality using celery #4368

Merged
merged 15 commits into from
Jul 31, 2018

Conversation

safwanrahman
Copy link
Member

@safwanrahman safwanrahman commented Jul 13, 2018

The management command reindex_elasticsearch has been rewritten from scratch using celery tasks.
The idea is taken from @robhudson blog post and heavily inspired from the code of mozilla/zamboni.

Need to overwrite some method of django-elasticsearch-dsl in order to support zero downtime rebuild (django-es/django-elasticsearch-dsl#75). I am hoping to send a PR to upstream.

This fixes #4333

@ericholscher @rtfd/core r?

@safwanrahman safwanrahman added this to Backlog in Search update via automation Jul 13, 2018
@safwanrahman safwanrahman moved this from Backlog to In progress in Search update Jul 13, 2018
@safwanrahman
Copy link
Member Author

I have tested with about 10K files in single celery host with 4 workers. The indexing got finished within 20 seconds.

Copy link
Member

@ericholscher ericholscher left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I like the approach of using the chain/chord to handle indexing and index creation. Does this work locally with CELERY_ALWAYS_EAGER, I believe Rob mentioning that in his blog post, or will we just use the existing DED indexing locally?

I'm a little concerned about the complexity of this approach. Is there a reason we're using chunks here when we already have a domain object to chunk on which is the Version? This feels like extra work to do and code to maintain, when we already have an existing way to think about this problem.

This approach also doesn't use the same code path for indexing as our application code, so now we have two different ways of indexing files, which doesn't seem great.

It really feels like all we need is:

  • A management commands that iterates over versions, and sends them to be indexed
  • A celery task that takes a version and indexes all the files in that version, which is called both in production as well as in reindexing.

I'm happy to talk about this more. There are likely some design decisions that you made that I don't understand :)

print("unsuccessful", name)


def fetch(url=None, all_projects=[]):
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

all_projects should be defined outside the function, it's confusing as defined here.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oops! This file is pushed mistakenly. I have written it for local purpose of fetching. removing it.

@staticmethod
def _get_models(args):
for model_name in args:
yield apps.get_model(model_name)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This doesn't need to be broken out into it's own function. It just increases complexity for little value.

"The format is <app_label>.<model_name>")
)

def handle(self, *args, **options):
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Needs a docstring showing how to run it.

),
}

def _get_actions(self, object_list, action, index_name=None):
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why are we overriding this? Needs a docstring.


for document in documents:
if str(document) == document_class:
return document
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is the logic here for? Will there be multiple documents for a model ever? Also needs a docstring :)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes. there can be multiple documents for a model

class FooDocument(DocType):
    ...
    class Meta:
        model = Bar

class NewFooDocument(DocType):
    ...
    class Meta:
        model = Bar

def _run_reindex_tasks(self, models):
for doc in registry.get_documents(models):
qs = doc().get_queryset()
instance_ids = list(qs.values_list('id', flat=True))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this not going to use a ton of memory? This is creating the entire list of objects in memory all at once, instead of streaming them.

Copy link
Member Author

@safwanrahman safwanrahman Jul 16, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Its not actually creating list of objects, Its just a list of integers. I think integers take much lower amount of memory.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm pretty worried this is going to be both slow & perhaps incredibly memory intensive in production when we have 2 million objects in memory here. I'd like to find another approach that streams the data if possible, but we can try this for now and see what happens.

Copy link
Member Author

@safwanrahman safwanrahman Jul 27, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@ericholscher I think we can try this now to check if anything wrong happens.
I have tested with 2 million integers, I takes about 16 MB of RAM. so I think its not big issue.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@ericholscher I have optimized the memory usage in 143ce7f



@app.task(queue='web')
def switch_es_index_task(app_label, model_name, index_name, new_index_name):
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We don't put task at the end of our tasks.

Copy link
Member Author

@safwanrahman safwanrahman Jul 16, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So what suffix do we put to understand that its a task? I thought its clearer to understand from code that its a task so do not call it directly.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We don't use any suffix in our code. We should be keeping the same coding standards here as elsewhere. If we want to change this, we need to change it everywhere, otherwise it's even more confusing having half of our tasks end in task.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah. Fixed it in latest commit

@ericholscher
Copy link
Member

ericholscher commented Jul 16, 2018

Another thought -- should this be implemented as a contribution to DED with a Celery backend, instead of our own custom logic? It sounds like we might be able to do it fix Celery with a setting: https://github.com/sabricot/django-elasticsearch-dsl#elasticsearch_dsl_signal_processor

We could also perhaps add a celery flag to the existing indexer command? So that we don't have to maintain our own set of code around indexing.

@safwanrahman
Copy link
Member Author

Another thought -- should this be implemented as a contribution to DED with a Celery backend, instead of our own custom logic? It sounds like we might be able to do it fix Celery with a setting:
https://github.com/sabricot/django-elasticsearch-dsl#elasticsearch_dsl_signal_processor

Sure, I will surely port it to DED, but I wonder if we can use the signal processor as that is called while saving/updating the objects.

We could also perhaps add a celery flag to the existing indexer command? So that we don't have to maintain our own set of code around indexing.

Thats really true. But it needs some time to get it reviewed and merged into master. I will open a PR there soon, but to get our deployment soon, we can keep it here untill then.

@ericholscher
Copy link
Member

Thats really true. But it needs some time to get it reviewed and merged into master. I will open a PR there soon, but to get our deployment soon, we can keep it here untill then.

We can always deploy a forked version while we wait for it to get accepted. It's true though we don't want to be waiting for review & merge from them, but perhaps it will be quick.

@safwanrahman
Copy link
Member Author

safwanrahman commented Jul 16, 2018

I'm a little concerned about the complexity of this approach. Is there a reason we're using chunks here when we already have a domain object to chunk on which is the Version? This feels like extra work to do and code to maintain, when we already have an existing way to think about this problem.

This is a general purpose management command for indexing all type of documents like Project/HTMLFile, not only the HTMLFile. So I had only one way to make a chunk depending on number.

This approach also doesn't use the same code path for indexing as our application code, so now we have two different ways of indexing files, which doesn't seem great.

Yeah, thats true. This management command actually reindex all the documents. On the otherside, DED catch the signal when a new file is created and index it into Elasticsearch.

It really feels like all we need is:
* A management commands that iterates over versions, and sends them to be indexed
* A celery task that takes a version and indexes all the files in that version, which is called both in production as well as in reindexing.

As mentioned above, the management command is general purpose. So something specially for HTMLFile would be extra implementation that maybe not available when we port it to DED

@safwanrahman
Copy link
Member Author

Does this work locally with CELERY_ALWAYS_EAGER, I believe Rob mentioning that in his blog post, or will we just use the existing DED indexing locally?

Yes, it works with CELERY_ALWAYS_EAGER. Maybe it was broken in the past, but maybe fixed from the celery end.

@ericholscher
Copy link
Member

Yeah, thats true. This management command actually reindex all the documents. On the otherside, DED catch the signal when a new file is created and index it into Elasticsearch.

Right, but that doesn’t scale properly right now. It currently sends an http request per file saved during project build. We need to batch it, which could use similar logic to this.

@safwanrahman
Copy link
Member Author

Right, but that doesn’t scale properly right now. It currently sends an http request per file saved during project build. We need to batch it, which could use similar logic to this.

I understand. but how do we get the files in batch? Do there any signal that is sent in batch?

@ericholscher
Copy link
Member

ericholscher commented Jul 17, 2018

We should just update the code to keep track of the files that are changed, as it works now. We can use the existing files_changed signal, or implement something that just does it in that function natively.

@safwanrahman
Copy link
Member Author

Do we need this to be another Celery task? It's already running inside of Celery, so I'm not sure we need to delay it here. It could lead to weird race conditions if it got queued up for a long time.

Good catch @ericholscher. I will fix it in synchronous task. Thanks

@safwanrahman
Copy link
Member Author

@ericholscher I have fixed the issues as you mentioned and fixed the tests. Also added a comment in #4264 (comment) for testing it in backlog. r?

@safwanrahman
Copy link
Member Author

@ericholscher I have also fixed #4409 with 612cfb8 . So there will not be any auto indexing to elasticsearch in local development.
r?

queryset = doc().get_queryset()
# Get latest object from the queryset
latest_object = queryset.latest('modified_date')
latest_object_time = latest_object.modified_date
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should just use the current time, not query the entire queryset for the time. This will be quite a slow query likely in prod.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah! I was also thinking about it.

@safwanrahman
Copy link
Member Author

@ericholscher Fixed the timestamp issue. r?

@ericholscher
Copy link
Member

This looks good. 👍

@ericholscher ericholscher merged commit 463f9e2 into readthedocs:search_upgrade Jul 31, 2018
Search update automation moved this from In progress to Done Jul 31, 2018
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
No open projects
Search update
  
Done
Development

Successfully merging this pull request may close these issues.

None yet

3 participants