From 16098b1d5811f46cf023f9c448207aa303e6f54b Mon Sep 17 00:00:00 2001 From: art2ip <126820369+art2ip@users.noreply.github.com> Date: Mon, 8 Jan 2024 16:04:09 -0800 Subject: [PATCH] Fixed etcd join. --- cmd/etcdsvr/etcdsvr.py | 122 ++++++++++++++++++++++++----------------- 1 file changed, 71 insertions(+), 51 deletions(-) diff --git a/cmd/etcdsvr/etcdsvr.py b/cmd/etcdsvr/etcdsvr.py index 31b46b1d..b4b6dc49 100755 --- a/cmd/etcdsvr/etcdsvr.py +++ b/cmd/etcdsvr/etcdsvr.py @@ -24,6 +24,7 @@ import logging import os import random +import shlex import signal import socket import subprocess @@ -62,13 +63,27 @@ def ip_in_use(ip, port): s.close() return True - except socket.error as e: + except socket.error: pass - except Exception as e: + except Exception: pass return False - + +def run_cmd(cmd, get_result=False): + result = "" + try: + out = None + if get_result: + out = subprocess.PIPE + re = subprocess.run(shlex.split(cmd), stdout=out, stderr=out, + universal_newlines=True, check=True) + result = str(re.stdout) + + except subprocess.CalledProcessError: + pass + return result + class Config(): # Init default. def __init__(self): @@ -95,8 +110,8 @@ def get_members(self): cmd_list = "%s member list" % (etcd_cmd) members = "" try: - members = subprocess.check_output(cmd_list, shell=False) - except subprocess.CalledProcessError as e: + members = run_cmd(cmd_list, get_result=True) + except subprocess.CalledProcessError: pass return members @@ -105,25 +120,23 @@ def display_status(self): os.environ["ETCDCTL_API"] = "3" etcd_cmd = '%s/etcdctl --endpoints="%s"' % (os.getcwd(), self.cluster_endpoints) - cmd_list = "%s member list 2>&1 | cat" % (etcd_cmd) - cmd_status = "%s endpoint status 2>&1 | cat" % (etcd_cmd) - cmd_health = "%s endpoint health 2>&1 | cat" % (etcd_cmd) - - out = etcd_cmd - out += "\n\n===== member list\n" + subprocess.check_output(cmd_list, shell=False) - print(out) - out = "===== endpoint status\n" + subprocess.check_output(cmd_status, shell=False) - print(out) - out = "===== endpoint health\n" + subprocess.check_output(cmd_health, shell=False) - print(out) + cmd_list = "%s member list" % (etcd_cmd) + cmd_status = "%s endpoint status" % (etcd_cmd) + cmd_health = "%s endpoint health" % (etcd_cmd) + + print(etcd_cmd + "\n\n===== member list\n") + run_cmd(cmd_list) + + print("\n===== endpoint status\n") + run_cmd(cmd_status) + + print("\n===== endpoint health\n") + run_cmd(cmd_health) # Join an existing cluster. def join_cluster(self): etcd_cmd = '%s/etcdctl --endpoints="%s"' % (os.getcwd(), self.cluster_endpoints) - cmd_select = "%s member list | grep ', %s, http' | awk -F',' '{print $1}'" % ( - etcd_cmd, self.etcd_name - ) cmd_add = "%s member add %s --peer-urls=%s" % ( etcd_cmd, self.etcd_name, self.peer_url @@ -133,41 +146,48 @@ def join_cluster(self): ok = True err = None - resp = ">> Members:\n" try: os.environ["ETCDCTL_API"] = "3" - resp += self.get_members() + text = self.get_members() hexid = "" - + resp = ">> Members:\n" + text + print(resp) + # Remove the current entry if any - resp += "\n>> Select:\n%s\n\n" % (cmd_select) - hexid = subprocess.check_output(cmd_select, shell=False) + lines = text.split("\n") + for li in lines: + tokens = li.split(", ") + if len(tokens) > 3 and self.etcd_name == tokens[2]: + hexid = tokens[0] + break if len(hexid) > 0: - cmd_remove = "%s member remove %s" % (etcd_cmd, hexid) - resp += "\n>> Remove:\n%s\n\n" % (cmd_remove) - - resp += subprocess.check_output(cmd_remove, stderr=subprocess.STDOUT, shell=False) + cmd_remove = "%s member remove %s\n\n" % (etcd_cmd, hexid) + print("\n>> Remove:\n%s" % (cmd_remove)) + resp += cmd_remove + + run_cmd(cmd_remove) sleep(5) # Add a new entry - resp += "\n>> Add:\n%s\n\n" % (cmd_add) - - resp += subprocess.check_output(cmd_add, stderr=subprocess.STDOUT, shell=False) - - resp += "\n>> Members:\n" - resp += self.get_members() - resp += "\n" - - resp += cmd_rm - resp += "\n" + msg = "\n>> Add:\n%s\n\n" % (cmd_add) + print(msg) + resp += msg + + run_cmd(cmd_add) + msg = "\n>> Members:\n" + self.get_members() + print(msg) + resp += msg + + msg = "\n" + cmd_rm + "\n" + print(msg) + resp += msg except subprocess.CalledProcessError as e: err = e.output ok = False - print(resp) with open("join.log", "w+") as f: f.write(resp) @@ -189,7 +209,6 @@ def add_json_cfg(self): h["advertise-client-urls"] = client_url h["initial-advertise-peer-urls"] = self.peer_url - dir = self.etcd_name + ".etcd" if self.is_existing_cluster: # Join an existing cluster h["initial-cluster-state"] = "existing" @@ -339,23 +358,22 @@ def sig_handler(self, sig, frame): def is_endpoint_healthy(self, wait_time): os.environ["ETCDCTL_API"] = "3" etcd_cmd = '%s/etcdctl --endpoints="%s"' % (os.getcwd(), self.local_endpoint) - cmd_health = "%s endpoint health 2>&1 | cat" % (etcd_cmd) + cmd_health = "%s endpoint health" % (etcd_cmd) result = "" now = int(time()) - for i in range(50): - sleep(2) + for i in range(10): + sleep(5) t = int(time()) - now if t > wait_time: break - if t > 60: - msg = "unhealthy_%s" % (self.etcd_name) + if t > 50: + msg = "unhealthy_%s retry ..." % (self.etcd_name) self.logger.error("[MANAGER] %s" % (msg)) - result = subprocess.check_output(cmd_health, shell=False) - health_check_bytes = str.encode("is healthy") - if health_check_bytes in result: + result = run_cmd(cmd_health, get_result=True) + if "is health" in result: return True self.logger.info("[MANAGER] %s" % (result)) @@ -418,8 +436,8 @@ def watch_and_recycle(self, cfg): print(" ") self.logger.info("[MANAGER] Started etcd process %d" % (self.pid)) - wait_time = 85 + random.randint(0,10) - while False: + wait_time = 60 + random.randint(0,10) + while False: #not self.is_endpoint_healthy(wait_time): if restartCount > 0: self.shutdown_server() @@ -436,6 +454,8 @@ def watch_and_recycle(self, cfg): self.shutdown(-1) # exit print("Starting etcd process %d succeeded." % (self.pid)) + if os.path.exists("join.log"): + os.remove("join.log") self.server.wait() # etcd server has exited. @@ -469,7 +489,7 @@ def watch_and_recycle(self, cfg): with open("./etcdsvr.pid", "w") as f: f.write("%d\n" % (os.getpid())) - + cfg = Config() err = cfg.parse_cfg(False) if err: