Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP
Browse files

More tests

  • Loading branch information...
commit 6f42c9af673423d71bea63b18ceba77a8a752f71 1 parent 46ad0e8
David LaBissoniere labisso authored
Showing with 53 additions and 11 deletions.
  1. +43 −11 dashi/tests/test_dashi.py
  2. +5 −0 dashi/tests/util.py
  3. +5 −0 dashi/util.py
54 dashi/tests/test_dashi.py
View
@@ -1,4 +1,3 @@
-import socket
import unittest
import threading
from functools import partial
@@ -8,13 +7,11 @@
import time
import sys
-from nose.plugins.skip import SkipTest
from kombu.pools import connections
import kombu.pools
import dashi
import dashi.util
-from dashi.exceptions import DashiError
from dashi.tests.util import who_is_calling, SocatProxy
log = logging.getLogger(__name__)
@@ -427,6 +424,16 @@ class CustomNotFoundError(Exception):
receiver.disconnect()
assert_kombu_pools_empty()
+ def test_call_timeout(self):
+ conn = dashi.DashiConnection("s1", self.uri, "x1",
+ transport_options=self.transport_options)
+
+ countdown = dashi.util.Countdown(0.6)
+ # call with no receiver should timeout
+ self.assertRaises(conn.timeout_error, conn.call, "notarealname", "test", timeout=0.5)
+ delta = countdown.delta_seconds
+ assert 0 < delta < 1, "delta: %s" % delta
+
class RabbitDashiConnectionTests(DashiConnectionTests):
"""The base dashi tests run on rabbit, plus some extras which are
@@ -474,12 +481,6 @@ def _thread_erroneous_replies(self, dashiconn, count):
log.exception("Got expected exception replying to a nonexistent exchange")
def test_pool_problems(self):
- # raise SkipTest("failing test that exposes problem in dashi RPC strategy")
-
- # this test fails (I think) because replies are sent to a nonexistent
- # exchange. Rabbit freaks out about this and poisons the channel.
- # Eventually the sender thread comes across the poisoned channel and
- # its send fails. How to fix??
receiver = TestReceiver(uri=self.uri, exchange="x1",
transport_options=self.transport_options)
@@ -505,6 +506,10 @@ def test_pool_problems(self):
self.assertEqual(len(receiver.received), 100)
+ receiver.cancel()
+ receiver.join_consumer_thread()
+ receiver.disconnect()
+
class RabbitProxyDashiConnectionTests(RabbitDashiConnectionTests):
"""Test rabbitmq dashi through a TCP proxy that we can kill to simulate failures
@@ -622,9 +627,13 @@ def test_fire_kill_pool_connection(self):
assert_kombu_pools_empty()
def test_receiver_kill_connection(self):
+ def errback():
+ log.debug("Errback called", exc_info=True)
+
# restart a consumer's connection. it should reconnect and keep consuming
receiver = TestReceiver(uri=self.uri, exchange="x1",
- transport_options=self.transport_options, retry=retry)
+ transport_options=self.transport_options, retry=retry,
+ errback=errback)
receiver.handle("test", "hats")
receiver.consume_in_thread()
@@ -682,7 +691,7 @@ def errback():
break
else:
self.proxy.start()
- time.sleep(3) # give it time to reconnect
+ time.sleep(2) # give it time to reconnect
self.proxy.stop()
assert event.is_set()
@@ -696,3 +705,26 @@ def errback():
assert_kombu_pools_empty()
+ def test_call_timeout_during_recovery(self):
+ conn = dashi.DashiConnection("s1", self.uri, "x1",
+ transport_options=self.transport_options)
+
+ got_timeout = threading.Event()
+
+ def doit():
+ self.assertRaises(conn.timeout_error, conn.call, "notarealname", "test", timeout=3)
+ got_timeout.set()
+
+ countdown = dashi.util.Countdown(4)
+
+ t = threading.Thread(target=doit)
+ t.daemon = True
+ t.start()
+ time.sleep(0.5)
+
+ try:
+ got_timeout.wait(5)
+ finally:
+ t.join()
+ delta = countdown.delta_seconds
+ assert 0 < delta < 1, "delta: %s" % delta
5 dashi/tests/util.py
View
@@ -5,6 +5,9 @@
import unittest
import signal
import socket
+import logging
+
+log = logging.getLogger(__name__)
def who_is_calling():
@@ -32,6 +35,7 @@ def start(self):
assert not self.process
src_arg = "TCP4-LISTEN:%d,fork,reuseaddr%s" % (self.port, self.source_options)
dest_arg = "TCP4:%s%s" % (self.destination, self.destination_options)
+ log.debug("Starting socat TCP proxy %s -> %s", self.port, self.address)
try:
self.process = subprocess.Popen(args=["socat", src_arg, dest_arg],
preexec_fn=os.setpgrp)
@@ -41,6 +45,7 @@ def start(self):
def stop(self):
if self.process and self.process.returncode is None:
+ log.debug("Stopping socat TCP proxy %s -> %s", self.port, self.address)
try:
os.killpg(self.process.pid, signal.SIGKILL)
except OSError, e:
5 dashi/util.py
View
@@ -30,6 +30,11 @@ def timeleft(self):
"""
return max(0.0, self.expires - self._time_func())
+ @property
+ def delta_seconds(self):
+ """difference in seconds. can be negative"""
+ return self.expires - self._time_func()
+
class RetryBackoff(object):
def __init__(self, max_attempts=0, backoff_start=0.5, backoff_step=0.5, backoff_max=30, timeout=None):
Please sign in to comment.
Something went wrong with that request. Please try again.