From e8950f259bd6d0a5a7d1b4fec0097463ab3071f6 Mon Sep 17 00:00:00 2001 From: Luke Bakken Date: Fri, 1 Feb 2019 12:24:18 -0800 Subject: [PATCH 1/2] Remove _adapter_get_write_buffer_size from Connection Fixes #1106 --- pika/adapters/base_connection.py | 10 ---------- pika/adapters/select_connection.py | 7 +++++++ pika/adapters/twisted_connection.py | 9 --------- pika/connection.py | 11 ----------- tests/unit/channel_tests.py | 2 -- tests/unit/connection_tests.py | 9 --------- 6 files changed, 7 insertions(+), 41 deletions(-) diff --git a/pika/adapters/base_connection.py b/pika/adapters/base_connection.py index b6cc175c8..35cc3cb99 100644 --- a/pika/adapters/base_connection.py +++ b/pika/adapters/base_connection.py @@ -400,16 +400,6 @@ def _adapter_emit_data(self, data): """ self._transport.write(data) - def _adapter_get_write_buffer_size(self): - """ - Subclasses must override this - - :return: Current size of output data buffered by the transport - :rtype: int - - """ - return self._transport.get_write_buffer_size() - def _proto_connection_made(self, transport): """Introduces transport to protocol after transport is connected. diff --git a/pika/adapters/select_connection.py b/pika/adapters/select_connection.py index b1fb27e49..a8fe5bfc1 100644 --- a/pika/adapters/select_connection.py +++ b/pika/adapters/select_connection.py @@ -137,6 +137,13 @@ def connection_factory(params): workflow=workflow, on_done=on_done) + def _adapter_get_write_buffer_size(self): + """ + :return: Current size of output data buffered by the transport + :rtype: int + """ + return self._transport.get_write_buffer_size() + class _Timeout(object): """Represents a timeout""" diff --git a/pika/adapters/twisted_connection.py b/pika/adapters/twisted_connection.py index ff0f00f2d..8d9851e8f 100644 --- a/pika/adapters/twisted_connection.py +++ b/pika/adapters/twisted_connection.py @@ -1147,15 +1147,6 @@ def _adapter_emit_data(self, data): """ self._transport.write(data) - def _adapter_get_write_buffer_size(self): - """Implement pure virtual - :py:ref:meth:`pika.connection.Connection._adapter_emit_data()` method. - - This method only belongs in SelectConnection, no others need it - and twisted transport doesn't expose it. - """ - raise NotImplementedError - def connection_made(self, transport): """Introduces transport to protocol after transport is connected. diff --git a/pika/connection.py b/pika/connection.py index 0c64fe4d8..e94385a55 100644 --- a/pika/connection.py +++ b/pika/connection.py @@ -1459,17 +1459,6 @@ def _adapter_emit_data(self, data): """ raise NotImplementedError - @abc.abstractmethod - def _adapter_get_write_buffer_size(self): - """ - Subclasses must override this - - :return: Current size of output data buffered by the transport - :rtype: int - - """ - raise NotImplementedError - def _add_channel_callbacks(self, channel_number): """Add the appropriate callbacks for the specified channel number. diff --git a/tests/unit/channel_tests.py b/tests/unit/channel_tests.py index a60af030d..aea2f1797 100644 --- a/tests/unit/channel_tests.py +++ b/tests/unit/channel_tests.py @@ -30,8 +30,6 @@ class ConnectionTemplate(connection.Connection): _adapter_connect_stream = connection.Connection._adapter_connect_stream _adapter_disconnect_stream = connection.Connection._adapter_disconnect_stream _adapter_emit_data = connection.Connection._adapter_emit_data - _adapter_get_write_buffer_size = ( - connection.Connection._adapter_get_write_buffer_size) _adapter_add_timeout = connection.Connection._adapter_add_timeout _adapter_remove_timeout = connection.Connection._adapter_remove_timeout _adapter_add_callback_threadsafe = ( diff --git a/tests/unit/connection_tests.py b/tests/unit/connection_tests.py index 995bfd7d2..13a0ba03e 100644 --- a/tests/unit/connection_tests.py +++ b/tests/unit/connection_tests.py @@ -56,9 +56,6 @@ def _adapter_add_callback_threadsafe(self, callback): def _adapter_emit_data(self, data): raise NotImplementedError - def _adapter_get_write_buffer_size(self): - raise NotImplementedError - class ConnectionTests(unittest.TestCase): # pylint: disable=R0904 def setUp(self): @@ -947,14 +944,8 @@ def test_blocked_connection_on_stream_terminated_removes_timer( ConstructibleConnection, '_adapter_emit_data', spec_set=connection.Connection._adapter_emit_data) - @mock.patch.object( - ConstructibleConnection, - '_adapter_get_write_buffer_size', - spec_set=connection.Connection._adapter_get_write_buffer_size, - return_value = 1000) def test_send_message_updates_frames_sent_and_bytes_sent( self, - _adapter_get_write_buffer_size, _adapter_emit_data): self.connection._flush_outbound = mock.Mock() self.connection._body_max_length = 10000 From ecc0e59e02f033ae5bd9b8e5cac1f39b36b09222 Mon Sep 17 00:00:00 2001 From: Luke Bakken Date: Fri, 1 Feb 2019 13:31:33 -0800 Subject: [PATCH 2/2] Rename _adapter_get_write_buffer_size to _get_write_buffer_size --- pika/adapters/blocking_connection.py | 2 +- pika/adapters/select_connection.py | 2 +- tests/unit/blocking_connection_tests.py | 6 +++--- tests/unit/heartbeat_tests.py | 3 --- 4 files changed, 5 insertions(+), 8 deletions(-) diff --git a/pika/adapters/blocking_connection.py b/pika/adapters/blocking_connection.py index 5dc7add0e..ffdd82e9d 100644 --- a/pika/adapters/blocking_connection.py +++ b/pika/adapters/blocking_connection.py @@ -513,7 +513,7 @@ def _flush_output(self, *waiters): is_done = (lambda: self._closed_result.ready or ((not self._impl._transport or - self._impl._adapter_get_write_buffer_size() == 0) and + self._impl._get_write_buffer_size() == 0) and (not waiters or any(ready() for ready in waiters)))) # Process I/O until our completion condition is satisfied diff --git a/pika/adapters/select_connection.py b/pika/adapters/select_connection.py index a8fe5bfc1..92d3526c6 100644 --- a/pika/adapters/select_connection.py +++ b/pika/adapters/select_connection.py @@ -137,7 +137,7 @@ def connection_factory(params): workflow=workflow, on_done=on_done) - def _adapter_get_write_buffer_size(self): + def _get_write_buffer_size(self): """ :return: Current size of output data buffered by the transport :rtype: int diff --git a/tests/unit/blocking_connection_tests.py b/tests/unit/blocking_connection_tests.py index e2ccc4357..eb37aa686 100644 --- a/tests/unit/blocking_connection_tests.py +++ b/tests/unit/blocking_connection_tests.py @@ -40,7 +40,7 @@ class SelectConnectionTemplate( _channels = None ioloop = None _transport = None - _adapter_get_write_buffer_size = None + _get_write_buffer_size = None class BlockingConnectionTests(unittest.TestCase): @@ -121,7 +121,7 @@ def test_flush_output(self, select_connection_class_mock): connection = blocking_connection.BlockingConnection('params') get_buffer_size_mock = mock.Mock( - name='_adapter_get_write_buffer_size', + name='_get_write_buffer_size', side_effect=[100, 50, 0], spec=nbio_interface.AbstractStreamTransport.get_write_buffer_size) @@ -129,7 +129,7 @@ def test_flush_output(self, select_connection_class_mock): spec_set=nbio_interface.AbstractStreamTransport) connection._impl._transport = transport_mock - connection._impl._adapter_get_write_buffer_size = get_buffer_size_mock + connection._impl._get_write_buffer_size = get_buffer_size_mock connection._flush_output(lambda: False, lambda: True) diff --git a/tests/unit/heartbeat_tests.py b/tests/unit/heartbeat_tests.py index 8a5240550..9665eabf9 100644 --- a/tests/unit/heartbeat_tests.py +++ b/tests/unit/heartbeat_tests.py @@ -40,9 +40,6 @@ def remove_timeout(self, timeout_id): def _adapter_emit_data(self, data): raise NotImplementedError - def _adapter_get_write_buffer_size(self): - raise NotImplementedError - def _adapter_add_callback_threadsafe(self, callback): raise NotImplementedError