diff --git a/hyper/http20/connection.py b/hyper/http20/connection.py index 8b2a71e8..b81b60eb 100644 --- a/hyper/http20/connection.py +++ b/hyper/http20/connection.py @@ -139,36 +139,9 @@ def __init__(self, host, port=None, secure=None, window_manager=None, # Concurrency # - # Use one lock (_lock) to synchronize any interaction with global - # connection state, e.g. stream creation/deletion. - # - # It's ok to use the same in lock all these cases as they occur at - # different/linked points in the connection's lifecycle. - # - # Use another 2 locks (_write_lock, _read_lock) to synchronize - # - _send_cb - # - _recv_cb - # respectively. - # - # I.e, send/recieve on the connection and its streams are serialized - # separately across the threads accessing the connection. This is a - # simple way of providing thread-safety. - # - # _write_lock and _read_lock synchronize all interactions between - # streams and the connnection. There is a third I/O callback, - # _close_stream, passed to a stream's constructor. It does not need to - # be synchronized, it uses _send_cb internally (which is serialized); - # its other activity (safe deletion of the stream from self.streams) - # does not require synchronization. - # - # _read_lock may be acquired when already holding the _write_lock, - # when they both held it is always by acquiring _write_lock first. - # - # Either _read_lock or _write_lock may be acquired whilst holding _lock - # which should always be acquired before either of the other two. + # Use one universal lock (_lock) to synchronize all interaction + # with global connection state, _send_cb and _recv_cb. self._lock = threading.RLock() - self._write_lock = threading.RLock() - self._read_lock = threading.RLock() # Create the mutable state. self.__wm_class = window_manager or FlowControlManager @@ -232,7 +205,7 @@ def ping(self, opaque_data): :returns: Nothing """ self.connect() - with self._write_lock: + with self._lock: with self._conn as conn: conn.ping(to_bytestring(opaque_data)) self._send_outstanding_data() @@ -271,7 +244,7 @@ def request(self, method, url, body=None, headers=None): # If threads interleave these operations, it could result in messages # being sent in the wrong order, which can lead to the out-of-order # messages with lower stream IDs being closed prematurely. - with self._write_lock: + with self._lock: stream_id = self.putrequest(method, url) default_headers = (':method', ':scheme', ':authority', ':path') @@ -464,10 +437,10 @@ def _send_outstanding_data(self, tolerate_peer_gone=False, send_empty=True): # Concurrency # - # Hold _write_lock; getting and writing data from _conn is synchronized + # Hold _lock; getting and writing data from _conn is synchronized # # I/O occurs while the lock is held; waiting threads will see a delay. - with self._write_lock: + with self._lock: with self._conn as conn: data = conn.data_to_send() if data or send_empty: @@ -557,9 +530,9 @@ def endheaders(self, message_body=None, final=False, stream_id=None): # Concurrency: # - # Hold _write_lock: synchronize access to the connection's HPACK + # Hold _lock: synchronize access to the connection's HPACK # encoder and decoder and the subsquent write to the connection - with self._write_lock: + with self._lock: stream.send_headers(headers_only) # Send whatever data we have. @@ -622,10 +595,10 @@ def _send_cb(self, data, tolerate_peer_gone=False): """ # Concurrency # - # Hold _write_lock: ensures only writer at a time + # Hold _lock: ensures only writer at a time # # I/O occurs while the lock is held; waiting threads will see a delay. - with self._write_lock: + with self._lock: try: self._sock.sendall(data) except socket.error as e: @@ -640,12 +613,12 @@ def _adjust_receive_window(self, frame_len): """ # Concurrency # - # Hold _write_lock; synchronize the window manager update and the + # Hold _lock; synchronize the window manager update and the # subsequent potential write to the connection # # I/O may occur while the lock is held; waiting threads may see a # delay. - with self._write_lock: + with self._lock: increment = self.window_manager._handle_frame(frame_len) if increment: @@ -667,7 +640,7 @@ def _single_read(self): # Synchronizes reading the data # # I/O occurs while the lock is held; waiting threads will see a delay. - with self._read_lock: + with self._lock: if self._sock is None: raise ConnectionError('tried to read after connection close') self._sock.fill() @@ -761,7 +734,7 @@ def _recv_cb(self, stream_id=0): # re-acquired in the calls to self._single_read. # # I/O occurs while the lock is held; waiting threads will see a delay. - with self._read_lock: + with self._lock: log.debug('recv for stream %d with %s already present', stream_id, self.recent_recv_streams) @@ -812,11 +785,11 @@ def _send_rst_frame(self, stream_id, error_code): """ # Concurrency # - # Hold _write_lock; synchronize generating the reset frame and writing + # Hold _lock; synchronize generating the reset frame and writing # it # # I/O occurs while the lock is held; waiting threads will see a delay. - with self._write_lock: + with self._lock: with self._conn as conn: conn.reset_stream(stream_id, error_code=error_code) self._send_outstanding_data() diff --git a/test/test_integration.py b/test/test_integration.py index 72da6156..e3b1c827 100644 --- a/test/test_integration.py +++ b/test/test_integration.py @@ -14,6 +14,7 @@ import pytest from contextlib import contextmanager from mock import patch +from concurrent.futures import ThreadPoolExecutor, TimeoutError from h2.frame_buffer import FrameBuffer from hyper.compat import ssl from hyper.contrib import HTTP20Adapter @@ -1039,7 +1040,6 @@ def test_version_after_http_upgrade(self): def socket_handler(listener): sock = listener.accept()[0] - # We should get the initial request. data = b'' while not data.endswith(b'\r\n\r\n'): @@ -1089,6 +1089,44 @@ def socket_handler(listener): self.tear_down() + def test_connection_and_send_simultaneously(self): + # Since deadlock occurs probabilistic, + # It still has deadlock probability + # even the testcase is passed. + self.set_up() + + recv_event = threading.Event() + + def socket_handler(listener): + sock = listener.accept()[0] + + receive_preamble(sock) + sock.recv(65535) + + recv_event.set() + sock.close() + + def do_req(conn): + conn.request('GET', '/') + recv_event.wait() + + def do_connect(conn): + conn.connect() + + self._start_server(socket_handler) + conn = self.get_connection() + + pool = ThreadPoolExecutor(max_workers=2) + pool.submit(do_connect, conn) + f = pool.submit(do_req, conn) + + try: + f.result(timeout=10) + except TimeoutError: + assert False + + self.tear_down() + @patch('hyper.http20.connection.H2_NPN_PROTOCOLS', PROTOCOLS) class TestRequestsAdapter(SocketLevelTest):