Skip to content
This repository has been archived by the owner on Jan 28, 2020. It is now read-only.

Commit

Permalink
Chunked bulk indexing to lower memory footprint
Browse files Browse the repository at this point in the history
  • Loading branch information
George Schneeloch committed Oct 7, 2015
1 parent 85b1f42 commit 40c4e4f
Show file tree
Hide file tree
Showing 10 changed files with 251 additions and 99 deletions.
6 changes: 3 additions & 3 deletions learningresources/tests/base.py
Expand Up @@ -28,7 +28,7 @@
update_description_path
)
from learningresources.models import Repository, StaticAsset
from search.utils import clear_index, refresh_index
from search.utils import recreate_index, refresh_index

log = logging.getLogger(__name__)
# Using the md5 hasher speeds up tests.
Expand Down Expand Up @@ -87,7 +87,7 @@ def create_resource(self, **kwargs):
def setUp(self):
"""set up"""
super(LoreTestCase, self).setUp()
clear_index()
recreate_index()
self.user = User.objects.create_user(
username=self.USERNAME, password=self.PASSWORD
)
Expand Down Expand Up @@ -134,7 +134,7 @@ def tearDown(self):
for key, _ in haystack.connections.connections_info.items():
haystack.connections.reload(key)
call_command('clear_index', interactive=False, verbosity=0)
clear_index()
recreate_index()
refresh_index()

def _make_archive(self, path, make_zip=False, ext=None):
Expand Down
11 changes: 11 additions & 0 deletions search/exceptions.py
@@ -0,0 +1,11 @@
"""
Exceptions related to search.
"""

from __future__ import unicode_literals


class ReindexException(Exception):
"""
Exception thrown when reindexing Elasticsearch.
"""
10 changes: 5 additions & 5 deletions search/management/commands/recreate_index.py
Expand Up @@ -6,15 +6,15 @@

from django.core.management.base import BaseCommand

from search.utils import clear_index
from search.utils import recreate_index


class Command(BaseCommand):
"""
Command for sync_permissions
Command for recreate_index.
"""
help = "Refreshes the Elasticsearch index."
help = "Clears the Elasticsearch index and recreates it."

def handle(self, *args, **options):
"""Command handler"""
clear_index()
"""Command for recreate_index"""
recreate_index()
9 changes: 5 additions & 4 deletions search/management/commands/refresh_index.py
Expand Up @@ -7,15 +7,16 @@
from django.core.management.base import BaseCommand

from learningresources.models import LearningResource
from search.utils import index_resources
from search.utils import index_resources, create_mapping


class Command(BaseCommand):
"""
Command for sync_permissions
Command for refresh_index
"""
help = "Refreshes the Elasticsearch index."
help = "Updates the Elasticsearch index and mapping."

def handle(self, *args, **options):
"""Command handler"""
"""Refreshes the Elasticsearch index."""
create_mapping()
index_resources(LearningResource.objects.all())
7 changes: 3 additions & 4 deletions search/migrations/0001_initial.py
Expand Up @@ -8,15 +8,14 @@

from __future__ import unicode_literals

from django.db import models, migrations

from search.utils import create_mapping, index_resources
from django.db import migrations

# pylint: skip-file


def create_learning_resource_mapping(apps, schema_editor):
create_mapping()
pass # changed to no-op


class Migration(migrations.Migration):

Expand Down
7 changes: 3 additions & 4 deletions search/migrations/0002_update_mapping.py
Expand Up @@ -8,15 +8,14 @@

from __future__ import unicode_literals

from django.db import models, migrations

from search.utils import create_mapping, index_resources
from django.db import migrations

# pylint: skip-file


def update_mapping(apps, schema_editor):
create_mapping()
pass # Changed to no-op


class Migration(migrations.Migration):

Expand Down
4 changes: 2 additions & 2 deletions search/signals.py
Expand Up @@ -71,8 +71,8 @@ def handle_resource_deletion(sender, **kwargs):
instance = kwargs.pop("instance")
if instance.__class__.__name__ != "LearningResource":
return
from search.utils import delete_index
delete_index(instance)
from search.utils import delete_resource_from_index
delete_resource_from_index(instance)


@statsd.timer('lore.elasticsearch.taxonomy_create')
Expand Down
148 changes: 147 additions & 1 deletion search/tests/test_es_indexing.py
Expand Up @@ -8,15 +8,31 @@

from __future__ import unicode_literals

import json
import logging

from django.contrib.auth.models import User
from django.test.testcases import call_command
from rest_framework.status import HTTP_200_OK

from learningresources.api import create_repo
from learningresources.tests.base import LoreTestCase
from learningresources.models import LearningResource
from importer.api import import_course_from_file
from rest.tests.base import API_BASE
from search.exceptions import ReindexException
from search.search_indexes import cache
from search.sorting import LoreSortingFields
from search.tests.base_es import SearchTestCase
from search.utils import index_resources, search_index, refresh_index
from search.utils import (
INDEX_NAME,
create_mapping,
get_conn,
index_resources,
search_index,
refresh_index,
remove_index,
)

log = logging.getLogger(__name__)

Expand Down Expand Up @@ -255,3 +271,133 @@ def test_sorting(self):
sec_res.id,
res4.id
)

def test_index_exception(self):
"""
Ensure that an exception is thrown if we have not indexed properly.
"""
# Delete the index
remove_index()

search_url = "/api/v1/repositories/{repo}/search/".format(
repo=self.repo.slug
)
with self.assertRaises(ReindexException):
self.client.get(search_url)

# Create a mapping
with self.assertRaises(ReindexException):
# Mapping doesn't exist yet
get_conn()

conn = get_conn(verify=False)
conn.indices.create(INDEX_NAME)
conn.indices.refresh()
create_mapping()

resp = self.client.get(search_url)
self.assertEqual(resp.status_code, HTTP_200_OK)
self.assertEqual(json.loads(resp.content.decode('utf-8'))['count'], 0)

# Reindex all resources
resources = LearningResource.objects.all()
index_resources(resources)

resp = self.client.get(search_url)
self.assertEqual(resp.status_code, HTTP_200_OK)
self.assertEqual(
json.loads(resp.content.decode('utf-8'))['count'],
resources.count()
)


class TestIndexFromScratch(LoreTestCase):
"""
Test behavior of indexing on first run of application.
"""
def setUp(self):
"""Override to only remove index."""
username = 'user'
password = 'pass'
self.user = User.objects.create_user(
username=username, password=password
)
self.client.login(username=username, password=password)
remove_index()

def tearDown(self):
"""Override to only remove index."""
remove_index()

def test_create_repo_without_reindex(self):
"""
Test that user gets an error message if they have never reindexed.
"""
repo = create_repo("new repo", "new repo", self.user.id)
search_url = "{api_base}repositories/{repo_slug}/search/".format(
api_base=API_BASE,
repo_slug=repo.slug
)

# When user installs LORE they must call refresh_index to create the
# index, else they get this error message.
with self.assertRaises(ReindexException):
self.client.get(search_url)

def test_create_repo_with_refresh_index(self):
"""
Test that we can create a new repo and that the refresh_index command
indexes appropriately. Note that this test has custom setUp
and tearDown code.
"""
# When user installs LORE they must call refresh_index
# or recreate_index to create the index.
call_command("refresh_index")

repo = create_repo("new repo", "new repo", self.user.id)
search_url = "{api_base}repositories/{repo_slug}/search/".format(
api_base=API_BASE,
repo_slug=repo.slug
)

resp = self.client.get(search_url)
self.assertEqual(resp.status_code, HTTP_200_OK)
result = json.loads(resp.content.decode('utf-8'))
self.assertEqual(0, result['count'])

# Import. This should index the resources automatically.
import_course_from_file(self.get_course_zip(), repo.id, self.user.id)

resp = self.client.get(search_url)
self.assertEqual(resp.status_code, HTTP_200_OK)
result = json.loads(resp.content.decode('utf-8'))
self.assertTrue(result['count'] > 0)

def test_create_repo_with_recreate_index(self):
"""
Test that we can create a new repo and that the refresh_index command
indexes appropriately. Note that this test has custom setUp
and tearDown code.
"""
# When user installs LORE they must call refresh_index
# or recreate_index to create the index.
call_command("recreate_index")

repo = create_repo("new repo", "new repo", self.user.id)
search_url = "{api_base}repositories/{repo_slug}/search/".format(
api_base=API_BASE,
repo_slug=repo.slug
)

resp = self.client.get(search_url)
self.assertEqual(resp.status_code, HTTP_200_OK)
result = json.loads(resp.content.decode('utf-8'))
self.assertEqual(0, result['count'])

# Import. This should index the resources automatically.
import_course_from_file(self.get_course_zip(), repo.id, self.user.id)

resp = self.client.get(search_url)
self.assertEqual(resp.status_code, HTTP_200_OK)
result = json.loads(resp.content.decode('utf-8'))
self.assertTrue(result['count'] > 0)
38 changes: 0 additions & 38 deletions search/tests/test_mgmt_cmds.py

This file was deleted.

0 comments on commit 40c4e4f

Please sign in to comment.