import shlex from threading import Thread, Lock, Event from psutil import TimeoutExpired from patroni.postgresql.cancellable import CancellableExecutor import logging logger = logging.getLogger(__name__) def synchronized(func): def wrapped(self, *args, **kwargs): with self._lock: return func(self, *args, **kwargs) return wrapped class LivenessThread(CancellableExecutor, Thread): def __init__(self, config): CancellableExecutor.__init__(self) Thread.__init__(self) self.reload_config(config) self.daemon = True self._stopevent = Event() self.__lv_failures = 0 def reload_config(self, config): self.config = config self.cmd = self.config and shlex.split(self.config['probe']) + [self.connect_args] or [] @property def lv_failures(self): return self.__lv_failures def terminate(self): with self._lock: if self._stopevent.isSet(): return self._stopevent.set() if self._process is None or not self._process.is_running(): return self.cancellable._kill_process() logger.info("Liveness probe terminated") def call_probe(self): try: with self._lock: if self._stopevent.isSet(): return started = self._start_process(self.cmd, env=self.env) if started: return self._process.wait(self.config['timeout']) except TimeoutExpired: self._kill_process() raise finally: with self._lock: self._process = None self._kill_children() def run(self): """Liveness plugin probe""" while not self._stopevent.isSet(): try: ret = self.call_probe() logger.info("Liveness Probe Completed with status %s", ret) if ret == 0: self.__lv_failures = 0 else: if self.config['max_failures'] > 0: self.__lv_failures += 1 logger.error("Liveness probe failed. failures %s out of %s", self.__lv_failures, self.config['max_failures']) except TimeoutExpired: if self.config['max_failures'] > 0: self.__lv_failures += 1 logger.error("Liveness probe timed out. failures %s out of %s", self.__lv_failures, self.config['max_failures']) except Exception: logger.exception("Exception during liveness probe") self._stopevent.wait(self.config['interval']) class Liveness(object): def __init__(self, config): self.livenesscheck = None self._lock = Lock() self.reload_config(config) @synchronized def reload_config(self, config): self.config = config if self.livenesscheck: self.livenesscheck.reload_config(config) # next loop iteration will restart if liveness checks are set in config if self.livenesscheck and self.livenesscheck.is_alive(): self._disable() def _disable(self): if self.livenesscheck and self.livenesscheck.is_alive(): logger.info("liveness check alive, stopping") self.livenesscheck.terminate() def _activate(self, connect_args): self.connect_args, self.env = connect_args if not self.config: logger.info("Liveness check activate skipped, No liveness checks in config") return self._disable() if self.livenesscheck and self.livenesscheck.is_alive(): logger.info("Liveness check from prev run still active after terminate, skipping activate") return self.livenesscheck = LivenessThread(self.config, connect_args) self.livenesscheck.start() @synchronized def disable(self): self._disable() @synchronized def activate(self, connect_args): self._activate(connect_args) @property @synchronized def is_running(self): return self.livenesscheck and self.livenesscheck.is_alive() or False @property @synchronized def is_healthy(self): if self.livenesscheck and self.livenesscheck.is_alive() and self.config['max_failures'] > 0: return self.livenesscheck.lv_failures < self.config['max_failures'] return True