diff --git a/examples/callbacks.py b/examples/callbacks.py index e998ec0..22c0714 100644 --- a/examples/callbacks.py +++ b/examples/callbacks.py @@ -5,7 +5,7 @@ def ping_callback(result): - if not isinstance(result, tornadis.ConnectionError): + if not isinstance(result, tornadis.TornadisException): print result diff --git a/examples/context_manager.py b/examples/context_manager.py index 86b6cda..85067c6 100644 --- a/examples/context_manager.py +++ b/examples/context_manager.py @@ -9,7 +9,7 @@ def ping_redis(num): # it will be automatically released to the pool thanks to the # "with" keyword reply = yield client.call("PING") - if not isinstance(reply, tornadis.ConnectionError): + if not isinstance(reply, tornadis.TornadisException): print("reply #%i : %s" % (num, reply)) diff --git a/examples/coroutines.py b/examples/coroutines.py index 0a8b3c6..f778d7b 100644 --- a/examples/coroutines.py +++ b/examples/coroutines.py @@ -7,7 +7,8 @@ @tornado.gen.coroutine def talk_to_redis(): result = yield client.call("PING") - print "Result: %s" % result + if not isinstance(result, tornadis.TornadisException): + print "Result: %s" % result loop = tornado.ioloop.IOLoop.instance() diff --git a/examples/web.py b/examples/web.py index 86053ca..63edada 100644 --- a/examples/web.py +++ b/examples/web.py @@ -14,7 +14,7 @@ class HelloHandler(RequestHandler): def get(self): with (yield POOL.connected_client()) as client: reply = yield client.call("PING") - if not isinstance(reply, tornadis.ConnectionError): + if not isinstance(reply, tornadis.TornadisException): self.write("Hello, %s" % reply) self.finish() diff --git a/tests/test_client.py b/tests/test_client.py index fc842e6..6a21757 100644 --- a/tests/test_client.py +++ b/tests/test_client.py @@ -7,7 +7,7 @@ import functools from tornadis.client import Client -from tornadis.exceptions import ConnectionError +from tornadis.exceptions import ConnectionError, ClientError from support import test_redis_or_raise_skiptest @@ -34,6 +34,14 @@ def test_ping(self): self.assertEquals(res, b"PONG") c.disconnect() + @tornado.testing.gen_test + def test_reply_error(self): + c = Client() + yield c.connect() + res = yield c.call('BADCOMMAND') + self.assertTrue(isinstance(res, ClientError)) + c.disconnect() + @tornado.testing.gen_test def test_autoconnect_future(self): c = Client(autoconnect=True) diff --git a/tornadis/__init__.py b/tornadis/__init__.py index 89899aa..e81135a 100644 --- a/tornadis/__init__.py +++ b/tornadis/__init__.py @@ -15,7 +15,8 @@ from tornadis.pubsub import PubSubClient from tornadis.pool import ClientPool from tornadis.pipeline import Pipeline -from tornadis.exceptions import ConnectionError, ClientError +from tornadis.exceptions import ConnectionError, ClientError, TornadisException __all__ = ['Client', 'ClientPool', 'Pipeline', - 'ConnectionError', 'ClientError', 'PubSubClient', 'WriteBuffer'] + 'ConnectionError', 'ClientError', 'TornadisException', + 'PubSubClient', 'WriteBuffer'] diff --git a/tornadis/client.py b/tornadis/client.py index 8b09ec9..fbe8e18 100644 --- a/tornadis/client.py +++ b/tornadis/client.py @@ -9,6 +9,7 @@ import hiredis import collections import functools +import logging from tornadis.connection import Connection from tornadis.pipeline import Pipeline @@ -17,6 +18,9 @@ from tornadis.exceptions import ConnectionError, ClientError +LOG = logging.getLogger(__name__) + + def discard_reply_cb(reply): pass @@ -82,7 +86,7 @@ def connect(self): cb2 = self._close_callback self.__callback_queue = collections.deque() self._reply_list = [] - self.__reader = hiredis.Reader() + self.__reader = hiredis.Reader(replyError=ClientError) kwargs = self.connection_kwargs self.__connection = Connection(cb1, cb2, **kwargs) return self.__connection.connect() @@ -119,21 +123,26 @@ def _read_callback(self, data=None): Args: data (str): string (buffer) read on the socket. """ - if data is not None: - self.__reader.feed(data) - while True: - reply = self.__reader.gets() - if reply is not False: - try: - callback = self.__callback_queue.popleft() - # normal client (1 reply = 1 callback) - callback(reply) - except IndexError: - # pubsub clients - self._reply_list.append(reply) - self._condition.notify_all() - else: - break + try: + if data is not None: + self.__reader.feed(data) + while True: + reply = self.__reader.gets() + if reply is not False: + try: + callback = self.__callback_queue.popleft() + # normal client (1 reply = 1 callback) + callback(reply) + except IndexError: + # pubsub clients + self._reply_list.append(reply) + self._condition.notify_all() + else: + break + except hiredis.ProtocolError: + # something nasty occured (corrupt stream => no way to recover) + LOG.warning("corrupted stream => disconnect") + self.disconnect() def call(self, *args, **kwargs): """Calls a redis command and returns a Future of the reply. diff --git a/tornadis/pubsub.py b/tornadis/pubsub.py index d7bf175..6d4c246 100644 --- a/tornadis/pubsub.py +++ b/tornadis/pubsub.py @@ -6,13 +6,10 @@ import tornado.ioloop import tornado.gen -import logging from tornadis.client import Client from tornadis.exceptions import ConnectionError, ClientError -LOG = logging.getLogger() - class PubSubClient(Client): """High level specific object to interact with pubsub redis. @@ -162,4 +159,3 @@ def pubsub_pop_message(self, deadline=None): except IndexError: pass raise tornado.gen.Return(reply) -