Skip to content

Commit

Permalink
Merge 0de0442 into 5d8c2fb
Browse files Browse the repository at this point in the history
  • Loading branch information
zhjwpku committed Jan 11, 2024
2 parents 5d8c2fb + 0de0442 commit 9cba7b9
Show file tree
Hide file tree
Showing 21 changed files with 128 additions and 98 deletions.
11 changes: 9 additions & 2 deletions patroni/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -1153,6 +1153,13 @@ def do_POST_switchover(self) -> None:
def do_POST_citus(self) -> None:
"""Handle a ``POST`` request to ``/citus`` path.
We keep this entrypoint and dispatch the request to do_POST_mpp.
"""
self.do_POST_mpp()

def do_POST_mpp(self) -> None:
"""Handle a ``POST`` request to ``/mpp`` path.
Call :func:`~patroni.postgresql.mpp.AbstractMPPHandler.handle_event` to handle the request,
then write a response with HTTP status code ``200``.
Expand All @@ -1164,9 +1171,9 @@ def do_POST_citus(self) -> None:
return

patroni = self.server.patroni
if patroni.postgresql.citus_handler.is_coordinator() and patroni.ha.is_leader():
if patroni.postgresql.mpp_handler.is_coordinator() and patroni.ha.is_leader():
cluster = patroni.dcs.get_cluster()
patroni.postgresql.citus_handler.handle_event(cluster, request)
patroni.postgresql.mpp_handler.handle_event(cluster, request)
self.write_response(200, 'OK')

def parse_request(self) -> bool:
Expand Down
2 changes: 1 addition & 1 deletion patroni/ctl.py
Original file line number Diff line number Diff line change
Expand Up @@ -346,7 +346,7 @@ def get_dcs(scope: str, group: Optional[int]) -> AbstractDCS:
try:
dcs = _get_dcs(config)
if is_citus_cluster() and group is None:
dcs.is_citus_coordinator = lambda: True
dcs.is_mpp_coordinator = lambda: True
click.get_current_context().obj['__mpp'] = dcs.mpp
return dcs
except PatroniException as e:
Expand Down
58 changes: 30 additions & 28 deletions patroni/dcs/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -782,7 +782,7 @@ class Cluster(NamedTuple('Cluster',
('history', Optional[TimelineHistory]),
('failsafe', Optional[Dict[str, str]]),
('workers', Dict[int, 'Cluster'])])):
"""Immutable object (namedtuple) which represents PostgreSQL or Citus cluster.
"""Immutable object (namedtuple) which represents PostgreSQL or MPP cluster.
.. note::
We are using an old-style attribute declaration here because otherwise it is not possible to override `__new__`
Expand All @@ -799,8 +799,8 @@ class Cluster(NamedTuple('Cluster',
:ivar sync: reference to :class:`SyncState` object, last observed synchronous replication state.
:ivar history: reference to `TimelineHistory` object.
:ivar failsafe: failsafe topology. Node is allowed to become the leader only if its name is found in this list.
:ivar workers: dictionary of workers of the Citus cluster, optional. Each key is an :class:`int` representing
the group, and the corresponding value is a :class:`Cluster` instance.
:ivar workers: dictionary of workers of the MPP cluster, optional. Each key representing the group and the
corresponding value is a :class:`Cluster` instance.
"""

def __new__(cls, *args: Any, **kwargs: Any):
Expand Down Expand Up @@ -1263,11 +1263,11 @@ class AbstractDCS(abc.ABC):
Functional methods that are critical in their timing, required to complete within ``retry_timeout`` period in order
to prevent the DCS considered inaccessible, each perform construction of complex data objects:
* :meth:`~AbstractDCS._cluster_loader`:
* :meth:`~AbstractDCS._postgresql_cluster_loader`:
method which processes the structure of data stored in the DCS used to build the :class:`Cluster` object
with all relevant associated data.
* :meth:`~AbstractDCS._citus_cluster_loader`:
Similar to above but specifically representing Citus group and workers information.
* :meth:`~AbstractDCS._mpp_cluster_loader`:
Similar to above but specifically representing MPP group and workers information.
* :meth:`~AbstractDCS._load_cluster`:
main method for calling specific ``loader`` method to build the :class:`Cluster` object representing the
state and topology of the cluster.
Expand Down Expand Up @@ -1337,7 +1337,7 @@ class AbstractDCS(abc.ABC):
_FAILSAFE = 'failsafe'

def __init__(self, config: Dict[str, Any], mpp: 'AbstractMPP') -> None:
"""Prepare DCS paths, Citus group ID, initial values for state information and processing dependencies.
"""Prepare DCS paths, MPP object, initial values for state information and processing dependencies.
:ivar config: :class:`dict`, reference to config section of selected DCS.
i.e.: ``zookeeper`` for zookeeper, ``etcd`` for etcd, etc...
Expand Down Expand Up @@ -1472,21 +1472,21 @@ def last_seen(self) -> int:
return self._last_seen

@abc.abstractmethod
def _cluster_loader(self, path: Any) -> Cluster:
"""Load and build the :class:`Cluster` object from DCS, which represents a single Patroni or Citus cluster.
def _postgresql_cluster_loader(self, path: Any) -> Cluster:
"""Load and build the :class:`Cluster` object from DCS, which represents a single PostgreSQL cluster.
:param path: the path in DCS where to load Cluster(s) from.
:returns: :class:`Cluster` instance.
"""

@abc.abstractmethod
def _citus_cluster_loader(self, path: Any) -> Dict[int, Cluster]:
"""Load and build all Patroni clusters from a single Citus cluster.
def _mpp_cluster_loader(self, path: Any) -> Dict[int, Cluster]:
"""Load and build all PostgreSQL clusters from a single MPP cluster.
:param path: the path in DCS where to load Cluster(s) from.
:returns: all Citus groups as :class:`dict`, with group IDs as keys and :class:`Cluster` objects as values or a
:returns: all MPP groups as :class:`dict`, with group IDs as keys and :class:`Cluster` objects as values or a
:class:`Cluster` object representing the coordinator with filled `Cluster.workers` attribute.
"""

Expand All @@ -1502,13 +1502,14 @@ def _load_cluster(
the :meth:`~AbstractDCS.get_cluster` method.
:param path: the path in DCS where to load Cluster(s) from.
:param loader: one of :meth:`~AbstractDCS._cluster_loader` or :meth:`~AbstractDCS._citus_cluster_loader`.
:param loader: one of :meth:`~AbstractDCS._postgresql_cluster_loader` or
:meth:`~AbstractDCS._mpp_cluster_loader`.
:raise: :exc:`~DCSError` in case of communication problems with DCS. If the current node was running as a
primary and exception raised, instance would be demoted.
"""

def __get_patroni_cluster(self, path: Optional[str] = None) -> Cluster:
def __get_postgresql_cluster(self, path: Optional[str] = None) -> Cluster:
"""Low level method to load a :class:`Cluster` object from DCS.
:param path: optional client path in DCS backend to load from.
Expand All @@ -1517,39 +1518,40 @@ def __get_patroni_cluster(self, path: Optional[str] = None) -> Cluster:
"""
if path is None:
path = self.client_path('')
cluster = self._load_cluster(path, self._cluster_loader)
cluster = self._load_cluster(path, self._postgresql_cluster_loader)
if TYPE_CHECKING: # pragma: no cover
assert isinstance(cluster, Cluster)
return cluster

def is_citus_coordinator(self) -> bool:
""":class:`Cluster` instance has a Citus Coordinator group ID.
def is_mpp_coordinator(self) -> bool:
""":class:`Cluster` instance has a Coordinator group ID.
:returns: ``True`` if the given node is running as the MPP Coordinator.
"""
return self._mpp.is_coordinator()

def get_citus_coordinator(self) -> Optional[Cluster]:
"""Load the Patroni cluster for the Citus Coordinator.
def get_mpp_coordinator(self) -> Optional[Cluster]:
"""Load the PostgreSQL cluster for the MPP Coordinator.
.. note::
This method is only executed on the worker nodes (``group!=0``) to find the coordinator.
This method is only executed on the worker nodes to find the coordinator.
:returns: Select :class:`Cluster` instance associated with the MPP Coordinator group ID.
"""
try:
return self.__get_patroni_cluster(f'{self._base_path}/{self._mpp.coordinator_group_id}/')
return self.__get_postgresql_cluster(f'{self._base_path}/{self._mpp.coordinator_group_id}/')
except Exception as e:
logger.error('Failed to load Citus coordinator cluster from %s: %r', self.__class__.__name__, e)
logger.error('Failed to load %s coordinator cluster from %s: %r' %
(self._mpp.type, self.__class__.__name__, e))
return None

def _get_citus_cluster(self) -> Cluster:
"""Load Citus cluster from DCS.
def _get_mpp_cluster(self) -> Cluster:
"""Load MPP cluster from DCS.
:returns: A Citus :class:`Cluster` instance for the coordinator with workers clusters in the `Cluster.workers`
:returns: A MPP :class:`Cluster` instance for the coordinator with workers clusters in the `Cluster.workers`
dict.
"""
groups = self._load_cluster(self._base_path + '/', self._citus_cluster_loader)
groups = self._load_cluster(self._base_path + '/', self._mpp_cluster_loader)
if TYPE_CHECKING: # pragma: no cover
assert isinstance(groups, dict)
cluster = groups.pop(self._mpp.coordinator_group_id, Cluster.empty())
Expand All @@ -1563,12 +1565,12 @@ def get_cluster(self) -> Cluster:
Stores copy of time, status and failsafe values for comparison in DCS update decisions.
Caching is required to avoid overhead placed upon the REST API.
Returns either a Citus or Patroni implementation of :class:`Cluster` depending on availability.
Returns either a PostgreSQL or MPP implementation of :class:`Cluster` depending on availability.
:returns:
"""
try:
cluster = self._get_citus_cluster() if self.is_citus_coordinator() else self.__get_patroni_cluster()
cluster = self._get_mpp_cluster() if self.is_mpp_coordinator() else self.__get_postgresql_cluster()
except Exception:
self.reset_cluster()
raise
Expand Down
4 changes: 2 additions & 2 deletions patroni/dcs/consul.py
Original file line number Diff line number Diff line change
Expand Up @@ -420,7 +420,7 @@ def _cluster_from_nodes(self, nodes: Dict[str, Any]) -> Cluster:
def _consistency(self) -> str:
return 'consistent' if self._ctl else self._client.consistency

def _cluster_loader(self, path: str) -> Cluster:
def _postgresql_cluster_loader(self, path: str) -> Cluster:
_, results = self.retry(self._client.kv.get, path, recurse=True, consistency=self._consistency)
if results is None:
return Cluster.empty()
Expand All @@ -431,7 +431,7 @@ def _cluster_loader(self, path: str) -> Cluster:

return self._cluster_from_nodes(nodes)

def _citus_cluster_loader(self, path: str) -> Dict[int, Cluster]:
def _mpp_cluster_loader(self, path: str) -> Dict[int, Cluster]:
_, results = self.retry(self._client.kv.get, path, recurse=True, consistency=self._consistency)
clusters: Dict[int, Dict[str, Cluster]] = defaultdict(dict)
for node in results or []:
Expand Down
4 changes: 2 additions & 2 deletions patroni/dcs/etcd.py
Original file line number Diff line number Diff line change
Expand Up @@ -710,15 +710,15 @@ def _cluster_from_nodes(self, etcd_index: int, nodes: Dict[str, etcd.EtcdResult]

return Cluster(initialize, config, leader, status, members, failover, sync, history, failsafe)

def _cluster_loader(self, path: str) -> Cluster:
def _postgresql_cluster_loader(self, path: str) -> Cluster:
try:
result = self.retry(self._client.read, path, recursive=True, quorum=self._ctl)
except etcd.EtcdKeyNotFound:
return Cluster.empty()
nodes = {node.key[len(result.key):].lstrip('/'): node for node in result.leaves}
return self._cluster_from_nodes(result.etcd_index, nodes)

def _citus_cluster_loader(self, path: str) -> Dict[int, Cluster]:
def _mpp_cluster_loader(self, path: str) -> Dict[int, Cluster]:
try:
result = self.retry(self._client.read, path, recursive=True, quorum=self._ctl)
except etcd.EtcdKeyNotFound:
Expand Down
6 changes: 3 additions & 3 deletions patroni/dcs/etcd3.py
Original file line number Diff line number Diff line change
Expand Up @@ -733,7 +733,7 @@ def create_lease(self) -> None:

@property
def cluster_prefix(self) -> str:
return self._base_path + '/' if self.is_citus_coordinator() else self.client_path('')
return self._base_path + '/' if self.is_mpp_coordinator() else self.client_path('')

@staticmethod
def member(node: Dict[str, str]) -> Member:
Expand Down Expand Up @@ -787,13 +787,13 @@ def _cluster_from_nodes(self, nodes: Dict[str, Any]) -> Cluster:

return Cluster(initialize, config, leader, status, members, failover, sync, history, failsafe)

def _cluster_loader(self, path: str) -> Cluster:
def _postgresql_cluster_loader(self, path: str) -> Cluster:
nodes = {node['key'][len(path):]: node
for node in self._client.get_cluster(path)
if node['key'].startswith(path)}
return self._cluster_from_nodes(nodes)

def _citus_cluster_loader(self, path: str) -> Dict[int, Cluster]:
def _mpp_cluster_loader(self, path: str) -> Dict[int, Cluster]:
clusters: Dict[int, Dict[str, Dict[str, Any]]] = defaultdict(dict)
path = self._base_path + '/'
for node in self._client.get_cluster(path):
Expand Down
23 changes: 12 additions & 11 deletions patroni/dcs/kubernetes.py
Original file line number Diff line number Diff line change
Expand Up @@ -746,7 +746,8 @@ def is_ready(self) -> bool:

class Kubernetes(AbstractDCS):

_CITUS_LABEL = 'citus-group'
# For backward compatibility, we keep citus-group here.
_MPP_GROUP_LABEL = 'citus-group'

def __init__(self, config: Dict[str, Any], mpp: AbstractMPP) -> None:
self._labels = deepcopy(config['labels'])
Expand All @@ -761,7 +762,7 @@ def __init__(self, config: Dict[str, Any], mpp: AbstractMPP) -> None:
self._ca_certs = os.environ.get('PATRONI_KUBERNETES_CACERT', config.get('cacert')) or SERVICE_CERT_FILENAME
super(Kubernetes, self).__init__({**config, 'namespace': ''}, mpp)
if self._mpp.is_enabled():
self._labels[self._CITUS_LABEL] = str(self._mpp.group)
self._labels[self._MPP_GROUP_LABEL] = str(self._mpp.group)

self._retry = Retry(deadline=config['retry_timeout'], max_delay=1, max_tries=-1,
retry_exceptions=KubernetesRetriableException)
Expand Down Expand Up @@ -936,19 +937,19 @@ def _cluster_from_nodes(self, group: str, nodes: Dict[str, K8sObject], pods: Col

return Cluster(initialize, config, leader, status, members, failover, sync, history, failsafe)

def _cluster_loader(self, path: Dict[str, Any]) -> Cluster:
def _postgresql_cluster_loader(self, path: Dict[str, Any]) -> Cluster:
return self._cluster_from_nodes(path['group'], path['nodes'], path['pods'].values())

def _citus_cluster_loader(self, path: Dict[str, Any]) -> Dict[int, Cluster]:
def _mpp_cluster_loader(self, path: Dict[str, Any]) -> Dict[int, Cluster]:
clusters: Dict[str, Dict[str, Dict[str, K8sObject]]] = defaultdict(lambda: defaultdict(dict))

for name, pod in path['pods'].items():
group = pod.metadata.labels.get(self._CITUS_LABEL)
group = pod.metadata.labels.get(self._MPP_GROUP_LABEL)
if group and self._mpp.group_re.match(group):
clusters[group]['pods'][name] = pod

for name, kind in path['nodes'].items():
group = kind.metadata.labels.get(self._CITUS_LABEL)
group = kind.metadata.labels.get(self._MPP_GROUP_LABEL)
if group and self._mpp.group_re.match(group):
clusters[group]['nodes'][name] = kind
return {int(group): self._cluster_from_nodes(group, value['nodes'], value['pods'].values())
Expand All @@ -965,9 +966,9 @@ def __load_cluster(
with self._condition:
self._wait_caches(stop_time)
pods = {name: pod for name, pod in self._pods.copy().items()
if not group or pod.metadata.labels.get(self._CITUS_LABEL) == group}
if not group or pod.metadata.labels.get(self._MPP_GROUP_LABEL) == group}
nodes = {name: kind for name, kind in self._kinds.copy().items()
if not group or kind.metadata.labels.get(self._CITUS_LABEL) == group}
if not group or kind.metadata.labels.get(self._MPP_GROUP_LABEL) == group}
return loader({'group': group, 'pods': pods, 'nodes': nodes})
except Exception:
logger.exception('get_cluster')
Expand All @@ -979,14 +980,14 @@ def _load_cluster(
group = str(self._mpp.group) if self._mpp.is_enabled() and path == self.client_path('') else None
return self.__load_cluster(group, loader)

def get_citus_coordinator(self) -> Optional[Cluster]:
def get_mpp_coordinator(self) -> Optional[Cluster]:
try:
ret = self.__load_cluster(str(self._mpp.coordinator_group_id), self._cluster_loader)
ret = self.__load_cluster(str(self._mpp.coordinator_group_id), self._postgresql_cluster_loader)
if TYPE_CHECKING: # pragma: no cover
assert isinstance(ret, Cluster)
return ret
except Exception as e:
logger.error('Failed to load Citus coordinator cluster from Kubernetes: %r', e)
logger.error('Failed to load %s coordinator cluster from Kubernetes: %r' % (self._mpp.type, e))

@staticmethod
def compare_ports(p1: K8sObject, p2: K8sObject) -> bool:
Expand Down
4 changes: 2 additions & 2 deletions patroni/dcs/raft.py
Original file line number Diff line number Diff line change
Expand Up @@ -375,14 +375,14 @@ def _cluster_from_nodes(self, nodes: Dict[str, Any]) -> Cluster:

return Cluster(initialize, config, leader, status, members, failover, sync, history, failsafe)

def _cluster_loader(self, path: str) -> Cluster:
def _postgresql_cluster_loader(self, path: str) -> Cluster:
response = self._sync_obj.get(path, recursive=True)
if not response:
return Cluster.empty()
nodes = {key[len(path):]: value for key, value in response.items()}
return self._cluster_from_nodes(nodes)

def _citus_cluster_loader(self, path: str) -> Dict[int, Cluster]:
def _mpp_cluster_loader(self, path: str) -> Dict[int, Cluster]:
clusters: Dict[int, Dict[str, Any]] = defaultdict(dict)
response = self._sync_obj.get(path, recursive=True)
for key, value in (response or {}).items():
Expand Down
6 changes: 3 additions & 3 deletions patroni/dcs/zookeeper.py
Original file line number Diff line number Diff line change
Expand Up @@ -214,7 +214,7 @@ def load_members(self, path: str) -> List[Member]:
members.append(self.member(member, *data))
return members

def _cluster_loader(self, path: str) -> Cluster:
def _postgresql_cluster_loader(self, path: str) -> Cluster:
nodes = set(self.get_children(path))

# get initialize flag
Expand Down Expand Up @@ -258,11 +258,11 @@ def _cluster_loader(self, path: str) -> Cluster:

return Cluster(initialize, config, leader, status, members, failover, sync, history, failsafe)

def _citus_cluster_loader(self, path: str) -> Dict[int, Cluster]:
def _mpp_cluster_loader(self, path: str) -> Dict[int, Cluster]:
ret: Dict[int, Cluster] = {}
for node in self.get_children(path):
if self._mpp.group_re.match(node):
ret[int(node)] = self._cluster_loader(path + node + '/')
ret[int(node)] = self._postgresql_cluster_loader(path + node + '/')
return ret

def _load_cluster(
Expand Down

0 comments on commit 9cba7b9

Please sign in to comment.