Skip to content

Commit

Permalink
Root patch-kind events to the root of the daemon status structure
Browse files Browse the repository at this point in the history
This will allow api clients to maintain their daemon_status
cache in-sync using the events feed only, instead of refetching
the whole daemon status structure upon every event.

This change breaks most janitors (envoy at least). So they will
have to follow the change.

Also implement a stricter RBAC events filter, capable of mangling
the patch data for a specific requester.

This patch also enables SSE for the logs handlers.

And add a simple url path router:

/node/<node>/logs => node_logs + options.node=<node>
/object/<path>/logs => service_logs + options.path=<path>
  • Loading branch information
cvaroqui committed Jul 13, 2019
1 parent 21cc1fd commit af377c3
Show file tree
Hide file tree
Showing 4 changed files with 230 additions and 81 deletions.
4 changes: 3 additions & 1 deletion lib/nodemgr_parser.py
Expand Up @@ -376,7 +376,9 @@
],
},
"events": {
"msg": "Follow the daemon events",
"msg": "Follow the daemon events feed. Two kinds of event "
"can be received: event and patch. Patch data "
"applies to the daemon status structure.",
},
"frozen": {
"msg": "Return 0 if the services are frozen node-wide, "
Expand Down
195 changes: 153 additions & 42 deletions lib/osvcd_lsnr.py
Expand Up @@ -2,6 +2,7 @@
Listener Thread
"""
import base64
import copy
import json
import os
import sys
Expand Down Expand Up @@ -443,18 +444,18 @@ def janitor_events(self):
if thr not in self.threads:
to_remove.append(idx)
continue
event = self.filter_event(event, thr)
if event is None:
fevent = self.filter_event(event, thr)
if fevent is None:
continue
if thr.h2conn:
if not thr.events_stream_ids:
to_remove.append(idx)
continue
_msg = event
_msg = fevent
elif thr.encrypted:
_msg = self.encrypt(event)
_msg = self.encrypt(fevent)
else:
_msg = self.msg_encode(event)
_msg = self.msg_encode(fevent)
thr.event_queue.put(_msg)
for idx in to_remove:
try:
Expand All @@ -467,6 +468,8 @@ def filter_event(self, event, thr):
return event
if event is None:
return
if "root" in thr.usr_grants:
return event
if event.get("kind") == "patch":
return self.filter_patch_event(event, thr)
else:
Expand All @@ -491,24 +494,84 @@ def valid(change):
def filter_patch_event(self, event, thr):
namespaces = thr.usr_grants.get("guest", [])

def valid(change):
if not change[0]:
return False
if not change[0][0] == "services":
return True
def filter_change(change):
try:
path = change[0][2]
except IndexError:
return False
_, namespace, _ = split_path(path)
if namespace in namespaces:
return True
return False
key, value = change
except:
key = change[0]
value = None
try:
key_len = len(key)
except:
return change
if key_len == 0:
if value is None:
return change
value = self.filter_daemon_status(value, namespaces)
return [key, value]
elif key[0] == "monitor":
if key_len == 1:
if value is None:
return change
value = self.filter_daemon_status({"monitor": value}, namespaces)["monitor"]
return [key, value]
if key[1] == "services":
if key_len == 2:
if value is None:
return change
value = dict((k, v) for k, v in value.items() if split_path(k)[1] in namespaces)
return [key, value]
if split_path(key[2])[1] in namespaces:
return change
else:
return
if key[1] == "nodes":
if key_len == 2:
if value is None:
return change
value = self.filter_daemon_status({"monitor": {"nodes": value}}, namespaces)["monitor"]["nodes"]
return [key, value]
if key_len == 3:
if value is None:
return change
value = self.filter_daemon_status({"monitor": {"nodes": {key[2]: value}}}, namespaces)["monitor"]["nodes"][key[2]]
return [key, value]
if key[3] == "services":
if key_len == 4:
if value is None:
return change
value = self.filter_daemon_status({"monitor": {"nodes": {key[2]: {"services": value}}}}, namespaces)["monitor"]["nodes"][key[2]]["services"]
return [key, value]
if key[4] == "status":
if key_len == 5:
if value is None:
return change
value = dict((k, v) for k, v in value.items() if split_path(k)[1] in namespaces)
return [key, value]
if split_path(key[5])[1] in namespaces:
return change
else:
return
if key[4] == "config":
if key_len == 5:
if value is None:
return change
value = dict((k, v) for k, v in value.items() if split_path(k)[1] in namespaces)
return [key, value]
if split_path(key[5])[1] in namespaces:
return change
else:
return
return change

changes = []
for change in event.get("data", []):
if valid(change):
changes.append(change)
filtered_change = filter_change(change)
if filtered_change:
changes.append(filtered_change)
# print("ACCEPT", thr.usr.name, filtered_change)
#else:
# print("DROP ", thr.usr.name, change)
event["data"] = changes
return event

Expand Down Expand Up @@ -1000,6 +1063,8 @@ def handle_h2_client(self):
return
except Exception as exc:
self.log.error("exit on %s %s", type(exc), exc)
#import traceback
#traceback.print_exc()
return

# execute all registered pushers
Expand Down Expand Up @@ -1447,6 +1512,8 @@ def do_node(nodename):
except Exception:
continue

if mode == "stream":
return
return result

def push_peer_stream(self, stream_id, nodename, client_stream_id, conn, resp):
Expand Down Expand Up @@ -1498,6 +1565,37 @@ def create_multiplex(self, fname, options, data, original_nodename, action, stre
result["status"] += _result.get("status", 0)
return result

@staticmethod
def parse_path(s):
l = s.split("/")
path = None
node = None
if len(l) == 1:
return node, path, s

if l[0] == "node":
node = l[1]
action = "/".join(l[2:])
elif l[0] == "object":
path = fmt_path(l[3], l[1], l[2])
action = "/".join(l[4:])
elif l[0] == "instance":
node = l[1]
path = fmt_path(l[4], l[2], l[3])
action = "/".join(l[5:])

# translate action
if action == "logs" and path:
action = "service_logs"
elif action == "backlogs" and path:
action = "service_backlogs"
elif action == "logs" and node:
action = "node_logs"
elif action == "backlogs" and node:
action = "node_backlogs"

return node, path, action

def router(self, nodename, data, stream_id=None):
"""
For a request data, extract the requested action and options,
Expand All @@ -1508,7 +1606,19 @@ def router(self, nodename, data, stream_id=None):
return {"error": "invalid data format", "status": 1}
if "action" not in data:
return {"error": "action not specified", "status": 1}

# url path router
# ex: nodes/n1/logs => n1, None, node_logs
action = data["action"]
node, path, action = self.parse_path(action)
if node:
data["node"] = node
if path:
if "options" in data:
data["options"]["path"] = path
else:
data["options"] = {"path": path}

fname = "action_" + action
if not hasattr(self, fname):
raise HTTP(501, "handler '%s' not supported" % action)
Expand Down Expand Up @@ -1676,23 +1786,11 @@ def action_daemon_status(self, nodename, **kwargs):
Return a hash indexed by thead id, containing the status data
structure of each thread.
"""
options = kwargs.get("options", {})
data = {
"pid": shared.DAEMON.pid,
"cluster": {
"name": self.cluster_name,
"id": self.cluster_id,
"nodes": self.cluster_nodes,
}
}
data = copy.deepcopy(self.daemon_status())
if self.usr is False or "root" in self.usr_grants:
allowed_namespaces = None
else:
allowed_namespaces = self.usr_grants.get("guest", [])
with shared.THREADS_LOCK:
for thr_id, thread in shared.THREADS.items():
data[thr_id] = thread.status(namespaces=allowed_namespaces, **options)
return data
return data
namespaces = self.usr_grants.get("guest", [])
return self.filter_daemon_status(data, namespaces)

def wait_shutdown(self):
def still_shutting():
Expand Down Expand Up @@ -2617,16 +2715,12 @@ def action_events(self, nodename, stream_id=None, **kwargs):
self.raw_push_action_events()

def h2_push_action_events(self, stream_id):
content_type = self.streams[stream_id]["content_type"]
while True:
try:
msg = self.event_queue.get(False, 0)
except queue.Empty:
break
if content_type == "text/event-stream":
self.h2_sse_stream_send(stream_id, msg)
else:
self.h2_stream_send(stream_id, msg)
self.h2_stream_send(stream_id, msg)

def raw_push_action_events(self):
while True:
Expand Down Expand Up @@ -2687,6 +2781,12 @@ def action_service_logs(self, nodename, stream_id=None, **kwargs):
svc = self.get_service(path)
if svc is None:
raise HTTP(404, "%s not found" % path)
request_headers = HTTPHeaderMap(self.streams[stream_id]["request"].headers)
try:
content_type = bdecode(request_headers.get("accept").pop())
except:
content_type = "application/json"
self.streams[stream_id]["content_type"] = content_type
logfile = os.path.join(svc.log_d, svc.name+".log")
ofile = self._action_logs_open(logfile, 0, "node")
self.streams[stream_id]["pushers"].append({
Expand Down Expand Up @@ -2716,10 +2816,14 @@ def action_node_logs(self, nodename, stream_id=None, **kwargs):
A negative value means send the whole file.
The 0 value means follow the file.
"""
options = kwargs.get("options", {})
follow = options.get("follow")
logfile = os.path.join(rcEnv.paths.pathlog, "node.log")
ofile = self._action_logs_open(logfile, 0, "node")
request_headers = HTTPHeaderMap(self.streams[stream_id]["request"].headers)
try:
content_type = bdecode(request_headers.get("accept").pop())
except:
content_type = "application/json"
self.streams[stream_id]["content_type"] = content_type
self.streams[stream_id]["pushers"].append({
"fn": "h2_push_logs",
"args": [ofile, True],
Expand Down Expand Up @@ -2792,6 +2896,13 @@ def h2_sse_stream_send(self, stream_id, data):
self.send_outbound(stream_id)

def h2_stream_send(self, stream_id, data):
try:
content_type = self.streams[stream_id]["content_type"]
except KeyError:
content_type = None
if content_type == "text/event-stream":
self.h2_sse_stream_send(stream_id, data)
return
promised_stream_id = self.h2conn.get_next_available_stream_id()
request_headers = self.streams[stream_id]["request"].headers
self.h2conn.push_stream(stream_id, promised_stream_id, request_headers)
Expand Down

0 comments on commit af377c3

Please sign in to comment.