Skip to content

Commit

Permalink
Merge pull request #9 from cgalibern/journaled_data
Browse files Browse the repository at this point in the history
Journaled data
  • Loading branch information
cvaroqui committed Dec 9, 2020
2 parents dcb0f65 + 3825df8 commit 8f7dde6
Show file tree
Hide file tree
Showing 11 changed files with 148 additions and 72 deletions.
2 changes: 1 addition & 1 deletion opensvc/core/node/node.py
Expand Up @@ -4432,7 +4432,7 @@ def __daemon_join(self, joined, secret):
server=joined,
cluster_name="join",
secret=secret,
timeout=20 + DEFAULT_DAEMON_TIMEOUT,
timeout=120,
)
if data is None:
raise ex.Error("join node %s failed" % joined)
Expand Down
40 changes: 23 additions & 17 deletions opensvc/daemon/clusterlock.py
Expand Up @@ -5,6 +5,9 @@
import daemon.shared as shared
from env import Env

DELAY_TIME = 0.5


class LockMixin(object):
"""
Methods shared between lock/unlock handlers.
Expand All @@ -21,12 +24,12 @@ def lock_acquire(self, nodename, name, timeout=None, thr=None):
situation = 0
while time.time() < deadline:
if not lock_id:
lock_id = self._lock_acquire(nodename, name)
lock_id = self._lock_acquire(nodename, name, thr=thr)
if not lock_id:
if situation != 1:
thr.log.info("claim %s lock refused (already claimed)", name)
situation = 1
time.sleep(0.5)
time.sleep(DELAY_TIME)
continue
thr.log.info("claimed %s lock: %s", name, lock_id)
if shared.LOCKS.get(name, {}).get("id") != lock_id:
Expand All @@ -36,7 +39,7 @@ def lock_acquire(self, nodename, name, timeout=None, thr=None):
if self.lock_accepted(name, lock_id, thr=thr):
thr.log.info("locked %s", name)
return lock_id
time.sleep(0.5)
time.sleep(DELAY_TIME)
thr.log.warning("claim timeout on %s lock", name)
self.lock_release(name, lock_id, silent=True, thr=thr)

Expand All @@ -45,20 +48,20 @@ def lock_release(self, name, lock_id, timeout=None, silent=False, thr=None):
if timeout is None:
timeout = 5
deadline = time.time() + timeout
if not lock_id or shared.LOCKS.get(name, {}).get("id") != lock_id:
return
try:
with shared.LOCKS_LOCK:
if not lock_id or shared.LOCKS.get(name, {}).get("id") != lock_id:
return
del shared.LOCKS[name]
except KeyError:
pass
if thr:
thr.update_cluster_locks_lk()
shared.wake_monitor(reason="unlock", immediate=True)
if not silent:
thr.log.info("released locally %s", name)
while time.time() < deadline:
if self._lock_released(name, lock_id, thr=thr):
released = True
break
time.sleep(0.5)
time.sleep(DELAY_TIME)
if released is False:
thr.log.warning('timeout waiting for lock %s %s release on peers', name, lock_id)

Expand All @@ -85,15 +88,18 @@ def _lock_released(self, name, lock_id, thr=None):
return False
return True

def _lock_acquire(self, nodename, name):
if name in shared.LOCKS:
return
def _lock_acquire(self, nodename, name, thr=None):
lock_id = str(uuid.uuid4())
shared.LOCKS[name] = {
"requested": time.time(),
"requester": nodename,
"id": lock_id,
}
with shared.LOCKS_LOCK:
if name in shared.LOCKS:
return
shared.LOCKS[name] = {
"requested": time.time(),
"requester": nodename,
"id": lock_id,
}
if thr:
thr.update_cluster_locks_lk()
shared.wake_monitor(reason="lock", immediate=True)
return lock_id

Expand Down
12 changes: 10 additions & 2 deletions opensvc/daemon/handlers/join/post.py
Expand Up @@ -6,6 +6,9 @@
import core.exceptions as ex
from env import Env

LOCK_TIMEOUT = 120


class Handler(daemon.handler.BaseHandler, daemon.clusterlock.LockMixin):
"""
Join the cluster.
Expand All @@ -17,14 +20,19 @@ class Handler(daemon.handler.BaseHandler, daemon.clusterlock.LockMixin):
prototype = []

def action(self, nodename, thr=None, **kwargs):
lock_id = self.lock_acquire(Env.nodename, "join", 30, thr=thr)
thr.log.info('-> join request from %s', nodename)
lock_id = self.lock_acquire(Env.nodename, "join", timeout=LOCK_TIMEOUT, thr=thr)
if not lock_id:
thr.log.warning('<- join request from %s refused (Lock not acquired)', nodename)
raise ex.HTTP(503, "Lock not acquired")
try:
with shared.JOIN_LOCK:
thr.log.info('-> join request from %s with lock %s', nodename, lock_id)
data = self.join(nodename, thr=thr, **kwargs)
thr.log.info('<- join request from %s with lock %s', nodename, lock_id)
finally:
self.lock_release("join", lock_id, thr=thr)
self.lock_release("join", lock_id, timeout=LOCK_TIMEOUT, thr=thr)
thr.log.info('<- join request from %s', nodename)
return data

def join(self, nodename, thr=None, **kwargs):
Expand Down
4 changes: 1 addition & 3 deletions opensvc/daemon/hb/hb.py
Expand Up @@ -101,7 +101,7 @@ def is_beating(self, nodename="*"):
return self.peers.get(nodename, {"beating": False})["beating"]

def set_peers_beating(self):
for nodename in self.peers:
for nodename in list(self.peers.keys()):
self.set_beating(nodename)

def set_beating(self, nodename="*"):
Expand Down Expand Up @@ -198,5 +198,3 @@ def get_message(self, nodename=None):

def queue_rx_data(self, data, nodename):
shared.RX.put((nodename, data, self.name))


25 changes: 17 additions & 8 deletions opensvc/daemon/hb/ucast.py
Expand Up @@ -107,6 +107,7 @@ def run(self):
while True:
self.do()
if self.stopped():
self.log.info('sys.exit()')
sys.exit(0)
with shared.HB_TX_TICKER:
shared.HB_TX_TICKER.wait(self.interval)
Expand Down Expand Up @@ -167,7 +168,7 @@ def __init__(self, name, role="rx"):
self.sock_recv_tmo = 5.0

def _configure(self):
HbUcast._configure(self)
super(HbUcastRx, self)._configure()
if not self.config_change:
return
self.config_change = False
Expand All @@ -185,29 +186,37 @@ def _configure(self):
raise ex.AbortAction

def configure_listener(self):
af = socket.AF_INET6 if ":" in self.peer_config[Env.nodename]["addr"] else socket.AF_INET
addr = self.peer_config[Env.nodename]["addr"]
port = self.peer_config[Env.nodename]["port"]
af = socket.AF_INET6 if ":" in addr else socket.AF_INET
self.sock = socket.socket(af, socket.SOCK_STREAM)
self.sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
self.sock.bind((self.peer_config[Env.nodename]["addr"],
self.peer_config[Env.nodename]["port"]))
self.sock.bind((addr, port))
self.sock.listen(5)
self.sock.settimeout(self.sock_accept_tmo)
self.log.info("listening on %s", fmt_listener(addr, port))

def run(self):
self.set_tid()
try:
self.configure()
except ex.AbortAction:
return

cf = self.peer_config[Env.nodename]
self.log.info("listening on %s", fmt_listener(cf["addr"], cf["port"]))
except Exception as exc:
self.log.error("%s", exc)
raise exc

while True:
self.do()
try:
self.do()
except Exception as exc:
self.log.error("during do(): %s", exc)
self.log.exception(exc)
raise exc
if self.stopped():
self.join_threads()
self.sock.close()
self.log.info('sys.exit()')
sys.exit(0)

def do(self):
Expand Down
4 changes: 3 additions & 1 deletion opensvc/daemon/main.py
Expand Up @@ -429,6 +429,7 @@ def start_threads(self):
self.threads[thr_id].stop()
self.threads[thr_id].join()
del self.threads[thr_id]
shared.DAEMON_STATUS.unset_safe([thr_id])
changed = True

# clean up collector thread no longer needed
Expand All @@ -437,6 +438,7 @@ def start_threads(self):
self.threads["collector"].stop()
self.threads["collector"].join()
del self.threads["collector"]
shared.DAEMON_STATUS.unset_safe(["collector"])
changed = True

if changed:
Expand Down Expand Up @@ -486,7 +488,7 @@ def _read_config(self):
if mtime is None:
return
if self.last_config_mtime is not None and \
self.last_config_mtime >= mtime:
self.last_config_mtime >= mtime:
return
try:
with shared.NODE_LOCK:
Expand Down
50 changes: 29 additions & 21 deletions opensvc/daemon/monitor.py
Expand Up @@ -3613,6 +3613,7 @@ def merge_hb_data(self):
self.merge_hb_data_monitor()

def merge_hb_data_locks(self):
changed = False
for nodename in self.list_nodes():
if nodename == Env.nodename:
continue
Expand All @@ -3626,32 +3627,39 @@ def merge_hb_data_locks(self):
if lock["requester"] == Env.nodename and name not in shared.LOCKS:
# don't re-merge a released lock emitted by this node
continue
if name not in shared.LOCKS:
self.log.info("merge lock %s from node %s", name, nodename)
shared.LOCKS[name] = lock
continue
if lock["requested"] < shared.LOCKS[name]["requested"] and \
lock["requester"] != Env.nodename and \
lock["requester"] == nodename:
self.log.info("merge older lock %s from node %s", name, nodename)
shared.LOCKS[name] = lock
continue
with shared.LOCKS_LOCK:
if name not in shared.LOCKS:
self.log.info("merge lock %s from node %s", name, nodename)
shared.LOCKS[name] = lock
changed = True
continue

# Lock name is already present in shared.LOCKS
if lock["requested"] < shared.LOCKS[name]["requested"] and \
lock["requester"] != Env.nodename and \
lock["requester"] == nodename:
self.log.info("merge older lock %s from node %s", name, nodename)
shared.LOCKS[name] = lock
changed = True
continue
for name in list(shared.LOCKS):
try:
lock = shared.LOCKS[name]
except KeyError:
# deleted during iteration
continue
if Env.nodename == lock["requester"]:
continue
requester_lock = self.thread_data.get(["nodes", lock["requester"], "locks", name], default=None)
if requester_lock is None:
self.log.info("drop lock %s from node %s", name, nodename)
with shared.LOCKS_LOCK:
try:
del shared.LOCKS[name]
shared_lock = shared.LOCKS[name]
except KeyError:
# deleted during iteration
continue
shared_lock_requester = shared_lock["requester"]
if shared_lock_requester == Env.nodename:
continue
requester_lock = self.thread_data.get(["nodes", shared_lock_requester, "locks", name], default=None)
if requester_lock is None:
self.log.info("drop lock %s from node %s", name, shared_lock_requester)
del shared.LOCKS[name]
changed = True
if changed:
with shared.LOCKS_LOCK:
self.update_cluster_locks_lk()

def merge_hb_data_compat(self):
compat = set()
Expand Down
11 changes: 8 additions & 3 deletions opensvc/daemon/shared.py
Expand Up @@ -9,6 +9,7 @@
import json
import tempfile
import shutil
from copy import deepcopy
from subprocess import Popen, PIPE

import foreign.six as six
Expand Down Expand Up @@ -116,6 +117,7 @@ def __init__(self):
# cluster wide locks, aquire/release via the listener (usually the unix socket),
# consensus via the heartbeat links.
LOCKS = {}
LOCKS_LOCK = RLock()

# The lock to serialize data updates from rx threads
RX = queue.Queue()
Expand Down Expand Up @@ -158,6 +160,7 @@ def __init__(self):
SERVICES_LOCK.name = "SERVICES"
HB_MSG_LOCK.name = "HB_MSG"
RUN_DONE_LOCK.name = "RUN_DONE"
LOCKS_LOCK.name = "LOCKS"
except AttributeError:
pass

Expand Down Expand Up @@ -503,7 +506,7 @@ def reload_config(self):
try:
getattr(self, "reconfigure")()
except Exception as exc:
self.log.error("reconfigure error: %s", str(exc))
self.log.error("reconfigure error: %s, stopping thread", str(exc))
self.stop()
self.configured = time.time()

Expand Down Expand Up @@ -1633,6 +1636,10 @@ def update_cluster_data(self):
"nodes": self.cluster_nodes,
})

def update_cluster_locks_lk(self):
# this need protection with LOCKS_LOCK
self.node_data.merge([], {"locks": deepcopy(LOCKS)})

def update_status(self):
data = self.status()
self.thread_data.set([], data)
Expand Down Expand Up @@ -1793,5 +1800,3 @@ def object_data(self, path):
except KeyError:
pass
return data


19 changes: 19 additions & 0 deletions opensvc/tests/daemon/conftest.py
@@ -0,0 +1,19 @@
import pytest

import daemon.shared as shared
from core.node import Node
from env import Env


@pytest.fixture(scope='function')
@pytest.mark.usefixtures('osvc_path_tests')
def shared_data(mocker):
"""
mock some of shared structures
"""
mocker.patch.object(shared, 'RX', shared.queue.Queue())
mocker.patch.object(shared, 'DAEMON_STATUS', shared.OsvcJournaledData())
mocker.patch.object(shared, 'SERVICES', {})
mocker.patch.object(shared, 'LOCKS', {})
shared.DAEMON_STATUS.set([], {"monitor": {"nodes": {Env.nodename: {}}}})
mocker.patch.object(shared, 'NODE', Node())

0 comments on commit 8f7dde6

Please sign in to comment.