Split tests into multiple files for ease of navigation.
Also, add deletion of "another-index" to a tearDown method, because, once one test run fails to delete it, you'd have to delete it manually to get future runs to pass.
erikrose committed Apr 5, 2013
commit 758ef8f
Unit tests for pyelasticsearch
These require an elasticsearch server running on the default port
import unittest

from import eq_

# Test that __all__ is sufficient:
from pyelasticsearch import *

class ElasticSearchTestCase(unittest.TestCase):
def setUp(self):
self.conn = ElasticSearch('http://localhost:9200/')

def tearDown(self):
except Exception:

def assert_result_contains(self, result, expected):
for (key, value) in expected.items():
eq_(value, result[key])
# -*- coding: utf-8 -*-
Unit tests for pyelasticsearch. These require an elasticsearch server running on the default port (localhost:9200).
import sys
from datetime import datetime, date
from decimal import Decimal
import unittest

from mock import patch
Expand All @@ -15,32 +9,17 @@

# Test that __all__ is sufficient:
from pyelasticsearch import *
from pyelasticsearch.client import es_kwargs
from pyelasticsearch.tests import ElasticSearchTestCase

def arbitrary_response():
response = requests.Response()
response._content = six.b('{"some": "json"}')
response.status_code = 200
return response

class ElasticSearchTestCase(unittest.TestCase):
def setUp(self):
self.conn = ElasticSearch('http://localhost:9200/')

class IndexingTestCase(ElasticSearchTestCase):
def tearDown(self):
except Exception:
super(IndexingTestCase, self).tearDown()

def assert_result_contains(self, result, expected):
for (key, value) in expected.items():
eq_(value, result[key])

class IndexingTestCase(ElasticSearchTestCase):
def test_indexing_with_id(self):
result = self.conn.index('test-index', 'test-type', {'name': 'Joe Tester'}, id=1)
self.assert_result_contains(result, {'_type': 'test-type', '_id': '1', 'ok': True, '_index': 'test-index'})
Expand Down Expand Up @@ -456,266 +435,3 @@ def update_all_settings(self):
self.conn.update_all_settings({'joe': 'bob'})
'PUT', ['_settings'], body={'joe': 'bob'})

class DowntimePoolingTests(unittest.TestCase):
"""Tests for failover, pooling, and auto-retry"""

def test_retry(self):
"""Make sure auto-retry works at least a little."""
first_url = [] # a mutable just so we can close over and write to it

def get_but_fail_the_first_time(url, **kwargs):
Raise ConnectionError for the first URL passed, but return a
plausible response for later ones.
# Monkeypatching random instead would have made too many
# assumptions about the code under test.
if first_url and url not in first_url:
return arbitrary_response()
raise ConnectionError

conn = ElasticSearch(['',

with patch.object(conn.session, 'get') as session_get:
session_get.side_effect = get_but_fail_the_first_time
# Try to request something with max_retries=1. This should make 2
# calls to session.get():
conn.get('test-index', 'test-type', 7)

# Assert that one server was tried and then the other.
eq_(session_get.call_count, 2)
calls = session_get.call_args_list
down_server = calls[0][0]
assert_not_equal(calls[1][0], down_server)

# Assert there's one item in the live pool and one in the dead.
# That oughta cover a fair amount.
eq_(len(, 1)
eq_(len(conn.servers.dead), 1)

def test_death_and_rebirth(self):
If a server fails, mark it dead. If there are no remaining live
servers, start trying dead ones. If a dead one starts working, bring it
back to life.
This is kind of an exploratory,
test-as-much-as-you-can-for-the-least-effort test.
conn = ElasticSearch(['',

with patch.object(conn.session, 'get') as session_get:
session_get.side_effect = Timeout

# This should kill off both servers:
for x in range(2):
conn.get('test-index', 'test-type', 7)
except Timeout:

# Make sure the pools are as we expect:
eq_(len(conn.servers.dead), 2)
eq_(len(, 0)

# And this should use a dead server, though the request will still
# time out:
conn.get('test-index', 'test-type', 7)
except Timeout:
raise AssertionError('That should have timed out.')

with patch.object(conn.session, 'get') as session_get:
session_get.return_value = arbitrary_response()

# Then we try another dead server, but this time it works:
conn.get('test-index', 'test-type', 7)

# Then that server should have come back to life:
eq_(len(conn.servers.dead), 1)
eq_(len(, 1)

class KwargsForQueryTests(unittest.TestCase):
"""Tests for the ``es_kwargs`` decorator and such"""

def test_to_query(self):
"""Test the thing that translates objects to query string text."""
to_query = ElasticSearch._to_query
eq_(to_query(4), '4')
eq_(to_query(4.5), '4.5')
eq_(to_query(True), 'true')
eq_(to_query(('4', 'hi', 'thomas')), '4,hi,thomas')
eq_(to_query(datetime(2000, 1, 2, 12, 34, 56)),
eq_(to_query(date(2000, 1, 2)),
assert_raises(TypeError, to_query, object())

# do not use unittest.skipIf because of python 2.6
if not six.PY3:
eq_(to_query(long(4)), '4')

def test_es_kwargs(self):
Make sure ``es_kwargs`` bundles es_ and specifically called out kwargs
into the ``query_params`` map and leaves other args and kwargs alone.
@es_kwargs('refresh', 'es_timeout')
def index(doc, query_params=None, other_kwarg=None):
:arg some_arg: Here just so es_kwargs doesn't crash
return doc, query_params, other_kwarg

eq_(index(3, refresh=True, es_timeout=7, other_kwarg=1),
(3, {'refresh': True, 'timeout': 7}, 1))
eq_(index.__name__, 'index')

def test_index(self):
"""Integration-test ``index()`` with some decorator-handled arg."""
def valid_responder(*args, **kwargs):
"""Return an arbitrary successful Response."""
response = requests.Response()
response._content = six.b('{"some": "json"}')
response.status_code = 200
return response

conn = ElasticSearch('')
with patch.object(conn.session, 'put') as put:
put.side_effect = valid_responder
{'some': 'doc'},

# Make sure all the query string params got into the URL:
url = put.call_args[0][0]
ok_('routing=boogie' in url)
ok_('snorkfest=true' in url)
ok_('borkfest=gerbils%3Agreat' in url)
ok_('es_' not in url) # We stripped the "es_" prefixes.

def test_arg_cross_refs_with_trailing(self):
Make sure ``es_kwargs`` adds "see ES docs" cross references for any
es_kwargs args not already documented in the decorated method's
docstring, in cases where there is trailing material after the arg
@es_kwargs('gobble', 'degook')
def some_method(foo, bar, query_params=None):
Do stuff.
:arg degook: Whether to remove the gook
It's neat.

if some_method.__doc__ is None:
raise SkipTest("This test doesn't work under python -OO.")

# Make sure it adds (only) the undocumented args and preserves anything
# that comes after the args block:
Do stuff.
:arg degook: Whether to remove the gook
:arg gobble: See the ES docs.
It's neat.

def test_arg_cross_refs_with_eof(self):
Make sure ``es_kwargs`` adds "see ES docs" cross references for any
es_kwargs args not already documented in the decorated method's
docstring, in cases where the docstring ends after the arg list.
@es_kwargs('gobble', 'degook')
def some_method(foo, bar, query_params=None):
Do stuff.
:arg degook: Whether to remove the gook

if some_method.__doc__ is None:
raise SkipTest("This test doesn't work under python -OO.")

Do stuff.
:arg degook: Whether to remove the gook
:arg gobble: See the ES docs.

class JsonTests(ElasticSearchTestCase):
"""Tests for JSON encoding and decoding"""

def test_decimal_encoding(self):
"""Make sure we can encode ``Decimal`` objects and that they don't end
up with quotes around them, which would suggest to ES to represent them
as strings if inferring a mapping."""
ones = '1.111111111111111111'
eq_(self.conn._encode_json({'hi': Decimal(ones)}),
'{"hi": %s}' % ones)

def test_set_encoding(self):
"""Make sure encountering a set doesn't raise a circular reference
eq_(self.conn._encode_json({'hi': set([1])}),
'{"hi": [1]}')

def test_tuple_encoding(self):
"""Make sure tuples encode as lists."""
eq_(self.conn._encode_json({'hi': (1, 2, 3)}),
'{"hi": [1, 2, 3]}')

def test_unhandled_encoding(self):
"""Make sure we raise a TypeError when encoding an unsupported type."""
assert_raises(TypeError, self.conn._encode_json, object())

def test_encoding(self):
"""Test encoding a zillion other types."""
eq_(self.conn._encode_json('abc'), u'"abc"')
eq_(self.conn._encode_json(u'☃'), r'"\u2603"')
eq_(self.conn._encode_json(123), '123')
eq_(self.conn._encode_json(12.25), '12.25')
eq_(self.conn._encode_json(True), 'true')
eq_(self.conn._encode_json(False), 'false')
date(2011, 12, 30)),
datetime(2011, 12, 30, 11, 59, 32)),
eq_(self.conn._encode_json([1, 2, 3]), '[1, 2, 3]')
eq_(self.conn._encode_json({'a': 1}), '{"a": 1}')

if __name__ == '__main__':

