diff --git a/docs/citus.rst b/docs/citus.rst index f404f47e9..24ad6ffb2 100644 --- a/docs/citus.rst +++ b/docs/citus.rst @@ -34,6 +34,8 @@ There are only a few simple rules you need to follow: After that you just need to start Patroni and it will handle the rest: +0. Patroni will set ``bootstrap.dcs.synchronous_mode`` to :ref:`quorum ` + if it is not explicitly set to any other value. 1. ``citus`` extension will be automatically added to ``shared_preload_libraries``. 2. If ``max_prepared_transactions`` isn't explicitly set in the global :ref:`dynamic configuration ` Patroni will @@ -77,36 +79,36 @@ It results in two major differences in :ref:`patronictl` behaviour when An example of :ref:`patronictl_list` output for the Citus cluster:: postgres@coord1:~$ patronictl list demo - + Citus cluster: demo ----------+--------------+---------+----+-----------+ - | Group | Member | Host | Role | State | TL | Lag in MB | - +-------+---------+-------------+--------------+---------+----+-----------+ - | 0 | coord1 | 172.27.0.10 | Replica | running | 1 | 0 | - | 0 | coord2 | 172.27.0.6 | Sync Standby | running | 1 | 0 | - | 0 | coord3 | 172.27.0.4 | Leader | running | 1 | | - | 1 | work1-1 | 172.27.0.8 | Sync Standby | running | 1 | 0 | - | 1 | work1-2 | 172.27.0.2 | Leader | running | 1 | | - | 2 | work2-1 | 172.27.0.5 | Sync Standby | running | 1 | 0 | - | 2 | work2-2 | 172.27.0.7 | Leader | running | 1 | | - +-------+---------+-------------+--------------+---------+----+-----------+ + + Citus cluster: demo ----------+----------------+---------+----+-----------+ + | Group | Member | Host | Role | State | TL | Lag in MB | + +-------+---------+-------------+----------------+---------+----+-----------+ + | 0 | coord1 | 172.27.0.10 | Replica | running | 1 | 0 | + | 0 | coord2 | 172.27.0.6 | Quorum Standby | running | 1 | 0 | + | 0 | coord3 | 172.27.0.4 | Leader | running | 1 | | + | 1 | work1-1 | 172.27.0.8 | Quorum Standby | running | 1 | 0 | + | 1 | work1-2 | 172.27.0.2 | Leader | running | 1 | | + | 2 | work2-1 | 172.27.0.5 | Quorum Standby | running | 1 | 0 | + | 2 | work2-2 | 172.27.0.7 | Leader | running | 1 | | + +-------+---------+-------------+----------------+---------+----+-----------+ If we add the ``--group`` option, the output will change to:: postgres@coord1:~$ patronictl list demo --group 0 - + Citus cluster: demo (group: 0, 7179854923829112860) -----------+ - | Member | Host | Role | State | TL | Lag in MB | - +--------+-------------+--------------+---------+----+-----------+ - | coord1 | 172.27.0.10 | Replica | running | 1 | 0 | - | coord2 | 172.27.0.6 | Sync Standby | running | 1 | 0 | - | coord3 | 172.27.0.4 | Leader | running | 1 | | - +--------+-------------+--------------+---------+----+-----------+ + + Citus cluster: demo (group: 0, 7179854923829112860) -+-----------+ + | Member | Host | Role | State | TL | Lag in MB | + +--------+-------------+----------------+---------+----+-----------+ + | coord1 | 172.27.0.10 | Replica | running | 1 | 0 | + | coord2 | 172.27.0.6 | Quorum Standby | running | 1 | 0 | + | coord3 | 172.27.0.4 | Leader | running | 1 | | + +--------+-------------+----------------+---------+----+-----------+ postgres@coord1:~$ patronictl list demo --group 1 - + Citus cluster: demo (group: 1, 7179854923881963547) -----------+ - | Member | Host | Role | State | TL | Lag in MB | - +---------+------------+--------------+---------+----+-----------+ - | work1-1 | 172.27.0.8 | Sync Standby | running | 1 | 0 | - | work1-2 | 172.27.0.2 | Leader | running | 1 | | - +---------+------------+--------------+---------+----+-----------+ + + Citus cluster: demo (group: 1, 7179854923881963547) -+-----------+ + | Member | Host | Role | State | TL | Lag in MB | + +---------+------------+----------------+---------+----+-----------+ + | work1-1 | 172.27.0.8 | Quorum Standby | running | 1 | 0 | + | work1-2 | 172.27.0.2 | Leader | running | 1 | | + +---------+------------+----------------+---------+----+-----------+ Citus worker switchover ----------------------- @@ -122,28 +124,28 @@ new primary worker node is ready to accept read-write queries. An example of :ref:`patronictl_switchover` on the worker cluster:: postgres@coord1:~$ patronictl switchover demo - + Citus cluster: demo ----------+--------------+---------+----+-----------+ - | Group | Member | Host | Role | State | TL | Lag in MB | - +-------+---------+-------------+--------------+---------+----+-----------+ - | 0 | coord1 | 172.27.0.10 | Replica | running | 1 | 0 | - | 0 | coord2 | 172.27.0.6 | Sync Standby | running | 1 | 0 | - | 0 | coord3 | 172.27.0.4 | Leader | running | 1 | | - | 1 | work1-1 | 172.27.0.8 | Leader | running | 1 | | - | 1 | work1-2 | 172.27.0.2 | Sync Standby | running | 1 | 0 | - | 2 | work2-1 | 172.27.0.5 | Sync Standby | running | 1 | 0 | - | 2 | work2-2 | 172.27.0.7 | Leader | running | 1 | | - +-------+---------+-------------+--------------+---------+----+-----------+ + + Citus cluster: demo ----------+----------------+---------+----+-----------+ + | Group | Member | Host | Role | State | TL | Lag in MB | + +-------+---------+-------------+----------------+---------+----+-----------+ + | 0 | coord1 | 172.27.0.10 | Replica | running | 1 | 0 | + | 0 | coord2 | 172.27.0.6 | Quorum Standby | running | 1 | 0 | + | 0 | coord3 | 172.27.0.4 | Leader | running | 1 | | + | 1 | work1-1 | 172.27.0.8 | Leader | running | 1 | | + | 1 | work1-2 | 172.27.0.2 | Quorum Standby | running | 1 | 0 | + | 2 | work2-1 | 172.27.0.5 | Quorum Standby | running | 1 | 0 | + | 2 | work2-2 | 172.27.0.7 | Leader | running | 1 | | + +-------+---------+-------------+----------------+---------+----+-----------+ Citus group: 2 Primary [work2-2]: Candidate ['work2-1'] []: When should the switchover take place (e.g. 2022-12-22T08:02 ) [now]: Current cluster topology - + Citus cluster: demo (group: 2, 7179854924063375386) -----------+ - | Member | Host | Role | State | TL | Lag in MB | - +---------+------------+--------------+---------+----+-----------+ - | work2-1 | 172.27.0.5 | Sync Standby | running | 1 | 0 | - | work2-2 | 172.27.0.7 | Leader | running | 1 | | - +---------+------------+--------------+---------+----+-----------+ + + Citus cluster: demo (group: 2, 7179854924063375386) -+-----------+ + | Member | Host | Role | State | TL | Lag in MB | + +---------+------------+----------------+---------+----+-----------+ + | work2-1 | 172.27.0.5 | Quorum Standby | running | 1 | 0 | + | work2-2 | 172.27.0.7 | Leader | running | 1 | | + +---------+------------+----------------+---------+----+-----------+ Are you sure you want to switchover cluster demo, demoting current primary work2-2? [y/N]: y 2022-12-22 07:02:40.33003 Successfully switched over to "work2-1" + Citus cluster: demo (group: 2, 7179854924063375386) ------+ @@ -154,17 +156,17 @@ An example of :ref:`patronictl_switchover` on the worker cluster:: +---------+------------+---------+---------+----+-----------+ postgres@coord1:~$ patronictl list demo - + Citus cluster: demo ----------+--------------+---------+----+-----------+ - | Group | Member | Host | Role | State | TL | Lag in MB | - +-------+---------+-------------+--------------+---------+----+-----------+ - | 0 | coord1 | 172.27.0.10 | Replica | running | 1 | 0 | - | 0 | coord2 | 172.27.0.6 | Sync Standby | running | 1 | 0 | - | 0 | coord3 | 172.27.0.4 | Leader | running | 1 | | - | 1 | work1-1 | 172.27.0.8 | Leader | running | 1 | | - | 1 | work1-2 | 172.27.0.2 | Sync Standby | running | 1 | 0 | - | 2 | work2-1 | 172.27.0.5 | Leader | running | 2 | | - | 2 | work2-2 | 172.27.0.7 | Sync Standby | running | 2 | 0 | - +-------+---------+-------------+--------------+---------+----+-----------+ + + Citus cluster: demo ----------+----------------+---------+----+-----------+ + | Group | Member | Host | Role | State | TL | Lag in MB | + +-------+---------+-------------+----------------+---------+----+-----------+ + | 0 | coord1 | 172.27.0.10 | Replica | running | 1 | 0 | + | 0 | coord2 | 172.27.0.6 | Quorum Standby | running | 1 | 0 | + | 0 | coord3 | 172.27.0.4 | Leader | running | 1 | | + | 1 | work1-1 | 172.27.0.8 | Leader | running | 1 | | + | 1 | work1-2 | 172.27.0.2 | Quorum Standby | running | 1 | 0 | + | 2 | work2-1 | 172.27.0.5 | Leader | running | 2 | | + | 2 | work2-2 | 172.27.0.7 | Quorum Standby | running | 2 | 0 | + +-------+---------+-------------+----------------+---------+----+-----------+ And this is how it looks on the coordinator side:: diff --git a/docs/dynamic_configuration.rst b/docs/dynamic_configuration.rst index b12aac204..ffd8b80a5 100644 --- a/docs/dynamic_configuration.rst +++ b/docs/dynamic_configuration.rst @@ -25,7 +25,7 @@ In order to change the dynamic configuration you can use either :ref:`patronictl - **max\_timelines\_history**: maximum number of timeline history items kept in DCS. Default value: 0. When set to 0, it keeps the full history in DCS. - **primary\_start\_timeout**: the amount of time a primary is allowed to recover from failures before failover is triggered (in seconds). Default is 300 seconds. When set to 0 failover is done immediately after a crash is detected if possible. When using asynchronous replication a failover can cause lost transactions. Worst case failover time for primary failure is: loop\_wait + primary\_start\_timeout + loop\_wait, unless primary\_start\_timeout is zero, in which case it's just loop\_wait. Set the value according to your durability/availability tradeoff. - **primary\_stop\_timeout**: The number of seconds Patroni is allowed to wait when stopping Postgres and effective only when synchronous_mode is enabled. When set to > 0 and the synchronous_mode is enabled, Patroni sends SIGKILL to the postmaster if the stop operation is running for more than the value set by primary\_stop\_timeout. Set the value according to your durability/availability tradeoff. If the parameter is not set or set <= 0, primary\_stop\_timeout does not apply. -- **synchronous\_mode**: turns on synchronous replication mode. In this mode a replica will be chosen as synchronous and only the latest leader and synchronous replica are able to participate in leader election. Synchronous mode makes sure that successfully committed transactions will not be lost at failover, at the cost of losing availability for writes when Patroni cannot ensure transaction durability. See :ref:`replication modes documentation ` for details. +- **synchronous\_mode**: turns on synchronous replication mode. Possible values: ``off``, ``on``, ``quorum``. In this mode the leader takes care of management of ``synchronous_standby_names``, and only the last known leader, or one of synchronous replicas, are allowed to participate in leader race. Synchronous mode makes sure that successfully committed transactions will not be lost at failover, at the cost of losing availability for writes when Patroni cannot ensure transaction durability. See :ref:`replication modes documentation ` for details. - **synchronous\_mode\_strict**: prevents disabling synchronous replication if no synchronous replicas are available, blocking all client writes to the primary. See :ref:`replication modes documentation ` for details. - **failsafe\_mode**: Enables :ref:`DCS Failsafe Mode `. Defaults to `false`. - **postgresql**: diff --git a/docs/replication_modes.rst b/docs/replication_modes.rst index 017abf15e..27a75ce74 100644 --- a/docs/replication_modes.rst +++ b/docs/replication_modes.rst @@ -6,8 +6,9 @@ Replication modes Patroni uses PostgreSQL streaming replication. For more information about streaming replication, see the `Postgres documentation `__. By default Patroni configures PostgreSQL for asynchronous replication. Choosing your replication schema is dependent on your business considerations. Investigate both async and sync replication, as well as other HA solutions, to determine which solution is best for you. + Asynchronous mode durability ----------------------------- +============================ In asynchronous mode the cluster is allowed to lose some committed transactions to ensure availability. When the primary server fails or becomes unavailable for any other reason Patroni will automatically promote a sufficiently healthy standby to primary. Any transactions that have not been replicated to that standby remain in a "forked timeline" on the primary, and are effectively unrecoverable [1]_. @@ -15,10 +16,11 @@ The amount of transactions that can be lost is controlled via ``maximum_lag_on_f By default, when running leader elections, Patroni does not take into account the current timeline of replicas, what in some cases could be undesirable behavior. You can prevent the node not having the same timeline as a former primary become the new leader by changing the value of ``check_timeline`` parameter to ``true``. + PostgreSQL synchronous replication ----------------------------------- +================================== -You can use Postgres's `synchronous replication `__ with Patroni. Synchronous replication ensures consistency across a cluster by confirming that writes are written to a secondary before returning to the connecting client with a success. The cost of synchronous replication: reduced throughput on writes. This throughput will be entirely based on network performance. +You can use Postgres's `synchronous replication `__ with Patroni. Synchronous replication ensures consistency across a cluster by confirming that writes are written to a secondary before returning to the connecting client with a success. The cost of synchronous replication: increased latency and reduced throughput on writes. This throughput will be entirely based on network performance. In hosted datacenter environments (like AWS, Rackspace, or any network you do not control), synchronous replication significantly increases the variability of write performance. If followers become inaccessible from the leader, the leader effectively becomes read-only. @@ -33,10 +35,11 @@ When using PostgreSQL synchronous replication, use at least three Postgres data Using PostgreSQL synchronous replication does not guarantee zero lost transactions under all circumstances. When the primary and the secondary that is currently acting as a synchronous replica fail simultaneously a third node that might not contain all transactions will be promoted. + .. _synchronous_mode: Synchronous mode ----------------- +================ For use cases where losing committed transactions is not permissible you can turn on Patroni's ``synchronous_mode``. When ``synchronous_mode`` is turned on Patroni will not promote a standby unless it is certain that the standby contains all transactions that may have returned a successful commit status to client [2]_. This means that the system may be unavailable for writes even though some servers are available. System administrators can still use manual failover commands to promote a standby even if it results in transaction loss. @@ -55,28 +58,122 @@ up. You can ensure that a standby never becomes the synchronous standby by setting ``nosync`` tag to true. This is recommended to set for standbys that are behind slow network connections and would cause performance degradation when becoming a synchronous standby. Setting tag ``nostream`` to true will also have the same effect. -Synchronous mode can be switched on and off via Patroni REST interface. See :ref:`dynamic configuration ` for instructions. +Synchronous mode can be switched on and off using ``patronictl edit-config`` command or via Patroni REST interface. See :ref:`dynamic configuration ` for instructions. Note: Because of the way synchronous replication is implemented in PostgreSQL it is still possible to lose transactions even when using ``synchronous_mode_strict``. If the PostgreSQL backend is cancelled while waiting to acknowledge replication (as a result of packet cancellation due to client timeout or backend failure) transaction changes become visible for other backends. Such changes are not yet replicated and may be lost in case of standby promotion. + Synchronous Replication Factor ------------------------------- -The parameter ``synchronous_node_count`` is used by Patroni to manage number of synchronous standby databases. It is set to 1 by default. It has no effect when ``synchronous_mode`` is set to off. When enabled, Patroni manages precise number of synchronous standby databases based on parameter ``synchronous_node_count`` and adjusts the state in DCS & synchronous_standby_names as members join and leave. +============================== + +The parameter ``synchronous_node_count`` is used by Patroni to manage the number of synchronous standby databases. It is set to ``1`` by default. It has no effect when ``synchronous_mode`` is set to ``off``. When enabled, Patroni manages the precise number of synchronous standby databases based on parameter ``synchronous_node_count`` and adjusts the state in DCS & ``synchronous_standby_names`` in PostgreSQL as members join and leave. If the parameter is set to a value higher than the number of eligible nodes it will be automatically reduced by Patroni. + + +Maximum lag on synchronous node +=============================== + +By default Patroni sticks to nodes that are declared as ``synchronous``, according to the ``pg_stat_replication`` view, even when there are other nodes ahead of it. This is done to minimize the number of changes of ``synchronous_standby_names``. To change this behavior one may use ``maximum_lag_on_syncnode`` parameter. It controls how much lag the replica can have to still be considered as "synchronous". + +Patroni utilizes the max replica LSN if there is more than one standby, otherwise it will use leader's current wal LSN. The default is ``-1``, and Patroni will not take action to swap a synchronous unhealthy standby when the value is set to ``0`` or less. Please set the value high enough so that Patroni won't swap synchronous standbys frequently during high transaction volume. + Synchronous mode implementation -------------------------------- +=============================== -When in synchronous mode Patroni maintains synchronization state in the DCS, containing the latest primary and current synchronous standby databases. This state is updated with strict ordering constraints to ensure the following invariants: +When in synchronous mode Patroni maintains synchronization state in the DCS (``/sync`` key), containing the latest primary and current synchronous standby databases. This state is updated with strict ordering constraints to ensure the following invariants: - A node must be marked as the latest leader whenever it can accept write transactions. Patroni crashing or PostgreSQL not shutting down can cause violations of this invariant. -- A node must be set as the synchronous standby in PostgreSQL as long as it is published as the synchronous standby. +- A node must be set as the synchronous standby in PostgreSQL as long as it is published as the synchronous standby in the ``/sync`` key in DCS.. - A node that is not the leader or current synchronous standby is not allowed to promote itself automatically. Patroni will only assign one or more synchronous standby nodes based on ``synchronous_node_count`` parameter to ``synchronous_standby_names``. -On each HA loop iteration Patroni re-evaluates synchronous standby nodes choice. If the current list of synchronous standby nodes are connected and has not requested its synchronous status to be removed it remains picked. Otherwise the cluster member available for sync that is furthest ahead in replication is picked. +On each HA loop iteration Patroni re-evaluates synchronous standby nodes choice. If the current list of synchronous standby nodes are connected and has not requested its synchronous status to be removed it remains picked. Otherwise the cluster members available for sync that are furthest ahead in replication are picked. + +Example: +--------- + +``/config`` key in DCS +^^^^^^^^^^^^^^^^^^^^^^ + +.. code-block:: YAML + + synchronous_mode: on + synchronous_node_count: 2 + ... + +``/sync`` key in DCS +^^^^^^^^^^^^^^^^^^^^ + +.. code-block:: JSON + + { + "leader": "node0", + "sync_standby": "node1,node2" + } + +postgresql.conf +^^^^^^^^^^^^^^^ + +.. code-block:: INI + + synchronous_standby_names = 'FIRST 2 (node1,node2)' + + +In the above examples only nodes ``node1`` and ``node2`` are known to be synchronous and allowed to be automatically promoted if the primary (``node0``) fails. + + +.. _quorum_mode: + +Quorum commit mode +================== + +Starting from PostgreSQL v10 Patroni supports quorum-based synchronous replication. + +In this mode, Patroni maintains synchronization state in the DCS, containing the latest known primary, the number of nodes required for quorum, and the nodes currently eligible to vote on quorum. In steady state, the nodes voting on quorum are the leader and all synchronous standbys. This state is updated with strict ordering constraints, with regards to node promotion and ``synchronous_standby_names``, to ensure that at all times any subset of voters that can achieve quorum includes at least one node with the latest successful commit. + +On each iteration of HA loop, Patroni re-evaluates synchronous standby choices and quorum, based on node availability and requested cluster configuration. In PostgreSQL versions above 9.6 all eligible nodes are added as synchronous standbys as soon as their replication catches up to leader. + +Quorum commit helps to reduce worst case latencies, even during normal operation, as a higher latency of replicating to one standby can be compensated by other standbys. + +The quorum-based synchronous mode could be enabled by setting ``synchronous_mode`` to ``quorum`` using ``patronictl edit-config`` command or via Patroni REST interface. See :ref:`dynamic configuration ` for instructions. + +Other parameters, like ``synchronous_node_count``, ``maximum_lag_on_syncnode``, and ``synchronous_mode_strict`` continue to work the same way as with ``synchronous_mode=on``. + +Example: +--------- + +``/config`` key in DCS +^^^^^^^^^^^^^^^^^^^^^^ + +.. code-block:: YAML + + synchronous_mode: quorum + synchronous_node_count: 2 + ... + +``/sync`` key in DCS +^^^^^^^^^^^^^^^^^^^^ + +.. code-block:: JSON + + { + "leader": "node0", + "sync_standby": "node1,node2,node3", + "quorum": 1 + } + +postgresql.conf +^^^^^^^^^^^^^^^ + +.. code-block:: INI + + synchronous_standby_names = 'ANY 2 (node1,node2,node3)' + + +If the primary (``node0``) failed, in the above example two of the ``node1``, ``node2``, ``node3`` will have the latest transaction received, but we don't know which ones. To figure out whether the node ``node1`` has received the latest transaction, we need to compare its LSN with the LSN on **at least** one node (``quorum=1`` in the ``/sync`` key) among ``node2`` and ``node3``. If ``node1`` isn't behind of at least one of them, we can guarantee that there will be no user visible data loss if ``node1`` is promoted. .. [1] The data is still there, but recovering it requires a manual recovery effort by data recovery specialists. When Patroni is allowed to rewind with ``use_pg_rewind`` the forked timeline will be automatically erased to rejoin the failed primary with the cluster. However, for ``use_pg_rewind`` to function properly, either the cluster must be initialized with ``data page checksums`` (``--data-checksums`` option for ``initdb``) and/or ``wal_log_hints`` must be set to ``on``. diff --git a/docs/rest_api.rst b/docs/rest_api.rst index a03c05d23..ef16de618 100644 --- a/docs/rest_api.rst +++ b/docs/rest_api.rst @@ -45,6 +45,10 @@ For all health check ``GET`` requests Patroni returns a JSON document with the s - ``GET /read-only-sync``: like the above endpoint, but also includes the primary. +- ``GET /quorum``: returns HTTP status code **200** only when this Patroni node is listed as a quorum node in ``synchronous_standby_names`` on the primary. + +- ``GET /read-only-quorum``: like the above endpoint, but also includes the primary. + - ``GET /asynchronous`` or ``GET /async``: returns HTTP status code **200** only when the Patroni node is running as an asynchronous standby. @@ -308,6 +312,9 @@ Retrieve the Patroni metrics in Prometheus format through the ``GET /metrics`` e # HELP patroni_sync_standby Value is 1 if this node is a sync standby replica, 0 otherwise. # TYPE patroni_sync_standby gauge patroni_sync_standby{scope="batman",name="patroni1"} 0 + # HELP patroni_quorum_standby Value is 1 if this node is a quorum standby replica, 0 otherwise. + # TYPE patroni_quorum_standby gauge + patroni_quorum_standby{scope="batman",name="patroni1"} 0 # HELP patroni_xlog_received_location Current location of the received Postgres transaction log, 0 if this node is not a replica. # TYPE patroni_xlog_received_location counter patroni_xlog_received_location{scope="batman",name="patroni1"} 0 diff --git a/features/quorum_commit.feature b/features/quorum_commit.feature new file mode 100644 index 000000000..2204ab596 --- /dev/null +++ b/features/quorum_commit.feature @@ -0,0 +1,68 @@ +Feature: quorum commit + Check basic workfrlows when quorum commit is enabled + + Scenario: check enable quorum commit and that the only leader promotes after restart + Given I start postgres0 + Then postgres0 is a leader after 10 seconds + And there is a non empty initialize key in DCS after 15 seconds + When I issue a PATCH request to http://127.0.0.1:8008/config with {"ttl": 20, "synchronous_mode": "quorum"} + Then I receive a response code 200 + And sync key in DCS has leader=postgres0 after 20 seconds + And sync key in DCS has quorum=0 after 2 seconds + And synchronous_standby_names on postgres0 is set to "_empty_str_" after 2 seconds + When I shut down postgres0 + And sync key in DCS has leader=postgres0 after 2 seconds + When I start postgres0 + Then postgres0 role is the primary after 10 seconds + When I issue a PATCH request to http://127.0.0.1:8008/config with {"synchronous_mode_strict": true} + Then synchronous_standby_names on postgres0 is set to "ANY 1 (*)" after 10 seconds + + Scenario: check failover with one quorum standby + Given I start postgres1 + Then sync key in DCS has sync_standby=postgres1 after 10 seconds + And synchronous_standby_names on postgres0 is set to "ANY 1 (postgres1)" after 2 seconds + When I shut down postgres0 + Then postgres1 role is the primary after 10 seconds + And sync key in DCS has quorum=0 after 10 seconds + Then synchronous_standby_names on postgres1 is set to "ANY 1 (*)" after 10 seconds + When I start postgres0 + Then sync key in DCS has leader=postgres1 after 10 seconds + Then sync key in DCS has sync_standby=postgres0 after 10 seconds + And synchronous_standby_names on postgres1 is set to "ANY 1 (postgres0)" after 2 seconds + + Scenario: check behavior with three nodes and different replication factor + Given I start postgres2 + Then sync key in DCS has sync_standby=postgres0,postgres2 after 10 seconds + And sync key in DCS has quorum=1 after 2 seconds + And synchronous_standby_names on postgres1 is set to "ANY 1 (postgres0,postgres2)" after 2 seconds + When I issue a PATCH request to http://127.0.0.1:8009/config with {"synchronous_node_count": 2} + Then sync key in DCS has quorum=0 after 10 seconds + And synchronous_standby_names on postgres1 is set to "ANY 2 (postgres0,postgres2)" after 2 seconds + + Scenario: switch from quorum replication to good old multisync and back + Given I issue a PATCH request to http://127.0.0.1:8009/config with {"synchronous_mode": true, "synchronous_node_count": 1} + And I shut down postgres0 + Then synchronous_standby_names on postgres1 is set to "postgres2" after 10 seconds + And sync key in DCS has sync_standby=postgres2 after 10 seconds + Then sync key in DCS has quorum=0 after 2 seconds + When I issue a PATCH request to http://127.0.0.1:8009/config with {"synchronous_mode": "quorum"} + And I start postgres0 + Then synchronous_standby_names on postgres1 is set to "ANY 1 (postgres0,postgres2)" after 10 seconds + And sync key in DCS has sync_standby=postgres0,postgres2 after 10 seconds + Then sync key in DCS has quorum=1 after 2 seconds + + Scenario: REST API and patronictl + Given I run patronictl.py list batman + Then I receive a response returncode 0 + And I receive a response output "Quorum Standby" + And Status code on GET http://127.0.0.1:8008/quorum is 200 after 3 seconds + And Status code on GET http://127.0.0.1:8010/quorum is 200 after 3 seconds + + Scenario: nosync node is removed from voters and synchronous_standby_names + Given I add tag nosync true to postgres2 config + When I issue an empty POST request to http://127.0.0.1:8010/reload + Then I receive a response code 202 + And sync key in DCS has quorum=0 after 10 seconds + And sync key in DCS has sync_standby=postgres0 after 10 seconds + And synchronous_standby_names on postgres1 is set to "ANY 1 (postgres0)" after 2 seconds + And Status code on GET http://127.0.0.1:8010/quorum is 503 after 10 seconds diff --git a/features/steps/quorum_commit.py b/features/steps/quorum_commit.py new file mode 100644 index 000000000..48fc4b623 --- /dev/null +++ b/features/steps/quorum_commit.py @@ -0,0 +1,60 @@ +import json +import re +import time + +from behave import step, then + + +@step('sync key in DCS has {key:w}={value} after {time_limit:d} seconds') +def check_sync(context, key, value, time_limit): + time_limit *= context.timeout_multiplier + max_time = time.time() + int(time_limit) + dcs_value = None + while time.time() < max_time: + try: + response = json.loads(context.dcs_ctl.query('sync')) + dcs_value = response.get(key) + if key == 'sync_standby' and set((dcs_value or '').split(',')) == set(value.split(',')): + return + elif str(dcs_value) == value: + return + except Exception: + pass + time.sleep(1) + assert False, "sync does not have {0}={1} (found {2}) in dcs after {3} seconds".format(key, value, + dcs_value, time_limit) + + +def _parse_synchronous_standby_names(value): + if '(' in value: + m = re.match(r'.*(\d+) \(([^)]+)\)', value) + expected_value = set(m.group(2).split()) + expected_num = m.group(1) + else: + expected_value = set([value]) + expected_num = '1' + return expected_num, expected_value + + +@then('synchronous_standby_names on {name:2} is set to "{value}" after {time_limit:d} seconds') +def check_synchronous_standby_names(context, name, value, time_limit): + time_limit *= context.timeout_multiplier + max_time = time.time() + int(time_limit) + + if value == '_empty_str_': + value = '' + + expected_num, expected_value = _parse_synchronous_standby_names(value) + + ssn = None + while time.time() < max_time: + try: + ssn = context.pctl.query(name, "SHOW synchronous_standby_names").fetchone()[0] + db_num, db_value = _parse_synchronous_standby_names(ssn) + if expected_value == db_value and expected_num == db_num: + return + except Exception: + pass + time.sleep(1) + assert False, "synchronous_standby_names is not set to '{0}' (found '{1}') after {2} seconds".format(value, ssn, + time_limit) diff --git a/patroni/api.py b/patroni/api.py index d9136d242..18ed4bff5 100644 --- a/patroni/api.py +++ b/patroni/api.py @@ -254,6 +254,14 @@ def do_GET(self, write_status_code_only: bool = False) -> None: * HTTP status ``200``: if up and running and without ``noloadbalance`` tag. + * ``/quorum``: + + * HTTP status ``200``: if up and running as a quorum synchronous standby. + + * ``/read-only-quorum``: + + * HTTP status ``200``: if up and running as a quorum synchronous standby or primary. + * ``/synchronous`` or ``/sync``: * HTTP status ``200``: if up and running as a synchronous standby. @@ -334,16 +342,24 @@ def do_GET(self, write_status_code_only: bool = False) -> None: ignore_tags = True elif 'replica' in path: status_code = replica_status_code - elif 'read-only' in path and 'sync' not in path: + elif 'read-only' in path and 'sync' not in path and 'quorum' not in path: status_code = 200 if 200 in (primary_status_code, standby_leader_status_code) else replica_status_code elif 'health' in path: status_code = 200 if response.get('state') == 'running' else 503 elif cluster: # dcs is available + is_quorum = response.get('quorum_standby') is_synchronous = response.get('sync_standby') if path in ('/sync', '/synchronous') and is_synchronous: status_code = replica_status_code - elif path in ('/async', '/asynchronous') and not is_synchronous: + elif path == '/quorum' and is_quorum: + status_code = replica_status_code + elif path in ('/async', '/asynchronous') and not is_synchronous and not is_quorum: status_code = replica_status_code + elif path == '/read-only-quorum': + if 200 in (primary_status_code, standby_leader_status_code): + status_code = 200 + elif is_quorum: + status_code = replica_status_code elif path in ('/read-only-sync', '/read-only-synchronous'): if 200 in (primary_status_code, standby_leader_status_code): status_code = 200 @@ -510,6 +526,7 @@ def do_GET_metrics(self) -> None: * ``patroni_standby_leader``: ``1`` if standby leader node, else ``0``; * ``patroni_replica``: ``1`` if a replica, else ``0``; * ``patroni_sync_standby``: ``1`` if a sync replica, else ``0``; + * ``patroni_quorum_standby``: ``1`` if a quorum sync replica, else ``0``; * ``patroni_xlog_received_location``: ``pg_wal_lsn_diff(pg_last_wal_receive_lsn(), '0/0')``; * ``patroni_xlog_replayed_location``: ``pg_wal_lsn_diff(pg_last_wal_replay_lsn(), '0/0)``; * ``patroni_xlog_replayed_timestamp``: ``pg_last_xact_replay_timestamp``; @@ -572,10 +589,14 @@ def do_GET_metrics(self) -> None: metrics.append("# TYPE patroni_replica gauge") metrics.append("patroni_replica{0} {1}".format(labels, int(postgres['role'] == 'replica'))) - metrics.append("# HELP patroni_sync_standby Value is 1 if this node is a sync standby replica, 0 otherwise.") + metrics.append("# HELP patroni_sync_standby Value is 1 if this node is a sync standby, 0 otherwise.") metrics.append("# TYPE patroni_sync_standby gauge") metrics.append("patroni_sync_standby{0} {1}".format(labels, int(postgres.get('sync_standby', False)))) + metrics.append("# HELP patroni_quorum_standby Value is 1 if this node is a quorum standby, 0 otherwise.") + metrics.append("# TYPE patroni_quorum_standby gauge") + metrics.append("patroni_quorum_standby{0} {1}".format(labels, int(postgres.get('quorum_standby', False)))) + metrics.append("# HELP patroni_xlog_received_location Current location of the received" " Postgres transaction log, 0 if this node is not a replica.") metrics.append("# TYPE patroni_xlog_received_location counter") @@ -1035,16 +1056,17 @@ def is_failover_possible(self, cluster: Cluster, leader: Optional[str], candidat :returns: a string with the error message or ``None`` if good nodes are found. """ - is_synchronous_mode = global_config.from_cluster(cluster).is_synchronous_mode + config = global_config.from_cluster(cluster) if leader and (not cluster.leader or cluster.leader.name != leader): return 'leader name does not match' if candidate: - if action == 'switchover' and is_synchronous_mode and not cluster.sync.matches(candidate): + if action == 'switchover' and config.is_synchronous_mode\ + and not config.is_quorum_commit_mode and not cluster.sync.matches(candidate): return 'candidate name does not match with sync_standby' members = [m for m in cluster.members if m.name == candidate] if not members: return 'candidate does not exists' - elif is_synchronous_mode: + elif config.is_synchronous_mode and not config.is_quorum_commit_mode: members = [m for m in cluster.members if cluster.sync.matches(m.name)] if not members: return action + ' is not possible: can not find sync_standby' @@ -1251,6 +1273,7 @@ def get_postgresql_status(self, retry: bool = False) -> Dict[str, Any]: * ``paused``: ``pg_is_wal_replay_paused()``; * ``sync_standby``: ``True`` if replication mode is synchronous and this is a sync standby; + * ``quorum_standby``: ``True`` if replication mode is quorum and this is a quorum standby; * ``timeline``: PostgreSQL primary node timeline; * ``replication``: :class:`list` of :class:`dict` entries, one for each replication connection. Each entry contains the following keys: @@ -1306,7 +1329,7 @@ def get_postgresql_status(self, retry: bool = False) -> Dict[str, Any]: if result['role'] == 'replica' and config.is_synchronous_mode\ and cluster and cluster.sync.matches(postgresql.name): - result['sync_standby'] = True + result['quorum_standby' if global_config.is_quorum_commit_mode else 'sync_standby'] = True if row[1] > 0: result['timeline'] = row[1] diff --git a/patroni/config.py b/patroni/config.py index 6b46f4bb1..16a0ab5e6 100644 --- a/patroni/config.py +++ b/patroni/config.py @@ -756,7 +756,7 @@ def _build_effective_configuration(self, dynamic_configuration: Dict[str, Any], if 'citus' in config: bootstrap = config.setdefault('bootstrap', {}) dcs = bootstrap.setdefault('dcs', {}) - dcs.setdefault('synchronous_mode', True) + dcs.setdefault('synchronous_mode', 'quorum') updated_fields = ( 'name', diff --git a/patroni/dcs/__init__.py b/patroni/dcs/__init__.py index 33a39464f..ee2ed77bf 100644 --- a/patroni/dcs/__init__.py +++ b/patroni/dcs/__init__.py @@ -549,11 +549,15 @@ class SyncState(NamedTuple): :ivar version: modification version of a synchronization key in a Configuration Store. :ivar leader: reference to member that was leader. :ivar sync_standby: synchronous standby list (comma delimited) which are last synchronized to leader. + :ivar quorum: if the node from :attr:`~SyncState.sync_standby` list is doing a leader race it should + see at least :attr:`~SyncState.quorum` other nodes from the + :attr:`~SyncState.sync_standby` + :attr:`~SyncState.leader` list. """ version: Optional[_Version] leader: Optional[str] sync_standby: Optional[str] + quorum: int @staticmethod def from_node(version: Optional[_Version], value: Union[str, Dict[str, Any], None]) -> 'SyncState': @@ -588,7 +592,9 @@ def from_node(version: Optional[_Version], value: Union[str, Dict[str, Any], Non if value and isinstance(value, str): value = json.loads(value) assert isinstance(value, dict) - return SyncState(version, value.get('leader'), value.get('sync_standby')) + leader = value.get('leader') + quorum = value.get('quorum') + return SyncState(version, leader, value.get('sync_standby'), int(quorum) if leader and quorum else 0) except (AssertionError, TypeError, ValueError): return SyncState.empty(version) @@ -600,7 +606,7 @@ def empty(version: Optional[_Version] = None) -> 'SyncState': :returns: empty synchronisation state object. """ - return SyncState(version, None, None) + return SyncState(version, None, None, 0) @property def is_empty(self) -> bool: @@ -618,10 +624,17 @@ def _str_to_list(value: str) -> List[str]: return list(filter(lambda a: a, [s.strip() for s in value.split(',')])) @property - def members(self) -> List[str]: + def voters(self) -> List[str]: """:attr:`~SyncState.sync_standby` as list or an empty list if undefined or object considered ``empty``.""" return self._str_to_list(self.sync_standby) if not self.is_empty and self.sync_standby else [] + @property + def members(self) -> List[str]: + """:attr:`~SyncState.sync_standby` and :attr:`~SyncState.leader` as list + or an empty list if object considered ``empty``. + """ + return [] if not self.leader else [self.leader] + self.voters + def matches(self, name: Optional[str], check_leader: bool = False) -> bool: """Checks if node is presented in the /sync state. @@ -635,7 +648,7 @@ def matches(self, name: Optional[str], check_leader: bool = False) -> bool: the sync state. :Example: - >>> s = SyncState(1, 'foo', 'bar,zoo') + >>> s = SyncState(1, 'foo', 'bar,zoo', 0) >>> s.matches('foo') False @@ -1875,18 +1888,23 @@ def delete_cluster(self) -> bool: """ @staticmethod - def sync_state(leader: Optional[str], sync_standby: Optional[Collection[str]]) -> Dict[str, Any]: + def sync_state(leader: Optional[str], sync_standby: Optional[Collection[str]], + quorum: Optional[int]) -> Dict[str, Any]: """Build ``sync_state`` dictionary. :param leader: name of the leader node that manages ``/sync`` key. :param sync_standby: collection of currently known synchronous standby node names. + :param quorum: if the node from :attr:`~SyncState.sync_standby` list is doing a leader race it should + see at least :attr:`~SyncState.quorum` other nodes from the + :attr:`~SyncState.sync_standby` + :attr:`~SyncState.leader` list :returns: dictionary that later could be serialized to JSON or saved directly to DCS. """ - return {'leader': leader, 'sync_standby': ','.join(sorted(sync_standby)) if sync_standby else None} + return {'leader': leader, 'quorum': quorum, + 'sync_standby': ','.join(sorted(sync_standby)) if sync_standby else None} def write_sync_state(self, leader: Optional[str], sync_standby: Optional[Collection[str]], - version: Optional[Any] = None) -> Optional[SyncState]: + quorum: Optional[int], version: Optional[Any] = None) -> Optional[SyncState]: """Write the new synchronous state to DCS. Calls :meth:`~AbstractDCS.sync_state` to build a dictionary and then calls DCS specific @@ -1895,10 +1913,13 @@ def write_sync_state(self, leader: Optional[str], sync_standby: Optional[Collect :param leader: name of the leader node that manages ``/sync`` key. :param sync_standby: collection of currently known synchronous standby node names. :param version: for conditional update of the key/object. + :param quorum: if the node from :attr:`~SyncState.sync_standby` list is doing a leader race it should + see at least :attr:`~SyncState.quorum` other nodes from the + :attr:`~SyncState.sync_standby` + :attr:`~SyncState.leader` list :returns: the new :class:`SyncState` object or ``None``. """ - sync_value = self.sync_state(leader, sync_standby) + sync_value = self.sync_state(leader, sync_standby, quorum) ret = self.set_sync_state_value(json.dumps(sync_value, separators=(',', ':')), version) if not isinstance(ret, bool): return SyncState.from_node(ret, sync_value) diff --git a/patroni/dcs/kubernetes.py b/patroni/dcs/kubernetes.py index 9590bb98f..884a5753c 100644 --- a/patroni/dcs/kubernetes.py +++ b/patroni/dcs/kubernetes.py @@ -1373,15 +1373,18 @@ def set_sync_state_value(self, value: str, version: Optional[str] = None) -> boo raise NotImplementedError # pragma: no cover def write_sync_state(self, leader: Optional[str], sync_standby: Optional[Collection[str]], - version: Optional[str] = None) -> Optional[SyncState]: + quorum: Optional[int], version: Optional[str] = None) -> Optional[SyncState]: """Prepare and write annotations to $SCOPE-sync Endpoint or ConfigMap. :param leader: name of the leader node that manages /sync key :param sync_standby: collection of currently known synchronous standby node names + :param quorum: if the node from sync_standby list is doing a leader race it should + see at least quorum other nodes from the sync_standby + leader list :param version: last known `resource_version` for conditional update of the object :returns: the new :class:`SyncState` object or None """ - sync_state = self.sync_state(leader, sync_standby) + sync_state = self.sync_state(leader, sync_standby, quorum) + sync_state['quorum'] = str(sync_state['quorum']) if sync_state['quorum'] is not None else None ret = self.patch_or_create(self.sync_path, sync_state, version, False) if not isinstance(ret, bool): return SyncState.from_node(ret.metadata.resource_version, sync_state) @@ -1393,7 +1396,7 @@ def delete_sync_state(self, version: Optional[str] = None) -> bool: :param version: last known `resource_version` for conditional update of the object :returns: `True` if "delete" was successful """ - return self.write_sync_state(None, None, version=version) is not None + return self.write_sync_state(None, None, None, version=version) is not None def watch(self, leader_version: Optional[str], timeout: float) -> bool: if self.__do_not_watch: diff --git a/patroni/global_config.py b/patroni/global_config.py index 5423d7e58..924961fd0 100644 --- a/patroni/global_config.py +++ b/patroni/global_config.py @@ -105,10 +105,16 @@ def is_paused(self) -> bool: """``True`` if cluster is in maintenance mode.""" return self.check_mode('pause') + @property + def is_quorum_commit_mode(self) -> bool: + """:returns: ``True`` if quorum commit replication is requested""" + return str(self.get('synchronous_mode')).lower() == 'quorum' + @property def is_synchronous_mode(self) -> bool: """``True`` if synchronous replication is requested and it is not a standby cluster config.""" - return self.check_mode('synchronous_mode') and not self.is_standby_cluster + return (self.check_mode('synchronous_mode') is True or self.is_quorum_commit_mode) \ + and not self.is_standby_cluster @property def is_synchronous_mode_strict(self) -> bool: diff --git a/patroni/ha.py b/patroni/ha.py index 8bbcaf660..60b0b73d8 100644 --- a/patroni/ha.py +++ b/patroni/ha.py @@ -14,12 +14,13 @@ from .__main__ import Patroni from .async_executor import AsyncExecutor, CriticalTask from .collections import CaseInsensitiveSet -from .dcs import AbstractDCS, Cluster, Leader, Member, RemoteMember, Status, slot_name_from_member_name +from .dcs import AbstractDCS, Cluster, Leader, Member, RemoteMember, Status, SyncState, slot_name_from_member_name from .exceptions import DCSError, PostgresConnectionException, PatroniFatalException from .postgresql.callback_executor import CallbackAction from .postgresql.misc import postgres_version_to_int from .postgresql.postmaster import PostmasterProcess from .postgresql.rewind import Rewind +from .quorum import QuorumStateResolver from .tags import Tags from .utils import polling_loop, tzutc @@ -161,6 +162,7 @@ def __init__(self, patroni: Patroni): self._leader_expiry_lock = RLock() self._failsafe = Failsafe(patroni.dcs) self._was_paused = False + self._promote_timestamp = 0 self._leader_timeline = None self.recovering = False self._async_response = CriticalTask() @@ -220,6 +222,8 @@ def set_is_leader(self, value: bool) -> None: """ with self._leader_expiry_lock: self._leader_expiry = time.time() + self.dcs.ttl if value else 0 + if not value: + self._promote_timestamp = 0 def sync_mode_is_active(self) -> bool: """Check whether synchronous replication is requested and already active. @@ -228,6 +232,13 @@ def sync_mode_is_active(self) -> bool: """ return self.is_synchronous_mode() and not self.cluster.sync.is_empty + def quorum_commit_mode_is_active(self) -> bool: + """Checks whether quorum replication is requested and already active. + + :returns: ``True`` if the primary already put its name into the ``/sync`` in DCS. + """ + return self.is_quorum_commit_mode() and not self.cluster.sync.is_empty + def _get_failover_action_name(self) -> str: """Return the currently requested manual failover action name or the default ``failover``. @@ -697,81 +708,221 @@ def is_synchronous_mode(self) -> bool: """:returns: `True` if synchronous replication is requested.""" return global_config.is_synchronous_mode + def is_quorum_commit_mode(self) -> bool: + """``True`` if quorum commit replication is requested and "supported".""" + return global_config.is_quorum_commit_mode and self.state_handler.supports_multiple_sync + def is_failsafe_mode(self) -> bool: """:returns: `True` if failsafe_mode is enabled in global configuration.""" return global_config.check_mode('failsafe_mode') - def process_sync_replication(self) -> None: - """Process synchronous standby beahvior. + def _maybe_enable_synchronous_mode(self) -> Optional[SyncState]: + """Explicitly enable synchronous mode if not yet enabled. + + We are trying to solve a corner case: synchronous mode needs to be explicitly enabled + by updating the ``/sync`` key with the current leader name and empty members. In opposite + case it will never be automatically enabled if there are no eligible candidates. + + :returns: the latest version of :class:`~patroni.dcs.SyncState` object. + """ + sync = self.cluster.sync + if sync.is_empty: + sync = self.dcs.write_sync_state(self.state_handler.name, None, 0, version=sync.version) + if sync: + logger.info("Enabled synchronous replication") + else: + logger.warning("Updating sync state failed") + return sync + + def disable_synchronous_replication(self) -> None: + """Cleans up ``/sync`` key in DCS and updates ``synchronous_standby_names``. + + .. note:: + We fall back to using the value configured by the user for ``synchronous_standby_names``, if any. + """ + # If synchronous_mode was turned off, we need to update synchronous_standby_names in Postgres + if not self.cluster.sync.is_empty and self.dcs.delete_sync_state(version=self.cluster.sync.version): + logger.info("Disabled synchronous replication") + self.state_handler.sync_handler.set_synchronous_standby_names(CaseInsensitiveSet()) + + # As synchronous_mode is off, check if the user configured Postgres synchronous replication instead + ssn = self.state_handler.config.synchronous_standby_names + self.state_handler.config.set_synchronous_standby_names(ssn) + + def _process_quorum_replication(self) -> None: + """Process synchronous replication state when quorum commit is requested. + + Synchronous standbys are registered in two places: ``postgresql.conf`` and DCS. The order of updating them must + keep the invariant that ``quorum + sync >= len(set(quorum pool)|set(sync pool))``. This is done using + :class:`QuorumStateResolver` that given a current state and set of desired synchronous nodes and replication + level outputs changes to DCS and synchronous replication in correct order to reach the desired state. + In case any of those steps causes an error we can just bail out and let next iteration rediscover the state + and retry necessary transitions. + """ + start_time = time.time() + + min_sync = global_config.min_synchronous_nodes + sync_wanted = global_config.synchronous_node_count + + sync = self._maybe_enable_synchronous_mode() + if not sync or not sync.leader: + return + + leader = sync.leader + + def _check_timeout(offset: float = 0) -> bool: + return time.time() - start_time + offset >= self.dcs.loop_wait + + while True: + transition = 'break' # we need define transition value if `QuorumStateResolver` produced no changes + sync_state = self.state_handler.sync_handler.current_state(self.cluster) + for transition, leader, num, nodes in QuorumStateResolver(leader=leader, + quorum=sync.quorum, + voters=sync.voters, + numsync=sync_state.numsync, + sync=sync_state.sync, + numsync_confirmed=sync_state.numsync_confirmed, + active=sync_state.active, + sync_wanted=sync_wanted, + leader_wanted=self.state_handler.name): + if _check_timeout(): + return + + if transition == 'quorum': + logger.info("Setting leader to %s, quorum to %d of %d (%s)", + leader, num, len(nodes), ", ".join(sorted(nodes))) + sync = self.dcs.write_sync_state(leader, nodes, num, version=sync.version) + if not sync: + return logger.info('Synchronous replication key updated by someone else.') + elif transition == 'sync': + logger.info("Setting synchronous replication to %d of %d (%s)", + num, len(nodes), ", ".join(sorted(nodes))) + # Bump up number of num nodes to meet minimum replication factor. Commits will have to wait until + # we have enough nodes to meet replication target. + if num < min_sync: + logger.warning("Replication factor %d requested, but %d synchronous standbys available." + " Commits will be delayed.", min_sync + 1, num) + num = min_sync + self.state_handler.sync_handler.set_synchronous_standby_names(nodes, num) + if transition != 'restart' or _check_timeout(1): + return + # synchronous_standby_names was transitioned from empty to non-empty and it may take + # some time for nodes to become synchronous. In this case we want to restart state machine + # hoping that we can update /sync key earlier than in loop_wait seconds. + time.sleep(1) + self.state_handler.reset_cluster_info_state(None) + + def _process_multisync_replication(self) -> None: + """Process synchronous replication state with one or more sync standbys. Synchronous standbys are registered in two places postgresql.conf and DCS. The order of updating them must be right. The invariant that should be kept is that if a node is primary and sync_standby is set in DCS, then that node must have synchronous_standby set to that value. Or more simple, first set in postgresql.conf and then in DCS. When removing, first remove in DCS, then in postgresql.conf. This is so we only consider promoting standbys that were guaranteed to be replicating synchronously. - - .. note:: - If ``synchronous_mode`` is disabled, we fall back to using the value configured by the user for - ``synchronous_standby_names``, if any. """ - if self.is_synchronous_mode(): - sync = self.cluster.sync - if sync.is_empty: - # corner case: we need to explicitly enable synchronous mode by updating the - # ``/sync`` key with the current leader name and empty members. In opposite case - # it will never be automatically enabled if there are not eligible candidates. - sync = self.dcs.write_sync_state(self.state_handler.name, None, version=sync.version) - if not sync: - return logger.warning("Updating sync state failed") - logger.info("Enabled synchronous replication") + sync = self._maybe_enable_synchronous_mode() + if not sync: + return - current = CaseInsensitiveSet(sync.members) - picked, allow_promote = self.state_handler.sync_handler.current_state(self.cluster) - - if picked == current and current != allow_promote: - logger.warning('Inconsistent state between synchronous_standby_names = %s and /sync = %s key ' - 'detected, updating synchronous replication key...', list(allow_promote), list(current)) - sync = self.dcs.write_sync_state(self.state_handler.name, allow_promote, version=sync.version) - if not sync: - return logger.warning("Updating sync state failed") - current = CaseInsensitiveSet(sync.members) - - if picked != current: - # update synchronous standby list in dcs temporarily to point to common nodes in current and picked - sync_common = current & allow_promote - if sync_common != current: - logger.info("Updating synchronous privilege temporarily from %s to %s", - list(current), list(sync_common)) - sync = self.dcs.write_sync_state(self.state_handler.name, sync_common, version=sync.version) - if not sync: - return logger.info('Synchronous replication key updated by someone else.') + current_state = self.state_handler.sync_handler.current_state(self.cluster) + picked = current_state.active + allow_promote = current_state.sync + voters = CaseInsensitiveSet(sync.voters) - # When strict mode and no suitable replication connections put "*" to synchronous_standby_names - if global_config.is_synchronous_mode_strict and not picked: - picked = CaseInsensitiveSet('*') - logger.warning("No standbys available!") - - # Update postgresql.conf and wait 2 secs for changes to become active - logger.info("Assigning synchronous standby status to %s", list(picked)) - self.state_handler.sync_handler.set_synchronous_standby_names(picked) - - if picked and picked != CaseInsensitiveSet('*') and allow_promote != picked: - # Wait for PostgreSQL to enable synchronous mode and see if we can immediately set sync_standby - time.sleep(2) - _, allow_promote = self.state_handler.sync_handler.current_state(self.cluster) - if allow_promote and allow_promote != sync_common: - if not self.dcs.write_sync_state(self.state_handler.name, allow_promote, version=sync.version): - return logger.info("Synchronous replication key updated by someone else") - logger.info("Synchronous standby status assigned to %s", list(allow_promote)) + if picked == voters and voters != allow_promote: + logger.warning('Inconsistent state between synchronous_standby_names = %s and /sync = %s key ' + 'detected, updating synchronous replication key...', list(allow_promote), list(voters)) + sync = self.dcs.write_sync_state(self.state_handler.name, allow_promote, 0, version=sync.version) + if not sync: + return logger.warning("Updating sync state failed") + voters = CaseInsensitiveSet(sync.voters) + + if picked == voters: + return + + # update synchronous standby list in dcs temporarily to point to common nodes in current and picked + sync_common = voters & allow_promote + if sync_common != voters: + logger.info("Updating synchronous privilege temporarily from %s to %s", + list(voters), list(sync_common)) + sync = self.dcs.write_sync_state(self.state_handler.name, sync_common, 0, version=sync.version) + if not sync: + return logger.info('Synchronous replication key updated by someone else.') + + # When strict mode and no suitable replication connections put "*" to synchronous_standby_names + if global_config.is_synchronous_mode_strict and not picked: + picked = CaseInsensitiveSet('*') + logger.warning("No standbys available!") + + # Update postgresql.conf and wait 2 secs for changes to become active + logger.info("Assigning synchronous standby status to %s", list(picked)) + self.state_handler.sync_handler.set_synchronous_standby_names(picked) + + if picked and picked != CaseInsensitiveSet('*') and allow_promote != picked: + # Wait for PostgreSQL to enable synchronous mode and see if we can immediately set sync_standby + time.sleep(2) + allow_promote = self.state_handler.sync_handler.current_state(self.cluster).sync + + if allow_promote and allow_promote != sync_common: + if self.dcs.write_sync_state(self.state_handler.name, allow_promote, 0, version=sync.version): + logger.info("Synchronous standby status assigned to %s", list(allow_promote)) + else: + logger.info("Synchronous replication key updated by someone else") + + def process_sync_replication(self) -> None: + """Process synchronous replication behavior on the primary.""" + if self.is_quorum_commit_mode(): + # The synchronous_standby_names was adjusted right before promote. + # After that, when postgres has become a primary, we need to reflect this change + # in the /sync key. Further changes of synchronous_standby_names and /sync key should + # be postponed for `loop_wait` seconds, to give a chance to some replicas to start streaming. + # In opposite case the /sync key will end up without synchronous nodes. + if self.state_handler.is_primary(): + if self._promote_timestamp == 0 or time.time() - self._promote_timestamp > self.dcs.loop_wait: + self._process_quorum_replication() + if self._promote_timestamp == 0: + self._promote_timestamp = time.time() + elif self.is_synchronous_mode(): + self._process_multisync_replication() else: - # If synchronous_mode was turned off, we need to update synchronous_standby_names in Postgres - if not self.cluster.sync.is_empty and self.dcs.delete_sync_state(version=self.cluster.sync.version): - logger.info("Disabled synchronous replication") - self.state_handler.sync_handler.set_synchronous_standby_names(CaseInsensitiveSet()) + self.disable_synchronous_replication() + + def process_sync_replication_prepromote(self) -> bool: + """Handle sync replication state before promote. + + If quorum replication is requested, and we can keep syncing to enough nodes satisfying the quorum invariant + we can promote immediately and let normal quorum resolver process handle any membership changes later. + Otherwise, we will just reset DCS state to ourselves and add replicas as they connect. + + :returns: ``True`` if on success or ``False`` if failed to update /sync key in DCS. + """ + if not self.is_synchronous_mode(): + self.disable_synchronous_replication() + return True + + if self.quorum_commit_mode_is_active(): + sync = CaseInsensitiveSet(self.cluster.sync.members) + numsync = len(sync) - self.cluster.sync.quorum - 1 + if self.state_handler.name not in sync: # Node outside voters achieved quorum and got leader + numsync += 1 + else: + sync.discard(self.state_handler.name) + else: + sync = CaseInsensitiveSet() + numsync = global_config.min_synchronous_nodes - # As synchronous_mode is off, check if the user configured Postgres synchronous replication instead - ssn = self.state_handler.config.synchronous_standby_names - self.state_handler.config.set_synchronous_standby_names(ssn) + if not self.is_quorum_commit_mode() or not self.state_handler.supports_multiple_sync and numsync > 1: + sync = CaseInsensitiveSet() + numsync = global_config.min_synchronous_nodes + + # Just set ourselves as the authoritative source of truth for now. We don't want to wait for standbys + # to connect. We will try finding a synchronous standby in the next cycle. + if not self.dcs.write_sync_state(self.state_handler.name, None, 0, version=self.cluster.sync.version): + return False + + self.state_handler.sync_handler.set_synchronous_standby_names(sync, numsync) + return True def is_sync_standby(self, cluster: Cluster) -> bool: """:returns: `True` if the current node is a synchronous standby.""" @@ -875,15 +1026,10 @@ def enforce_primary_role(self, message: str, promote_message: str) -> str: self.process_sync_replication() return message else: - if self.is_synchronous_mode(): - # Just set ourselves as the authoritative source of truth for now. We don't want to wait for standbys - # to connect. We will try finding a synchronous standby in the next cycle. - if not self.dcs.write_sync_state(self.state_handler.name, None, version=self.cluster.sync.version): - # Somebody else updated sync state, it may be due to us losing the lock. To be safe, postpone - # promotion until next cycle. TODO: trigger immediate retry of run_cycle - return 'Postponing promotion because synchronous replication state was updated by somebody else' - self.state_handler.sync_handler.set_synchronous_standby_names( - CaseInsensitiveSet('*') if global_config.is_synchronous_mode_strict else CaseInsensitiveSet()) + if not self.process_sync_replication_prepromote(): + # Somebody else updated sync state, it may be due to us losing the lock. To be safe, + # postpone promotion until next cycle. TODO: trigger immediate retry of run_cycle. + return 'Postponing promotion because synchronous replication state was updated by somebody else' if self.state_handler.role not in ('master', 'promoted', 'primary'): # reset failsafe state when promote self._failsafe.set_is_active(0) @@ -899,10 +1045,14 @@ def before_promote(): return promote_message def fetch_node_status(self, member: Member) -> _MemberStatus: - """This function perform http get request on member.api_url and fetches its status - :returns: `_MemberStatus` object - """ + """Perform http get request on member.api_url to fetch its status. + + Usually this happens during the leader race and we can't afford to wait an indefinite time + for a response, therefore the request timeout is hardcoded to 2 seconds, which seems to be a + good compromise. The node which is slow to respond is most likely unhealthy. + :returns: :class:`_MemberStatus` object + """ try: response = self.patroni.request(member, timeout=2, retries=0) data = response.data.decode('utf-8') @@ -987,18 +1137,26 @@ def check_failsafe_topology(self) -> bool: return all(results) def is_lagging(self, wal_position: int) -> bool: - """Returns if instance with an wal should consider itself unhealthy to be promoted due to replication lag. + """Check if node should consider itself unhealthy to be promoted due to replication lag. :param wal_position: Current wal position. - :returns True when node is lagging + :returns: ``True`` when node is lagging """ lag = (self.cluster.last_lsn or 0) - wal_position return lag > global_config.maximum_lag_on_failover def _is_healthiest_node(self, members: Collection[Member], check_replication_lag: bool = True) -> bool: - """This method tries to determine whether I am healthy enough to became a new leader candidate or not.""" - + """Determine whether the current node is healthy enough to become a new leader candidate. + + :param members: the list of nodes to check against + :param check_replication_lag: whether to take the replication lag into account. + If the lag exceeds configured threshold the node disqualifies itself. + :returns: ``True`` if the node is eligible to become the new leader. Since this method is executed + on multiple nodes independently it is possible that multiple nodes could count + themselves as the healthiest because they received/replayed up to the same LSN, + but this is totally fine. + """ my_wal_position = self.state_handler.last_operation() if check_replication_lag and self.is_lagging(my_wal_position): logger.info('My wal position exceeds maximum replication lag') @@ -1014,8 +1172,26 @@ def _is_healthiest_node(self, members: Collection[Member], check_replication_lag logger.info('My timeline %s is behind last known cluster timeline %s', my_timeline, cluster_timeline) return False - # Prepare list of nodes to run check against - members = [m for m in members if m.name != self.state_handler.name and not m.nofailover and m.api_url] + if self.quorum_commit_mode_is_active(): + quorum = self.cluster.sync.quorum + voting_set = CaseInsensitiveSet(self.cluster.sync.members) + else: + quorum = 0 + voting_set = CaseInsensitiveSet() + + # Prepare list of nodes to run check against. If quorum commit is enabled + # we also include members with nofailover tag if they are listed in voters. + members = [m for m in members if m.name != self.state_handler.name + and m.api_url and (not m.nofailover or m.name in voting_set)] + + # If there is a quorum active then at least one of the quorum contains latest commit. A quorum member saying + # their WAL position is not ahead counts as a vote saying we may become new leader. Note that a node doesn't + # have to be a member of the voting set to gather the necessary votes. + + # Regardless of voting, if we observe a node that can become a leader and is ahead, we defer to that node. + # This can lead to failure to act on quorum if there is asymmetric connectivity. + quorum_votes = 0 if self.state_handler.name in voting_set else -1 + nodes_ahead = 0 for st in self.fetch_nodes_statuses(members): if st.failover_limitation() is None: @@ -1023,22 +1199,34 @@ def _is_healthiest_node(self, members: Collection[Member], check_replication_lag logger.warning('Primary (%s) is still alive', st.member.name) return False if my_wal_position < st.wal_position: + nodes_ahead += 1 logger.info('Wal position of %s is ahead of my wal position', st.member.name) # In synchronous mode the former leader might be still accessible and even be ahead of us. # We should not disqualify himself from the leader race in such a situation. if not self.sync_mode_is_active() or not self.cluster.sync.leader_matches(st.member.name): return False logger.info('Ignoring the former leader being ahead of us') - if my_wal_position == st.wal_position and self.patroni.failover_priority < st.failover_priority: - # There's a higher priority non-lagging replica - logger.info( - '%s has equally tolerable WAL position and priority %s, while this node has priority %s', - st.member.name, - st.failover_priority, - self.patroni.failover_priority, - ) - return False - return True + elif st.wal_position > 0: # we want to count votes only from nodes with postgres up and running! + quorum_vote = st.member.name in voting_set + low_priority = my_wal_position == st.wal_position \ + and self.patroni.failover_priority < st.failover_priority + + if low_priority and (not self.sync_mode_is_active() or quorum_vote): + # There's a higher priority non-lagging replica + logger.info( + '%s has equally tolerable WAL position and priority %s, while this node has priority %s', + st.member.name, st.failover_priority, self.patroni.failover_priority) + return False + + if quorum_vote: + logger.info('Got quorum vote from %s', st.member.name) + quorum_votes += 1 + + # When not in quorum commit we just want to return `True`. + # In quorum commit the former leader is special and counted healthy even when there are no other nodes. + # Otherwise check that the number of votes exceeds the quorum field from the /sync key. + return not self.quorum_commit_mode_is_active() or quorum_votes >= quorum\ + or nodes_ahead == 0 and self.cluster.sync.leader == self.state_handler.name def is_failover_possible(self, *, cluster_lsn: int = 0, exclude_failover_candidate: bool = False) -> bool: """Checks whether any of the cluster members is allowed to promote and is healthy enough for that. @@ -1098,9 +1286,10 @@ def manual_failover_process_no_leader(self) -> Optional[bool]: return None return False - # in synchronous mode when our name is not in the /sync key - # we shouldn't take any action even if the candidate is unhealthy - if self.is_synchronous_mode() and not self.cluster.sync.matches(self.state_handler.name, True): + # in synchronous mode (except quorum commit!) when our name is not in the + # /sync key we shouldn't take any action even if the candidate is unhealthy + if self.is_synchronous_mode() and not self.is_quorum_commit_mode()\ + and not self.cluster.sync.matches(self.state_handler.name, True): return False # find specific node and check that it is healthy @@ -1198,9 +1387,11 @@ def is_healthiest_node(self) -> bool: all_known_members += [RemoteMember(name, {'api_url': url}) for name, url in failsafe_members.items()] all_known_members += self.cluster.members - # When in sync mode, only last known primary and sync standby are allowed to promote automatically. + # Special handling if synchronous mode was requested and activated (the leader in /sync is not empty) if self.sync_mode_is_active(): - if not self.cluster.sync.matches(self.state_handler.name, True): + # In quorum commit mode we allow nodes outside of "voters" to take part in + # the leader race. They just need to get enough votes to `reach quorum + 1`. + if not self.is_quorum_commit_mode() and not self.cluster.sync.matches(self.state_handler.name, True): return False # pick between synchronous candidates so we minimize unnecessary failovers/demotions members = {m.name: m for m in all_known_members if self.cluster.sync.matches(m.name, True)} @@ -2093,8 +2284,11 @@ def get_failover_candidates(self, exclude_failover_candidate: bool) -> List[Memb exclude = [self.state_handler.name] + ([failover.candidate] if failover and exclude_failover_candidate else []) def is_eligible(node: Member) -> bool: + # If quorum commit is requested we want to check all nodes (even not voters), + # because they could get enough votes and reach necessary quorum + 1. # in synchronous mode we allow failover (not switchover!) to async node - if self.sync_mode_is_active() and not self.cluster.sync.matches(node.name)\ + if self.sync_mode_is_active()\ + and not (self.is_quorum_commit_mode() or self.cluster.sync.matches(node.name))\ and not (failover and not failover.leader): return False # Don't spend time on "nofailover" nodes checking. diff --git a/patroni/postgresql/__init__.py b/patroni/postgresql/__init__.py index f6deb97de..71756c452 100644 --- a/patroni/postgresql/__init__.py +++ b/patroni/postgresql/__init__.py @@ -181,6 +181,11 @@ def wal_flush(self) -> str: def lsn_name(self) -> str: return 'lsn' if self._major_version >= 100000 else 'location' + @property + def supports_quorum_commit(self) -> bool: + """``True`` if quorum commit is supported by Postgres.""" + return self._major_version >= 100000 + @property def supports_multiple_sync(self) -> bool: """:returns: `True` if Postgres version supports more than one synchronous node.""" diff --git a/patroni/postgresql/sync.py b/patroni/postgresql/sync.py index 577422b5d..37c6c9065 100644 --- a/patroni/postgresql/sync.py +++ b/patroni/postgresql/sync.py @@ -3,7 +3,7 @@ import time from copy import deepcopy -from typing import Collection, List, NamedTuple, Tuple, TYPE_CHECKING +from typing import Collection, List, NamedTuple, Optional, TYPE_CHECKING from .. import global_config from ..collections import CaseInsensitiveDict, CaseInsensitiveSet @@ -138,7 +138,7 @@ def parse_sync_standby_names(value: str) -> _SSN: if len(synclist) == i + 1: # except the last token raise ValueError("Unparseable synchronous_standby_names value %r: Unexpected token %s %r at %d" % (value, a_type, a_value, a_pos)) - elif a_type != 'comma': + if a_type != 'comma': raise ValueError("Unparseable synchronous_standby_names value %r: ""Got token %s %r while" " expecting comma at %d" % (value, a_type, a_value, a_pos)) elif a_type in {'ident', 'first', 'any'}: @@ -154,6 +154,26 @@ def parse_sync_standby_names(value: str) -> _SSN: return _SSN(sync_type, has_star, num, members) +class _SyncState(NamedTuple): + """Class representing the current synchronous state. + + :ivar sync_type: possible values: ``off``, ``priority``, ``quorum`` + :ivar numsync: how many nodes are required to be synchronous (according to ``synchronous_standby_names``). + Is ``0`` if ``synchronous_standby_names`` value is invalid or contains ``*``. + :ivar numsync_confirmed: how many nodes are known to be synchronous according to the ``pg_stat_replication`` view. + Only nodes that caught up with the :attr:`SyncHandler._primary_flush_lsn` are counted. + :ivar sync: collection of synchronous node names. In case of quorum commit all nodes listed + in ``synchronous_standby_names``, otherwise nodes that are confirmed to be synchronous according + to the ``pg_stat_replication`` view. + :ivar active: collection of node names that are streaming and have no restrictions to become synchronous. + """ + sync_type: str + numsync: int + numsync_confirmed: int + sync: CaseInsensitiveSet + active: CaseInsensitiveSet + + class _Replica(NamedTuple): """Class representing a single replica that is eligible to be synchronous. @@ -217,6 +237,8 @@ def __init__(self, postgresql: 'Postgresql', cluster: Cluster) -> None: # Prefer replicas that are in state ``sync`` and with higher values of ``write``/``flush``/``replay`` LSN. self.sort(key=lambda r: (r.sync_state, r.lsn), reverse=True) + # When checking ``maximum_lag_on_syncnode`` we want to compare with the most + # up-to-date replica otherwise with cluster LSN if there is only one replica. self.max_lsn = max(self, key=lambda x: x.lsn).lsn if len(self) > 1 else postgresql.last_operation() @@ -278,12 +300,22 @@ def _process_replica_readiness(self, cluster: Cluster, replica_list: _ReplicaLis # if standby name is listed in the /sync key we can count it as synchronous, otherwise # it becomes really synchronous when sync_state = 'sync' and it is known that it managed to catch up if replica.application_name not in self._ready_replicas\ - and replica.application_name in self._ssn_data.members\ - and (cluster.sync.matches(replica.application_name) - or replica.sync_state == 'sync' and replica.lsn >= self._primary_flush_lsn): - self._ready_replicas[replica.application_name] = replica.pid - - def current_state(self, cluster: Cluster) -> Tuple[CaseInsensitiveSet, CaseInsensitiveSet]: + and replica.application_name in self._ssn_data.members: + if global_config.is_quorum_commit_mode: + # When quorum commit is enabled we can't check against cluster.sync because nodes + # are written there when at least one of them caught up with _primary_flush_lsn. + if replica.lsn >= self._primary_flush_lsn\ + and (replica.sync_state == 'quorum' + or (not self._postgresql.supports_quorum_commit + and replica.sync_state in ('sync', 'potential'))): + self._ready_replicas[replica.application_name] = replica.pid + elif cluster.sync.matches(replica.application_name)\ + or replica.sync_state == 'sync' and replica.lsn >= self._primary_flush_lsn: + # if standby name is listed in the /sync key we can count it as synchronous, otherwise it becomes + # "really" synchronous when sync_state = 'sync' and we known that it managed to catch up + self._ready_replicas[replica.application_name] = replica.pid + + def current_state(self, cluster: Cluster) -> _SyncState: """Find the best candidates to be the synchronous standbys. Current synchronous standby is always preferred, unless it has disconnected or does not want to be a @@ -291,51 +323,86 @@ def current_state(self, cluster: Cluster) -> Tuple[CaseInsensitiveSet, CaseInsen Standbys are selected based on values from the global configuration: - - `maximum_lag_on_syncnode`: would help swapping unhealthy sync replica in case if it stops - responding (or hung). Please set the value high enough so it won't unncessarily swap sync - standbys during high loads. Any value less or equal of 0 keeps the behavior backward compatible. - Please note that it will not also swap sync standbys in case where all replicas are hung. - - `synchronous_node_count`: controlls how many nodes should be set as synchronous. + - ``maximum_lag_on_syncnode``: would help swapping unhealthy sync replica in case it stops + responding (or hung). Please set the value high enough, so it won't unnecessarily swap sync + standbys during high loads. Any value less or equal to ``0`` keeps the behavior backwards compatible. + Please note that it will also not swap sync standbys when all replicas are hung. - :returns: tuple of candidates :class:`CaseInsensitiveSet` and synchronous standbys :class:`CaseInsensitiveSet`. + - ``synchronous_node_count``: controls how many nodes should be set as synchronous. + + :param cluster: current cluster topology from DCS + + :returns: current synchronous replication state as a :class:`_SyncState` object """ self._handle_synchronous_standby_names_change() replica_list = _ReplicaList(self._postgresql, cluster) self._process_replica_readiness(cluster, replica_list) + active = CaseInsensitiveSet() + sync_nodes = CaseInsensitiveSet() + numsync_confirmed = 0 + sync_node_count = global_config.synchronous_node_count if self._postgresql.supports_multiple_sync else 1 sync_node_maxlag = global_config.maximum_lag_on_syncnode - candidates = CaseInsensitiveSet() - sync_nodes = CaseInsensitiveSet() # Prefer members without nofailover tag. We are relying on the fact that sorts are guaranteed to be stable. for replica in sorted(replica_list, key=lambda x: x.nofailover): if sync_node_maxlag <= 0 or replica_list.max_lsn - replica.lsn <= sync_node_maxlag: - candidates.add(replica.application_name) - if replica.sync_state == 'sync' and replica.application_name in self._ready_replicas: - sync_nodes.add(replica.application_name) - if len(candidates) >= sync_node_count: - break - - return candidates, sync_nodes - - def set_synchronous_standby_names(self, sync: Collection[str]) -> None: - """Constructs and sets "synchronous_standby_names" GUC value. + if global_config.is_quorum_commit_mode: + # We do not add nodes with `nofailover` enabled because that reduces availability. + # We need to check LSN quorum only among nodes that are promotable because + # there is a chance that a non-promotable node is ahead of a promotable one. + if not replica.nofailover or len(active) < sync_node_count: + if replica.application_name in self._ready_replicas: + numsync_confirmed += 1 + active.add(replica.application_name) + else: + active.add(replica.application_name) + if replica.sync_state == 'sync' and replica.application_name in self._ready_replicas: + sync_nodes.add(replica.application_name) + numsync_confirmed += 1 + if len(active) >= sync_node_count: + break + + if global_config.is_quorum_commit_mode: + sync_nodes = CaseInsensitiveSet() if self._ssn_data.has_star else self._ssn_data.members + + return _SyncState( + self._ssn_data.sync_type, + 0 if self._ssn_data.has_star else self._ssn_data.num, + numsync_confirmed, + sync_nodes, + active) + + def set_synchronous_standby_names(self, sync: Collection[str], num: Optional[int] = None) -> None: + """Constructs and sets ``synchronous_standby_names`` GUC value. + + .. note:: + standbys in ``synchronous_standby_names`` will be sorted by name. :param sync: set of nodes to sync to + :param num: specifies number of nodes to sync to. The *num* is set only in case if quorum commit is enabled """ - has_asterisk = '*' in sync + # Special case. If sync nodes set is empty but requested num of sync nodes >= 1 + # we want to set synchronous_standby_names to '*' + has_asterisk = '*' in sync or num and num >= 1 and not sync if has_asterisk: sync = ['*'] else: - sync = [quote_ident(x) for x in sync] + sync = [quote_ident(x) for x in sorted(sync)] if self._postgresql.supports_multiple_sync and len(sync) > 1: - sync_param = '{0} ({1})'.format(len(sync), ','.join(sync)) + if num is None: + num = len(sync) + sync_param = ','.join(sync) else: sync_param = next(iter(sync), None) + if global_config.is_quorum_commit_mode and sync or self._postgresql.supports_multiple_sync and len(sync) > 1: + prefix = 'ANY ' if global_config.is_quorum_commit_mode and self._postgresql.supports_quorum_commit else '' + sync_param = f'{prefix}{num} ({sync_param})' + if not (self._postgresql.config.set_synchronous_standby_names(sync_param) and self._postgresql.state == 'running' and self._postgresql.is_primary()) or has_asterisk: return diff --git a/patroni/quorum.py b/patroni/quorum.py new file mode 100644 index 000000000..059c8acc7 --- /dev/null +++ b/patroni/quorum.py @@ -0,0 +1,431 @@ +"""Implement state machine to manage ``synchronous_standby_names`` GUC and ``/sync`` key in DCS.""" +import logging + +from typing import Collection, Iterator, NamedTuple, Optional + +from .collections import CaseInsensitiveSet +from .exceptions import PatroniException + +logger = logging.getLogger(__name__) + + +class Transition(NamedTuple): + """Object describing transition of ``/sync`` or ``synchronous_standby_names`` to the new state. + + .. note:: + Object attributes represent the new state. + + :ivar transition_type: possible values: + + * ``sync`` - indicates that we needed to update ``synchronous_standby_names``. + * ``quorum`` - indicates that we need to update ``/sync`` key in DCS. + * ``restart`` - caller should stop iterating over transitions and restart :class:`QuorumStateResolver`. + :ivar leader: the new value of the ``leader`` field in the ``/sync`` key. + :ivar num: the new value of the synchronous nodes count in ``synchronous_standby_names`` or value of the ``quorum`` + field in the ``/sync`` key for :attr:`transition_type` values ``sync`` and ``quorum`` respectively. + :ivar names: the new value of node names listed in ``synchronous_standby_names`` or value of ``voters`` + field in the ``/sync`` key for :attr:`transition_type` values ``sync`` and ``quorum`` respectively. + """ + + transition_type: str + leader: str + num: int + names: CaseInsensitiveSet + + +class QuorumError(PatroniException): + """Exception indicating that the quorum state is broken.""" + + +class QuorumStateResolver: + """Calculates a list of state transitions and yields them as :class:`Transition` named tuples. + + Synchronous replication state is set in two places: + + * PostgreSQL configuration sets how many and which nodes are needed for a commit to succeed, abbreviated as + ``numsync`` and ``sync`` set here; + * DCS contains information about how many and which nodes need to be interrogated to be sure to see an wal position + containing latest confirmed commit, abbreviated as ``quorum`` and ``voters`` set. + + .. note:: + Both of above pairs have the meaning "ANY n OF set". + + The number of nodes needed for commit to succeed, ``numsync``, is also called the replication factor. + + To guarantee zero transaction loss on failover we need to keep the invariant that at all times any subset of + nodes that can acknowledge a commit overlaps with any subset of nodes that can achieve quorum to promote a new + leader. Given a desired replication factor and a set of nodes able to participate in sync replication there + is one optimal state satisfying this condition. Given the node set ``active``, the optimal state is:: + + sync = voters = active + + numsync = min(sync_wanted, len(active)) + + quorum = len(active) - numsync + + We need to be able to produce a series of state changes that take the system to this desired state from any + other arbitrary state given arbitrary changes is node availability, configuration and interrupted transitions. + + To keep the invariant the rule to follow is that when increasing ``numsync`` or ``quorum``, we need to perform the + increasing operation first. When decreasing either, the decreasing operation needs to be performed later. In other + words: + + * If a user increases ``synchronous_node_count`` configuration, first we increase ``synchronous_standby_names`` + (``numsync``), then we decrease ``quorum`` field in the ``/sync`` key; + * If a user decreases ``synchronous_node_count`` configuration, first we increase ``quorum`` field in the ``/sync`` + key, then we decrease ``synchronous_standby_names`` (``numsync``). + + Order of adding or removing nodes from ``sync`` and ``voters`` depends on the state of + ``synchronous_standby_names``. + + When adding new nodes:: + + if ``sync`` (``synchronous_standby_names``) is empty: + add new nodes first to ``sync`` and then to ``voters`` when ``numsync_confirmed`` > ``0``. + else: + add new nodes first to ``voters`` and then to ``sync``. + + When removing nodes:: + + if ``sync`` (``synchronous_standby_names``) will become empty after removal: + first remove nodes from ``voters`` and then from ``sync``. + else: + first remove nodes from ``sync`` and then from ``voters``. + Make ``voters`` empty if ``numsync_confirmed`` == ``0``. + + :ivar leader: name of the leader, according to the ``/sync`` key. + :ivar quorum: ``quorum`` value from the ``/sync`` key, the minimal number of nodes we need see + when doing the leader race. + :ivar voters: ``sync_standby`` value from the ``/sync`` key, set of node names we will be + running the leader race against. + :ivar numsync: the number of synchronous nodes from the ``synchronous_standby_names``. + :ivar sync: set of node names listed in the ``synchronous_standby_names``. + :ivar numsync_confirmed: the number of nodes that are confirmed to reach "safe" LSN after they were added to the + ``synchronous_standby_names``. + :ivar active: set of node names that are replicating from the primary (according to ``pg_stat_replication``) + and are eligible to be listed in ``synchronous_standby_names``. + :ivar sync_wanted: desired number of synchronous nodes (``synchronous_node_count`` from the global configuration). + :ivar leader_wanted: the desired leader (could be different from the :attr:`leader` right after a failover). + """ + + def __init__(self, leader: str, quorum: int, voters: Collection[str], + numsync: int, sync: Collection[str], numsync_confirmed: int, + active: Collection[str], sync_wanted: int, leader_wanted: str) -> None: + """Instantiate :class:``QuorumStateResolver`` based on input parameters. + + :param leader: name of the leader, according to the ``/sync`` key. + :param quorum: ``quorum`` value from the ``/sync`` key, the minimal number of nodes we need see + when doing the leader race. + :param voters: ``sync_standby`` value from the ``/sync`` key, set of node names we will be + running the leader race against. + :param numsync: the number of synchronous nodes from the ``synchronous_standby_names``. + :param sync: Set of node names listed in the ``synchronous_standby_names``. + :param numsync_confirmed: the number of nodes that are confirmed to reach "safe" LSN after + they were added to the ``synchronous_standby_names``. + :param active: set of node names that are replicating from the primary (according to ``pg_stat_replication``) + and are eligible to be listed in ``synchronous_standby_names``. + :param sync_wanted: desired number of synchronous nodes + (``synchronous_node_count`` from the global configuration). + :param leader_wanted: the desired leader (could be different from the *leader* right after a failover). + + """ + self.leader = leader + self.quorum = quorum + self.voters = CaseInsensitiveSet(voters) + self.numsync = min(numsync, len(sync)) # numsync can't be bigger than number of listed synchronous nodes. + self.sync = CaseInsensitiveSet(sync) + self.numsync_confirmed = numsync_confirmed + self.active = CaseInsensitiveSet(active) + self.sync_wanted = sync_wanted + self.leader_wanted = leader_wanted + + def check_invariants(self) -> None: + """Checks invariant of ``synchronous_standby_names`` and ``/sync`` key in DCS. + + .. seealso:: + Check :class:`QuorumStateResolver`'s docstring for more information. + + :raises: + :exc:`QuorumError`: in case of broken state""" + voters = CaseInsensitiveSet(self.voters | CaseInsensitiveSet([self.leader])) + sync = CaseInsensitiveSet(self.sync | CaseInsensitiveSet([self.leader_wanted])) + + # We need to verify that subset of nodes that can acknowledge a commit overlaps + # with any subset of nodes that can achieve quorum to promote a new leader. + # ``+ 1`` is required because the leader is included in the set. + if self.voters and not (len(voters | sync) <= self.quorum + self.numsync + 1): + len_nodes = len(voters | sync) + raise QuorumError("Quorum and sync not guaranteed to overlap: " + f"nodes {len_nodes} >= quorum {self.quorum} + sync {self.sync} + 1") + # unstable cases, we are changing synchronous_standby_names and /sync key + # one after another, hence one set is allowed to be a subset of another + if not (voters.issubset(sync) or sync.issubset(voters)): + voters_only = voters - sync + sync_only = sync - voters + raise QuorumError(f"Mismatched sets: voter only={voters_only} sync only={sync_only}") + + def quorum_update(self, quorum: int, voters: CaseInsensitiveSet, leader: Optional[str] = None, + adjust_quorum: Optional[bool] = True) -> Iterator[Transition]: + """Updates :attr:`quorum`, :attr:`voters` and optionally :attr:`leader` fields. + + :param quorum: the new value for :attr:`quorum`, could be adjusted depending + on values of :attr:`numsync_confirmed` and *adjust_quorum*. + :param voters: the new value for :attr:`voters`, could be adjusted if :attr:`numsync_confirmed` == ``0``. + :param leader: the new value for :attr:`leader`, optional. + :param adjust_quorum: if set to ``True`` the quorum requirement will be increased by the + difference between :attr:`numsync` and :attr:`numsync_confirmed`. + + :yields: the new state of the ``/sync`` key as a :class:`Transition` object. + + :raises: + :exc:`QuorumError` in case of invalid data or if the invariant after transition could not be satisfied. + """ + if quorum < 0: + raise QuorumError(f'Quorum {quorum} < 0 of ({voters})') + if quorum > 0 and quorum >= len(voters): + raise QuorumError(f'Quorum {quorum} >= N of ({voters})') + + old_leader = self.leader + if leader is not None: # Change of leader was requested + self.leader = leader + elif self.numsync_confirmed == 0: + # If there are no nodes that known to caught up with the primary we want to reset quorum/voters in /sync key + quorum = 0 + voters = CaseInsensitiveSet() + elif adjust_quorum: + # It could be that the number of nodes that are known to catch up with the primary is below desired numsync. + # We want to increase quorum to guarantee that the sync node will be found during the leader race. + quorum += max(self.numsync - self.numsync_confirmed, 0) + + if (self.leader, quorum, voters) == (old_leader, self.quorum, self.voters): + if self.voters: + return + # If transition produces no change of leader/quorum/voters we want to give a hint to + # the caller to fetch the new state from the database and restart QuorumStateResolver. + yield Transition('restart', self.leader, self.quorum, self.voters) + + self.quorum = quorum + self.voters = voters + self.check_invariants() + logger.debug('quorum %s %s %s', self.leader, self.quorum, self.voters) + yield Transition('quorum', self.leader, self.quorum, self.voters) + + def sync_update(self, numsync: int, sync: CaseInsensitiveSet) -> Iterator[Transition]: + """Updates :attr:`numsync` and :attr:`sync` fields. + + :param numsync: the new value for :attr:`numsync`. + :param sync: the new value for :attr:`sync`: + + :yields: the new state of ``synchronous_standby_names`` as a :class:`Transition` object. + + :raises: + :exc:`QuorumError` in case of invalid data or if invariant after transition could not be satisfied + """ + if numsync < 0: + raise QuorumError(f'Sync {numsync} < 0 of ({sync})') + if numsync > len(sync): + raise QuorumError(f'Sync {numsync} > N of ({sync})') + + self.numsync = numsync + self.sync = sync + self.check_invariants() + logger.debug('sync %s %s %s', self.leader, self.numsync, self.sync) + yield Transition('sync', self.leader, self.numsync, self.sync) + + def __iter__(self) -> Iterator[Transition]: + """Iterate over the transitions produced by :meth:`_generate_transitions`. + + .. note:: + Merge two transitions of the same type to a single one. + + This is always safe because skipping the first transition is equivalent + to no one observing the intermediate state. + + :yields: transitions as :class:`Transition` objects. + """ + transitions = list(self._generate_transitions()) + for cur_transition, next_transition in zip(transitions, transitions[1:] + [None]): + if isinstance(next_transition, Transition) \ + and cur_transition.transition_type == next_transition.transition_type: + continue + yield cur_transition + if cur_transition.transition_type == 'restart': + break + + def __handle_non_steady_cases(self) -> Iterator[Transition]: + """Handle cases when set of transitions produced on previous run was interrupted. + + :yields: transitions as :class:`Transition` objects. + """ + if self.sync < self.voters: + logger.debug("Case 1: synchronous_standby_names %s is a subset of DCS state %s", self.sync, self.voters) + # Case 1: voters is superset of sync nodes. In the middle of changing voters (quorum). + # Evict dead nodes from voters that are not being synced. + remove_from_voters = self.voters - (self.sync | self.active) + if remove_from_voters: + yield from self.quorum_update( + quorum=len(self.voters) - len(remove_from_voters) - self.numsync, + voters=CaseInsensitiveSet(self.voters - remove_from_voters), + adjust_quorum=not (self.sync - self.active)) + # Start syncing to nodes that are in voters and alive + add_to_sync = (self.voters & self.active) - self.sync + if add_to_sync: + yield from self.sync_update(self.numsync, CaseInsensitiveSet(self.sync | add_to_sync)) + elif self.sync > self.voters: + logger.debug("Case 2: synchronous_standby_names %s is a superset of DCS state %s", self.sync, self.voters) + # Case 2: sync is superset of voters nodes. In the middle of changing replication factor (sync). + # Add to voters nodes that are already synced and active + add_to_voters = (self.sync - self.voters) & self.active + if add_to_voters: + voters = CaseInsensitiveSet(self.voters | add_to_voters) + yield from self.quorum_update(len(voters) - self.numsync, voters) + # Remove from sync nodes that are dead + remove_from_sync = self.sync - self.voters + if remove_from_sync: + yield from self.sync_update( + numsync=min(self.numsync, len(self.sync) - len(remove_from_sync)), + sync=CaseInsensitiveSet(self.sync - remove_from_sync)) + + # After handling these two cases voters and sync must match. + assert self.voters == self.sync + + safety_margin = self.quorum + min(self.numsync, self.numsync_confirmed) - len(self.voters | self.sync) + if safety_margin > 0: # In the middle of changing replication factor. + if self.numsync > self.sync_wanted: + numsync = max(self.sync_wanted, len(self.voters) - self.quorum) + logger.debug('Case 3: replication factor %d is bigger than needed %d', self.numsync, numsync) + yield from self.sync_update(numsync, self.sync) + else: + quorum = len(self.sync) - self.numsync + logger.debug('Case 4: quorum %d is bigger than needed %d', self.quorum, quorum) + yield from self.quorum_update(quorum, self.voters) + else: + safety_margin = self.quorum + self.numsync - len(self.voters | self.sync) + if self.numsync == self.sync_wanted and safety_margin > 0 and self.numsync > self.numsync_confirmed: + yield from self.quorum_update(len(self.sync) - self.numsync, self.voters) + + def __remove_gone_nodes(self) -> Iterator[Transition]: + """Remove inactive nodes from ``synchronous_standby_names`` and from ``/sync`` key. + + :yields: transitions as :class:`Transition` objects. + """ + to_remove = self.sync - self.active + if to_remove and self.sync == to_remove: + logger.debug("Removing nodes: %s", to_remove) + yield from self.quorum_update(0, CaseInsensitiveSet(), adjust_quorum=False) + yield from self.sync_update(0, CaseInsensitiveSet()) + elif to_remove: + logger.debug("Removing nodes: %s", to_remove) + can_reduce_quorum_by = self.quorum + # If we can reduce quorum size try to do so first + if can_reduce_quorum_by: + # Pick nodes to remove by sorted order to provide deterministic behavior for tests + remove = CaseInsensitiveSet(sorted(to_remove, reverse=True)[:can_reduce_quorum_by]) + sync = CaseInsensitiveSet(self.sync - remove) + # when removing nodes from sync we can safely increase numsync if requested + numsync = min(self.sync_wanted, len(sync)) if self.sync_wanted > self.numsync else self.numsync + yield from self.sync_update(numsync, sync) + voters = CaseInsensitiveSet(self.voters - remove) + to_remove &= self.sync + yield from self.quorum_update(len(voters) - self.numsync, voters, + adjust_quorum=not to_remove) + if to_remove: + assert self.quorum == 0 + numsync = self.numsync - len(to_remove) + sync = CaseInsensitiveSet(self.sync - to_remove) + voters = CaseInsensitiveSet(self.voters - to_remove) + sync_decrease = numsync - min(self.sync_wanted, len(sync)) + quorum = min(sync_decrease, len(voters) - 1) if sync_decrease else 0 + yield from self.quorum_update(quorum, voters, adjust_quorum=False) + yield from self.sync_update(numsync, sync) + + def __add_new_nodes(self) -> Iterator[Transition]: + """Add new active nodes to ``synchronous_standby_names`` and to ``/sync`` key. + + :yields: transitions as :class:`Transition` objects. + """ + to_add = self.active - self.sync + if to_add: + # First get to requested replication factor + logger.debug("Adding nodes: %s", to_add) + sync_wanted = min(self.sync_wanted, len(self.sync | to_add)) + increase_numsync_by = sync_wanted - self.numsync + if increase_numsync_by > 0: + if self.sync: + add = CaseInsensitiveSet(sorted(to_add)[:increase_numsync_by]) + increase_numsync_by = len(add) + else: # there is only the leader + add = to_add # and it is safe to add all nodes at once if sync is empty + yield from self.sync_update(self.numsync + increase_numsync_by, CaseInsensitiveSet(self.sync | add)) + voters = CaseInsensitiveSet(self.voters | add) + yield from self.quorum_update(len(voters) - sync_wanted, voters) + to_add -= self.sync + if to_add: + voters = CaseInsensitiveSet(self.voters | to_add) + yield from self.quorum_update(len(voters) - sync_wanted, voters, + adjust_quorum=sync_wanted > self.numsync_confirmed) + yield from self.sync_update(sync_wanted, CaseInsensitiveSet(self.sync | to_add)) + + def __handle_replication_factor_change(self) -> Iterator[Transition]: + """Handle change of the replication factor (:attr:`sync_wanted`, aka ``synchronous_node_count``). + + :yields: transitions as :class:`Transition` objects. + """ + # Apply requested replication factor change + sync_increase = min(self.sync_wanted, len(self.sync)) - self.numsync + if sync_increase > 0: + # Increase replication factor + logger.debug("Increasing replication factor to %s", self.numsync + sync_increase) + yield from self.sync_update(self.numsync + sync_increase, self.sync) + yield from self.quorum_update(len(self.voters) - self.numsync, self.voters) + elif sync_increase < 0: + # Reduce replication factor + logger.debug("Reducing replication factor to %s", self.numsync + sync_increase) + if self.quorum - sync_increase < len(self.voters): + yield from self.quorum_update(len(self.voters) - self.numsync - sync_increase, self.voters, + adjust_quorum=self.sync_wanted > self.numsync_confirmed) + yield from self.sync_update(self.numsync + sync_increase, self.sync) + + def _generate_transitions(self) -> Iterator[Transition]: + """Produce a set of changes to safely transition from the current state to the desired. + + :yields: transitions as :class:`Transition` objects. + """ + logger.debug("Quorum state: leader %s quorum %s, voters %s, numsync %s, sync %s, " + "numsync_confirmed %s, active %s, sync_wanted %s leader_wanted %s", + self.leader, self.quorum, self.voters, self.numsync, self.sync, + self.numsync_confirmed, self.active, self.sync_wanted, self.leader_wanted) + try: + if self.leader_wanted != self.leader: # failover + voters = (self.voters - CaseInsensitiveSet([self.leader_wanted])) | CaseInsensitiveSet([self.leader]) + if not self.sync: + # If sync is empty we need to update synchronous_standby_names first + numsync = len(voters) - self.quorum + yield from self.sync_update(numsync, CaseInsensitiveSet(voters)) + # If leader changed we need to add the old leader to quorum (voters) + yield from self.quorum_update(self.quorum, CaseInsensitiveSet(voters), self.leader_wanted) + # right after promote there could be no replication connections yet + if not self.sync & self.active: + return # give another loop_wait seconds for replicas to reconnect before removing them from quorum + else: + self.check_invariants() + except QuorumError as e: + logger.warning('%s', e) + yield from self.quorum_update(len(self.sync) - self.numsync, self.sync) + + assert self.leader == self.leader_wanted + + # numsync_confirmed could be 0 after restart/failover, we will calculate it from quorum + if self.numsync_confirmed == 0 and self.sync & self.active: + self.numsync_confirmed = min(len(self.sync & self.active), len(self.voters) - self.quorum) + logger.debug('numsync_confirmed=0, adjusting it to %d', self.numsync_confirmed) + + yield from self.__handle_non_steady_cases() + + # We are in a steady state point. Find if desired state is different and act accordingly. + + yield from self.__remove_gone_nodes() + + yield from self.__add_new_nodes() + + yield from self.__handle_replication_factor_change() diff --git a/patroni/utils.py b/patroni/utils.py index 560d25701..fe7854b21 100644 --- a/patroni/utils.py +++ b/patroni/utils.py @@ -922,7 +922,7 @@ def cluster_as_json(cluster: 'Cluster') -> Dict[str, Any]: * ``members``: list of members in the cluster. Each value is a :class:`dict` that may have the following keys: * ``name``: the name of the host (unique in the cluster). The ``members`` list is sorted by this key; - * ``role``: ``leader``, ``standby_leader``, ``sync_standby``, or ``replica``; + * ``role``: ``leader``, ``standby_leader``, ``sync_standby``, ``quorum_standby``, or ``replica``; * ``state``: ``stopping``, ``stopped``, ``stop failed``, ``crashed``, ``running``, ``starting``, ``start failed``, ``restarting``, ``restart failed``, ``initializing new cluster``, ``initdb failed``, ``running custom bootstrap script``, ``custom bootstrap failed``, or ``creating replica``; @@ -949,11 +949,12 @@ def cluster_as_json(cluster: 'Cluster') -> Dict[str, Any]: cluster_lsn = cluster.last_lsn or 0 ret: Dict[str, Any] = {'members': []} + sync_role = 'quorum_standby' if config.is_quorum_commit_mode else 'sync_standby' for m in cluster.members: if m.name == leader_name: role = 'standby_leader' if config.is_standby_cluster else 'leader' elif config.is_synchronous_mode and cluster.sync.matches(m.name): - role = 'sync_standby' + role = sync_role else: role = 'replica' diff --git a/tests/test_api.py b/tests/test_api.py index bc7f5263c..c0866bab2 100644 --- a/tests/test_api.py +++ b/tests/test_api.py @@ -205,7 +205,6 @@ class TestRestApiHandler(unittest.TestCase): def test_do_GET(self): MockPostgresql.pending_restart_reason = {'max_connections': get_param_diff('200', '100')} MockPatroni.dcs.cluster.last_lsn = 20 - MockPatroni.dcs.cluster.sync.members = [MockPostgresql.name] with patch.object(global_config.__class__, 'is_synchronous_mode', PropertyMock(return_value=True)): MockRestApiServer(RestApiHandler, 'GET /replica') MockRestApiServer(RestApiHandler, 'GET /replica?lag=1M') @@ -223,12 +222,16 @@ def test_do_GET(self): Mock(return_value={'role': 'replica', 'sync_standby': True})): MockRestApiServer(RestApiHandler, 'GET /synchronous') MockRestApiServer(RestApiHandler, 'GET /read-only-sync') + with patch.object(RestApiHandler, 'get_postgresql_status', + Mock(return_value={'role': 'replica', 'quorum_standby': True})): + MockRestApiServer(RestApiHandler, 'GET /quorum') + MockRestApiServer(RestApiHandler, 'GET /read-only-quorum') with patch.object(RestApiHandler, 'get_postgresql_status', Mock(return_value={'role': 'replica'})): - MockPatroni.dcs.cluster.sync.members = [] MockRestApiServer(RestApiHandler, 'GET /asynchronous') with patch.object(MockHa, 'is_leader', Mock(return_value=True)): MockRestApiServer(RestApiHandler, 'GET /replica') MockRestApiServer(RestApiHandler, 'GET /read-only-sync') + MockRestApiServer(RestApiHandler, 'GET /read-only-quorum') with patch.object(global_config.__class__, 'is_standby_cluster', Mock(return_value=True)): MockRestApiServer(RestApiHandler, 'GET /standby_leader') MockPatroni.dcs.cluster = None diff --git a/tests/test_etcd.py b/tests/test_etcd.py index 4c56c4258..ce80b8855 100644 --- a/tests/test_etcd.py +++ b/tests/test_etcd.py @@ -344,7 +344,7 @@ def test_set_ttl(self): self.assertTrue(self.etcd.watch(None, 1)) def test_sync_state(self): - self.assertIsNone(self.etcd.write_sync_state('leader', None)) + self.assertIsNone(self.etcd.write_sync_state('leader', None, 0)) self.assertFalse(self.etcd.delete_sync_state()) def test_set_history_value(self): diff --git a/tests/test_ha.py b/tests/test_ha.py index 6ec28775e..2d7fda7f2 100644 --- a/tests/test_ha.py +++ b/tests/test_ha.py @@ -18,6 +18,7 @@ from patroni.postgresql.postmaster import PostmasterProcess from patroni.postgresql.rewind import Rewind from patroni.postgresql.slots import SlotsHandler +from patroni.postgresql.sync import _SyncState from patroni.utils import tzutc from patroni.watchdog import Watchdog @@ -63,7 +64,7 @@ def get_cluster_initialized_without_leader(leader=False, failover=None, sync=Non 'tags': {'clonefrom': True}, 'scheduled_restart': {'schedule': "2100-01-01 10:53:07.560445+00:00", 'postgres_version': '99.0.0'}}) - syncstate = SyncState(0 if sync else None, sync and sync[0], sync and sync[1]) + syncstate = SyncState(0 if sync else None, sync and sync[0], sync and sync[1], 0) failsafe = {m.name: m.api_url for m in (m1, m2)} if failsafe else None return get_cluster(SYSID, leader, [m1, m2], failover, syncstate, cluster_config, failsafe) @@ -207,6 +208,7 @@ class TestHa(PostgresInit): @patch('patroni.dcs.dcs_modules', Mock(return_value=['patroni.dcs.etcd'])) @patch.object(etcd.Client, 'read', etcd_read) @patch.object(AbstractEtcdClientWithFailover, '_get_machines_list', Mock(return_value=['http://remotehost:2379'])) + @patch.object(Config, '_load_cache', Mock()) def setUp(self): super(TestHa, self).setUp() self.p.set_state('running') @@ -1303,7 +1305,7 @@ def test_demote_immediate(self, follow): self.ha.demote('immediate') follow.assert_called_once_with(None) - def test_process_sync_replication(self): + def test__process_multisync_replication(self): self.ha.has_lock = true mock_set_sync = self.p.sync_handler.set_synchronous_standby_names = Mock() mock_cfg_set_sync = self.p.config.set_synchronous_standby_names = Mock() @@ -1333,8 +1335,9 @@ def test_process_sync_replication(self): self.ha.is_synchronous_mode = true # Test sync standby not touched when picking the same node - self.p.sync_handler.current_state = Mock(return_value=(CaseInsensitiveSet(['other']), - CaseInsensitiveSet(['other']))) + self.p.sync_handler.current_state = Mock(return_value=_SyncState('priority', 1, 1, + CaseInsensitiveSet(['other']), + CaseInsensitiveSet(['other']))) self.ha.cluster = get_cluster_initialized_with_leader(sync=('leader', 'other')) self.ha.run_cycle() mock_set_sync.assert_not_called() @@ -1343,15 +1346,17 @@ def test_process_sync_replication(self): mock_cfg_set_sync.reset_mock() # Test sync standby is replaced when switching standbys - self.p.sync_handler.current_state = Mock(return_value=(CaseInsensitiveSet(['other2']), CaseInsensitiveSet())) + self.p.sync_handler.current_state = Mock(return_value=_SyncState('priority', 0, 0, CaseInsensitiveSet(), + CaseInsensitiveSet(['other2']))) self.ha.dcs.write_sync_state = Mock(return_value=SyncState.empty()) self.ha.run_cycle() mock_set_sync.assert_called_once_with(CaseInsensitiveSet(['other2'])) mock_cfg_set_sync.assert_not_called() # Test sync standby is replaced when new standby is joined - self.p.sync_handler.current_state = Mock(return_value=(CaseInsensitiveSet(['other2', 'other3']), - CaseInsensitiveSet(['other2']))) + self.p.sync_handler.current_state = Mock(return_value=_SyncState('priority', 1, 1, + CaseInsensitiveSet(['other2']), + CaseInsensitiveSet(['other2', 'other3']))) self.ha.dcs.write_sync_state = Mock(return_value=SyncState.empty()) self.ha.run_cycle() self.assertEqual(mock_set_sync.call_args_list[0][0], (CaseInsensitiveSet(['other2']),)) @@ -1372,8 +1377,9 @@ def test_process_sync_replication(self): self.ha.dcs.write_sync_state = Mock(return_value=SyncState.empty()) self.ha.dcs.get_cluster = Mock(return_value=get_cluster_initialized_with_leader(sync=('leader', 'other'))) # self.ha.cluster = get_cluster_initialized_with_leader(sync=('leader', 'other')) - self.p.sync_handler.current_state = Mock(return_value=(CaseInsensitiveSet(['other2']), - CaseInsensitiveSet(['other2']))) + self.p.sync_handler.current_state = Mock(return_value=_SyncState('priority', 1, 1, + CaseInsensitiveSet(['other2']), + CaseInsensitiveSet(['other2']))) self.ha.run_cycle() self.assertEqual(self.ha.dcs.write_sync_state.call_count, 2) @@ -1396,9 +1402,10 @@ def test_process_sync_replication(self): # Test sync set to '*' when synchronous_mode_strict is enabled mock_set_sync.reset_mock() mock_cfg_set_sync.reset_mock() - self.p.sync_handler.current_state = Mock(return_value=(CaseInsensitiveSet(), CaseInsensitiveSet())) - self.ha.cluster.config.data['synchronous_mode_strict'] = True - self.ha.run_cycle() + self.p.sync_handler.current_state = Mock(return_value=_SyncState('priority', 0, 0, CaseInsensitiveSet(), + CaseInsensitiveSet())) + with patch.object(global_config.__class__, 'is_synchronous_mode_strict', PropertyMock(return_value=True)): + self.ha.run_cycle() mock_set_sync.assert_called_once_with(CaseInsensitiveSet('*')) mock_cfg_set_sync.assert_not_called() @@ -1426,8 +1433,8 @@ def test_sync_replication_become_primary(self): # When we just became primary nobody is sync self.assertEqual(self.ha.enforce_primary_role('msg', 'promote msg'), 'promote msg') - mock_set_sync.assert_called_once_with(CaseInsensitiveSet()) - mock_write_sync.assert_called_once_with('leader', None, version=0) + mock_set_sync.assert_called_once_with(CaseInsensitiveSet(), 0) + mock_write_sync.assert_called_once_with('leader', None, 0, version=0) mock_set_sync.reset_mock() @@ -1465,7 +1472,7 @@ def test_unhealthy_sync_mode(self): mock_acquire.assert_called_once() mock_follow.assert_not_called() mock_promote.assert_called_once() - mock_write_sync.assert_called_once_with('other', None, version=0) + mock_write_sync.assert_called_once_with('other', None, 0, version=0) def test_disable_sync_when_restarting(self): self.ha.is_synchronous_mode = true @@ -1507,7 +1514,8 @@ def test_enable_synchronous_mode(self): self.ha.is_synchronous_mode = true self.ha.has_lock = true self.p.name = 'leader' - self.p.sync_handler.current_state = Mock(return_value=(CaseInsensitiveSet(), CaseInsensitiveSet())) + self.p.sync_handler.current_state = Mock(return_value=_SyncState('priority', 0, 0, + CaseInsensitiveSet(), CaseInsensitiveSet())) self.ha.dcs.write_sync_state = Mock(return_value=SyncState.empty()) with patch('patroni.ha.logger.info') as mock_logger: self.ha.run_cycle() @@ -1523,7 +1531,8 @@ def test_inconsistent_synchronous_state(self): self.ha.has_lock = true self.p.name = 'leader' self.ha.cluster = get_cluster_initialized_without_leader(sync=('leader', 'a')) - self.p.sync_handler.current_state = Mock(return_value=(CaseInsensitiveSet('a'), CaseInsensitiveSet())) + self.p.sync_handler.current_state = Mock(return_value=_SyncState('priority', 0, 0, + CaseInsensitiveSet(), CaseInsensitiveSet('a'))) self.ha.dcs.write_sync_state = Mock(return_value=SyncState.empty()) mock_set_sync = self.p.sync_handler.set_synchronous_standby_names = Mock() with patch('patroni.ha.logger.warning') as mock_logger: @@ -1693,3 +1702,113 @@ def test_notify_citus_coordinator(self): mock_logger.assert_called() self.assertTrue(mock_logger.call_args[0][0].startswith('Request to %s coordinator leader')) self.assertEqual(mock_logger.call_args[0][1], 'Citus') + + @patch.object(global_config.__class__, 'is_synchronous_mode', PropertyMock(return_value=True)) + @patch.object(global_config.__class__, 'is_quorum_commit_mode', PropertyMock(return_value=True)) + def test_process_sync_replication_prepromote(self): + self.p._major_version = 90500 + self.ha.cluster = get_cluster_initialized_without_leader(sync=('other', self.p.name + ',foo')) + self.p.is_primary = false + self.p.set_role('replica') + mock_write_sync = self.ha.dcs.write_sync_state = Mock(return_value=None) + # Postgres 9.5, write_sync_state to DCS failed + self.assertEqual(self.ha.run_cycle(), + 'Postponing promotion because synchronous replication state was updated by somebody else') + self.assertEqual(self.ha.dcs.write_sync_state.call_count, 1) + self.assertEqual(mock_write_sync.call_args_list[0][0], (self.p.name, None, 0)) + self.assertEqual(mock_write_sync.call_args_list[0][1], {'version': 0}) + + mock_set_sync = self.p.config.set_synchronous_standby_names = Mock() + mock_write_sync = self.ha.dcs.write_sync_state = Mock(return_value=True) + # Postgres 9.5, our name is written to leader of the /sync key, while voters list and ssn is empty + self.assertEqual(self.ha.run_cycle(), 'promoted self to leader by acquiring session lock') + self.assertEqual(self.ha.dcs.write_sync_state.call_count, 1) + self.assertEqual(mock_write_sync.call_args_list[0][0], (self.p.name, None, 0)) + self.assertEqual(mock_write_sync.call_args_list[0][1], {'version': 0}) + self.assertEqual(mock_set_sync.call_count, 1) + self.assertEqual(mock_set_sync.call_args_list[0][0], (None,)) + + self.p._major_version = 90600 + mock_set_sync.reset_mock() + mock_write_sync.reset_mock() + self.p.set_role('replica') + # Postgres 9.6, with quorum commit we avoid updating /sync key and put some nodes to ssn + self.assertEqual(self.ha.run_cycle(), 'promoted self to leader by acquiring session lock') + self.assertEqual(mock_write_sync.call_count, 0) + self.assertEqual(mock_set_sync.call_count, 1) + self.assertEqual(mock_set_sync.call_args_list[0][0], ('2 (foo,other)',)) + + self.p._major_version = 150000 + mock_set_sync.reset_mock() + self.p.set_role('replica') + self.p.name = 'nonsync' + self.ha.fetch_node_status = get_node_status() + # Postgres 15, with quorum commit. Non-sync node promoted we avoid updating /sync key and put some nodes to ssn + self.assertEqual(self.ha.run_cycle(), 'promoted self to leader by acquiring session lock') + self.assertEqual(mock_write_sync.call_count, 0) + self.assertEqual(mock_set_sync.call_count, 1) + self.assertEqual(mock_set_sync.call_args_list[0][0], ('ANY 3 (foo,other,postgresql0)',)) + + @patch.object(global_config.__class__, 'is_synchronous_mode', PropertyMock(return_value=True)) + @patch.object(global_config.__class__, 'is_quorum_commit_mode', PropertyMock(return_value=True)) + def test__process_quorum_replication(self): + self.p._major_version = 150000 + self.ha.has_lock = true + mock_set_sync = self.p.config.set_synchronous_standby_names = Mock() + self.p.name = 'leader' + + mock_write_sync = self.ha.dcs.write_sync_state = Mock(return_value=None) + # Test /sync key is attempted to set and failed when missing or invalid + self.p.sync_handler.current_state = Mock(return_value=_SyncState('quorum', 1, 1, CaseInsensitiveSet(['other']), + CaseInsensitiveSet(['other']))) + self.ha.run_cycle() + self.assertEqual(mock_write_sync.call_count, 1) + self.assertEqual(mock_write_sync.call_args_list[0][0], (self.p.name, None, 0)) + self.assertEqual(mock_write_sync.call_args_list[0][1], {'version': None}) + self.assertEqual(mock_set_sync.call_count, 0) + + self.ha._promote_timestamp = 1 + mock_write_sync = self.ha.dcs.write_sync_state = Mock(side_effect=[SyncState(None, self.p.name, None, 0), None]) + # Test /sync key is attempted to set and succeed when missing or invalid + with patch.object(SyncState, 'is_empty', Mock(side_effect=[True, False])): + self.ha.run_cycle() + self.assertEqual(mock_write_sync.call_count, 2) + self.assertEqual(mock_write_sync.call_args_list[0][0], (self.p.name, None, 0)) + self.assertEqual(mock_write_sync.call_args_list[0][1], {'version': None}) + self.assertEqual(mock_write_sync.call_args_list[1][0], (self.p.name, CaseInsensitiveSet(['other']), 0)) + self.assertEqual(mock_write_sync.call_args_list[1][1], {'version': None}) + self.assertEqual(mock_set_sync.call_count, 0) + + self.p.sync_handler.current_state = Mock(side_effect=[_SyncState('quorum', 1, 0, CaseInsensitiveSet(['foo']), + CaseInsensitiveSet(['other'])), + _SyncState('quorum', 1, 1, CaseInsensitiveSet(['foo']), + CaseInsensitiveSet(['foo']))]) + mock_write_sync = self.ha.dcs.write_sync_state = Mock(return_value=SyncState(1, 'leader', 'foo', 0)) + self.ha.cluster = get_cluster_initialized_with_leader(sync=('leader', 'foo')) + # Test the sync node is removed from voters, added to ssn + with patch.object(Postgresql, 'synchronous_standby_names', Mock(return_value='other')), \ + patch('time.sleep', Mock()): + self.ha.run_cycle() + self.assertEqual(mock_write_sync.call_count, 1) + self.assertEqual(mock_write_sync.call_args_list[0][0], (self.p.name, CaseInsensitiveSet(), 0)) + self.assertEqual(mock_write_sync.call_args_list[0][1], {'version': 0}) + self.assertEqual(mock_set_sync.call_count, 1) + self.assertEqual(mock_set_sync.call_args_list[0][0], ('ANY 1 (other)',)) + + # Test ANY 1 (*) when synchronous_mode_strict and no nodes available + self.p.sync_handler.current_state = Mock(return_value=_SyncState('quorum', 1, 0, + CaseInsensitiveSet(['other', 'foo']), + CaseInsensitiveSet())) + mock_write_sync.reset_mock() + mock_set_sync.reset_mock() + with patch.object(global_config.__class__, 'is_synchronous_mode_strict', PropertyMock(return_value=True)): + self.ha.run_cycle() + self.assertEqual(mock_write_sync.call_count, 1) + self.assertEqual(mock_write_sync.call_args_list[0][0], (self.p.name, CaseInsensitiveSet(), 0)) + self.assertEqual(mock_write_sync.call_args_list[0][1], {'version': 0}) + self.assertEqual(mock_set_sync.call_count, 1) + self.assertEqual(mock_set_sync.call_args_list[0][0], ('ANY 1 (*)',)) + + # Test that _process_quorum_replication doesn't take longer than loop_wait + with patch('time.time', Mock(side_effect=[30, 60, 90, 120])): + self.ha.process_sync_replication() diff --git a/tests/test_kubernetes.py b/tests/test_kubernetes.py index 4bb2da19d..27600933b 100644 --- a/tests/test_kubernetes.py +++ b/tests/test_kubernetes.py @@ -440,7 +440,7 @@ def test_delete_sync_state(self): @patch.object(k8s_client.CoreV1Api, 'patch_namespaced_endpoints', mock_namespaced_kind, create=True) def test_write_sync_state(self): - self.assertIsNotNone(self.k.write_sync_state('a', ['b'], 1)) + self.assertIsNotNone(self.k.write_sync_state('a', ['b'], 0, 1)) @patch.object(k8s_client.CoreV1Api, 'patch_namespaced_pod', mock_namespaced_kind, create=True) @patch.object(k8s_client.CoreV1Api, 'create_namespaced_endpoints', mock_namespaced_kind, create=True) diff --git a/tests/test_quorum.py b/tests/test_quorum.py new file mode 100644 index 000000000..8dddb4f1a --- /dev/null +++ b/tests/test_quorum.py @@ -0,0 +1,473 @@ +import unittest + +from typing import List, Set, Tuple + +from patroni.quorum import QuorumStateResolver, QuorumError + + +class QuorumTest(unittest.TestCase): + + def check_state_transitions(self, leader: str, quorum: int, voters: Set[str], numsync: int, sync: Set[str], + numsync_confirmed: int, active: Set[str], sync_wanted: int, leader_wanted: str, + expected: List[Tuple[str, str, int, Set[str]]]) -> None: + kwargs = { + 'leader': leader, 'quorum': quorum, 'voters': voters, + 'numsync': numsync, 'sync': sync, 'numsync_confirmed': numsync_confirmed, + 'active': active, 'sync_wanted': sync_wanted, 'leader_wanted': leader_wanted + } + result = list(QuorumStateResolver(**kwargs)) + self.assertEqual(result, expected) + + # also check interrupted transitions + if len(result) > 0 and result[0][0] != 'restart' and kwargs['leader'] == result[0][1]: + if result[0][0] == 'sync': + kwargs.update(numsync=result[0][2], sync=result[0][3]) + else: + kwargs.update(leader=result[0][1], quorum=result[0][2], voters=result[0][3]) + kwargs['expected'] = expected[1:] + self.check_state_transitions(**kwargs) + + def test_1111(self): + leader = 'a' + + # Add node + self.check_state_transitions(leader=leader, quorum=0, voters=set(), + numsync=0, sync=set(), numsync_confirmed=0, active=set('b'), + sync_wanted=2, leader_wanted=leader, expected=[ + ('sync', leader, 1, set('b')), + ('restart', leader, 0, set()), + ]) + self.check_state_transitions(leader=leader, quorum=0, voters=set(), + numsync=1, sync=set('b'), numsync_confirmed=1, active=set('b'), + sync_wanted=2, leader_wanted=leader, expected=[ + ('quorum', leader, 0, set('b')) + ]) + + self.check_state_transitions(leader=leader, quorum=0, voters=set(), + numsync=0, sync=set(), numsync_confirmed=0, active=set('bcde'), + sync_wanted=2, leader_wanted=leader, expected=[ + ('sync', leader, 2, set('bcde')), + ('restart', leader, 0, set()), + ]) + self.check_state_transitions(leader=leader, quorum=0, voters=set(), + numsync=2, sync=set('bcde'), numsync_confirmed=1, active=set('bcde'), + sync_wanted=2, leader_wanted=leader, expected=[ + ('quorum', leader, 3, set('bcde')), + ]) + + def test_1222(self): + """2 node cluster""" + leader = 'a' + + # Active set matches state + self.check_state_transitions(leader=leader, quorum=0, voters=set('b'), + numsync=1, sync=set('b'), numsync_confirmed=1, active=set('b'), + sync_wanted=2, leader_wanted=leader, expected=[]) + + # Add node by increasing quorum + self.check_state_transitions(leader=leader, quorum=0, voters=set('b'), + numsync=1, sync=set('b'), numsync_confirmed=1, active=set('BC'), + sync_wanted=1, leader_wanted=leader, expected=[ + ('quorum', leader, 1, set('bC')), + ('sync', leader, 1, set('bC')), + ]) + + # Add node by increasing sync + self.check_state_transitions(leader=leader, quorum=0, voters=set('b'), + numsync=1, sync=set('b'), numsync_confirmed=1, active=set('bc'), + sync_wanted=2, leader_wanted=leader, expected=[ + ('sync', leader, 2, set('bc')), + ('quorum', leader, 1, set('bc')), + ]) + # Reduce quorum after added node caught up + self.check_state_transitions(leader=leader, quorum=1, voters=set('bc'), + numsync=2, sync=set('bc'), numsync_confirmed=2, active=set('bc'), + sync_wanted=2, leader_wanted=leader, expected=[ + ('quorum', leader, 0, set('bc')), + ]) + + # Add multiple nodes by increasing both sync and quorum + self.check_state_transitions(leader=leader, quorum=0, voters=set('b'), + numsync=1, sync=set('b'), numsync_confirmed=1, active=set('BCdE'), + sync_wanted=2, leader_wanted=leader, expected=[ + ('sync', leader, 2, set('bC')), + ('quorum', leader, 3, set('bCdE')), + ('sync', leader, 2, set('bCdE')), + ]) + # Reduce quorum after added nodes caught up + self.check_state_transitions(leader=leader, quorum=3, voters=set('bcde'), + numsync=2, sync=set('bcde'), numsync_confirmed=3, active=set('bcde'), + sync_wanted=2, leader_wanted=leader, expected=[ + ('quorum', leader, 2, set('bcde')), + ]) + + # Primary is alone + self.check_state_transitions(leader=leader, quorum=0, voters=set('b'), + numsync=1, sync=set('b'), numsync_confirmed=0, active=set(), + sync_wanted=1, leader_wanted=leader, expected=[ + ('quorum', leader, 0, set()), + ('sync', leader, 0, set()), + ]) + + # Swap out sync replica + self.check_state_transitions(leader=leader, quorum=0, voters=set('b'), + numsync=1, sync=set('b'), numsync_confirmed=0, active=set('c'), + sync_wanted=1, leader_wanted=leader, expected=[ + ('quorum', leader, 0, set()), + ('sync', leader, 1, set('c')), + ('restart', leader, 0, set()), + ]) + # Update quorum when added node caught up + self.check_state_transitions(leader=leader, quorum=0, voters=set(), + numsync=1, sync=set('c'), numsync_confirmed=1, active=set('c'), + sync_wanted=1, leader_wanted=leader, expected=[ + ('quorum', leader, 0, set('c')), + ]) + + def test_1233(self): + """Interrupted transition from 2 node cluster to 3 node fully sync cluster""" + leader = 'a' + + # Node c went away, transition back to 2 node cluster + self.check_state_transitions(leader=leader, quorum=0, voters=set('b'), + numsync=2, sync=set('bc'), numsync_confirmed=1, active=set('b'), + sync_wanted=2, leader_wanted=leader, expected=[ + ('sync', leader, 1, set('b')), + ]) + + # Node c is available transition to larger quorum set, but not yet caught up. + self.check_state_transitions(leader=leader, quorum=0, voters=set('b'), + numsync=2, sync=set('bc'), numsync_confirmed=1, active=set('bc'), + sync_wanted=2, leader_wanted=leader, expected=[ + ('quorum', leader, 1, set('bc')), + ]) + + # Add in a new node at the same time, but node c didn't caught up yet + self.check_state_transitions(leader=leader, quorum=0, voters=set('b'), + numsync=2, sync=set('bc'), numsync_confirmed=1, active=set('bcd'), + sync_wanted=2, leader_wanted=leader, expected=[ + ('quorum', leader, 2, set('bcd')), + ('sync', leader, 2, set('bcd')), + ]) + # All sync nodes caught up, reduce quorum + self.check_state_transitions(leader=leader, quorum=2, voters=set('bcd'), + numsync=2, sync=set('bcd'), numsync_confirmed=3, active=set('bcd'), + sync_wanted=2, leader_wanted=leader, expected=[ + ('quorum', leader, 1, set('bcd')), + ]) + + # Change replication factor at the same time + self.check_state_transitions(leader=leader, quorum=0, voters=set('b'), + numsync=2, sync=set('bc'), numsync_confirmed=1, active=set('bc'), + sync_wanted=1, leader_wanted=leader, expected=[ + ('quorum', leader, 1, set('bc')), + ('sync', leader, 1, set('bc')), + ]) + + def test_2322(self): + """Interrupted transition from 2 node cluster to 3 node cluster with replication factor 2""" + leader = 'a' + + # Node c went away, transition back to 2 node cluster + self.check_state_transitions(leader=leader, quorum=1, voters=set('bc'), + numsync=1, sync=set('b'), numsync_confirmed=1, active=set('b'), + sync_wanted=1, leader_wanted=leader, expected=[ + ('quorum', leader, 0, set('b')), + ]) + + # Node c is available transition to larger quorum set. + self.check_state_transitions(leader=leader, quorum=1, voters=set('bc'), + numsync=1, sync=set('b'), numsync_confirmed=1, active=set('bc'), + sync_wanted=1, leader_wanted=leader, expected=[ + ('sync', leader, 1, set('bc')), + ]) + + # Add in a new node at the same time + self.check_state_transitions(leader=leader, quorum=1, voters=set('bc'), + numsync=1, sync=set('b'), numsync_confirmed=1, active=set('bcd'), + sync_wanted=1, leader_wanted=leader, expected=[ + ('sync', leader, 1, set('bc')), + ('quorum', leader, 2, set('bcd')), + ('sync', leader, 1, set('bcd')), + ]) + + # Convert to a fully synced cluster + self.check_state_transitions(leader=leader, quorum=1, voters=set('bc'), + numsync=1, sync=set('b'), numsync_confirmed=1, active=set('bc'), + sync_wanted=2, leader_wanted=leader, expected=[ + ('sync', leader, 2, set('bc')), + ]) + # Reduce quorum after all nodes caught up + self.check_state_transitions(leader=leader, quorum=1, voters=set('bc'), + numsync=2, sync=set('bc'), numsync_confirmed=2, active=set('bc'), + sync_wanted=2, leader_wanted=leader, expected=[ + ('quorum', leader, 0, set('bc')), + ]) + + def test_3535(self): + leader = 'a' + + # remove nodes + self.check_state_transitions(leader=leader, quorum=2, voters=set('bcde'), + numsync=2, sync=set('bcde'), numsync_confirmed=2, active=set('bc'), + sync_wanted=2, leader_wanted=leader, expected=[ + ('sync', leader, 2, set('bc')), + ('quorum', leader, 0, set('bc')), + ]) + self.check_state_transitions(leader=leader, quorum=2, voters=set('bcde'), + numsync=2, sync=set('bcde'), numsync_confirmed=3, active=set('bcd'), + sync_wanted=2, leader_wanted=leader, expected=[ + ('sync', leader, 2, set('bcd')), + ('quorum', leader, 1, set('bcd')), + ]) + + # remove nodes and decrease sync + self.check_state_transitions(leader=leader, quorum=2, voters=set('bcde'), + numsync=2, sync=set('bcde'), numsync_confirmed=2, active=set('bc'), + sync_wanted=1, leader_wanted=leader, expected=[ + ('sync', leader, 2, set('bc')), + ('quorum', leader, 1, set('bc')), + ('sync', leader, 1, set('bc')), + ]) + self.check_state_transitions(leader=leader, quorum=1, voters=set('bcde'), + numsync=3, sync=set('bcde'), numsync_confirmed=2, active=set('bc'), + sync_wanted=1, leader_wanted=leader, expected=[ + ('sync', leader, 3, set('bcd')), + ('quorum', leader, 1, set('bc')), + ('sync', leader, 1, set('bc')), + ]) + + # Increase replication factor and decrease quorum + self.check_state_transitions(leader=leader, quorum=2, voters=set('bcde'), + numsync=2, sync=set('bcde'), numsync_confirmed=2, active=set('bcde'), + sync_wanted=3, leader_wanted=leader, expected=[ + ('sync', leader, 3, set('bcde')), + ]) + # decrease quorum after more nodes caught up + self.check_state_transitions(leader=leader, quorum=2, voters=set('bcde'), + numsync=3, sync=set('bcde'), numsync_confirmed=3, active=set('bcde'), + sync_wanted=3, leader_wanted=leader, expected=[ + ('quorum', leader, 1, set('bcde')), + ]) + + # Add node with decreasing sync and increasing quorum + self.check_state_transitions(leader=leader, quorum=2, voters=set('bcde'), + numsync=2, sync=set('bcde'), numsync_confirmed=2, active=set('bcdef'), + sync_wanted=1, leader_wanted=leader, expected=[ + # increase quorum by 2, 1 for added node and another for reduced sync + ('quorum', leader, 4, set('bcdef')), + # now reduce replication factor to requested value + ('sync', leader, 1, set('bcdef')), + ]) + + # Remove node with increasing sync and decreasing quorum + self.check_state_transitions(leader=leader, quorum=2, voters=set('bcde'), + numsync=2, sync=set('bcde'), numsync_confirmed=2, active=set('bcd'), + sync_wanted=3, leader_wanted=leader, expected=[ + # node e removed from sync wth replication factor increase + ('sync', leader, 3, set('bcd')), + # node e removed from voters with quorum decrease + ('quorum', leader, 1, set('bcd')), + ]) + + def test_remove_nosync_node(self): + leader = 'a' + self.check_state_transitions(leader=leader, quorum=0, voters=set('bc'), + numsync=2, sync=set('bc'), numsync_confirmed=1, active=set('b'), + sync_wanted=2, leader_wanted=leader, expected=[ + ('quorum', leader, 0, set('b')), + ('sync', leader, 1, set('b')) + ]) + + def test_swap_sync_node(self): + leader = 'a' + self.check_state_transitions(leader=leader, quorum=0, voters=set('bc'), + numsync=2, sync=set('bc'), numsync_confirmed=1, active=set('bd'), + sync_wanted=2, leader_wanted=leader, expected=[ + ('quorum', leader, 0, set('b')), + ('sync', leader, 2, set('bd')), + ('quorum', leader, 1, set('bd')) + ]) + + def test_promotion(self): + # Beginning stat: 'a' in the primary, 1 of bcd in sync + # a fails, c gets quorum votes and promotes + self.check_state_transitions(leader='a', quorum=2, voters=set('bcd'), + numsync=0, sync=set(), numsync_confirmed=0, active=set(), + sync_wanted=1, leader_wanted='c', expected=[ + ('sync', 'a', 1, set('abd')), # set a and b to sync + ('quorum', 'c', 2, set('abd')), # set c as a leader and move a to voters + # and stop because there are no active nodes + ]) + + # next loop, b managed to reconnect + self.check_state_transitions(leader='c', quorum=2, voters=set('abd'), + numsync=1, sync=set('abd'), numsync_confirmed=0, active=set('b'), + sync_wanted=1, leader_wanted='c', expected=[ + ('sync', 'c', 1, set('b')), # remove a from sync as inactive + ('quorum', 'c', 0, set('b')), # remove a from voters and reduce quorum + ]) + + # alternative reality: next loop, no one reconnected + self.check_state_transitions(leader='c', quorum=2, voters=set('abd'), + numsync=1, sync=set('abd'), numsync_confirmed=0, active=set(), + sync_wanted=1, leader_wanted='c', expected=[ + ('quorum', 'c', 0, set()), + ('sync', 'c', 0, set()), + ]) + + def test_nonsync_promotion(self): + # Beginning state: 1 of bc in sync. e.g. (a primary, ssn = ANY 1 (b c)) + # a fails, d sees b and c, knows that it is in sync and decides to promote. + # We include in sync state former primary increasing replication factor + # and let situation resolve. Node d ssn=ANY 1 (b c) + leader = 'd' + self.check_state_transitions(leader='a', quorum=1, voters=set('bc'), + numsync=0, sync=set(), numsync_confirmed=0, active=set(), + sync_wanted=1, leader_wanted=leader, expected=[ + # Set a, b, and c to sync and increase replication factor + ('sync', 'a', 2, set('abc')), + # Set ourselves as the leader and move the old leader to voters + ('quorum', leader, 1, set('abc')), + # and stop because there are no active nodes + ]) + # next loop, b and c managed to reconnect + self.check_state_transitions(leader=leader, quorum=1, voters=set('abc'), + numsync=2, sync=set('abc'), numsync_confirmed=0, active=set('bc'), + sync_wanted=1, leader_wanted=leader, expected=[ + ('sync', leader, 2, set('bc')), # Remove a from being synced to. + ('quorum', leader, 1, set('bc')), # Remove a from quorum + ('sync', leader, 1, set('bc')), # Can now reduce replication factor back + ]) + # alternative reality: next loop, no one reconnected + self.check_state_transitions(leader=leader, quorum=1, voters=set('abc'), + numsync=2, sync=set('abc'), numsync_confirmed=0, active=set(), + sync_wanted=1, leader_wanted=leader, expected=[ + ('quorum', leader, 0, set()), + ('sync', leader, 0, set()), + ]) + + def test_invalid_states(self): + leader = 'a' + + # Main invariant is not satisfied, system is in an unsafe state + resolver = QuorumStateResolver(leader=leader, quorum=0, voters=set('bc'), + numsync=1, sync=set('bc'), numsync_confirmed=1, + active=set('bc'), sync_wanted=1, leader_wanted=leader) + self.assertRaises(QuorumError, resolver.check_invariants) + self.assertEqual(list(resolver), [ + ('quorum', leader, 1, set('bc')) + ]) + + # Quorum and sync states mismatched, somebody other than Patroni modified system state + resolver = QuorumStateResolver(leader=leader, quorum=1, voters=set('bc'), + numsync=2, sync=set('bd'), numsync_confirmed=1, + active=set('bd'), sync_wanted=1, leader_wanted=leader) + self.assertRaises(QuorumError, resolver.check_invariants) + self.assertEqual(list(resolver), [ + ('quorum', leader, 1, set('bd')), + ('sync', leader, 1, set('bd')), + ]) + self.assertTrue(repr(resolver.sync).startswith('