Skip to content

Commit

Permalink
fix #9: introduce read timeouts
Browse files Browse the repository at this point in the history
  • Loading branch information
thefab committed Mar 8, 2017
1 parent 42b8b03 commit 4d44173
Show file tree
Hide file tree
Showing 5 changed files with 95 additions and 36 deletions.
35 changes: 35 additions & 0 deletions tests/support.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,3 +33,38 @@ def test_redis_uds_or_raise_skiptest(uds="/tmp/redis.sock"):
raise unittest.SkipTest("redis must listen on %s" % uds)
finally:
s.close()


def fake_socket_constructor(cls, *args, **kwargs):
return cls(*args, **kwargs)


class FakeSocketObject(object):

def __init__(self, *args, **kwargs):
cls = socket._socket.socket
self.__socket = cls(*args, **kwargs)

def setblocking(self, *args, **kwargs):
return self.__socket.setblocking(*args, **kwargs)

def connect(self, *args, **kwargs):
return self.__socket.connect(*args, **kwargs)

def fileno(self, *args, **kwargs):
return self.__socket.fileno(*args, **kwargs)

def close(self, *args, **kwargs):
return self.__socket.close(*args, **kwargs)

def getsockopt(self, *args, **kwargs):
return self.__socket.getsockopt(*args, **kwargs)

def setsockopt(self, *args, **kwargs):
return self.__socket.setsockopt(*args, **kwargs)

def recv(self, *args, **kwargs):
return self.__socket.recv(*args, **kwargs)

def send(self, *args, **kwargs):
return self.__socket.send(*args, **kwargs)
38 changes: 38 additions & 0 deletions tests/test_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,23 @@
import tornado.ioloop
import tornado
import functools
import socket

from tornadis.client import Client
from tornadis.exceptions import ConnectionError, ClientError
from support import test_redis_or_raise_skiptest
from support import FakeSocketObject
from support import fake_socket_constructor


class FakeSocketObject5(FakeSocketObject):

def __init__(self, *args, **kwargs):
self.__first = True
FakeSocketObject.__init__(self, *args, **kwargs)

def send(self, *args, **kwargs):
return len(args[0])


class ClientTestCase(tornado.testing.AsyncTestCase):
Expand Down Expand Up @@ -69,6 +82,31 @@ def test_autoconnect_callback(self):
yield condition.wait()
c.disconnect()

@tornado.testing.gen_test
def test_read_timeout1(self):
orig_constructor = socket.socket
socket.socket = functools.partial(fake_socket_constructor,
FakeSocketObject5)
c = Client(read_timeout=1)
res = yield c.connect()
self.assertTrue(res)
res2 = yield c.call('PING')
self.assertTrue(isinstance(res2, ConnectionError))
c.disconnect()
socket.socket = orig_constructor

@tornado.testing.gen_test
def test_read_timeout2(self):
c = Client(read_timeout=1, autoconnect=False)
res = yield c.connect()
self.assertTrue(res)
res2 = yield c.call('PING')
self.assertEqual(res2, "PONG")
yield tornado.gen.sleep(2)
res3 = yield c.call('PING')
self.assertTrue(isinstance(res3, ConnectionError))
c.disconnect()

@tornado.testing.gen_test
def test_discard(self):
c = Client()
Expand Down
37 changes: 2 additions & 35 deletions tests/test_connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@
from tornadis.utils import format_args_in_redis_protocol
from support import test_redis_or_raise_skiptest
from support import test_redis_uds_or_raise_skiptest
from support import FakeSocketObject
from support import fake_socket_constructor
import hiredis
import functools
import random
Expand All @@ -20,37 +22,6 @@
for x in range(0, 1000000)]))


class FakeSocketObject(object):

def __init__(self, *args, **kwargs):
cls = socket._socket.socket
self.__socket = cls(*args, **kwargs)

def setblocking(self, *args, **kwargs):
return self.__socket.setblocking(*args, **kwargs)

def connect(self, *args, **kwargs):
return self.__socket.connect(*args, **kwargs)

def fileno(self, *args, **kwargs):
return self.__socket.fileno(*args, **kwargs)

def close(self, *args, **kwargs):
return self.__socket.close(*args, **kwargs)

def getsockopt(self, *args, **kwargs):
return self.__socket.getsockopt(*args, **kwargs)

def setsockopt(self, *args, **kwargs):
return self.__socket.setsockopt(*args, **kwargs)

def recv(self, *args, **kwargs):
return self.__socket.recv(*args, **kwargs)

def send(self, *args, **kwargs):
return self.__socket.send(*args, **kwargs)


class FakeSocketObject1(FakeSocketObject):

def connect(self, *args, **kwargs):
Expand Down Expand Up @@ -94,10 +65,6 @@ def recv(self, *args, **kwargs):
return FakeSocketObject.recv(self, *args, **kwargs)


def fake_socket_constructor(cls, *args, **kwargs):
return cls(*args, **kwargs)


class AbstractConnectionTestCase(tornado.testing.AsyncTestCase):

def setUp(self):
Expand Down
1 change: 1 addition & 0 deletions tornadis/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
DEFAULT_HOST = '127.0.0.1'
DEFAULT_PORT = 6379
DEFAULT_CONNECT_TIMEOUT = 20
DEFAULT_READ_TIMEOUT = 0
DEFAULT_READ_PAGE_SIZE = 65536
DEFAULT_WRITE_PAGE_SIZE = 65536

Expand Down
20 changes: 19 additions & 1 deletion tornadis/connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
import tornadis
import errno
import logging
from datetime import datetime

# Stolen from tornado/iostream.py
_ERRNO_WOULDBLOCK = (errno.EWOULDBLOCK, errno.EAGAIN)
Expand Down Expand Up @@ -50,6 +51,9 @@ class Connection(object):
tcp_nodelay (boolean): set TCP_NODELAY on socket.
aggressive_write (boolean): try to minimize write latency over
global throughput (default False).
read_timeout (int): timeout (in seconds) to read something on
the socket (if nothing is read during this time, the
connection is closed) (default: 0 means no timeout)
"""

def __init__(self, read_callback, close_callback,
Expand All @@ -58,7 +62,9 @@ def __init__(self, read_callback, close_callback,
read_page_size=tornadis.DEFAULT_READ_PAGE_SIZE,
write_page_size=tornadis.DEFAULT_WRITE_PAGE_SIZE,
connect_timeout=tornadis.DEFAULT_CONNECT_TIMEOUT,
tcp_nodelay=False, aggressive_write=False, ioloop=None):
tcp_nodelay=False, aggressive_write=False,
read_timeout=tornadis.DEFAULT_READ_TIMEOUT,
ioloop=None):
"""Constructor.
Args:
Expand All @@ -76,6 +82,9 @@ def __init__(self, read_callback, close_callback,
tcp_nodelay (boolean): set TCP_NODELAY on socket.
aggressive_write (boolean): try to minimize write latency over
global throughput (default False).
read_timeout (int): timeout (in seconds) to read something on
the socket (if nothing is read during this time, the
connection is closed) (default: 0 means no timeout)
ioloop (IOLoop): the tornado ioloop to use.
"""
self.host = host
Expand All @@ -91,10 +100,12 @@ def __init__(self, read_callback, close_callback,
self.read_page_size = read_page_size
self.write_page_size = write_page_size
self.connect_timeout = connect_timeout
self.read_timeout = read_timeout
self.tcp_nodelay = tcp_nodelay
self.aggressive_write = aggressive_write
self._write_buffer = WriteBuffer()
self._listened_events = 0
self._last_read = datetime.now()

def _redis_server(self):
if self.unix_domain_socket:
Expand Down Expand Up @@ -163,6 +174,11 @@ def _on_every_second(self):
dt = self._state.get_last_state_change_timedelta()
if dt.total_seconds() > self.connect_timeout:
self.disconnect()
if self.read_timeout > 0:
dt = datetime.now() - self._last_read
if dt.total_seconds() > self.read_timeout:
LOG.warning("read timeout => disconnecting")
self.disconnect()

def _register_or_update_event_handler(self, write=True):
if write:
Expand Down Expand Up @@ -236,6 +252,8 @@ def _handle_events(self, fd, event):
def _handle_read(self):
chunk = self._read(self.read_page_size)
if chunk is not None:
if self.read_timeout > 0:
self._last_read = datetime.now()
self._read_callback(chunk)

def _handle_write(self):
Expand Down

0 comments on commit 4d44173

Please sign in to comment.