Skip to content

Commit

Permalink
Limit time spent in _process_quorum_replication by loop_wait seconds
Browse files Browse the repository at this point in the history
  • Loading branch information
CyberDem0n committed Jul 18, 2023
1 parent b0d8b21 commit e6d251b
Show file tree
Hide file tree
Showing 2 changed files with 14 additions and 2 deletions.
12 changes: 10 additions & 2 deletions patroni/ha.py
Original file line number Diff line number Diff line change
Expand Up @@ -642,6 +642,8 @@ def _process_quorum_replication(self) -> None:
In case any of those steps causes an error we can just bail out and let next iteration rediscover the state
and retry necessary transitions.
"""
start_time = time.time()

min_sync = self.global_config.min_synchronous_nodes
sync_wanted = self.global_config.synchronous_node_count

Expand All @@ -652,6 +654,9 @@ def _process_quorum_replication(self) -> None:
if not sync:
return logger.warning("Updating sync state failed")

def _check_timeout(offset: float = 0) -> bool:
return time.time() - start_time + offset >= self.dcs.loop_wait

while True:
transition = 'break' # we need define transition value if `QuorumStateResolver` produced no changes
sync_state = self.state_handler.sync_handler.current_state(self.cluster)
Expand All @@ -664,6 +669,9 @@ def _process_quorum_replication(self) -> None:
active=sync_state.active,
sync_wanted=sync_wanted,
leader_wanted=self.state_handler.name):
if _check_timeout():
return

if transition == 'quorum':
logger.info("Setting leader to %s, quorum to %d of %d (%s)",
leader, num, len(nodes), ", ".join(sorted(nodes)))
Expand All @@ -680,8 +688,8 @@ def _process_quorum_replication(self) -> None:
" Commits will be delayed.", min_sync + 1, num)
num = min_sync
self.state_handler.sync_handler.set_synchronous_standby_names(nodes, num)
if transition != 'restart':
break
if transition != 'restart' or _check_timeout(1):
return
# synchronous_standby_names was transitioned from empty to non-empty and it may take
# some time for nodes to become synchronous. In this case we want to restart state machine
# hoping that we can update /sync key earlier than in loop_wait seconds.
Expand Down
4 changes: 4 additions & 0 deletions tests/test_ha.py
Original file line number Diff line number Diff line change
Expand Up @@ -1555,3 +1555,7 @@ def test_process_quorum_replication(self):
self.assertEqual(mock_write_sync.call_args_list[0][1], {'version': 0})
self.assertEqual(mock_set_sync.call_count, 1)
self.assertEqual(mock_set_sync.call_args_list[0][0], ('ANY 1 (*)',))

# Test that _process_quorum_replication doesn't take longer than loop_wait
with patch('time.time', Mock(side_effect=[30, 60, 90, 120])):
self.ha.process_sync_replication()

0 comments on commit e6d251b

Please sign in to comment.