Skip to content
This repository has been archived by the owner on Sep 23, 2020. It is now read-only.

Commit

Permalink
Draft heartbeat support
Browse files Browse the repository at this point in the history
  • Loading branch information
oldpatricka committed Jan 4, 2013
1 parent 2696792 commit 6b87f28
Show file tree
Hide file tree
Showing 3 changed files with 56 additions and 5 deletions.
29 changes: 26 additions & 3 deletions dashi/__init__.py
Expand Up @@ -6,7 +6,8 @@
import sys import sys
import logging import logging


from kombu.connection import BrokerConnection from datetime import datetime, timedelta
from kombu.connection import Connection
from kombu.messaging import Consumer from kombu.messaging import Consumer
from kombu.pools import connections, producers from kombu.pools import connections, producers
from kombu.entity import Queue, Exchange from kombu.entity import Queue, Exchange
Expand All @@ -16,6 +17,7 @@


log = logging.getLogger(__name__) log = logging.getLogger(__name__)


DEFAULT_HEARTBEAT = 30


class DashiConnection(object): class DashiConnection(object):


Expand All @@ -24,7 +26,8 @@ class DashiConnection(object):
#TODO support connection info instead of uri #TODO support connection info instead of uri


def __init__(self, name, uri, exchange, durable=False, auto_delete=True, def __init__(self, name, uri, exchange, durable=False, auto_delete=True,
serializer=None, transport_options=None, ssl=False): serializer=None, transport_options=None, ssl=False,
heartbeat=DEFAULT_HEARTBEAT):
"""Set up a Dashi connection """Set up a Dashi connection
@param name: name of destination service queue used by consumers @param name: name of destination service queue used by consumers
Expand All @@ -38,7 +41,9 @@ def __init__(self, name, uri, exchange, durable=False, auto_delete=True,
@param transport_options: custom parameter dict for the transport backend @param transport_options: custom parameter dict for the transport backend
""" """


self._conn = BrokerConnection(uri, transport_options=transport_options, ssl=ssl) self._heartbeat_interval = heartbeat
self._conn = Connection(uri, transport_options=transport_options,
ssl=ssl, heartbeat=self._heartbeat_interval)
self._name = name self._name = name
self._exchange_name = exchange self._exchange_name = exchange
self._exchange = Exchange(name=exchange, type='direct', self._exchange = Exchange(name=exchange, type='direct',
Expand Down Expand Up @@ -216,6 +221,7 @@ def __init__(self, dashi, connection, name, exchange):
self._ops = {} self._ops = {}
self._cancelled = False self._cancelled = False
self._consumer_lock = threading.Lock() self._consumer_lock = threading.Lock()
self._last_heartbeat_check = datetime.min


self.connect() self.connect()


Expand Down Expand Up @@ -272,6 +278,9 @@ def _consume_one(self, timeout=None):


# keep trying until a single event is drained or timeout hit # keep trying until a single event is drained or timeout hit
while not self._cancelled: while not self._cancelled:

self.heartbeat()

try: try:
self._conn.drain_events(timeout=inner_timeout) self._conn.drain_events(timeout=inner_timeout)
break break
Expand All @@ -285,6 +294,20 @@ def _consume_one(self, timeout=None):
if elapsed + inner_timeout > timeout: if elapsed + inner_timeout > timeout:
inner_timeout = timeout - elapsed inner_timeout = timeout - elapsed


def heartbeat(self):
time_between_tics = timedelta(seconds=self._dashi._heartbeat_interval / 2)

if self._dashi.consumer_timeout > time_between_tics.seconds:
msg = "dashi consumer timeout (%s) must be half or smaller than the heartbeat interval %s" % (
self._dashi.consumer_timeout, self._dashi._heartbeat_interval)

raise DashiError(msg)

if datetime.now() - self._last_heartbeat_check > time_between_tics:
self._last_heartbeat_check = datetime.now()
self._conn.heartbeat_check()


def cancel(self, block=True): def cancel(self, block=True):
self._cancelled = True self._cancelled = True
if block: if block:
Expand Down
28 changes: 28 additions & 0 deletions dashi/tests/test_dashi.py
Expand Up @@ -6,12 +6,14 @@
import logging import logging
import time import time


from mock import Mock
from nose.plugins.skip import SkipTest from nose.plugins.skip import SkipTest
from kombu.pools import connections from kombu.pools import connections
import kombu.pools import kombu.pools


import dashi import dashi
import dashi.util import dashi.util
from dashi.exceptions import DashiError
from dashi.tests.util import who_is_calling from dashi.tests.util import who_is_calling


log = logging.getLogger(__name__) log = logging.getLogger(__name__)
Expand Down Expand Up @@ -366,6 +368,32 @@ def test_handle_sender_kwarg(self):
receiver.disconnect() receiver.disconnect()
assert_kombu_pools_empty() assert_kombu_pools_empty()


def test_heartbeats(self):

receiver = TestReceiver(uri=self.uri, exchange="x1",
transport_options=self.transport_options)
receiver.conn.consumer_timeout = 100

receiver.handle("test1", "hello", sender_kwarg="sender")

caught_exp = None
try:
receiver.consume(1)
except DashiError, e:
caught_exp = e
assert caught_exp

receiver.conn.consumer_timeout = 1
receiver.consume(1)

receiver.clear()

receiver.cancel()

receiver.disconnect()
assert_kombu_pools_empty()


def test_exceptions(self): def test_exceptions(self):
class CustomNotFoundError(Exception): class CustomNotFoundError(Exception):
pass pass
Expand Down
4 changes: 2 additions & 2 deletions setup.py
Expand Up @@ -33,8 +33,8 @@
"download_url" : "http://www.nimbusproject.org/downloads/dashi-%s.tar.gz" % VERSION, "download_url" : "http://www.nimbusproject.org/downloads/dashi-%s.tar.gz" % VERSION,
} }


install_requires = ['kombu>=2.1.2,<2.5.0', 'pyyaml'] install_requires = ['kombu>=2.5', 'pyyaml']
tests_require = ["nose"] tests_require = ["nose", "mock"]


from setuptools import setup, find_packages from setuptools import setup, find_packages
setupdict['packages'] = find_packages() setupdict['packages'] = find_packages()
Expand Down

0 comments on commit 6b87f28

Please sign in to comment.