Skip to content

Commit

Permalink
better error handling (#18)
Browse files Browse the repository at this point in the history
  • Loading branch information
thefab committed Aug 10, 2015
1 parent b21431f commit 4d0ff16
Show file tree
Hide file tree
Showing 7 changed files with 42 additions and 23 deletions.
2 changes: 1 addition & 1 deletion examples/callbacks.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@


def ping_callback(result):
if not isinstance(result, tornadis.ConnectionError):
if not isinstance(result, tornadis.TornadisException):
print result


Expand Down
2 changes: 1 addition & 1 deletion examples/context_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ def ping_redis(num):
# it will be automatically released to the pool thanks to the
# "with" keyword
reply = yield client.call("PING")
if not isinstance(reply, tornadis.ConnectionError):
if not isinstance(reply, tornadis.TornadisException):
print("reply #%i : %s" % (num, reply))


Expand Down
3 changes: 2 additions & 1 deletion examples/coroutines.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,8 @@
@tornado.gen.coroutine
def talk_to_redis():
result = yield client.call("PING")
print "Result: %s" % result
if not isinstance(result, tornadis.TornadisException):
print "Result: %s" % result


loop = tornado.ioloop.IOLoop.instance()
Expand Down
2 changes: 1 addition & 1 deletion examples/web.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ class HelloHandler(RequestHandler):
def get(self):
with (yield POOL.connected_client()) as client:
reply = yield client.call("PING")
if not isinstance(reply, tornadis.ConnectionError):
if not isinstance(reply, tornadis.TornadisException):
self.write("Hello, %s" % reply)
self.finish()

Expand Down
10 changes: 9 additions & 1 deletion tests/test_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
import functools

from tornadis.client import Client
from tornadis.exceptions import ConnectionError
from tornadis.exceptions import ConnectionError, ClientError
from support import test_redis_or_raise_skiptest


Expand All @@ -35,6 +35,14 @@ def test_ping(self):
self.assertEquals(res, b"PONG")
c.disconnect()

@tornado.testing.gen_test
def test_reply_error(self):
c = Client()
yield c.connect()
res = yield c.call('BADCOMMAND')
self.assertTrue(isinstance(res, ClientError))
c.disconnect()

@tornado.testing.gen_test
def test_autoconnect_future(self):
c = Client(autoconnect=True)
Expand Down
5 changes: 3 additions & 2 deletions tornadis/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,8 @@
from tornadis.pubsub import PubSubClient
from tornadis.pool import ClientPool
from tornadis.pipeline import Pipeline
from tornadis.exceptions import ConnectionError, ClientError
from tornadis.exceptions import ConnectionError, ClientError, TornadisException

__all__ = ['Client', 'ClientPool', 'Pipeline',
'ConnectionError', 'ClientError', 'PubSubClient', 'WriteBuffer']
'ConnectionError', 'ClientError', 'TornadisException',
'PubSubClient', 'WriteBuffer']
41 changes: 25 additions & 16 deletions tornadis/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import collections
import functools
import toro
import logging

from tornadis.connection import Connection
from tornadis.pipeline import Pipeline
Expand All @@ -17,6 +18,9 @@
from tornadis.exceptions import ConnectionError, ClientError


LOG = logging.getLogger(__name__)


def discard_reply_cb(reply):
pass

Expand Down Expand Up @@ -82,7 +86,7 @@ def connect(self):
cb2 = self._close_callback
self.__callback_queue = collections.deque()
self._reply_list = []
self.__reader = hiredis.Reader()
self.__reader = hiredis.Reader(replyError=ClientError)
kwargs = self.connection_kwargs
self.__connection = Connection(cb1, cb2, **kwargs)
return self.__connection.connect()
Expand Down Expand Up @@ -119,21 +123,26 @@ def _read_callback(self, data=None):
Args:
data (str): string (buffer) read on the socket.
"""
if data is not None:
self.__reader.feed(data)
while True:
reply = self.__reader.gets()
if reply is not False:
try:
callback = self.__callback_queue.popleft()
# normal client (1 reply = 1 callback)
callback(reply)
except IndexError:
# pubsub clients
self._reply_list.append(reply)
self._condition.notify_all()
else:
break
try:
if data is not None:
self.__reader.feed(data)
while True:
reply = self.__reader.gets()
if reply is not False:
try:
callback = self.__callback_queue.popleft()
# normal client (1 reply = 1 callback)
callback(reply)
except IndexError:
# pubsub clients
self._reply_list.append(reply)
self._condition.notify_all()
else:
break
except hiredis.ProtocolError:
# something nasty occured (corrupt stream => no way to recover)
LOG.warning("corrupted stream => disconnect")
self.disconnect()

def call(self, *args, **kwargs):
"""Calls a redis command and returns a Future of the reply.
Expand Down

0 comments on commit 4d0ff16

Please sign in to comment.