Skip to content

Commit

Permalink
Merge "Socket errors don't warrant tracebacks when talking to memcached"
Browse files Browse the repository at this point in the history
  • Loading branch information
Jenkins authored and openstack-gerrit committed Aug 23, 2017
2 parents 33830dd + 7392953 commit daea993
Show file tree
Hide file tree
Showing 2 changed files with 66 additions and 41 deletions.
10 changes: 6 additions & 4 deletions swift/common/memcached.py
Expand Up @@ -49,7 +49,6 @@
import logging
import time
from bisect import bisect
from swift import gettext_ as _
from hashlib import md5

from eventlet.green import socket
Expand Down Expand Up @@ -163,10 +162,13 @@ def __init__(self, servers, connect_timeout=CONN_TIMEOUT,
def _exception_occurred(self, server, e, action='talking',
sock=None, fp=None, got_connection=True):
if isinstance(e, Timeout):
logging.error(_("Timeout %(action)s to memcached: %(server)s"),
logging.error("Timeout %(action)s to memcached: %(server)s",
{'action': action, 'server': server})
elif isinstance(e, socket.error):
logging.error("Error %(action)s to memcached: %(server)s: %(err)s",
{'action': action, 'server': server, 'err': e})
else:
logging.exception(_("Error %(action)s to memcached: %(server)s"),
logging.exception("Error %(action)s to memcached: %(server)s",
{'action': action, 'server': server})
try:
if fp:
Expand All @@ -191,7 +193,7 @@ def _exception_occurred(self, server, e, action='talking',
if err > now - ERROR_LIMIT_TIME]
if len(self._errors[server]) > ERROR_LIMIT_COUNT:
self._error_limited[server] = now + ERROR_LIMIT_DURATION
logging.error(_('Error limiting server %s'), server)
logging.error('Error limiting server %s', server)

def _get_conns(self, key):
"""
Expand Down
97 changes: 60 additions & 37 deletions test/unit/common/test_memcached.py
Expand Up @@ -17,19 +17,22 @@
"""Tests for swift.common.utils"""

from collections import defaultdict
import errno
from hashlib import md5
import logging
import socket
import time
import unittest
from uuid import uuid4
import os

import mock

from eventlet import GreenPool, sleep, Queue
from eventlet.pools import Pool

from swift.common import memcached
from mock import patch, MagicMock
from test.unit import NullLoggingHandler
from test.unit import debug_logger


class MockedMemcachePool(memcached.MemcacheConnPool):
Expand All @@ -48,15 +51,15 @@ class ExplodingMockMemcached(object):

def sendall(self, string):
self.exploded = True
raise socket.error()
raise socket.error(errno.EPIPE, os.strerror(errno.EPIPE))

def readline(self):
self.exploded = True
raise socket.error()
raise socket.error(errno.EPIPE, os.strerror(errno.EPIPE))

def read(self, size):
self.exploded = True
raise socket.error()
raise socket.error(errno.EPIPE, os.strerror(errno.EPIPE))

def close(self):
pass
Expand Down Expand Up @@ -169,6 +172,12 @@ def close(self):
class TestMemcached(unittest.TestCase):
"""Tests for swift.common.memcached"""

def setUp(self):
self.logger = debug_logger()
patcher = mock.patch('swift.common.memcached.logging', self.logger)
self.addCleanup(patcher.stop)
patcher.start()

def test_get_conns(self):
sock1 = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock1.bind(('127.0.0.1', 0))
Expand Down Expand Up @@ -397,18 +406,32 @@ def test_decr(self):
memcache_client.decr, 'some_key', delta=15)

def test_retry(self):
logging.getLogger().addHandler(NullLoggingHandler())
memcache_client = memcached.MemcacheRing(
['1.2.3.4:11211', '1.2.3.5:11211'])
mock1 = ExplodingMockMemcached()
mock2 = MockMemcached()
memcache_client._client_cache['1.2.3.4:11211'] = MockedMemcachePool(
[(mock2, mock2)])
memcache_client._client_cache['1.2.3.5:11211'] = MockedMemcachePool(
[(mock1, mock1)])
[(mock1, mock1), (mock1, mock1)])
memcache_client.set('some_key', [1, 2, 3])
self.assertEqual(mock1.exploded, True)
self.assertEqual(self.logger.get_lines_for_level('error'), [
'Error talking to memcached: 1.2.3.5:11211: '
'[Errno 32] Broken pipe',
])

self.logger.clear()
mock1.exploded = False
self.assertEqual(memcache_client.get('some_key'), [1, 2, 3])
self.assertEqual(mock1.exploded, True)
self.assertEqual(self.logger.get_lines_for_level('error'), [
'Error talking to memcached: 1.2.3.5:11211: '
'[Errno 32] Broken pipe',
])
# Check that we really did call create() twice
self.assertEqual(memcache_client._client_cache['1.2.3.5:11211'].mocks,
[])

def test_delete(self):
memcache_client = memcached.MemcacheRing(['1.2.3.4:11211'])
Expand Down Expand Up @@ -544,25 +567,23 @@ def wait_connect(addr):
self.assertTrue(connections.empty())

def test_connection_pool_timeout(self):
orig_conn_pool = memcached.MemcacheConnPool
try:
connections = defaultdict(Queue)
pending = defaultdict(int)
served = defaultdict(int)

class MockConnectionPool(orig_conn_pool):
def get(self):
pending[self.host] += 1
conn = connections[self.host].get()
pending[self.host] -= 1
return conn

def put(self, *args, **kwargs):
connections[self.host].put(*args, **kwargs)
served[self.host] += 1

memcached.MemcacheConnPool = MockConnectionPool

connections = defaultdict(Queue)
pending = defaultdict(int)
served = defaultdict(int)

class MockConnectionPool(memcached.MemcacheConnPool):
def get(self):
pending[self.host] += 1
conn = connections[self.host].get()
pending[self.host] -= 1
return conn

def put(self, *args, **kwargs):
connections[self.host].put(*args, **kwargs)
served[self.host] += 1

with mock.patch.object(memcached, 'MemcacheConnPool',
MockConnectionPool):
memcache_client = memcached.MemcacheRing(['1.2.3.4:11211',
'1.2.3.5:11211'],
io_timeout=0.5,
Expand All @@ -587,18 +608,20 @@ def put(self, *args, **kwargs):
# Wait for the dust to settle.
p.waitall()

self.assertEqual(pending['1.2.3.5'], 8)
self.assertEqual(len(memcache_client._errors['1.2.3.5:11211']), 8)
self.assertEqual(served['1.2.3.5'], 2)
self.assertEqual(pending['1.2.3.4'], 0)
self.assertEqual(len(memcache_client._errors['1.2.3.4:11211']), 0)
self.assertEqual(served['1.2.3.4'], 8)
self.assertEqual(pending['1.2.3.5'], 8)
self.assertEqual(len(memcache_client._errors['1.2.3.5:11211']), 8)
self.assertEqual(
self.logger.get_lines_for_level('error'),
['Timeout getting a connection to memcached: 1.2.3.5:11211'] * 8)
self.assertEqual(served['1.2.3.5'], 2)
self.assertEqual(pending['1.2.3.4'], 0)
self.assertEqual(len(memcache_client._errors['1.2.3.4:11211']), 0)
self.assertEqual(served['1.2.3.4'], 8)

# and we never got more put in that we gave out
self.assertEqual(connections['1.2.3.5'].qsize(), 2)
self.assertEqual(connections['1.2.3.4'].qsize(), 2)

# and we never got more put in that we gave out
self.assertEqual(connections['1.2.3.5'].qsize(), 2)
self.assertEqual(connections['1.2.3.4'].qsize(), 2)
finally:
memcached.MemcacheConnPool = orig_conn_pool

if __name__ == '__main__':
unittest.main()

0 comments on commit daea993

Please sign in to comment.