Skip to content

Commit

Permalink
Merge branch 'kherrala-heartbeat-fix'
Browse files Browse the repository at this point in the history
  • Loading branch information
gmr committed Apr 29, 2015
2 parents 765107d + d7595ff commit 72d33bd
Show file tree
Hide file tree
Showing 6 changed files with 23 additions and 11 deletions.
3 changes: 1 addition & 2 deletions pika/adapters/base_connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -128,8 +128,7 @@ def _adapter_connect(self):

def _adapter_disconnect(self):
"""Invoked if the connection is being told to disconnect"""
if hasattr(self, 'heartbeat') and self.heartbeat is not None:
self.heartbeat.stop()
self._remove_heartbeat()
if self.socket:
self.socket.close()
self.socket = None
Expand Down
3 changes: 1 addition & 2 deletions pika/adapters/blocking_connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -219,8 +219,7 @@ def close(self, reply_code=200, reply_text='Normal shutdown'):
self._send_connection_close(reply_code, reply_text)
while self.is_closing:
self.process_data_events()
if self.heartbeat:
self.heartbeat.stop()
self._remove_heartbeat()
self._remove_connection_callbacks()
self._adapter_disconnect()

Expand Down
13 changes: 11 additions & 2 deletions pika/connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -589,6 +589,8 @@ def __init__(self,
on_open_error_callback or self._on_connection_error,
False)

self.heartbeat = None

# On connection callback
if on_open_callback:
self.add_on_open_callback(on_open_callback)
Expand Down Expand Up @@ -994,6 +996,14 @@ def _create_heartbeat_checker(self):
self.params.heartbeat)
return heartbeat.HeartbeatChecker(self, self.params.heartbeat)

def _remove_heartbeat(self):
"""Stop the heartbeat checker if it exists
"""
if self.heartbeat:
self.heartbeat.stop()
self.heartbeat = None

def _deliver_frame_to_channel(self, value):
"""Deliver the frame to the channel specified in the frame.
Expand Down Expand Up @@ -1223,8 +1233,7 @@ def _on_connection_closed(self, method_frame, from_adapter=False):
reply_code, reply_text = self.closing

# Stop the heartbeat checker if it exists
if self.heartbeat:
self.heartbeat.stop()
self._remove_heartbeat()

# If this did not come from the connection adapter, close the socket
if not from_adapter:
Expand Down
1 change: 1 addition & 0 deletions pika/heartbeat.py
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,7 @@ def _close_connection(self):
duration = self._max_idle_count * self._interval
text = HeartbeatChecker._STALE_CONNECTION % duration
self._connection.close(HeartbeatChecker._CONNECTION_FORCED, text)
self._connection._adapter_disconnect()
self._connection._on_disconnect(HeartbeatChecker._CONNECTION_FORCED,
text)

Expand Down
5 changes: 3 additions & 2 deletions tests/unit/connection_tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -374,12 +374,13 @@ def test_on_connection_closed(self):
method_frame.method = mock.Mock(spec=spec.Connection.Close)
method_frame.method.reply_code = 1
method_frame.method.reply_text = 'hello'
self.connection.heartbeat = mock.Mock()
heartbeat = mock.Mock()
self.connection.heartbeat = heartbeat
self.connection._adapter_disconnect = mock.Mock()
self.connection._on_connection_closed(method_frame, from_adapter=False)
#Check
self.assertTupleEqual((1, 'hello'), self.connection.closing)
self.connection.heartbeat.stop.assert_called_once_with()
heartbeat.stop.assert_called_once_with()
self.connection._adapter_disconnect.assert_called_once_with()

@mock.patch('pika.frame.decode_frame')
Expand Down
9 changes: 6 additions & 3 deletions tests/unit/heartbeat_tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -127,10 +127,13 @@ def test_connection_close(self):
self.obj._idle_byte_intervals = 3
self.obj._idle_heartbeat_intervals = 4
self.obj._close_connection()
reason = self.obj._STALE_CONNECTION % (self.obj._max_idle_count *
self.obj._interval)
self.mock_conn.close.assert_called_once_with(
self.obj._CONNECTION_FORCED, self.obj._STALE_CONNECTION % (
self.obj._max_idle_count * self.obj._interval
))
self.obj._CONNECTION_FORCED, reason)
self.mock_conn._on_disconnect.assert_called_once_with(
self.obj._CONNECTION_FORCED, reason)
self.mock_conn._adapter_disconnect.assert_called_once_with()

def test_has_received_data_false(self):
self.obj._bytes_received = 100
Expand Down

0 comments on commit 72d33bd

Please sign in to comment.