From e6d251bda060064e959c311a20527c6cd82fd23e Mon Sep 17 00:00:00 2001 From: Alexander Kukushkin Date: Tue, 18 Jul 2023 14:16:28 +0200 Subject: [PATCH] Limit time spent in _process_quorum_replication by loop_wait seconds --- patroni/ha.py | 12 ++++++++++-- tests/test_ha.py | 4 ++++ 2 files changed, 14 insertions(+), 2 deletions(-) diff --git a/patroni/ha.py b/patroni/ha.py index 24f8d7f4e..dccd68f2d 100644 --- a/patroni/ha.py +++ b/patroni/ha.py @@ -642,6 +642,8 @@ def _process_quorum_replication(self) -> None: In case any of those steps causes an error we can just bail out and let next iteration rediscover the state and retry necessary transitions. """ + start_time = time.time() + min_sync = self.global_config.min_synchronous_nodes sync_wanted = self.global_config.synchronous_node_count @@ -652,6 +654,9 @@ def _process_quorum_replication(self) -> None: if not sync: return logger.warning("Updating sync state failed") + def _check_timeout(offset: float = 0) -> bool: + return time.time() - start_time + offset >= self.dcs.loop_wait + while True: transition = 'break' # we need define transition value if `QuorumStateResolver` produced no changes sync_state = self.state_handler.sync_handler.current_state(self.cluster) @@ -664,6 +669,9 @@ def _process_quorum_replication(self) -> None: active=sync_state.active, sync_wanted=sync_wanted, leader_wanted=self.state_handler.name): + if _check_timeout(): + return + if transition == 'quorum': logger.info("Setting leader to %s, quorum to %d of %d (%s)", leader, num, len(nodes), ", ".join(sorted(nodes))) @@ -680,8 +688,8 @@ def _process_quorum_replication(self) -> None: " Commits will be delayed.", min_sync + 1, num) num = min_sync self.state_handler.sync_handler.set_synchronous_standby_names(nodes, num) - if transition != 'restart': - break + if transition != 'restart' or _check_timeout(1): + return # synchronous_standby_names was transitioned from empty to non-empty and it may take # some time for nodes to become synchronous. In this case we want to restart state machine # hoping that we can update /sync key earlier than in loop_wait seconds. diff --git a/tests/test_ha.py b/tests/test_ha.py index 473a6db99..116cc5af1 100644 --- a/tests/test_ha.py +++ b/tests/test_ha.py @@ -1555,3 +1555,7 @@ def test_process_quorum_replication(self): self.assertEqual(mock_write_sync.call_args_list[0][1], {'version': 0}) self.assertEqual(mock_set_sync.call_count, 1) self.assertEqual(mock_set_sync.call_args_list[0][0], ('ANY 1 (*)',)) + + # Test that _process_quorum_replication doesn't take longer than loop_wait + with patch('time.time', Mock(side_effect=[30, 60, 90, 120])): + self.ha.process_sync_replication()