Skip to content

Commit

Permalink
zmq test: fix flakiness by using more robust sync method
Browse files Browse the repository at this point in the history
After connecting the subscriber sockets to the node, there is no
guarantee that the node's zmq publisher interfaces are ready yet, which
means that potentially the first expected notification messages could
get lost and the test fails. Currently this is handled by just waiting
for a short period of time (200ms), which works most of the time but is
still problematic, as in some rare cases the setup time takes much
longer, even in the range of multiple seconds.

The solution in this commit approaches the problem by using a more
robust method of syncing up, originally proposed by instagibbs:
    1. Generate a block on the node
    2. Try to receive a notification on all subscribers
    3. If all subscribers get a message within the timeout (1 second),
       we are done, otherwise repeat starting from step 1
  • Loading branch information
theStack committed Feb 9, 2021
1 parent 8666033 commit 5c65463
Showing 1 changed file with 37 additions and 12 deletions.
49 changes: 37 additions & 12 deletions test/functional/interface_zmq.py
Expand Up @@ -87,23 +87,45 @@ def run_test(self):

# Restart node with the specified zmq notifications enabled, subscribe to
# all of them and return the corresponding ZMQSubscriber objects.
def setup_zmq_test(self, services, recv_timeout=60, connect_nodes=False):
def setup_zmq_test(self, services, *, recv_timeout=60, sync_blocks=True):
subscribers = []
for topic, address in services:
socket = self.ctx.socket(zmq.SUB)
socket.set(zmq.RCVTIMEO, recv_timeout*1000)
subscribers.append(ZMQSubscriber(socket, topic.encode()))

self.restart_node(0, ["-zmqpub%s=%s" % (topic, address) for topic, address in services])

if connect_nodes:
self.connect_nodes(0, 1)

for i, sub in enumerate(subscribers):
sub.socket.connect(services[i][1])

# Relax so that the subscribers are ready before publishing zmq messages
sleep(0.2)
# Ensure that all zmq publisher notification interfaces are ready by
# running the following "sync up" procedure:
# 1. Generate a block on the node
# 2. Try to receive a notification on all subscribers
# 3. If all subscribers get a message within the timeout (1 second),
# we are done, otherwise repeat starting from step 1
for sub in subscribers:
sub.socket.set(zmq.RCVTIMEO, 1000)
while True:
self.nodes[0].generate(1)
recv_failed = False
for sub in subscribers:
try:
sub.receive()
except zmq.error.Again:
self.log.debug("Didn't receive sync-up notification, trying again.")
recv_failed = True
if not recv_failed:
self.log.debug("ZMQ sync-up completed, all subscribers are ready.")
break

# set subscriber's desired timeout for the test
for sub in subscribers:
sub.socket.set(zmq.RCVTIMEO, recv_timeout*1000)

self.connect_nodes(0, 1)
if sync_blocks:
self.sync_blocks()

return subscribers

Expand All @@ -113,9 +135,7 @@ def test_basic(self):
self.restart_node(0, ["-zmqpubrawtx=foo", "-zmqpubhashtx=bar"])

address = 'tcp://127.0.0.1:28332'
subs = self.setup_zmq_test(
[(topic, address) for topic in ["hashblock", "hashtx", "rawblock", "rawtx"]],
connect_nodes=True)
subs = self.setup_zmq_test([(topic, address) for topic in ["hashblock", "hashtx", "rawblock", "rawtx"]])

hashblock = subs[0]
hashtx = subs[1]
Expand Down Expand Up @@ -192,6 +212,7 @@ def test_reorg(self):
hashblock, hashtx = self.setup_zmq_test(
[(topic, address) for topic in ["hashblock", "hashtx"]],
recv_timeout=2) # 2 second timeout to check end of notifications
self.disconnect_nodes(0, 1)

# Generate 1 block in nodes[0] with 1 mempool tx and receive all notifications
payment_txid = self.nodes[0].sendtoaddress(self.nodes[0].getnewaddress(), 1.0)
Expand Down Expand Up @@ -240,6 +261,7 @@ def test_sequence(self):
"""
self.log.info("Testing 'sequence' publisher")
[seq] = self.setup_zmq_test([("sequence", "tcp://127.0.0.1:28333")])
self.disconnect_nodes(0, 1)

# Mempool sequence number starts at 1
seq_num = 1
Expand Down Expand Up @@ -390,7 +412,7 @@ def test_mempool_sync(self):
return

self.log.info("Testing 'mempool sync' usage of sequence notifier")
[seq] = self.setup_zmq_test([("sequence", "tcp://127.0.0.1:28333")], connect_nodes=True)
[seq] = self.setup_zmq_test([("sequence", "tcp://127.0.0.1:28333")])

# In-memory counter, should always start at 1
next_mempool_seq = self.nodes[0].getrawmempool(mempool_sequence=True)["mempool_sequence"]
Expand Down Expand Up @@ -490,10 +512,13 @@ def test_mempool_sync(self):

def test_multiple_interfaces(self):
# Set up two subscribers with different addresses
# (note that after the reorg test, syncing would fail due to different
# chain lengths on node0 and node1; for this test we only need node0, so
# we can disable syncing blocks on the setup)
subscribers = self.setup_zmq_test([
("hashblock", "tcp://127.0.0.1:28334"),
("hashblock", "tcp://127.0.0.1:28335"),
])
], sync_blocks=False)

# Generate 1 block in nodes[0] and receive all notifications
self.nodes[0].generatetoaddress(1, ADDRESS_BCRT1_UNSPENDABLE)
Expand Down

0 comments on commit 5c65463

Please sign in to comment.