Skip to content

Commit

Permalink
Port from requests to urllib3 in an unsuccessful attempt to be 15x fa…
Browse files Browse the repository at this point in the history
…ster at bulk indexing.
  • Loading branch information
erikrose committed May 21, 2014
1 parent 0234adc commit 13023c0
Show file tree
Hide file tree
Showing 3 changed files with 22 additions and 19 deletions.
35 changes: 19 additions & 16 deletions pyelasticsearch/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,9 @@
# PY2
from urllib import urlencode, quote_plus

import requests
import simplejson as json # for use_decimal
from simplejson import JSONDecodeError
from urllib3 import PoolManager

from pyelasticsearch.downtime import DowntimePronePool
from pyelasticsearch.exceptions import (Timeout, ConnectionError,
Expand Down Expand Up @@ -122,10 +122,9 @@ def __init__(self, urls, timeout=60, max_retries=0, revival_delay=300):
urls = [u.rstrip('/') for u in urls]
self.servers = DowntimePronePool(urls, revival_delay)
self.revival_delay = revival_delay
self.timeout = timeout
self.max_retries = max_retries
self.logger = getLogger('pyelasticsearch')
self.session = requests.session()
self.pool_manager = PoolManager(timeout=timeout)
self.json_encoder = JsonEncoder

def _concat(self, items):
Expand Down Expand Up @@ -206,7 +205,7 @@ def send_request(self,
:arg method: An HTTP method, like "GET"
:arg path_components: An iterable of path components, to be joined by
"/"
:arg body: The request body
:arg body: A map of key/value pairs to be sent as the request body
:arg query_params: A map of querystring param names to values or
``None``
:arg encode_body: Whether to encode the body of the request as JSON
Expand All @@ -219,7 +218,6 @@ def send_request(self,
iteritems(query_params)))])

request_body = self._encode_json(body) if encode_body else body
req_method = getattr(self.session, method.lower())

# We do our own retrying rather than using urllib3's; we want to retry
# a different node in the cluster if possible, not the same one again
Expand All @@ -232,10 +230,15 @@ def send_request(self,
method, url, request_body)

try:
resp = req_method(
url,
timeout=self.timeout,
**({'data': request_body} if body else {}))
if method == 'GET' and not body:
response = self.pool_manager.urlopen(
method,
url)
else:
response = self.pool_manager.urlopen(
method,
url,
body=request_body)
except (ConnectionError, Timeout):
self.servers.mark_dead(server_url)
self.logger.info('%s marked as dead for %s seconds.',
Expand All @@ -248,10 +251,10 @@ def send_request(self,
self.servers.mark_live(server_url)
break

self.logger.debug('response status: %s', resp.status_code)
prepped_response = self._decode_response(resp)
if resp.status_code >= 400:
self._raise_exception(resp, prepped_response)
self.logger.debug('response status: %s', response.status)
prepped_response = self._decode_response(response)
if response.status >= 400:
self._raise_exception(response, prepped_response)
self.logger.debug('got response %s', prepped_response)
return prepped_response

Expand All @@ -260,13 +263,13 @@ def _raise_exception(self, response, decoded_body):
error_message = decoded_body.get('error', decoded_body)

error_class = ElasticHttpError
if response.status_code == 404:
if response.status == 404:
error_class = ElasticHttpNotFoundError
elif (error_message.startswith('IndexAlreadyExistsException') or
'nested: IndexAlreadyExistsException' in error_message):
error_class = IndexAlreadyExistsError

raise error_class(response.status_code, error_message)
raise error_class(response.status, error_message)

def _encode_json(self, value):
"""
Expand All @@ -277,7 +280,7 @@ def _encode_json(self, value):
def _decode_response(self, response):
"""Return a native-Python representation of a response's JSON blob."""
try:
json_response = response.json()
json_response = json.loads(response.data.decode('utf-8'))
except JSONDecodeError:
raise InvalidJsonResponseError(response)
return json_response
Expand Down
2 changes: 1 addition & 1 deletion pyelasticsearch/exceptions.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from requests import Timeout, ConnectionError
from urllib3.exceptions import TimeoutError as Timeout, ConnectionError


class ElasticHttpError(Exception):
Expand Down
4 changes: 2 additions & 2 deletions setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,11 +55,11 @@ def find_version(file_path):
],
requires=[ # Needed?
'six',
'requests(>=1.0,<3.0)',
'urllib3(>=1.8)',
'simplejson(>=2.1.0)',
],
install_requires=[
'requests>=1.0,<3.0',
'urllib3>=1.8',
'simplejson>=2.1.0',
'six'
],
Expand Down

0 comments on commit 13023c0

Please sign in to comment.