From af377c36d971712b36312994df89da0184b7412d Mon Sep 17 00:00:00 2001 From: Christophe Varoqui Date: Sat, 13 Jul 2019 17:06:18 +0200 Subject: [PATCH] Root patch-kind events to the root of the daemon status structure This will allow api clients to maintain their daemon_status cache in-sync using the events feed only, instead of refetching the whole daemon status structure upon every event. This change breaks most janitors (envoy at least). So they will have to follow the change. Also implement a stricter RBAC events filter, capable of mangling the patch data for a specific requester. This patch also enables SSE for the logs handlers. And add a simple url path router: /node//logs => node_logs + options.node= /object//logs => service_logs + options.path= --- lib/nodemgr_parser.py | 4 +- lib/osvcd_lsnr.py | 195 +++++++++++++++++++++++++++++++++--------- lib/osvcd_mon.py | 42 ++------- lib/osvcd_shared.py | 70 ++++++++++++++- 4 files changed, 230 insertions(+), 81 deletions(-) diff --git a/lib/nodemgr_parser.py b/lib/nodemgr_parser.py index 5ccfbc614d..088ee7a056 100644 --- a/lib/nodemgr_parser.py +++ b/lib/nodemgr_parser.py @@ -376,7 +376,9 @@ ], }, "events": { - "msg": "Follow the daemon events", + "msg": "Follow the daemon events feed. Two kinds of event " + "can be received: event and patch. Patch data " + "applies to the daemon status structure.", }, "frozen": { "msg": "Return 0 if the services are frozen node-wide, " diff --git a/lib/osvcd_lsnr.py b/lib/osvcd_lsnr.py index 90940576dd..99c43cae05 100644 --- a/lib/osvcd_lsnr.py +++ b/lib/osvcd_lsnr.py @@ -2,6 +2,7 @@ Listener Thread """ import base64 +import copy import json import os import sys @@ -443,18 +444,18 @@ def janitor_events(self): if thr not in self.threads: to_remove.append(idx) continue - event = self.filter_event(event, thr) - if event is None: + fevent = self.filter_event(event, thr) + if fevent is None: continue if thr.h2conn: if not thr.events_stream_ids: to_remove.append(idx) continue - _msg = event + _msg = fevent elif thr.encrypted: - _msg = self.encrypt(event) + _msg = self.encrypt(fevent) else: - _msg = self.msg_encode(event) + _msg = self.msg_encode(fevent) thr.event_queue.put(_msg) for idx in to_remove: try: @@ -467,6 +468,8 @@ def filter_event(self, event, thr): return event if event is None: return + if "root" in thr.usr_grants: + return event if event.get("kind") == "patch": return self.filter_patch_event(event, thr) else: @@ -491,24 +494,84 @@ def valid(change): def filter_patch_event(self, event, thr): namespaces = thr.usr_grants.get("guest", []) - def valid(change): - if not change[0]: - return False - if not change[0][0] == "services": - return True + def filter_change(change): try: - path = change[0][2] - except IndexError: - return False - _, namespace, _ = split_path(path) - if namespace in namespaces: - return True - return False + key, value = change + except: + key = change[0] + value = None + try: + key_len = len(key) + except: + return change + if key_len == 0: + if value is None: + return change + value = self.filter_daemon_status(value, namespaces) + return [key, value] + elif key[0] == "monitor": + if key_len == 1: + if value is None: + return change + value = self.filter_daemon_status({"monitor": value}, namespaces)["monitor"] + return [key, value] + if key[1] == "services": + if key_len == 2: + if value is None: + return change + value = dict((k, v) for k, v in value.items() if split_path(k)[1] in namespaces) + return [key, value] + if split_path(key[2])[1] in namespaces: + return change + else: + return + if key[1] == "nodes": + if key_len == 2: + if value is None: + return change + value = self.filter_daemon_status({"monitor": {"nodes": value}}, namespaces)["monitor"]["nodes"] + return [key, value] + if key_len == 3: + if value is None: + return change + value = self.filter_daemon_status({"monitor": {"nodes": {key[2]: value}}}, namespaces)["monitor"]["nodes"][key[2]] + return [key, value] + if key[3] == "services": + if key_len == 4: + if value is None: + return change + value = self.filter_daemon_status({"monitor": {"nodes": {key[2]: {"services": value}}}}, namespaces)["monitor"]["nodes"][key[2]]["services"] + return [key, value] + if key[4] == "status": + if key_len == 5: + if value is None: + return change + value = dict((k, v) for k, v in value.items() if split_path(k)[1] in namespaces) + return [key, value] + if split_path(key[5])[1] in namespaces: + return change + else: + return + if key[4] == "config": + if key_len == 5: + if value is None: + return change + value = dict((k, v) for k, v in value.items() if split_path(k)[1] in namespaces) + return [key, value] + if split_path(key[5])[1] in namespaces: + return change + else: + return + return change changes = [] for change in event.get("data", []): - if valid(change): - changes.append(change) + filtered_change = filter_change(change) + if filtered_change: + changes.append(filtered_change) + # print("ACCEPT", thr.usr.name, filtered_change) + #else: + # print("DROP ", thr.usr.name, change) event["data"] = changes return event @@ -1000,6 +1063,8 @@ def handle_h2_client(self): return except Exception as exc: self.log.error("exit on %s %s", type(exc), exc) + #import traceback + #traceback.print_exc() return # execute all registered pushers @@ -1447,6 +1512,8 @@ def do_node(nodename): except Exception: continue + if mode == "stream": + return return result def push_peer_stream(self, stream_id, nodename, client_stream_id, conn, resp): @@ -1498,6 +1565,37 @@ def create_multiplex(self, fname, options, data, original_nodename, action, stre result["status"] += _result.get("status", 0) return result + @staticmethod + def parse_path(s): + l = s.split("/") + path = None + node = None + if len(l) == 1: + return node, path, s + + if l[0] == "node": + node = l[1] + action = "/".join(l[2:]) + elif l[0] == "object": + path = fmt_path(l[3], l[1], l[2]) + action = "/".join(l[4:]) + elif l[0] == "instance": + node = l[1] + path = fmt_path(l[4], l[2], l[3]) + action = "/".join(l[5:]) + + # translate action + if action == "logs" and path: + action = "service_logs" + elif action == "backlogs" and path: + action = "service_backlogs" + elif action == "logs" and node: + action = "node_logs" + elif action == "backlogs" and node: + action = "node_backlogs" + + return node, path, action + def router(self, nodename, data, stream_id=None): """ For a request data, extract the requested action and options, @@ -1508,7 +1606,19 @@ def router(self, nodename, data, stream_id=None): return {"error": "invalid data format", "status": 1} if "action" not in data: return {"error": "action not specified", "status": 1} + + # url path router + # ex: nodes/n1/logs => n1, None, node_logs action = data["action"] + node, path, action = self.parse_path(action) + if node: + data["node"] = node + if path: + if "options" in data: + data["options"]["path"] = path + else: + data["options"] = {"path": path} + fname = "action_" + action if not hasattr(self, fname): raise HTTP(501, "handler '%s' not supported" % action) @@ -1676,23 +1786,11 @@ def action_daemon_status(self, nodename, **kwargs): Return a hash indexed by thead id, containing the status data structure of each thread. """ - options = kwargs.get("options", {}) - data = { - "pid": shared.DAEMON.pid, - "cluster": { - "name": self.cluster_name, - "id": self.cluster_id, - "nodes": self.cluster_nodes, - } - } + data = copy.deepcopy(self.daemon_status()) if self.usr is False or "root" in self.usr_grants: - allowed_namespaces = None - else: - allowed_namespaces = self.usr_grants.get("guest", []) - with shared.THREADS_LOCK: - for thr_id, thread in shared.THREADS.items(): - data[thr_id] = thread.status(namespaces=allowed_namespaces, **options) - return data + return data + namespaces = self.usr_grants.get("guest", []) + return self.filter_daemon_status(data, namespaces) def wait_shutdown(self): def still_shutting(): @@ -2617,16 +2715,12 @@ def action_events(self, nodename, stream_id=None, **kwargs): self.raw_push_action_events() def h2_push_action_events(self, stream_id): - content_type = self.streams[stream_id]["content_type"] while True: try: msg = self.event_queue.get(False, 0) except queue.Empty: break - if content_type == "text/event-stream": - self.h2_sse_stream_send(stream_id, msg) - else: - self.h2_stream_send(stream_id, msg) + self.h2_stream_send(stream_id, msg) def raw_push_action_events(self): while True: @@ -2687,6 +2781,12 @@ def action_service_logs(self, nodename, stream_id=None, **kwargs): svc = self.get_service(path) if svc is None: raise HTTP(404, "%s not found" % path) + request_headers = HTTPHeaderMap(self.streams[stream_id]["request"].headers) + try: + content_type = bdecode(request_headers.get("accept").pop()) + except: + content_type = "application/json" + self.streams[stream_id]["content_type"] = content_type logfile = os.path.join(svc.log_d, svc.name+".log") ofile = self._action_logs_open(logfile, 0, "node") self.streams[stream_id]["pushers"].append({ @@ -2716,10 +2816,14 @@ def action_node_logs(self, nodename, stream_id=None, **kwargs): A negative value means send the whole file. The 0 value means follow the file. """ - options = kwargs.get("options", {}) - follow = options.get("follow") logfile = os.path.join(rcEnv.paths.pathlog, "node.log") ofile = self._action_logs_open(logfile, 0, "node") + request_headers = HTTPHeaderMap(self.streams[stream_id]["request"].headers) + try: + content_type = bdecode(request_headers.get("accept").pop()) + except: + content_type = "application/json" + self.streams[stream_id]["content_type"] = content_type self.streams[stream_id]["pushers"].append({ "fn": "h2_push_logs", "args": [ofile, True], @@ -2792,6 +2896,13 @@ def h2_sse_stream_send(self, stream_id, data): self.send_outbound(stream_id) def h2_stream_send(self, stream_id, data): + try: + content_type = self.streams[stream_id]["content_type"] + except KeyError: + content_type = None + if content_type == "text/event-stream": + self.h2_sse_stream_send(stream_id, data) + return promised_stream_id = self.h2conn.get_next_available_stream_id() request_headers = self.streams[stream_id]["request"].headers self.h2conn.push_stream(stream_id, promised_stream_id, request_headers) diff --git a/lib/osvcd_mon.py b/lib/osvcd_mon.py index 35f29b79e8..e6ab344c68 100644 --- a/lib/osvcd_mon.py +++ b/lib/osvcd_mon.py @@ -200,7 +200,6 @@ def do(self): self.stop() else: self.update_cluster_data() - self.merge_hb_data() self.orchestrator() self.update_hb_data() shared.wake_collector() @@ -3066,6 +3065,8 @@ def get_arbitrators_data(self): def update_cluster_data(self): self.update_node_data() self.purge_left_nodes() + self.merge_hb_data() + self.update_daemon_status() def purge_left_nodes(self): left = set([node for node in shared.CLUSTER_DATA]) - set(self.cluster_nodes) @@ -3134,13 +3135,6 @@ def update_hb_data(self): if diff is None: return - shared.EVENT_Q.put({ - "nodename": rcEnv.nodename, - "kind": "patch", - "ts": time.time(), - "data": diff, - }) - # don't store the diff if we have no peers if len(shared.LOCAL_GEN) == 0: return @@ -3435,7 +3429,7 @@ def get_all_paths(self): continue return paths - def get_agg_services(self, paths=None): + def get_agg_services(self): data = {} all_paths = self.get_all_paths() for path in all_paths: @@ -3449,8 +3443,6 @@ def get_agg_services(self, paths=None): data[path] = self.get_agg(path) with shared.AGG_LOCK: shared.AGG = data - if paths is not None: - return dict((path, data[path]) for path in paths if path in data) return data def update_completions(self): @@ -3469,33 +3461,13 @@ def update_completion(self, otype): print(exc) pass - def filter_cluster_data(self, paths=None): - with shared.CLUSTER_DATA_LOCK: - data = copy.deepcopy(shared.CLUSTER_DATA) - if paths is None: - return data - nodes = list([n for n in data]) - for node in data: - cpaths = [p for p in data[node]["services"]["config"] if p in paths] - data[node]["services"]["config"] = dict((path, data[node]["services"]["config"][path]) for path in cpaths) - spaths = [p for p in data[node]["services"]["status"] if p in paths] - data[node]["services"]["status"] = dict((path, data[node]["services"]["status"][path]) for path in spaths) - return data - - def status(self, **kwargs): - if kwargs.get("refresh"): - self.update_hb_data() - namespaces = kwargs.get("namespaces") - if namespaces is None: - paths = None - else: - paths = [p for p in self.get_all_paths() if split_path(p)[1] in namespaces] - data = shared.OsvcThread.status(self, **kwargs) - data["nodes"] = self.filter_cluster_data(paths) + def status(self): + data = shared.OsvcThread.status(self) + data["nodes"] = json.loads(json.dumps(shared.CLUSTER_DATA)) data["compat"] = self.compat data["transitions"] = self.transition_count() data["frozen"] = self.get_clu_agg_frozen() - data["services"] = self.get_agg_services(paths) + data["services"] = self.get_agg_services() return data def get_last_shutdown(self): diff --git a/lib/osvcd_shared.py b/lib/osvcd_shared.py index 2bf69c13d3..cfff28cd7c 100644 --- a/lib/osvcd_shared.py +++ b/lib/osvcd_shared.py @@ -1,6 +1,7 @@ """ A module to share variables used by osvcd threads. """ +import copy import os import sys import threading @@ -8,13 +9,14 @@ import codecs import hashlib import json +import json_delta from subprocess import Popen, PIPE import six from six.moves import queue import rcExceptions as ex -from rcUtilities import lazy, unset_lazy, is_string, factory +from rcUtilities import lazy, unset_lazy, is_string, factory, split_path from rcGlobalEnv import rcEnv from storage import Storage from freezer import Freezer @@ -25,6 +27,10 @@ # a global to store the Daemon() instance DAEMON = None +# daemon_status cache +LAST_DAEMON_STATUS = {} +DAEMON_STATUS = {} + # disable orchestration if a peer announces a different compat version than # ours COMPAT_VERSION = 10 @@ -238,10 +244,10 @@ def status(self, **kwargs): state = "running" else: state = "terminated" - data = Storage({ + data = { "state": state, "created": self.created, - }) + } if self.tid: data["tid"] = self.tid return data @@ -1349,3 +1355,61 @@ def get_node(self): """ return NODE + def _daemon_status(self): + """ + Return a hash indexed by thead id, containing the status data + structure of each thread. + """ + data = { + "pid": DAEMON.pid, + "cluster": { + "name": self.cluster_name, + "id": self.cluster_id, + "nodes": self.cluster_nodes, + } + } + with THREADS_LOCK: + for thr_id, thread in THREADS.items(): + data[thr_id] = thread.status() + return data + + def update_daemon_status(self): + global LAST_DAEMON_STATUS + global DAEMON_STATUS + global EVENT_Q + LAST_DAEMON_STATUS = json.loads(json.dumps(DAEMON_STATUS)) + DAEMON_STATUS = self._daemon_status() + diff = json_delta.diff( + LAST_DAEMON_STATUS, DAEMON_STATUS, + verbose=False, array_align=False, compare_lengths=False + ) + if not diff: + return + EVENT_Q.put({ + "kind": "patch", + "ts": time.time(), + "data": diff, + }) + + def daemon_status(self): + return DAEMON_STATUS + + @staticmethod + def filter_daemon_status(data, namespaces): + keep = [] + for path in [p for p in data.get("monitor", {}).get("services", {})]: + namespace = split_path(path)[1] + if namespace not in namespaces: + del data["monitor"]["services"][path] + else: + keep.append(path) + for node in [n for n in data.get("monitor", {}).get("nodes", {})]: + for path in [p for p in data["monitor"]["nodes"][node].get("services", {}).get("status", {})]: + if path not in keep: + del data["monitor"]["nodes"][node]["services"]["status"][path] + for path in [p for p in data["monitor"]["nodes"][node].get("services", {}).get("config", {})]: + if path not in keep: + del data["monitor"]["nodes"][node]["services"]["config"][path] + return data + +