Skip to content
This repository has been archived by the owner on Jan 28, 2020. It is now read-only.

Adds elasticsearch-dsl and adds it alongside Haystack for now. #673

Merged
merged 1 commit into from
Sep 17, 2015

Conversation

ShawnMilo
Copy link
Contributor

Haystack will removed once all the features are added and tested. This requires
the addition of searching by facets, React.js updates, and updates to the
restful API.

Before code review, please watch this, which explains how the new libraries are being used.

The search/utils.py file should be read first, in this order:

  • create_mapping
  • resource_to_dict
  • index_resources
  • search_index
  • class SearchResults
  • the rest

This should make the tests easy to follow.

@noisecapella
Copy link
Contributor

That's a neat video, it does seem to clean up a lot of little messy things about Haystack

@ShawnMilo
Copy link
Contributor Author

@noisecapella: Thanks for checking it out. It's definitely more straightforward. Haystack was keeping things simple for the benefit of the Haystack authors, which made our use of Haystack pretty clumsy.

We can now get it to work the way we want it to and it's simpler to use. Everybody wins.

DOC_TYPE = "learningresource"
INDEX_NAME = settings.HAYSTACK_CONNECTIONS["default"]["INDEX_NAME"]
URL = settings.HAYSTACK_CONNECTIONS["default"]["URL"]
CONN = connections.create_connection(hosts=[URL])
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we initialize the connection here there will be a network connection happening we import search.utils. The other globals are just strings but this one does a whole lot more. My main concern is that this may cause an exception if the connection fails and it wouldn't be clean to put a try block around an import.

Could you make this a lazy connection instead where functions call something like get_connection which may initialize? This would follow how databases work in Django where the connection is made on first request.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So, keep CONN as a global variable, defaulting to None, and have each function in utils.py call get_connection (or maybe initialize_connection) which checks if it's none, initializes if necessary, and returns CONN?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yep

@noisecapella
Copy link
Contributor

Almost through with this, looking at tests now

@ShawnMilo
Copy link
Contributor Author

Okay. The first round of issues were addressed and pushed. Diff:

diff --git search/utils.py search/utils.py
index ffe01a1..2b8759b 100644
--- search/utils.py
+++ search/utils.py
@@ -24,10 +24,22 @@ log = logging.getLogger(__name__)
 DOC_TYPE = "learningresource"
 INDEX_NAME = settings.HAYSTACK_CONNECTIONS["default"]["INDEX_NAME"]
 URL = settings.HAYSTACK_CONNECTIONS["default"]["URL"]
-CONN = connections.create_connection(hosts=[URL])
+_CONN = connections.create_connection(hosts=[URL])
 PAGE_LENGTH = 10


+def get_conn():
+    """
+    Lazily create the connection.
+    """
+    # pylint: disable=global-statement
+    # This is ugly. Any suggestions on a way that doesn't require "global"?
+    global _CONN
+    if _CONN is None:
+        _CONN = connections.create_connection(hosts=[URL])
+    return _CONN
+
+
 def get_resource_terms(resource_ids):
     """
     Returns taxonomy metadata for LearningResources.
@@ -104,8 +116,9 @@ def index_resources(resources):
     ensure_vocabulary_mappings(term_info)

     # Perform bulk insert using Elasticsearch directly.
+    conn = get_conn()
     insert_count, errors = bulk(
-        CONN,
+        conn,
         (resource_to_dict(x, term_info.get(x.id, {})) for x in resources),
         index=INDEX_NAME,
         doc_type=DOC_TYPE
@@ -120,8 +133,9 @@ def index_resources(resources):
 @statsd.timer('lore.elasticsearch.delete_index')
 def delete_index(resource):
     """Delete a record from Elasticsearch."""
+    conn = get_conn()
     try:
-        CONN.delete(
+        conn.delete(
             index=INDEX_NAME, doc_type=DOC_TYPE, id=resource.id)
         refresh_index()
     except NotFoundError:
@@ -201,10 +215,11 @@ def resource_to_dict(resource, term_info=None):

 def clear_index():
     """Wipe the index."""
-    if CONN.indices.exists(INDEX_NAME):
-        CONN.indices.delete(INDEX_NAME)
-        CONN.indices.create(INDEX_NAME)
-        CONN.indices.refresh()
+    conn = get_conn()
+    if conn.indices.exists(INDEX_NAME):
+        conn.indices.delete(INDEX_NAME)
+        conn.indices.create(INDEX_NAME)
+        conn.indices.refresh()
     create_mapping()

     # re-index all existing LearningResource instances:
@@ -228,8 +243,9 @@ class SearchResults(object):

     def page_count(self):
         """Total number of result pages."""
-        count = self._search.count() / PAGE_LENGTH
-        if self._search.count() % PAGE_LENGTH > 0:
+        total = self._search.count()
+        count = total / PAGE_LENGTH
+        if total % PAGE_LENGTH > 0:
             count += 1
         return int(count)

@@ -247,7 +263,7 @@ class SearchResults(object):

     def __getitem__(self, i):
         """Return result by index."""
-        return self._search[i:i+1].execute().hits[0]
+        return self._search[i].execute().hits[0]


 def create_mapping():
@@ -262,11 +278,12 @@ def create_mapping():
     """

     # Create the index if it doesn't exist.
-    if not CONN.indices.exists(INDEX_NAME):
-        CONN.indices.create(INDEX_NAME)
+    conn = get_conn()
+    if not conn.indices.exists(INDEX_NAME):
+        conn.indices.create(INDEX_NAME)
     # Delete the mapping if an older version exists.
-    if CONN.indices.exists_type(index=INDEX_NAME, doc_type=DOC_TYPE):
-        CONN.indices.delete_mapping(index=INDEX_NAME, doc_type=DOC_TYPE)
+    if conn.indices.exists_type(index=INDEX_NAME, doc_type=DOC_TYPE):
+        conn.indices.delete_mapping(index=INDEX_NAME, doc_type=DOC_TYPE)

     mapping = Mapping(DOC_TYPE)

@@ -275,7 +292,7 @@ def create_mapping():
     mapping.field("description", "string", index="analyzed")
     mapping.field("preview_url", "string", index="no")
     mapping.field("repository", "string", index="not_analyzed")
-    mapping.field("resource_type", "string", index="analyzed")
+    mapping.field("resource_type", "string", index="not_analyzed")
     mapping.field("content_xml", "string", index="no")
     mapping.field("content_stripped", "string", index="analyzed")
     mapping.field("run", "string", index="not_analyzed")
@@ -293,7 +310,7 @@ def create_mapping():
     # LearningResource instances. This function will probably only
     # ever be called by migrations.
     index_resources(LearningResource.objects.all())
-    CONN.indices.refresh()
+    conn.indices.refresh()


 def refresh_index():
@@ -301,7 +318,8 @@ def refresh_index():
     Force a refresh instead of waiting for it to happen automatically.
     This should only be necessary during tests.
     """
-    CONN.indices.refresh(index=INDEX_NAME)
+    conn = get_conn()
+    conn.indices.refresh(index=INDEX_NAME)


 def ensure_vocabulary_mappings(term_info):

@noisecapella
Copy link
Contributor

I got an error running your migration:

(lore_venv)george@george-laptop:~/Projects/lore$ docker-compose run web ./manage.py migrate
Operations to perform:
  Synchronize unmigrated apps: audit, exporter, roles, compressor, cas, xanalytics, messages, rest, staticfiles, ui, rest_framework, storages, haystack, bootstrap3
  Apply all migrations: learningresources, search, sessions, admin, guardian, auth, contenttypes, taxonomy
Synchronizing apps without migrations:
  Creating tables...
    Running deferred SQL...
  Installing custom SQL...
Running migrations:
  Rendering model states... DONE
  Applying search.0001_initial...No handlers could be found for logger "elasticsearch.trace"
Traceback (most recent call last):
  File "./manage.py", line 16, in <module>
    execute_from_command_line(sys.argv)
  File "/usr/local/lib/python2.7/dist-packages/django/core/management/__init__.py", line 338, in execute_from_command_line
    utility.execute()
  File "/usr/local/lib/python2.7/dist-packages/django/core/management/__init__.py", line 330, in execute
    self.fetch_command(subcommand).run_from_argv(self.argv)
  File "/usr/local/lib/python2.7/dist-packages/django/core/management/base.py", line 393, in run_from_argv
    self.execute(*args, **cmd_options)
  File "/usr/local/lib/python2.7/dist-packages/django/core/management/base.py", line 444, in execute
    output = self.handle(*args, **options)
  File "/usr/local/lib/python2.7/dist-packages/django/core/management/commands/migrate.py", line 222, in handle
    executor.migrate(targets, plan, fake=fake, fake_initial=fake_initial)
  File "/usr/local/lib/python2.7/dist-packages/django/db/migrations/executor.py", line 110, in migrate
    self.apply_migration(states[migration], migration, fake=fake, fake_initial=fake_initial)
  File "/usr/local/lib/python2.7/dist-packages/django/db/migrations/executor.py", line 148, in apply_migration
    state = migration.apply(state, schema_editor)
  File "/usr/local/lib/python2.7/dist-packages/django/db/migrations/migration.py", line 115, in apply
    operation.database_forwards(self.app_label, schema_editor, old_state, project_state)
  File "/usr/local/lib/python2.7/dist-packages/django/db/migrations/operations/special.py", line 183, in database_forwards
    self.code(from_state.apps, schema_editor)
  File "/src/search/migrations/0001_initial.py", line 19, in create_learning_resource_mapping
    create_mapping()
  File "/src/search/utils.py", line 295, in create_mapping
    index_resources(LearningResource.objects.all())
  File "/usr/local/lib/python2.7/dist-packages/statsd/client.py", line 30, in _wrapped
    return_value = f(*args, **kwargs)
  File "/src/search/utils.py", line 111, in index_resources
    doc_type=DOC_TYPE
  File "/usr/local/lib/python2.7/dist-packages/elasticsearch/helpers/__init__.py", line 182, in bulk
    for ok, item in streaming_bulk(client, actions, **kwargs):
  File "/usr/local/lib/python2.7/dist-packages/elasticsearch/helpers/__init__.py", line 110, in streaming_bulk
    for action, data in chunk:
  File "/src/search/utils.py", line 109, in <genexpr>
    (resource_to_dict(x, term_info.get(x.id, {})) for x in resources),
  File "/src/search/utils.py", line 195, in resource_to_dict
    rec[key] = rec[key].decode('utf-8')
  File "/usr/lib/python2.7/encodings/utf_8.py", line 16, in decode
    return codecs.utf_8_decode(input, errors, True)
UnicodeEncodeError: 'ascii' codec can't encode character u'\xa0' in position 47: ordinal not in range(128)

@noisecapella
Copy link
Contributor

If the connection in get_conn fails, does Elasticsearch retry until it succeeds or are we responsible for that? It may be a problem for celery workers which will hold onto the same connection for a while.

@noisecapella
Copy link
Contributor

Test coverage looks good

@noisecapella
Copy link
Contributor

Is there any reasonable way to test functionality, or should that wait for the next PR?

@ShawnMilo
Copy link
Contributor Author

The migration and _CONN = None are fixed. Next I want to figure out having get_conn test the connection and establish a new connection if necessary. As for whether we can reasonably test functionality, what did you have in mind beyond what the unit tests (and demo script) already demonstrate?

@noisecapella
Copy link
Contributor

I tried some DSL queries using the shell already and they worked, that's probably enough for this phase

@ShawnMilo
Copy link
Contributor Author

On Wed, Sep 16, 2015 at 4:17 PM, George Schneeloch <notifications@github.com

wrote:
If the connection in get_conn fails, does Elasticsearch retry until it
succeeds or are we responsible for that? It may be a problem for celery
workers which will hold onto the same connection for a while.

Created a stand-alone script to test this:

from elasticsearch_dsl.connections import connections
from elasticsearch.exceptions import ConnectionError

conn = connections.create_connection(hosts=["localhost:10200"])

def print_info():
    try:
        print conn.info()
    except ConnectionError:
        print "failed"
        return False
    return True


assert print_info() == True
print "now, stop the service"
raw_input()
assert print_info()  == False
print "now, start the service"
raw_input()
assert print_info() == True

This works, so it does repair itself after a temporary loss of
communication.

@ShawnMilo
Copy link
Contributor Author

@noisecapella:

I think the outstanding issues have been addressed: What happens when the connection dies and the migration error.

@noisecapella
Copy link
Contributor

👍 after squash

Haystack will removed once all the features are added and tested. This requires
the addition of searching by facets, React.js updates, and updates to the
restful API.
@ShawnMilo ShawnMilo force-pushed the feature/skm/669_elasticsearch_dsl branch from e238824 to f68ce59 Compare September 17, 2015 16:42
ShawnMilo added a commit that referenced this pull request Sep 17, 2015
Adds elasticsearch-dsl and adds it alongside Haystack for now.
@ShawnMilo ShawnMilo merged commit f3b224d into master Sep 17, 2015
@ShawnMilo ShawnMilo deleted the feature/skm/669_elasticsearch_dsl branch September 17, 2015 18:04
Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

4 participants