Skip to content

Commit

Permalink
[Bug 832037] Update for pyelasticsearch-based elasticutils
Browse files Browse the repository at this point in the history
This updates the codebase to use the pyelasticsearch-based elasticutils
from master tip.

Generally the changes are pretty minor. Most of them are to handle
the pyelasticsearch exceptions rather than the pyes ones. A few are
because some of the API bits changed in elasticutils. Then there's
some very minor cleanup.
  • Loading branch information
willkg committed Jan 24, 2013
1 parent 5ee255d commit ac1707b
Show file tree
Hide file tree
Showing 9 changed files with 101 additions and 74 deletions.
4 changes: 2 additions & 2 deletions fjord/analytics/tests/test_views.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
import logging

from nose.tools import eq_
import pyes
from pyelasticsearch.exceptions import Timeout
from pyquery import PyQuery

from fjord.analytics import views
Expand Down Expand Up @@ -260,7 +260,7 @@ def test_frontpage_es_down(self):
old_counts_to_options = views.counts_to_options
try:
def mock_counts_to_options(*args, **kwargs):
raise pyes.urllib3.MaxRetryError()
raise Timeout()
views.counts_to_options = mock_counts_to_options

resp = self.client.get(reverse('dashboard'))
Expand Down
1 change: 0 additions & 1 deletion fjord/analytics/views.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
import time
from datetime import timedelta, datetime
from math import floor

Expand Down
7 changes: 4 additions & 3 deletions fjord/base/views.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,8 @@

from celery.messaging import establish_connection
from mobility.decorators import mobile_template
import pyes
from pyelasticsearch.exceptions import (
ConnectionError, ElasticHttpNotFoundError, Timeout)

from fjord.search.index import get_index, get_index_stats

Expand Down Expand Up @@ -98,11 +99,11 @@ def monitor_view(request):
(INFO, ('Successfully connected to ElasticSearch and index '
'exists.')))

except pyes.urllib3.MaxRetryError as exc:
except (ConnectionError, Timeout) as exc:
es_results.append(
(ERROR, 'Cannot connect to ElasticSearch: %s' % str(exc)))

except pyes.exceptions.IndexMissingException:
except ElasticHttpNotFoundError:
es_results.append(
(ERROR, 'Index "%s" missing.' % get_index()))

Expand Down
17 changes: 8 additions & 9 deletions fjord/search/admin.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,8 @@
from django.http import HttpResponseRedirect
from django.shortcuts import render

import pyes
from pyelasticsearch.exceptions import (
ConnectionError, ElasticHttpNotFoundError, Timeout)

from fjord.search.index import (
chunked, get_index, get_index_stats, get_indexes, get_indexable,
Expand Down Expand Up @@ -86,20 +87,18 @@ def search_admin_view(request):
# a bad state.
try:
stats = get_index_stats()
except pyes.exceptions.IndexMissingException:
except ElasticHttpNotFoundError:
stats = None
indexes = get_indexes()
indexes.sort(key=lambda m: m[0])

except pyes.urllib3.MaxRetryError:
except (ConnectionError, Timeout):
error_messages.append('Error: Elastic Search is not set up on this '
'machine or is not responding. (MaxRetryError)')
except pyes.exceptions.IndexMissingException:
'machine or timed out trying to respond. '
'(ConnectionError/Timeout)')
except ElasticHttpNotFoundError:
error_messages.append('Error: Index is missing. Press the reindex '
'button below. (IndexMissingException)')
except pyes.urllib3.TimeoutError:
error_messages.append('Error: Connection to Elastic Search timed out. '
'(TimeoutError)')
'button below. (ElasticHttpNotFoundError)')

outstanding_records = Record.outstanding()
recent_records = Record.objects.order_by('-creation_time')[:20]
Expand Down
114 changes: 66 additions & 48 deletions fjord/search/index.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,10 @@
from django.conf import settings
from django.db import reset_queries

import pyes

from elasticutils.contrib.django import get_es, S
from elasticutils.contrib.django.models import DjangoMappingType
from pyelasticsearch.exceptions import (ConnectionError, Timeout,
ElasticHttpNotFoundError)


# Note: This module should not import any Fjord modules. Otherwise we
Expand Down Expand Up @@ -134,14 +134,17 @@ def get_indexes(all_indexes=False):
:returns: list of (name, count) tuples.
"""
es = get_indexing_es(default_indexes=['_all'])
es = get_indexing_es()

status = es.status()
indexes = status['indices']

if not all_indexes:
indexes = dict((k, v) for k, v in indexes.items()
if k.startswith(settings.ES_INDEX_PREFIX))

indexes = [(k, v['docs']['num_docs']) for k, v in indexes.items()]

indexes = es.get_indices()
if all_indexes:
indexes = [(k, v['num_docs']) for k, v in indexes.items()]
else:
indexes = [(k, v['num_docs']) for k, v in indexes.items()
if k.startswith(settings.ES_INDEX_PREFIX)]
return indexes


Expand All @@ -151,7 +154,12 @@ def delete_index_if_exists(index):
:arg index: The name of the index to delete.
"""
get_indexing_es(default_indexes=['_all']).delete_index_if_exists(index)
try:
get_indexing_es().delete_index(index)
except ElasticHttpNotFoundError:
# Can ignore this since it indicates the index doesn't exist
# and therefore there's nothing to delete.
pass


def get_index_stats():
Expand All @@ -169,8 +177,12 @@ def get_index_stats():
:returns: mapping type name -> count for documents indexes.
:throws pyes.urllib3.MaxRetryError: if it can't connect to ElasticSearch
:throws pyes.exceptions.IndexMissingException: if the index doesn't exist
:throws pyelasticsearch.exceptions.Timeout: if the request
times out
:throws pyelasticsearch.exceptions.ConnectionError: if there's a
connection error
:throws pyelasticsearch.exceptions.ElasticHttpNotFound: if the
index doesn't exist
"""
stats = {}
Expand Down Expand Up @@ -245,29 +257,47 @@ def index_chunk(cls, chunk, reraise=False, es=None):
if you want errors to be thrown.
:arg es: The ES to use. Defaults to creating a new indexing ES.
.. Note::
This indexes all the documents in the chunk in one single bulk
indexing call. Keep that in mind when you break your indexing
task into chunks.
"""
if es is None:
es = get_indexing_es()

try:
for id_ in chunk:
try:
cls.index(cls.extract_document(id_), bulk=True, es=es)
except Exception:
log.exception('Unable to extract/index document (id: %d)', id_)
if reraise:
raise

finally:
# Try to do these things, but if we fail, it's probably the
# case that many things are broken, so just move on.
documents = []
for id_ in chunk:
try:
es.flush_bulk(forced=True)
es.refresh(get_index(), timesleep=0)
documents.append(cls.extract_document(id_))
except Exception:
log.exception('Unable to flush/refresh')
log.exception('Unable to extract/index document (id: %d)', id_)
if reraise:
raise

cls.bulk_index(documents, id_field='id', es=es)


def requires_good_connection(fun):
"""Decorator that logs an error on connection issues
9 out of 10 doctors say that connection errors are usually because
ES_URLS is set wrong. This catches those errors and helps you out
with fixing it.
"""
def _requires_good_connection(*args, **kwargs):
try:
return fun(*args, **kwargs)
except (ConnectionError, Timeout):
log.error('Either your ElasticSearch process is not quite '
'ready to rumble, is not running at all, or ES_URLS'
'is set wrong in your settings_local.py file.')
return _requires_good_connection


@requires_good_connection
def es_reindex_cmd(percent=100, mapping_types=None):
"""Rebuild ElasticSearch indexes.
Expand Down Expand Up @@ -324,14 +354,10 @@ def es_reindex_cmd(percent=100, mapping_types=None):
log.info('Done! (total time: %s)', format_time(delta_time))


@requires_good_connection
def es_delete_cmd(index):
"""Delete a specified index."""
try:
indexes = [name for name, count in get_indexes()]
except pyes.urllib3.MaxRetryError:
log.error('Your ElasticSearch process is not running or ES_HOSTS '
'is set wrong in your settings_local.py file.')
return
indexes = [name for name, count in get_indexes()]

if index not in indexes:
log.error('Index "%s" is not a valid index.', index)
Expand All @@ -350,29 +376,21 @@ def es_delete_cmd(index):
log.info('Done!')


@requires_good_connection
def es_status_cmd(checkindex=False):
"""Show ElasticSearch index status."""
try:
try:
mt_stats = get_index_stats()
except pyes.exceptions.IndexMissingException:
mt_stats = None

except pyes.urllib3.MaxRetryError:
log.error('Your ElasticSearch process is not running or ES_HOSTS '
'is set wrong in your settings_local.py file.')
return

log.info('Settings:')
log.info(' ES_HOSTS : %s', settings.ES_HOSTS)
log.info(' ES_URLS : %s', settings.ES_URLS)
log.info(' ES_INDEX_PREFIX : %s', settings.ES_INDEX_PREFIX)
log.info(' ES_INDEXES : %s', settings.ES_INDEXES)

log.info('Index (%s) stats:', get_index())

if mt_stats is None:
log.info(' Index does not exist. (%s)', get_index())
else:
try:
mt_stats = get_index_stats()
log.info(' Index (%s):', get_index())
for name, count in mt_stats.items():
log.info(' %-20s: %d', name, count)

except ElasticHttpNotFoundError:
log.info(' Index does not exist. (%s)', get_index())
23 changes: 16 additions & 7 deletions fjord/search/tests/__init__.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
import time

from django.conf import settings

import pyes.urllib3
import pyes.exceptions
from nose import SkipTest
from pyelasticsearch.exceptions import (Timeout, ConnectionError,
ElasticHttpNotFoundError)
from test_utils import TestCase

from fjord.base.tests import with_save
Expand All @@ -18,14 +20,14 @@ class ElasticTestCase(TestCase):
def setUpClass(cls):
super(ElasticTestCase, cls).setUpClass()

if not getattr(settings, 'ES_HOSTS'):
if not getattr(settings, 'ES_URLS', None):
cls.skipme = True
return

# try to connect to ES and if it fails, skip ElasticTestCases.
try:
get_indexing_es().collect_info()
except pyes.urllib3.MaxRetryError:
get_indexing_es().health()
except (Timeout, ConnectionError):
cls.skipme = True
return

Expand Down Expand Up @@ -60,7 +62,9 @@ def refresh(self, timesleep=0):
# TODO: uncomment this when we have live indexing.
# generate_tasks()

get_indexing_es().refresh(index, timesleep=timesleep)
get_indexing_es().refresh(index)
if timesleep > 0:
time.sleep(timesleep)

def setup_indexes(self, empty=False):
"""(Re-)create ES indexes."""
Expand All @@ -79,7 +83,12 @@ def setup_indexes(self, empty=False):

def teardown_indexes(self):
es = get_indexing_es()
es.delete_index_if_exists(get_index())
try:
es.delete_index(get_index())
except ElasticHttpNotFoundError:
# If we get this error, it means the index didn't exist
# so there's nothing to delete.
pass


@with_save
Expand Down
2 changes: 2 additions & 0 deletions fjord/search/tests/test_tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@ def test_index_chunk_task(self):
chunk = (SimpleIndex, [item.id for item in simple_items])
index_chunk_task.delay(get_index(), batch_id, rec.id, chunk)

SimpleIndex.refresh_index()

# Verify everything is in the index now.
eq_(len(SimpleIndex.search()), 10)

Expand Down
5 changes: 2 additions & 3 deletions fjord/settings/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -160,9 +160,8 @@

# ElasticSearch settings.

# List of hostname:port strings for all the ES hosts we should connect
# to.
ES_HOSTS = ['127.0.0.1:9200']
# List of host urls for the ES hosts we should connect to.
ES_URLS = ['http://localhost:9200']

# Dict of mapping-type-name -> index-name to use. Input pretty much
# uses one index, so this should be some variation of:
Expand Down
2 changes: 1 addition & 1 deletion vendor-local/src/elasticutils

0 comments on commit ac1707b

Please sign in to comment.