import datetime
import functools
import json
import logging
import os
import random
import socket
import six
import sys
import time
import urllib3
import yaml
from urllib3 import Timeout
from urllib3.exceptions import HTTPError
from six.moves.http_client import HTTPException
from threading import Condition, Lock, Thread
from . import AbstractDCS, Cluster, ClusterConfig, Failover, Leader, Member, SyncState, TimelineHistory
from ..exceptions import DCSError
from ..utils import deep_compare, iter_response_objects, keepalive_socket_options,\
Retry, RetryFailedError, tzutc, uri, USER_AGENT
logger = logging.getLogger(__name__)
KUBE_CONFIG_DEFAULT_LOCATION = os.environ.get('KUBECONFIG', '~/.kube/config')
SERVICE_TOKEN_FILENAME = '/var/run/secrets/'
SERVICE_CERT_FILENAME = '/var/run/secrets/'
class KubernetesError(DCSError):
# this function does the same mapping of snake_case => camelCase for > 97% of cases as autogenerated swagger code
def to_camel_case(value):
reserved = {'api', 'apiv3', 'cidr', 'cpu', 'csi', 'id', 'io', 'ip', 'ipc', 'pid', 'tls', 'uri', 'url', 'uuid'}
words = value.split('_')
return words[0] + ''.join(w.upper() if w in reserved else w.title() for w in words[1:])
class K8sConfig(object):
class ConfigException(Exception):
def __init__(self):
self.pool_config = {'maxsize': 10, 'num_pools': 10} # configuration for urllib3.PoolManager
def _make_headers(self, token=None, **kwargs):
self._headers = urllib3.make_headers(user_agent=USER_AGENT, **kwargs)
if token:
self._headers['authorization'] = 'Bearer ' + token
def load_incluster_config(self, ca_certs=SERVICE_CERT_FILENAME):
if SERVICE_HOST_ENV_NAME not in os.environ or SERVICE_PORT_ENV_NAME not in os.environ:
raise self.ConfigException('Service host/port is not set.')
if not os.environ[SERVICE_HOST_ENV_NAME] or not os.environ[SERVICE_PORT_ENV_NAME]:
raise self.ConfigException('Service host/port is set but empty.')
if not os.path.isfile(ca_certs):
raise self.ConfigException('Service certificate file does not exists.')
with open(ca_certs) as f:
if not
raise self.ConfigException('Cert file exists but empty.')
self.pool_config['ca_certs'] = ca_certs
if not os.path.isfile(SERVICE_TOKEN_FILENAME):
raise self.ConfigException('Service token file does not exists.')
token =
if not token:
raise self.ConfigException('Token file exists but empty.')
self._server = uri('https', (os.environ[SERVICE_HOST_ENV_NAME], os.environ[SERVICE_PORT_ENV_NAME]))
def _get_by_name(config, section, name):
for c in config[section + 's']:
if c['name'] == name:
return c[section]
def load_kube_config(self, context=None):
with open(os.path.expanduser(KUBE_CONFIG_DEFAULT_LOCATION)) as f:
config = yaml.safe_load(f)
context = self._get_by_name(config, 'context', context or config['current-context'])
cluster = self._get_by_name(config, 'cluster', context['cluster'])
user = self._get_by_name(config, 'user', context['user'])
self._server = cluster['server'].rstrip('/')
if self._server.startswith('https'):
self.pool_config.update({v: user[k] for k, v in {'client-certificate': 'cert_file',
'client-key': 'key_file'}.items() if k in user})
if 'certificate-authority' in cluster:
self.pool_config['ca_certs'] = cluster['certificate-authority']
self.pool_config['cert_reqs'] = 'CERT_NONE' if cluster.get('insecure-skip-tls-verify') else 'CERT_REQUIRED'
if user.get('token'):
elif 'username' in user and 'password' in user:
self._headers = self._make_headers(basic_auth=':'.join((user['username'], user['password'])))
def server(self):
return self._server
def headers(self):
return self._headers.copy()
class K8sObject(object):
def __init__(self, kwargs):
self._dict = {k: self._wrap(k, v) for k, v in kwargs.items()}
def get(self, name, default=None):
return self._dict.get(name, default)
def __getattr__(self, name):
return self.get(to_camel_case(name))
def _wrap(cls, parent, value):
if isinstance(value, dict):
# we know that `annotations` and `labels` are dicts and therefore don't want to convert them into K8sObject
return value if parent in {'annotations', 'labels'} and \
all(isinstance(v, six.string_types) for v in value.values()) else cls(value)
elif isinstance(value, list):
return [cls._wrap(None, v) for v in value]
return value
def to_dict(self):
return self._dict
def __repr__(self):
return json.dumps(self, indent=4, default=lambda o: o.to_dict())
class K8sException(Exception):
class K8sConnectionFailed(K8sException):
class K8sClient(object):
class rest(object):
class ApiException(Exception):
def __init__(self, status=None, reason=None, http_resp=None):
self.status = http_resp.status if http_resp else status
self.reason = http_resp.reason if http_resp else reason
self.body = if http_resp else None
self.headers = http_resp.getheaders() if http_resp else None
def __str__(self):
error_message = "({0})\nReason: {1}\n".format(self.status, self.reason)
if self.headers:
error_message += "HTTP response headers: {0}\n".format(self.headers)
if self.body:
error_message += "HTTP response body: {0}\n".format(self.body)
return error_message
class ApiClient(object):
_API_URL_PREFIX = '/api/v1/namespaces/'
def __init__(self, bypass_api_service=False):
self._bypass_api_service = bypass_api_service
self.pool_manager = urllib3.PoolManager(**k8s_config.pool_config)
self._base_uri = k8s_config.server
self._api_servers_cache = [k8s_config.server]
self._api_servers_cache_updated = 0
except K8sException:
def set_read_timeout(self, timeout):
self._read_timeout = timeout
def set_api_servers_cache_ttl(self, ttl):
self._api_servers_cache_ttl = ttl - 0.5
def set_base_uri(self, value):'Selected new K8s API server endpoint %s', value)
# We will connect by IP of the master node which is not listed as alternative name
self.pool_manager.connection_pool_kw['assert_hostname'] = False
self._base_uri = value
def _handle_server_response(response, _preload_content):
if response.status not in range(200, 206):
return K8sObject(json.loads('utf-8'))) if _preload_content else response
def _make_headers(headers):
ret = k8s_config.headers
ret.update(headers or {})
return ret
def api_servers_cache(self):
base_uri, cache = self._base_uri, self._api_servers_cache
return ([base_uri] if base_uri in cache else []) + [machine for machine in cache if machine != base_uri]
def _get_api_servers(self, api_servers_cache):
_, per_node_timeout, per_node_retries = self._calculate_timeouts(len(api_servers_cache))
kwargs = {'headers': self._make_headers({}), 'preload_content': True, 'retries': per_node_retries,
'timeout': urllib3.Timeout(connect=max(1, per_node_timeout/2.0), total=per_node_timeout)}
path = self._API_URL_PREFIX + 'default/endpoints/kubernetes'
for base_uri in api_servers_cache:
response = self.pool_manager.request('GET', base_uri + path, **kwargs)
endpoint = self._handle_server_response(response, True)
for subset in endpoint.subsets:
for port in subset.ports:
if == 'https' and port.protocol == 'TCP':
addresses = [uri('https', (a.ip, port.port)) for a in subset.addresses]
if addresses:
return addresses
except Exception as e:
if isinstance(e, and e.status == 403:
logger.error('Failed to get "kubernetes" endpoint from %s: %r', base_uri, e)
raise K8sConnectionFailed('No more K8s API server nodes in the cluster')
def _refresh_api_servers_cache(self, updating_cache=False):
if self._bypass_api_service:
api_servers_cache = [k8s_config.server] if updating_cache else self.api_servers_cache
self._api_servers_cache = self._get_api_servers(api_servers_cache)
if updating_cache:
except # 403 Permission denied
logger.warning("Kubernetes RBAC doesn't allow GET access to the 'kubernetes' "
"endpoint in the 'default' namespace. Disabling 'bypass_api_service'.")
self._bypass_api_service = False
self._api_servers_cache = [k8s_config.server]
if not updating_cache:
except K8sConnectionFailed:
if updating_cache:
raise K8sException("Could not get the list of K8s API server nodes")
self._api_servers_cache = [k8s_config.server]
if self._base_uri not in self._api_servers_cache:
self._api_servers_cache_updated = time.time()
def refresh_api_servers_cache(self):
if self._bypass_api_service and time.time() - self._api_servers_cache_updated > self._api_servers_cache_ttl:
def _load_api_servers_cache(self):
self._update_api_servers_cache = True
self._update_api_servers_cache = False
def _calculate_timeouts(self, api_servers, timeout=None):
"""Calculate a request timeout and number of retries per single K8s API server node.
In case if the timeout per node is too small (less than one second) we will reduce the number of nodes.
For the cluster with only one API server node we will try to do 1 retry.
No retries for clusters with 2 or more API server nodes. We better rely on switching to a different node."""
per_node_timeout = timeout = float(timeout or self._read_timeout)
max_retries = 3 - min(api_servers, 2)
per_node_retries = 1
min_timeout = 1.0
while api_servers > 0:
per_node_timeout = float(timeout) / api_servers
if per_node_timeout >= min_timeout:
# for small clusters we will try to do more than one try on every node
while per_node_retries < max_retries and per_node_timeout / (per_node_retries + 1) >= min_timeout:
per_node_retries += 1
per_node_timeout /= per_node_retries
# if the timeout per one node is to small try to reduce number of nodes
api_servers -= 1
max_retries = 1
return api_servers, per_node_timeout, per_node_retries - 1
def _do_http_request(self, retry, api_servers_cache, method, path, **kwargs):
some_request_failed = False
for i, base_uri in enumerate(api_servers_cache):
if i > 0:'Retrying on %s', base_uri)
response = self.pool_manager.request(method, base_uri + path, **kwargs)
if some_request_failed:
return response
except (HTTPError, HTTPException, socket.error, socket.timeout) as e:
if not retry:
# switch to the next node if request failed and retry is not allowed
if i + 1 < len(api_servers_cache):
self.set_base_uri(api_servers_cache[i + 1])
raise K8sException('{0} {1} request failed'.format(method, path))
logger.error('Request to server %s failed: %r', base_uri, e)
some_request_failed = True
raise K8sConnectionFailed('No more API server nodes in the cluster')
def request(self, retry, method, path, timeout=None, **kwargs):
if self._update_api_servers_cache:
api_servers_cache = self.api_servers_cache
api_servers = len(api_servers_cache)
if timeout:
if isinstance(timeout, six.integer_types + (float,)):
timeout = urllib3.Timeout(total=timeout)
elif isinstance(timeout, tuple) and len(timeout) == 2:
timeout = urllib3.Timeout(connect=timeout[0], read=timeout[1])
retries = 0
_, timeout, retries = self._calculate_timeouts(api_servers)
timeout = urllib3.Timeout(connect=max(1, timeout/2.0), total=timeout)
kwargs.update(retries=retries, timeout=timeout)
while True:
return self._do_http_request(retry, api_servers_cache, method, path, **kwargs)
except K8sConnectionFailed as ex:
api_servers_cache = self.api_servers_cache
api_servers = len(api_servers)
except Exception as e:
logger.debug('Failed to update list of K8s master nodes: %r', e)
sleeptime = retry.sleeptime
remaining_time = retry.stoptime - sleeptime - time.time()
nodes, timeout, retries = self._calculate_timeouts(api_servers, remaining_time)
if nodes == 0:
self._update_api_servers_cache = True
raise ex
# We still have some time left. Partially reduce `api_servers_cache` and retry request
kwargs.update(timeout=urllib3.Timeout(connect=max(1, timeout/2.0), total=timeout), retries=retries)
api_servers_cache = api_servers_cache[:nodes]
def call_api(self, method, path, headers=None, body=None, _retry=None,
_preload_content=True, _request_timeout=None, **kwargs):
headers = self._make_headers(headers)
fields = {to_camel_case(k): v for k, v in kwargs.items()} # resource_version => resourceVersion
body = json.dumps(body, default=lambda o: o.to_dict()) if body is not None else None
response = self.request(_retry, method, self._API_URL_PREFIX + path, headers=headers, fields=fields,
body=body, preload_content=_preload_content, timeout=_request_timeout)
return self._handle_server_response(response, _preload_content)
class CoreV1Api(object):
def __init__(self, api_client=None):
self._api_client = api_client or k8s_client.ApiClient()
def __getattr__(self, func): # `func` name pattern: (action)_namespaced_(kind)
action, kind = func.split('_namespaced_') # (read|list|create|patch|replace|delete|delete_collection)
kind = kind.replace('_', '') + ('s' * int(kind[-1] != 's')) # plural, single word
def wrapper(*args, **kwargs):
method = {'read': 'GET', 'list': 'GET', 'create': 'POST',
'replace': 'PUT'}.get(action, action.split('_')[0]).upper()
if action == 'create' or len(args) == 1: # namespace is a first argument and name in not in arguments
path = '/'.join([args[0], kind])
else: # name, namespace followed by optional body
path = '/'.join([args[1], kind, args[0]])
headers = {'Content-Type': 'application/strategic-merge-patch+json'} if action == 'patch' else {}
if len(args) == 3: # name, namespace, body
body = args[2]
elif action == 'create': # namespace, body
body = args[1]
elif action == 'delete': # name, namespace
body = kwargs.pop('body', None)
body = None
return self._api_client.call_api(method, path, headers, body, **kwargs)
return wrapper
class _K8sObjectTemplate(K8sObject):
"""The template for objects which we create locally, e.g. k8s_client.V1ObjectMeta & co"""
def __init__(self, **kwargs):
self._dict = {to_camel_case(k): v for k, v in kwargs.items()}
def __init__(self):
self.__cls_cache = {}
self.__cls_lock = Lock()
def __getattr__(self, name):
with self.__cls_lock:
if name not in self.__cls_cache:
self.__cls_cache[name] = type(name, (self._K8sObjectTemplate,), {})
return self.__cls_cache[name]
k8s_client = K8sClient()
k8s_config = K8sConfig()
class KubernetesRetriableException(
def __init__(self, orig):
super(KubernetesRetriableException, self).__init__(orig.status, orig.reason)
self.body = orig.body
self.headers = orig.headers
def sleeptime(self):
return int(self.headers['retry-after'])
except Exception:
return None
class CoreV1ApiProxy(object):
def __init__(self, use_endpoints=False, bypass_api_service=False):
self._api_client = k8s_client.ApiClient(bypass_api_service)
self._core_v1_api = k8s_client.CoreV1Api(self._api_client)
self._use_endpoints = bool(use_endpoints)
def configure_timeouts(self, loop_wait, retry_timeout, ttl):
# Normally every loop_wait seconds we should have receive something from the socket.
# If we didn't received anything after the loop_wait + retry_timeout it is a time
# to start worrying (send keepalive messages). Finally, the connection should be
# considered as dead if we received nothing from the socket after the ttl seconds.
self._api_client.pool_manager.connection_pool_kw['socket_options'] = \
list(keepalive_socket_options(ttl, int(loop_wait + retry_timeout)))
def refresh_api_servers_cache(self):
def __getattr__(self, func):
if func.endswith('_kind'):
func = func[:-4] + ('endpoints' if self._use_endpoints else 'config_map')
def wrapper(*args, **kwargs):
return getattr(self._core_v1_api, func)(*args, **kwargs)
except as e:
if e.status in (500, 503, 504) or e.headers and 'retry-after' in e.headers: # XXX
raise KubernetesRetriableException(e)
return wrapper
def use_endpoints(self):
return self._use_endpoints
def catch_kubernetes_errors(func):
def wrapper(*args, **kwargs):
return func(*args, **kwargs)
except as e:
if e.status == 403:
logger.exception('Permission denied')
elif e.status != 409: # Object exists or conflict in resource_version
logger.exception('Unexpected error from Kubernetes API')
return False
except (RetryFailedError, K8sException):
return False
return wrapper
class ObjectCache(Thread):
def __init__(self, dcs, func, retry, condition, name=None):
self.daemon = True
self._dcs = dcs
self._func = func
self._retry = retry
self._condition = condition
self._name = name # name of this pod
self._is_ready = False
self._object_cache = {}
self._object_cache_lock = Lock()
self._annotations_map = {self._dcs.leader_path: self._dcs._LEADER, self._dcs.config_path: self._dcs._CONFIG}
def _list(self):
return self._func(_retry=self._retry.copy())
except Exception:
def _watch(self, resource_version):
return self._func(_request_timeout=(self._retry.deadline, Timeout.DEFAULT_TIMEOUT),
_preload_content=False, watch=True, resource_version=resource_version)
def set(self, name, value):
with self._object_cache_lock:
old_value = self._object_cache.get(name)
ret = not old_value or int(old_value.metadata.resource_version) < int(value.metadata.resource_version)
if ret:
self._object_cache[name] = value
return ret, old_value
def delete(self, name, resource_version):
with self._object_cache_lock:
old_value = self._object_cache.get(name)
ret = old_value and int(old_value.metadata.resource_version) < int(resource_version)
if ret:
del self._object_cache[name]
return not old_value or ret, old_value
def copy(self):
with self._object_cache_lock:
return self._object_cache.copy()
def get(self, name):
with self._object_cache_lock:
return self._object_cache.get(name)
def _build_cache(self):
objects = self._list()
return_type = 'V1' + objects.kind[:-4]
with self._object_cache_lock:
self._object_cache = { item for item in objects.items}
with self._condition:
self._is_ready = True
response = self._watch(objects.metadata.resource_version)
for event in iter_response_objects(response):
obj = event['object']
if obj.get('code') == 410:
ev_type = event['type']
name = obj['metadata']['name']
if ev_type in ('ADDED', 'MODIFIED'):
obj = K8sObject(obj)
success, old_value = self.set(name, obj)
if success:
new_value = (obj.metadata.annotations or {}).get(self._annotations_map.get(name))
elif ev_type == 'DELETED':
success, old_value = self.delete(name, obj['metadata']['resourceVersion'])
new_value = None
logger.warning('Unexpected event type: %s', ev_type)
if success and return_type != 'V1Pod':
if old_value:
old_value = (old_value.metadata.annotations or {}).get(self._annotations_map.get(name))
value_changed = old_value != new_value and \
(name != self._dcs.config_path or old_value is not None and new_value is not None)
if value_changed:
logger.debug('%s changed from %s to %s', name, old_value, new_value)
# Do not wake up HA loop if we run as leader and received leader object update event
if value_changed or name == self._dcs.leader_path and self._name != new_value:
with self._condition:
self._is_ready = False
def run(self):
while True:
except Exception as e:
with self._condition:
self._is_ready = False
logger.error(' %r', e)
def is_ready(self):
"""Must be called only when holding the lock on `_condition`"""
return self._is_ready
class Kubernetes(AbstractDCS):
def __init__(self, config):
self._labels = config['labels']
self._labels[config.get('scope_label', 'cluster-name')] = config['scope']
self._label_selector = ','.join('{0}={1}'.format(k, v) for k, v in self._labels.items())
self._namespace = config.get('namespace') or 'default'
self._role_label = config.get('role_label', 'role')
self._ca_certs = os.environ.get('PATRONI_KUBERNETES_CACERT', config.get('cacert')) or SERVICE_CERT_FILENAME
config['namespace'] = ''
super(Kubernetes, self).__init__(config)
self._retry = Retry(deadline=config['retry_timeout'], max_delay=1, max_tries=-1,
self._ttl = None
except k8s_config.ConfigException:
k8s_config.load_kube_config(context=config.get('context', 'local'))
self.__my_pod = None
self.__ips = [] if config.get('patronictl') else [config.get('pod_ip')]
self.__ports = []
for p in config.get('ports', [{}]):
port = {'port': int(p.get('port', '5432'))}
port.update({n: p[n] for n in ('name', 'protocol') if p.get(n)})
bypass_api_service = not config.get('patronictl') and config.get('bypass_api_service')
self._api = CoreV1ApiProxy(config.get('use_endpoints'), bypass_api_service)
self._should_create_config_service = self._api.use_endpoints
# leader_observed_record, leader_resource_version, and leader_observed_time are used only for leader race!
self._leader_observed_record = {}
self._leader_observed_time = None
self._leader_resource_version = None
self.__do_not_watch = False
self._condition = Condition()
pods_func = functools.partial(self._api.list_namespaced_pod, self._namespace,
self._pods = ObjectCache(self, pods_func, self._retry, self._condition)
kinds_func = functools.partial(self._api.list_namespaced_kind, self._namespace,
self._kinds = ObjectCache(self, kinds_func, self._retry, self._condition, self._name)
def retry(self, *args, **kwargs):
retry = self._retry.copy()
kwargs['_retry'] = retry
return retry(*args, **kwargs)
def client_path(self, path):
return super(Kubernetes, self).client_path(path)[1:].replace('/', '-')
def leader_path(self):
return self._base_path[1:] if self._api.use_endpoints else super(Kubernetes, self).leader_path
def set_ttl(self, ttl):
ttl = int(ttl)
self.__do_not_watch = self._ttl != ttl
self._ttl = ttl
def ttl(self):
return self._ttl
def set_retry_timeout(self, retry_timeout):
self._retry.deadline = retry_timeout
def reload_config(self, config):
super(Kubernetes, self).reload_config(config)
self._api.configure_timeouts(self.loop_wait, self._retry.deadline, self.ttl)
def member(pod):
annotations = pod.metadata.annotations or {}
member = Member.from_node(pod.metadata.resource_version,, None, annotations.get('status', ''))['pod_labels'] = pod.metadata.labels
return member
def _wait_caches(self, stop_time):
while not (self._pods.is_ready() and self._kinds.is_ready()):
timeout = stop_time - time.time()
if timeout <= 0:
raise RetryFailedError('Exceeded retry deadline')
def _load_cluster(self):
stop_time = time.time() + self._retry.deadline
with self._condition:
members = [self.member(pod) for pod in self._pods.copy().values()]
nodes = self._kinds.copy()
config = nodes.get(self.config_path)
metadata = config and config.metadata
annotations = metadata and metadata.annotations or {}
# get initialize flag
initialize = annotations.get(self._INITIALIZE)
# get global dynamic configuration
config = ClusterConfig.from_node(metadata and metadata.resource_version,
annotations.get(self._CONFIG) or '{}',
metadata.resource_version if self._CONFIG in annotations else 0)
# get timeline history
history = TimelineHistory.from_node(metadata and metadata.resource_version,
annotations.get(self._HISTORY) or '[]')
leader = nodes.get(self.leader_path)
metadata = leader and leader.metadata
self._leader_resource_version = metadata.resource_version if metadata else None
annotations = metadata and metadata.annotations or {}
# get last known leader lsn
last_lsn = annotations.get(self._OPTIME)
last_lsn = 0 if last_lsn is None else int(last_lsn)
except Exception:
last_lsn = 0
# get permanent slots state (confirmed_flush_lsn)
slots = annotations.get('slots')
slots = slots and json.loads(slots)
except Exception:
slots = None
# get leader
leader_record = {n: annotations.get(n) for n in (self._LEADER, 'acquireTime',
'ttl', 'renewTime', 'transitions') if n in annotations}
if (leader_record or self._leader_observed_record) and leader_record != self._leader_observed_record:
self._leader_observed_record = leader_record
self._leader_observed_time = time.time()
leader = leader_record.get(self._LEADER)
ttl = int(leader_record.get('ttl')) or self._ttl
except (TypeError, ValueError):
ttl = self._ttl
if not metadata or not self._leader_observed_time or self._leader_observed_time + ttl < time.time():
leader = None
if metadata:
member = Member(-1, leader, None, {})
member = ([m for m in members if == leader] or [member])[0]
leader = Leader(metadata.resource_version, None, member)
# failover key
failover = nodes.get(self.failover_path)
metadata = failover and failover.metadata
failover = Failover.from_node(metadata and metadata.resource_version,
metadata and (metadata.annotations or {}).copy())
# get synchronization state
sync = nodes.get(self.sync_path)
metadata = sync and sync.metadata
sync = SyncState.from_node(metadata and metadata.resource_version, metadata and metadata.annotations)
return Cluster(initialize, config, leader, last_lsn, members, failover, sync, history, slots)
except Exception:
raise KubernetesError('Kubernetes API is not responding properly')
def compare_ports(p1, p2):
return == and p1.port == p2.port and (p1.protocol or 'TCP') == (p2.protocol or 'TCP')
def subsets_changed(last_observed_subsets, ip, ports):
>>> Kubernetes.subsets_changed([], None, [])
>>> ip = ''
>>> a = [k8s_client.V1EndpointAddress(ip=ip)]
>>> s = [k8s_client.V1EndpointSubset(addresses=a)]
>>> Kubernetes.subsets_changed(s, '', [])
>>> s = [k8s_client.V1EndpointSubset(addresses=a, ports=[k8s_client.V1EndpointPort(protocol='TCP', port=1)])]
>>> Kubernetes.subsets_changed(s, '', [k8s_client.V1EndpointPort(port=5432)])
>>> p1 = k8s_client.V1EndpointPort(name='port1', port=1)
>>> p2 = k8s_client.V1EndpointPort(name='port2', port=2)
>>> p3 = k8s_client.V1EndpointPort(name='port3', port=3)
>>> s = [k8s_client.V1EndpointSubset(addresses=a, ports=[p1, p2])]
>>> Kubernetes.subsets_changed(s, ip, [p2, p3])
>>> s2 = [k8s_client.V1EndpointSubset(addresses=a, ports=[p2, p1])]
>>> Kubernetes.subsets_changed(s, ip, [p2, p1])
if len(last_observed_subsets) != 1:
return True
if len(last_observed_subsets[0].addresses or []) != 1 or \
last_observed_subsets[0].addresses[0].ip != ip or \
len(last_observed_subsets[0].ports) != len(ports):
return True
if len(ports) == 1:
return not Kubernetes.compare_ports(last_observed_subsets[0].ports[0], ports[0])
observed_ports = { p for p in last_observed_subsets[0].ports}
for p in ports:
if not in observed_ports or not Kubernetes.compare_ports(p, observed_ports.pop(
return True
return False
def __target_ref(self, leader_ip, latest_subsets, pod):
# we want to re-use existing target_ref if possible
for subset in latest_subsets:
for address in subset.addresses or []:
if address.ip == leader_ip and address.target_ref and == self._name:
return address.target_ref
return k8s_client.V1ObjectReference(kind='Pod', uid=pod.metadata.uid, namespace=self._namespace,
name=self._name, resource_version=pod.metadata.resource_version)
def _map_subsets(self, endpoints, ips):
leader = self._kinds.get(self.leader_path)
latest_subsets = leader and leader.subsets or []
if not ips:
# We want to have subsets empty
if latest_subsets:
endpoints['subsets'] = []
pod = self._pods.get(self._name)
leader_ip = ips[0] or pod and pod.status.pod_ip
# don't touch subsets if our (leader) ip is unknown or subsets is valid
if leader_ip and self.subsets_changed(latest_subsets, leader_ip, self.__ports):
kwargs = {'hostname': pod.spec.hostname, 'node_name': pod.spec.node_name,
'target_ref': self.__target_ref(leader_ip, latest_subsets, pod)} if pod else {}
address = k8s_client.V1EndpointAddress(ip=leader_ip, **kwargs)
endpoints['subsets'] = [k8s_client.V1EndpointSubset(addresses=[address], ports=self.__ports)]
def _patch_or_create(self, name, annotations, resource_version=None, patch=False, retry=None, ips=None):
metadata = {'namespace': self._namespace, 'name': name, 'labels': self._labels, 'annotations': annotations}
if patch or resource_version:
if resource_version is not None:
metadata['resource_version'] = resource_version
func = functools.partial(self._api.patch_namespaced_kind, name)
func = functools.partial(self._api.create_namespaced_kind)
# skip annotations with null values
metadata['annotations'] = {k: v for k, v in metadata['annotations'].items() if v is not None}
metadata = k8s_client.V1ObjectMeta(**metadata)
if ips is not None and self._api.use_endpoints:
endpoints = {'metadata': metadata}
self._map_subsets(endpoints, ips)
body = k8s_client.V1Endpoints(**endpoints)
body = k8s_client.V1ConfigMap(metadata=metadata)
ret = retry(func, self._namespace, body) if retry else func(self._namespace, body)
if ret:
self._kinds.set(name, ret)
return ret
def patch_or_create(self, name, annotations, resource_version=None, patch=False, retry=True, ips=None):
if retry is True:
retry = self.retry
return self._patch_or_create(name, annotations, resource_version, patch, retry, ips)
def patch_or_create_config(self, annotations, resource_version=None, patch=False, retry=True):
# SCOPE-config endpoint requires corresponding service otherwise it might be "cleaned" by k8s master
if self._api.use_endpoints and not patch and not resource_version:
self._should_create_config_service = True
return self.patch_or_create(self.config_path, annotations, resource_version, patch, retry)
def _create_config_service(self):
metadata = k8s_client.V1ObjectMeta(namespace=self._namespace, name=self.config_path, labels=self._labels)
body = k8s_client.V1Service(metadata=metadata, spec=k8s_client.V1ServiceSpec(cluster_ip='None'))
if not self._api.create_namespaced_service(self._namespace, body):
except Exception as e:
if not isinstance(e, or e.status != 409: # Service already exists
return logger.exception('create_config_service failed')
self._should_create_config_service = False
def _write_leader_optime(self, last_lsn):
def _write_status(self, value):
def _update_leader(self):
def _update_leader_with_retry(self, annotations, resource_version, ips):
retry = self._retry.copy()
def _retry(*args, **kwargs):
kwargs['_retry'] = retry
return retry(*args, **kwargs)
return self._patch_or_create(self.leader_path, annotations, resource_version, ips=ips, retry=_retry)
except as e:
if e.status == 409:
logger.warning('Concurrent update of %s', self.leader_path)
logger.exception('Permission denied' if e.status == 403 else 'Unexpected error from Kubernetes API')
return False
except (RetryFailedError, K8sException):
return False
retry.deadline = retry.stoptime - time.time()
if retry.deadline < 1:
return False
# Try to get the latest version directly from K8s API instead of relying on async cache
kind = retry(self._api.read_namespaced_kind, self.leader_path, self._namespace)
except Exception as e:
logger.error('Failed to get the leader object "%s": %r', self.leader_path, e)
return False
self._kinds.set(self.leader_path, kind)
retry.deadline = retry.stoptime - time.time()
if retry.deadline < 0.5:
return False
kind_annotations = kind and kind.metadata.annotations or {}
kind_resource_version = kind and kind.metadata.resource_version
# There is different leader or resource_version in cache didn't change
if kind and (kind_annotations.get(self._LEADER) != self._name or kind_resource_version == resource_version):
return False
return self.patch_or_create(self.leader_path, annotations, kind_resource_version, ips=ips, retry=_retry)
def update_leader(self, last_lsn, slots=None):
kind = self._kinds.get(self.leader_path)
kind_annotations = kind and kind.metadata.annotations or {}
if kind and kind_annotations.get(self._LEADER) != self._name:
return False
now =
leader_observed_record = kind_annotations or self._leader_observed_record
annotations = {self._LEADER: self._name, 'ttl': str(self._ttl), 'renewTime': now,
'acquireTime': leader_observed_record.get('acquireTime') or now,
'transitions': leader_observed_record.get('transitions') or '0'}
if last_lsn:
annotations[self._OPTIME] = str(last_lsn)
annotations['slots'] = json.dumps(slots) if slots else None
resource_version = kind and kind.metadata.resource_version
return self._update_leader_with_retry(annotations, resource_version, self.__ips)
def attempt_to_acquire_leader(self, permanent=False):
now =
annotations = {self._LEADER: self._name, 'ttl': str(sys.maxsize if permanent else self._ttl),
'renewTime': now, 'acquireTime': now, 'transitions': '0'}
if self._leader_observed_record:
transitions = int(self._leader_observed_record.get('transitions'))
except (TypeError, ValueError):
transitions = 0
if self._leader_observed_record.get(self._LEADER) != self._name:
transitions += 1
annotations['acquireTime'] = self._leader_observed_record.get('acquireTime') or now
annotations['transitions'] = str(transitions)
ips = [] if self._api.use_endpoints else None
ret = self.patch_or_create(self.leader_path, annotations, self._leader_resource_version, ips=ips)
if not ret:'Could not take out TTL lock')
return ret
def take_leader(self):
return self.attempt_to_acquire_leader()
def set_failover_value(self, value, index=None):
def manual_failover(self, leader, candidate, scheduled_at=None, index=None):
annotations = {'leader': leader or None, 'member': candidate or None,
'scheduled_at': scheduled_at and scheduled_at.isoformat()}
patch = bool(self.cluster and isinstance(self.cluster.failover, Failover) and self.cluster.failover.index)
return self.patch_or_create(self.failover_path, annotations, index, bool(index or patch), False)
def _config_resource_version(self):
config = self._kinds.get(self.config_path)
return config and config.metadata.resource_version
def set_config_value(self, value, index=None):
return self.patch_or_create_config({self._CONFIG: value}, index, bool(self._config_resource_version), False)
def touch_member(self, data, permanent=False):
cluster = self.cluster
if cluster and cluster.leader and == self._name:
role = 'promoted' if data['role'] in ('replica', 'promoted') else 'master'
elif data['state'] == 'running' and data['role'] != 'master':
role = data['role']
role = None
member = cluster and cluster.get_member(self._name, fallback_to_leader=False)
pod_labels = member and'pod_labels', None)
ret = pod_labels is not None and pod_labels.get(self._role_label) == role and deep_compare(data,
if not ret:
metadata = {'namespace': self._namespace, 'name': self._name, 'labels': {self._role_label: role},
'annotations': {'status': json.dumps(data, separators=(',', ':'))}}
body = k8s_client.V1Pod(metadata=k8s_client.V1ObjectMeta(**metadata))
ret = self._api.patch_namespaced_pod(self._name, self._namespace, body)
if ret:
self._pods.set(self._name, ret)
if self._should_create_config_service:
return ret
def initialize(self, create_new=True, sysid=""):
cluster = self.cluster
resource_version = cluster.config.index if cluster and cluster.config and cluster.config.index else None
return self.patch_or_create_config({self._INITIALIZE: sysid}, resource_version)
def _delete_leader(self):
def delete_leader(self, last_lsn=None):
kind = self._kinds.get(self.leader_path)
if kind and (kind.metadata.annotations or {}).get(self._LEADER) == self._name:
annotations = {self._LEADER: None}
if last_lsn:
annotations[self._OPTIME] = last_lsn
self.patch_or_create(self.leader_path, annotations, kind.metadata.resource_version, True, False, [])
def cancel_initialization(self):
self.patch_or_create_config({self._INITIALIZE: None}, self._config_resource_version, True)
def delete_cluster(self):
self.retry(self._api.delete_collection_namespaced_kind, self._namespace, label_selector=self._label_selector)
def set_history_value(self, value):
return self.patch_or_create_config({self._HISTORY: value}, None, bool(self._config_resource_version), False)
def set_sync_state_value(self, value, index=None):
def write_sync_state(self, leader, sync_standby, index=None):
return self.patch_or_create(self.sync_path, self.sync_state(leader, sync_standby), index, False)
def delete_sync_state(self, index=None):
return self.write_sync_state(None, None, index)
def watch(self, leader_index, timeout):
if self.__do_not_watch:
self.__do_not_watch = False
return True
return super(Kubernetes, self).watch(None, timeout + 0.5)