Skip to content
This repository was archived by the owner on Jan 13, 2021. It is now read-only.

try to avoid deadlock #280

Merged
merged 1 commit into from
Jun 10, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
59 changes: 16 additions & 43 deletions hyper/http20/connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -139,36 +139,9 @@ def __init__(self, host, port=None, secure=None, window_manager=None,

# Concurrency
#
# Use one lock (_lock) to synchronize any interaction with global
# connection state, e.g. stream creation/deletion.
#
# It's ok to use the same in lock all these cases as they occur at
# different/linked points in the connection's lifecycle.
#
# Use another 2 locks (_write_lock, _read_lock) to synchronize
# - _send_cb
# - _recv_cb
# respectively.
#
# I.e, send/recieve on the connection and its streams are serialized
# separately across the threads accessing the connection. This is a
# simple way of providing thread-safety.
#
# _write_lock and _read_lock synchronize all interactions between
# streams and the connnection. There is a third I/O callback,
# _close_stream, passed to a stream's constructor. It does not need to
# be synchronized, it uses _send_cb internally (which is serialized);
# its other activity (safe deletion of the stream from self.streams)
# does not require synchronization.
#
# _read_lock may be acquired when already holding the _write_lock,
# when they both held it is always by acquiring _write_lock first.
#
# Either _read_lock or _write_lock may be acquired whilst holding _lock
# which should always be acquired before either of the other two.
# Use one universal lock (_lock) to synchronize all interaction
# with global connection state, _send_cb and _recv_cb.
self._lock = threading.RLock()
self._write_lock = threading.RLock()
self._read_lock = threading.RLock()

# Create the mutable state.
self.__wm_class = window_manager or FlowControlManager
Expand Down Expand Up @@ -232,7 +205,7 @@ def ping(self, opaque_data):
:returns: Nothing
"""
self.connect()
with self._write_lock:
with self._lock:
with self._conn as conn:
conn.ping(to_bytestring(opaque_data))
self._send_outstanding_data()
Expand Down Expand Up @@ -271,7 +244,7 @@ def request(self, method, url, body=None, headers=None):
# If threads interleave these operations, it could result in messages
# being sent in the wrong order, which can lead to the out-of-order
# messages with lower stream IDs being closed prematurely.
with self._write_lock:
with self._lock:
stream_id = self.putrequest(method, url)

default_headers = (':method', ':scheme', ':authority', ':path')
Expand Down Expand Up @@ -464,10 +437,10 @@ def _send_outstanding_data(self, tolerate_peer_gone=False,
send_empty=True):
# Concurrency
#
# Hold _write_lock; getting and writing data from _conn is synchronized
# Hold _lock; getting and writing data from _conn is synchronized
#
# I/O occurs while the lock is held; waiting threads will see a delay.
with self._write_lock:
with self._lock:
with self._conn as conn:
data = conn.data_to_send()
if data or send_empty:
Expand Down Expand Up @@ -557,9 +530,9 @@ def endheaders(self, message_body=None, final=False, stream_id=None):

# Concurrency:
#
# Hold _write_lock: synchronize access to the connection's HPACK
# Hold _lock: synchronize access to the connection's HPACK
# encoder and decoder and the subsquent write to the connection
with self._write_lock:
with self._lock:
stream.send_headers(headers_only)

# Send whatever data we have.
Expand Down Expand Up @@ -622,10 +595,10 @@ def _send_cb(self, data, tolerate_peer_gone=False):
"""
# Concurrency
#
# Hold _write_lock: ensures only writer at a time
# Hold _lock: ensures only writer at a time
#
# I/O occurs while the lock is held; waiting threads will see a delay.
with self._write_lock:
with self._lock:
try:
self._sock.sendall(data)
except socket.error as e:
Expand All @@ -640,12 +613,12 @@ def _adjust_receive_window(self, frame_len):
"""
# Concurrency
#
# Hold _write_lock; synchronize the window manager update and the
# Hold _lock; synchronize the window manager update and the
# subsequent potential write to the connection
#
# I/O may occur while the lock is held; waiting threads may see a
# delay.
with self._write_lock:
with self._lock:
increment = self.window_manager._handle_frame(frame_len)

if increment:
Expand All @@ -667,7 +640,7 @@ def _single_read(self):
# Synchronizes reading the data
#
# I/O occurs while the lock is held; waiting threads will see a delay.
with self._read_lock:
with self._lock:
if self._sock is None:
raise ConnectionError('tried to read after connection close')
self._sock.fill()
Expand Down Expand Up @@ -761,7 +734,7 @@ def _recv_cb(self, stream_id=0):
# re-acquired in the calls to self._single_read.
#
# I/O occurs while the lock is held; waiting threads will see a delay.
with self._read_lock:
with self._lock:
log.debug('recv for stream %d with %s already present',
stream_id,
self.recent_recv_streams)
Expand Down Expand Up @@ -812,11 +785,11 @@ def _send_rst_frame(self, stream_id, error_code):
"""
# Concurrency
#
# Hold _write_lock; synchronize generating the reset frame and writing
# Hold _lock; synchronize generating the reset frame and writing
# it
#
# I/O occurs while the lock is held; waiting threads will see a delay.
with self._write_lock:
with self._lock:
with self._conn as conn:
conn.reset_stream(stream_id, error_code=error_code)
self._send_outstanding_data()
Expand Down
40 changes: 39 additions & 1 deletion test/test_integration.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
import pytest
from contextlib import contextmanager
from mock import patch
from concurrent.futures import ThreadPoolExecutor, TimeoutError
from h2.frame_buffer import FrameBuffer
from hyper.compat import ssl
from hyper.contrib import HTTP20Adapter
Expand Down Expand Up @@ -1039,7 +1040,6 @@ def test_version_after_http_upgrade(self):

def socket_handler(listener):
sock = listener.accept()[0]

# We should get the initial request.
data = b''
while not data.endswith(b'\r\n\r\n'):
Expand Down Expand Up @@ -1089,6 +1089,44 @@ def socket_handler(listener):

self.tear_down()

def test_connection_and_send_simultaneously(self):
# Since deadlock occurs probabilistic,
# It still has deadlock probability
# even the testcase is passed.
self.set_up()

recv_event = threading.Event()

def socket_handler(listener):
sock = listener.accept()[0]

receive_preamble(sock)
sock.recv(65535)

recv_event.set()
sock.close()

def do_req(conn):
conn.request('GET', '/')
recv_event.wait()

def do_connect(conn):
conn.connect()

self._start_server(socket_handler)
conn = self.get_connection()

pool = ThreadPoolExecutor(max_workers=2)
pool.submit(do_connect, conn)
f = pool.submit(do_req, conn)

try:
f.result(timeout=10)
except TimeoutError:
assert False

self.tear_down()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd love to see a comment on this test that points out that it's fundamentally probabilistic: it'll only fail transiently, and that we need to accept that. That will help us take it more seriously if it does transiently fail.



@patch('hyper.http20.connection.H2_NPN_PROTOCOLS', PROTOCOLS)
class TestRequestsAdapter(SocketLevelTest):
Expand Down