Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fixed etcd join. #172

Merged
merged 1 commit into from
Jan 9, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
122 changes: 71 additions & 51 deletions cmd/etcdsvr/etcdsvr.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import logging
import os
import random
import shlex
import signal
import socket
import subprocess
Expand Down Expand Up @@ -62,13 +63,27 @@ def ip_in_use(ip, port):
s.close()
return True

except socket.error as e:
except socket.error:
pass
except Exception as e:
except Exception:
pass

return False


def run_cmd(cmd, get_result=False):
result = ""
try:
out = None
if get_result:
out = subprocess.PIPE
re = subprocess.run(shlex.split(cmd), stdout=out, stderr=out,
universal_newlines=True, check=True)
result = str(re.stdout)

except subprocess.CalledProcessError:
pass
return result

class Config():
# Init default.
def __init__(self):
Expand All @@ -95,8 +110,8 @@ def get_members(self):
cmd_list = "%s member list" % (etcd_cmd)
members = ""
try:
members = subprocess.check_output(cmd_list, shell=False)
except subprocess.CalledProcessError as e:
members = run_cmd(cmd_list, get_result=True)
except subprocess.CalledProcessError:
pass
return members

Expand All @@ -105,25 +120,23 @@ def display_status(self):

os.environ["ETCDCTL_API"] = "3"
etcd_cmd = '%s/etcdctl --endpoints="%s"' % (os.getcwd(), self.cluster_endpoints)
cmd_list = "%s member list 2>&1 | cat" % (etcd_cmd)
cmd_status = "%s endpoint status 2>&1 | cat" % (etcd_cmd)
cmd_health = "%s endpoint health 2>&1 | cat" % (etcd_cmd)

out = etcd_cmd
out += "\n\n===== member list\n" + subprocess.check_output(cmd_list, shell=False)
print(out)
out = "===== endpoint status\n" + subprocess.check_output(cmd_status, shell=False)
print(out)
out = "===== endpoint health\n" + subprocess.check_output(cmd_health, shell=False)
print(out)
cmd_list = "%s member list" % (etcd_cmd)
cmd_status = "%s endpoint status" % (etcd_cmd)
cmd_health = "%s endpoint health" % (etcd_cmd)

print(etcd_cmd + "\n\n===== member list\n")
run_cmd(cmd_list)

print("\n===== endpoint status\n")
run_cmd(cmd_status)

print("\n===== endpoint health\n")
run_cmd(cmd_health)

# Join an existing cluster.
def join_cluster(self):

etcd_cmd = '%s/etcdctl --endpoints="%s"' % (os.getcwd(), self.cluster_endpoints)
cmd_select = "%s member list | grep ', %s, http' | awk -F',' '{print $1}'" % (
etcd_cmd, self.etcd_name
)

cmd_add = "%s member add %s --peer-urls=%s" % (
etcd_cmd, self.etcd_name, self.peer_url
Expand All @@ -133,41 +146,48 @@ def join_cluster(self):

ok = True
err = None
resp = ">> Members:\n"
try:
os.environ["ETCDCTL_API"] = "3"
resp += self.get_members()
text = self.get_members()

hexid = ""

resp = ">> Members:\n" + text
print(resp)

# Remove the current entry if any
resp += "\n>> Select:\n%s\n\n" % (cmd_select)
hexid = subprocess.check_output(cmd_select, shell=False)
lines = text.split("\n")
for li in lines:
tokens = li.split(", ")
if len(tokens) > 3 and self.etcd_name == tokens[2]:
hexid = tokens[0]
break

if len(hexid) > 0:
cmd_remove = "%s member remove %s" % (etcd_cmd, hexid)
resp += "\n>> Remove:\n%s\n\n" % (cmd_remove)

resp += subprocess.check_output(cmd_remove, stderr=subprocess.STDOUT, shell=False)
cmd_remove = "%s member remove %s\n\n" % (etcd_cmd, hexid)
print("\n>> Remove:\n%s" % (cmd_remove))
resp += cmd_remove

run_cmd(cmd_remove)
sleep(5)

# Add a new entry
resp += "\n>> Add:\n%s\n\n" % (cmd_add)

resp += subprocess.check_output(cmd_add, stderr=subprocess.STDOUT, shell=False)

resp += "\n>> Members:\n"
resp += self.get_members()
resp += "\n"

resp += cmd_rm
resp += "\n"
msg = "\n>> Add:\n%s\n\n" % (cmd_add)
print(msg)
resp += msg

run_cmd(cmd_add)
msg = "\n>> Members:\n" + self.get_members()
print(msg)
resp += msg

msg = "\n" + cmd_rm + "\n"
print(msg)
resp += msg

except subprocess.CalledProcessError as e:
err = e.output
ok = False

print(resp)
with open("join.log", "w+") as f:
f.write(resp)

Expand All @@ -189,7 +209,6 @@ def add_json_cfg(self):
h["advertise-client-urls"] = client_url
h["initial-advertise-peer-urls"] = self.peer_url

dir = self.etcd_name + ".etcd"
if self.is_existing_cluster:
# Join an existing cluster
h["initial-cluster-state"] = "existing"
Expand Down Expand Up @@ -339,23 +358,22 @@ def sig_handler(self, sig, frame):
def is_endpoint_healthy(self, wait_time):
os.environ["ETCDCTL_API"] = "3"
etcd_cmd = '%s/etcdctl --endpoints="%s"' % (os.getcwd(), self.local_endpoint)
cmd_health = "%s endpoint health 2>&1 | cat" % (etcd_cmd)
cmd_health = "%s endpoint health" % (etcd_cmd)
result = ""

now = int(time())
for i in range(50):
sleep(2)
for i in range(10):
sleep(5)
t = int(time()) - now
if t > wait_time:
break

if t > 60:
msg = "unhealthy_%s" % (self.etcd_name)
if t > 50:
msg = "unhealthy_%s retry ..." % (self.etcd_name)
self.logger.error("[MANAGER] %s" % (msg))

result = subprocess.check_output(cmd_health, shell=False)
health_check_bytes = str.encode("is healthy")
if health_check_bytes in result:
result = run_cmd(cmd_health, get_result=True)
if "is health" in result:
return True

self.logger.info("[MANAGER] %s" % (result))
Expand Down Expand Up @@ -418,8 +436,8 @@ def watch_and_recycle(self, cfg):
print(" ")
self.logger.info("[MANAGER] Started etcd process %d" % (self.pid))

wait_time = 85 + random.randint(0,10)
while False:
wait_time = 60 + random.randint(0,10)
while False: #not self.is_endpoint_healthy(wait_time):

if restartCount > 0:
self.shutdown_server()
Expand All @@ -436,6 +454,8 @@ def watch_and_recycle(self, cfg):
self.shutdown(-1) # exit

print("Starting etcd process %d succeeded." % (self.pid))
if os.path.exists("join.log"):
os.remove("join.log")
self.server.wait()

# etcd server has exited.
Expand Down Expand Up @@ -469,7 +489,7 @@ def watch_and_recycle(self, cfg):

with open("./etcdsvr.pid", "w") as f:
f.write("%d\n" % (os.getpid()))

cfg = Config()
err = cfg.parse_cfg(False)
if err:
Expand Down