Skip to content

Commit

Permalink
Merge pull request #425 from cgalibern/b2.1
Browse files Browse the repository at this point in the history
[daemon] Fix infinite full->patch heartbeat message after monitor thread crashed & hb tx crash
  • Loading branch information
cgalibern committed Jun 22, 2021
2 parents 323aae2 + fc439af commit a83c3b3
Show file tree
Hide file tree
Showing 5 changed files with 106 additions and 64 deletions.
2 changes: 1 addition & 1 deletion opensvc/daemon/handlers/askfull/post.py
Expand Up @@ -29,7 +29,7 @@ def action(self, nodename, thr=None, **kwargs):
raise ex.Error("Can't ask a full from ourself")
if options.peer not in thr.cluster_nodes:
raise ex.Error("Can't ask a full from %s: not in cluster.nodes" % options.peer)
shared.REMOTE_GEN[options.peer] = 0
shared.PEER_GEN_MERGED[options.peer] = 0
result = {
"info": "remote %s asked for a full" % options.peer,
"status": 0,
Expand Down
19 changes: 12 additions & 7 deletions opensvc/daemon/handlers/sync/get.py
Expand Up @@ -51,11 +51,16 @@ def action(self, nodename, thr=None, stream_id=None, **kwargs):
return {"status": 1, "data": {"satisfied": False, "gen": ref_gen}}

def match(self, ref_gen):
for node, gen in shared.LOCAL_GEN.items():
try:
if gen < ref_gen:
return False
except TypeError:
continue
return True
matched = True
with shared.RX_LOCK:
# Protect from LOCAL_GEN_MERGED_ON_PEER delete items
# during hb::delete_peer_data(...)
for node, gen in shared.LOCAL_GEN_MERGED_ON_PEER.items():
try:
if gen < ref_gen:
matched = False
break
except TypeError:
continue
return matched

21 changes: 14 additions & 7 deletions opensvc/daemon/hb/hb.py
Expand Up @@ -164,7 +164,7 @@ def get_message(self, nodename=None):
self.log.debug("ping node %s", nodename if nodename else "*")
if self.msg_type != 'ping':
self.msg_type = 'ping'
self.log.info('change message type to %s', self.msg_type)
self.log.info('change message type to %s (gen %s)', self.msg_type, shared.GEN)
message = self.encrypt({
"kind": "ping",
"compat": shared.COMPAT_VERSION,
Expand All @@ -181,7 +181,7 @@ def get_message(self, nodename=None):
return None, 0
if self.msg_type != 'full':
self.msg_type = 'full'
self.log.info('change message type to %s', self.msg_type)
self.log.info('change message type to %s (gen %s)', self.msg_type, shared.GEN)
with shared.HB_MSG_LOCK:
if shared.HB_MSG is not None:
return shared.HB_MSG, shared.HB_MSG_LEN
Expand All @@ -196,12 +196,19 @@ def get_message(self, nodename=None):
self.log.debug("send gen %d-%d deltas to %s", begin, shared.GEN, nodename if nodename else "*") # COMMENT
if self.msg_type != 'patch':
self.msg_type = 'patch'
self.log.info('change message type to %s', self.msg_type)
self.log.info('change message type to %s (gen %s)', self.msg_type, shared.GEN)
data = {}
for gen, delta in shared.GEN_DIFF.items():
if gen <= begin:
continue
data[gen] = delta
try:
for gen, delta in shared.GEN_DIFF.items():
if gen <= begin:
continue
data[gen] = delta
except:
# Protect from GEN_DIFF 'dictionary changed size' during iteration
# - purge_log()
# - reset during monitor crash init_data()
self.log.info("wait next iteration to create patch message (gen %s)", shared.GEN)
return None, 0
message = self.encrypt({
"kind": "patch",
"deltas": data,
Expand Down
110 changes: 70 additions & 40 deletions opensvc/daemon/monitor.py
Expand Up @@ -678,7 +678,20 @@ def wait_listener(self):

def init_data(self):
self._update_cluster_data()
shared.GEN = 0
if shared.GEN > 1:
self.log.warning("monitor init data after thread crashed at gen %s", shared.GEN)
# inc GEN to ensure out of sequence patch detection on peer
shared.GEN = shared.GEN + 2
with shared.RX_LOCK:
# Protect from LOCAL_GEN_MERGED_ON_PEER, PEER_GEN_MERGED delete items
# during hb::delete_peer_data(...)
#
# Ensure ask full from peers
shared.PEER_GEN_MERGED = {peer: 0 for peer in shared.PEER_GEN_MERGED}
# no diff until one peer node receive our full
shared.LOCAL_GEN_MERGED_ON_PEER = {peer: 0 for peer in shared.LOCAL_GEN_MERGED_ON_PEER}
# reset GEN_DIFF to ensure fresh patch sequence
shared.GEN_DIFF = {}
initial_data = {
"compat": shared.COMPAT_VERSION,
"api": shared.API_VERSION,
Expand Down Expand Up @@ -3651,7 +3664,7 @@ def update_hb_data(self):
return

# don't store the diff if we have no peers
if len(shared.LOCAL_GEN) == 0:
if len(shared.LOCAL_GEN_MERGED_ON_PEER) == 0:
return

shared.GEN_DIFF[shared.GEN] = diff
Expand Down Expand Up @@ -4130,12 +4143,12 @@ def missing_beating_peer_data(self):
return True
return False

def update_node_gen(self, nodename, local=0, remote=0):
shared.LOCAL_GEN[nodename] = local
shared.REMOTE_GEN[nodename] = remote
def update_node_gen(self, nodename, local_gen_merged_on_peer=0, peer_gen_merged=0):
shared.LOCAL_GEN_MERGED_ON_PEER[nodename] = local_gen_merged_on_peer
shared.PEER_GEN_MERGED[nodename] = peer_gen_merged
gdata = {
nodename: remote,
Env.nodename: local
nodename: peer_gen_merged,
Env.nodename: local_gen_merged_on_peer
}
if not self.nodes_data.exists([nodename]):
self.nodes_data.set([nodename], {"gen": gdata})
Expand All @@ -4157,8 +4170,10 @@ def merge_rx(self):
def _merge_rx(self, nodename, data, hbname):
if data is None:
self.log.info("drop corrupted rx data from %s", nodename)
current_gen = shared.REMOTE_GEN.get(nodename, 0)
our_gen_on_peer = data.get("gen", {}).get(Env.nodename, 0)
peer_gen_merged = shared.PEER_GEN_MERGED.get(nodename, 0)
local_gen_merged_on_peer = shared.LOCAL_GEN_MERGED_ON_PEER.get(nodename, 0)
local_gen_merged_on_peer_from_message = data.get("gen", {}).get(Env.nodename, 0)
peer_gen_from_message = data.get("gen", {}).get(nodename, 0)
kind = data.get("kind", "full")
change = False
# self.log.debug("received %s from node %s: current gen %d, our gen local:%s peer:%s",
Expand All @@ -4167,71 +4182,84 @@ def _merge_rx(self, nodename, data, hbname):
if not self.nodes_data.exists([nodename]):
# happens during init, or after join. ignore the patch, and ask for a full
self.log.info("%s was not yet in nodes data view, ask for a full", nodename)
if our_gen_on_peer == 0:
if local_gen_merged_on_peer_from_message == 0:
self.log.info("%s ignore us yet, will send a full", nodename)
self.update_node_gen(nodename, remote=0, local=our_gen_on_peer)
self.update_node_gen(nodename, peer_gen_merged=0, local_gen_merged_on_peer=0)
return False
if current_gen == 0:
if peer_gen_merged == 0:
# waiting for a full: ignore patches
# self.log.debug("waiting for a full: ignore patch %s received from %s", list(data.get("deltas", [])),
# nodename)
if shared.REMOTE_GEN.get(nodename) is None:
if shared.PEER_GEN_MERGED.get(nodename) is None:
self.log.info("undefined gen for %s dataset, drop patch and "
"ask for a full (peer has gen %s of our dataset)", nodename, our_gen_on_peer)
self.update_node_gen(nodename, remote=0, local=our_gen_on_peer)
"ask for a full (peer has gen %s of our dataset)", nodename, local_gen_merged_on_peer_from_message)
self.update_node_gen(nodename,
peer_gen_merged=0,
local_gen_merged_on_peer=local_gen_merged_on_peer_from_message)
return False
deltas = data.get("deltas", [])
gens = sorted([int(gen) for gen in deltas])
gens = [gen for gen in gens if gen > current_gen]
gens = [gen for gen in gens if gen > peer_gen_merged]
if len(gens) == 0:
# self.log.info("no more recent gen in received deltas")
if our_gen_on_peer > shared.LOCAL_GEN[nodename]:
shared.LOCAL_GEN[nodename] = our_gen_on_peer
self.nodes_data.set([nodename, "gen", Env.nodename], our_gen_on_peer)
if local_gen_merged_on_peer_from_message > local_gen_merged_on_peer:
shared.LOCAL_GEN_MERGED_ON_PEER[nodename] = local_gen_merged_on_peer_from_message
self.nodes_data.set([nodename, "gen", Env.nodename], local_gen_merged_on_peer_from_message)
return False
nodes_info_change = False
for gen in gens:
# self.log.debug("patch node %s dataset gen %d over %d (%d diffs)", nodename, gen, current_gen,
# len(deltas[str(gen)]))
if gen - 1 != current_gen:
if current_gen:
if gen - 1 != peer_gen_merged:
if peer_gen_merged:
# don't be alarming on daemon start: it is normal we receive a out-of-sequence patch
self.log.warning("unsynchronized node %s dataset. local gen %d, received %d. "
"ask for a full.", nodename, current_gen, gen)
self.update_node_gen(nodename, remote=0, local=our_gen_on_peer)
self.log.warning("unsynchronized node %s dataset. merged gen %d, received out of sequence %d. "
"ask for a full.", nodename, peer_gen_merged, gen)
self.update_node_gen(nodename,
peer_gen_merged=0,
local_gen_merged_on_peer=local_gen_merged_on_peer_from_message)
break
try:
self.nodes_data.patch([nodename], deltas[str(gen)])
current_gen = gen
self.update_node_gen(nodename, remote=gen, local=our_gen_on_peer)
peer_gen_merged = gen
self.update_node_gen(nodename,
peer_gen_merged=peer_gen_merged,
local_gen_merged_on_peer=local_gen_merged_on_peer_from_message)
self.log.debug("patch node %s dataset to gen %d, peer has gen %d of our dataset",
nodename, shared.REMOTE_GEN[nodename],
shared.LOCAL_GEN[nodename])
nodename,
peer_gen_merged,
local_gen_merged_on_peer_from_message)
if not nodes_info_change:
nodes_info_change |= self.patch_has_nodes_info_change(deltas[str(gen)])
change = True
except Exception as exc:
self.log.warning("failed to apply node %s dataset gen %d patch: %s. "
"ask for a full: %s", nodename, gen, deltas[str(gen)], exc)
self.update_node_gen(nodename, remote=0, local=our_gen_on_peer)
self.update_node_gen(nodename,
peer_gen_merged=0,
local_gen_merged_on_peer=local_gen_merged_on_peer_from_message)
break
if nodes_info_change:
self.on_nodes_info_change()
return change
elif kind == "ping":
self.update_node_gen(nodename, remote=0, local=our_gen_on_peer)
self.update_node_gen(nodename,
peer_gen_merged=0,
local_gen_merged_on_peer=local_gen_merged_on_peer_from_message)
self.nodes_data.set([nodename, "monitor"], data["monitor"])
self.log.debug("reset node %s dataset gen, peer has gen %d of our dataset",
nodename, shared.LOCAL_GEN[nodename])
nodename, local_gen_merged_on_peer_from_message)
change = True
else:
data_gen = data.get("gen", {}).get(nodename)
if data_gen is None:
if peer_gen_from_message == 0:
# GEN starts at 1, this message is 1st message after init_data gen: {}
self.log.debug("no 'gen' in full dataset from %s: drop", nodename)
return False
last_gen = shared.REMOTE_GEN.get(nodename)
if last_gen is not None and last_gen >= data_gen:
self.log.debug("already installed or beyond %s gen %d dataset: drop", nodename, data_gen)
if peer_gen_merged >= peer_gen_from_message:
self.log.debug("already installed or beyond %s gen %d dataset (current gen merged: %s): drop",
nodename,
peer_gen_from_message,
peer_gen_merged)
return False
node_status = data.get("monitor", {}).get("status")
if node_status in ("init", "maintenance", "upgrade") and self.nodes_data.exists([nodename]):
Expand All @@ -4242,11 +4270,13 @@ def _merge_rx(self, nodename, data, hbname):
data["services"]["status"][path] = idata

self.nodes_data.set([nodename], data)
new_gen = data.get("gen", {}).get(nodename, 0)
self.update_node_gen(nodename, remote=new_gen, local=our_gen_on_peer)
self.update_node_gen(nodename,
peer_gen_merged=peer_gen_from_message,
local_gen_merged_on_peer=local_gen_merged_on_peer_from_message)
self.log.info("install node %s full dataset gen %d, peer has gen %d of our dataset",
nodename, shared.REMOTE_GEN[nodename],
shared.LOCAL_GEN[nodename])
nodename,
peer_gen_from_message,
local_gen_merged_on_peer_from_message)
self.on_nodes_info_change()
change = True
return change
Expand Down
18 changes: 9 additions & 9 deletions opensvc/daemon/shared.py
Expand Up @@ -39,7 +39,7 @@ def __init__(self):
["updated"],
],
# disable journaling if we have no peer, as nothing purges the journal
journal_condition=lambda: bool(LOCAL_GEN),
journal_condition=lambda: bool(LOCAL_GEN_MERGED_ON_PEER),
)


Expand Down Expand Up @@ -71,10 +71,10 @@ def __init__(self):
GEN = 1

# track the generation of the local dataset on peer nodes
LOCAL_GEN = {}
LOCAL_GEN_MERGED_ON_PEER = {}

# track the generation of the peer datasets we merged
REMOTE_GEN = {}
PEER_GEN_MERGED = {}

# track the local dataset gen diffs pending merge by peers
GEN_DIFF = {}
Expand Down Expand Up @@ -1013,12 +1013,12 @@ def delete_peer_data(self, nodename):
with RX_LOCK:
self.nodes_data.unset_safe([nodename])
try:
del LOCAL_GEN[nodename]
del LOCAL_GEN_MERGED_ON_PEER[nodename]
except KeyError:
pass
try:
# will ask for a full when the node comes back again
del REMOTE_GEN[nodename]
del PEER_GEN_MERGED[nodename]
except KeyError:
pass

Expand Down Expand Up @@ -1349,16 +1349,16 @@ def get_oldest_gen(self, nodename=None):
Get oldest generation of the local dataset on peers.
"""
if nodename is None:
gens = LOCAL_GEN.values()
gens = LOCAL_GEN_MERGED_ON_PEER.values()
if len(gens) == 0:
return 0, 0
gen = min(gens)
num = len(gens)
# self.log.info("oldest gen is %d amongst %d nodes", gen, num)
else:
if nodename not in LOCAL_GEN:
if nodename not in LOCAL_GEN_MERGED_ON_PEER:
return 0, 0
gen = LOCAL_GEN.get(nodename, 0)
gen = LOCAL_GEN_MERGED_ON_PEER.get(nodename, 0)
num = 1
# self.log.info("gen on node %s is %d", nodename, gen)
return gen, num
Expand Down Expand Up @@ -1389,7 +1389,7 @@ def get_gen(inc=False):
if inc:
GEN += 1
gen = {Env.nodename: GEN}
gen.update(REMOTE_GEN)
gen.update(PEER_GEN_MERGED)
return gen

def node_overloaded(self, nodename=None):
Expand Down

0 comments on commit a83c3b3

Please sign in to comment.