diff --git a/agents/registry/registry.py b/agents/registry/registry.py index ba74fdf1..c4618e3c 100644 --- a/agents/registry/registry.py +++ b/agents/registry/registry.py @@ -1,9 +1,12 @@ from ocs import ocs_agent, site_config +from ocs.base import OpCode import time from twisted.internet.defer import inlineCallbacks from autobahn.twisted.util import sleep as dsleep from collections import defaultdict +from ocs.ocs_feed import Feed +from ocs.agent.aggregator import Provider class RegisteredAgent: """ @@ -17,26 +20,37 @@ class RegisteredAgent: is not expired. last_updated (float): ctime at which the agent was last updated + op_codes (dict): + Dictionary of operation codes for each of the agent's + operations. For details on what the operation codes mean, see + docs from the ``ocs_agent`` module """ def __init__(self): self.expired = False self.time_expired = None self.last_updated = time.time() + self.op_codes = {} - def refresh(self): + def refresh(self, op_codes=None): self.expired = False self.time_expired = None self.last_updated = time.time() + if op_codes: + self.op_codes.update(op_codes) + def expire(self): self.expired = True self.time_expired = time.time() + for k in self.op_codes: + self.op_codes[k] = OpCode.EXPIRED.value def encoded(self): return { 'expired': self.expired, 'time_expired': self.time_expired, - 'last_updated': self.last_updated + 'last_updated': self.last_updated, + 'op_codes': self.op_codes, } @@ -76,13 +90,20 @@ def __init__(self, agent): options={'match': 'wildcard'} ) + agg_params = { + 'frame_length': 60, + 'fresh_time': 10, + } + self.agent.register_feed('agent_operations', record=True, + agg_params=agg_params, buffer_time=0) + def _register_heartbeat(self, _data): - """ + """ Function that is called whenever a heartbeat is received from an agent. It will update that agent in the Registry's registered_agent dict. """ - data, feed = _data - self.registered_agents[feed['agent_address']].refresh() + op_codes, feed = _data + self.registered_agents[feed['agent_address']].refresh(op_codes=op_codes) @inlineCallbacks def main(self, session: ocs_agent.OpSession, params=None): @@ -94,7 +115,7 @@ def main(self, session: ocs_agent.OpSession, params=None): The session.data object for this process will be a dictionary containing the encoded RegisteredAgent object for each agent observed during the lifetime of the registry. For instance, this might look like - + >>> session.data {'observatory.aggregator': {'expired': False, @@ -119,11 +140,30 @@ def main(self, session: ocs_agent.OpSession, params=None): for k, agent in self.registered_agents.items(): if time.time() - agent.last_updated > self.agent_timeout: - agent.expire() + agent.expire() session.data = { k: agent.encoded() for k, agent in self.registered_agents.items() } + + for addr, agent in self.registered_agents.items(): + msg = { 'block_name': addr, + 'timestamp': time.time(), + 'data': {}} + for op_name, op_code in agent.op_codes.items(): + field = f'{addr}_{op_name}' + field = field.replace('.', '_') + field = field.replace('-', '_') + field = Provider._enforce_field_name_rules(field) + try: + Feed.verify_data_field_string(field) + except ValueError as e: + self.log.warn(f"Improper field name: {field}\n{e}") + continue + msg['data'][field] = op_code + if msg['data']: + self.agent.publish_to_feed('agent_operations', msg) + return True, "Stopped registry main process" def stop(self, session, params=None): diff --git a/docs/_static/operation_monitor_screenshot.png b/docs/_static/operation_monitor_screenshot.png new file mode 100644 index 00000000..f15aa30d Binary files /dev/null and b/docs/_static/operation_monitor_screenshot.png differ diff --git a/docs/agents/registry.rst b/docs/agents/registry.rst index efad71ea..6534e133 100644 --- a/docs/agents/registry.rst +++ b/docs/agents/registry.rst @@ -10,10 +10,11 @@ It listens to the heartbeat feeds of all agents on the crossbar server, and keeps track of the last heartbeat time of each agent and whether or not each agent has agent has "expired" (gone 5 seconds without a heartbeat). -This check happens in the registry's single "main" process. The session.data object -of this process is set to a dict of agents on the system, including their last -heartbeat time, whether they have expired, and the time at which they expired. -This data can the be viewed by checking the session variable of the main process. +This check happens in the registry's single "main" process. The session.data +object of this process is set to a dict of agents on the system, including +their last heartbeat time, whether they have expired, the time at which they +expired, and a dictionary of their operation codes. This data can the be +viewed by checking the session variable of the main process. For instance, the following code will print agent's that have been on the system since the registry started running:: @@ -24,21 +25,40 @@ since the registry started running:: status, msg, session = registry_client.main.status() print(session['data']) - which will print a dictionary that might look like:: {'observatory.aggregator': {'expired': False, 'last_updated': 1583179794.5175, - 'time_expired': None}, + 'time_expired': None, + 'op_codes': {'record': 3}}, 'observatory.faker1': {'expired': False, 'last_updated': 1583179795.072248, - 'time_expired': None}, + 'time_expired': None, + 'op_codes': {'acq': 3, 'set_heartbeat': 1, 'delay_task': 1}}, 'observatory.faker2': {'expired': True, 'last_updated': 1583179777.0211036, - 'time_expired': 1583179795.3862052}} + 'time_expired': 1583179795.3862052, + 'op_codes': {'acq': 3, 'set_heartbeat': 1, 'delay_task': 1}}} + +Operation Monitor +------------------- + +The registry is also used to track the status of each agent's tasks and +processes. `Operation codes` for each operation are regularly passed through an +agent's heartbeat feed, which the registry assembles and publishes through its +own OCS feed. This makes it possible to monitor individual operation states in +grafana and to easily set alerts when a process stops running or when a task +fails. + +By mapping the enumeration values described in the ``OpCode`` documentation in +the :ref:`ocs_base api `, one can make a grafana panel to monitor +all operations on a network as pictured below: + +.. image:: ../_static/operation_monitor_screenshot.png + diff --git a/docs/api.rst b/docs/api.rst index 904dfcf9..e6aa59fd 100644 --- a/docs/api.rst +++ b/docs/api.rst @@ -17,6 +17,8 @@ agent.aggregator :undoc-members: :show-inheritance: +.. _ocs_base_api: + ocs.base -------- diff --git a/ocs/base.py b/ocs/base.py index e5f22197..a56124e4 100644 --- a/ocs/base.py +++ b/ocs/base.py @@ -1,4 +1,5 @@ import ocs +from enum import Enum RET_VALS = { 'OK': 0, @@ -9,3 +10,15 @@ OK = RET_VALS['OK'] ERROR = RET_VALS['ERROR'] TIMEOUT = RET_VALS['TIMEOUT'] + +class OpCode(Enum): + """ + Enumeration of OpSession states. + """ + NONE = 1 + STARTING = 2 + RUNNING = 3 + STOPPING = 4 + SUCCEEDED = 5 + FAILED = 6 + EXPIRED = 7 diff --git a/ocs/ocs_agent.py b/ocs/ocs_agent.py index cf173a1d..d17091dc 100644 --- a/ocs/ocs_agent.py +++ b/ocs/ocs_agent.py @@ -22,6 +22,7 @@ from deprecation import deprecated from ocs import client_t from ocs import ocs_feed +from ocs.base import OpCode def init_site_agent(args, address=None): """ @@ -178,7 +179,14 @@ def heartbeat(): if self._heartbeat_on: self.log.debug(' {:.1f} {address} heartbeat ' .format(time.time(), address=self.agent_address)) - self.publish_to_feed("heartbeat", 0, from_reactor=True) + + op_codes = {} + for name, session in self.sessions.items(): + if session is None: + op_codes[name] = OpCode.NONE.value + else: + op_codes[name] = session.op_code.value + self.publish_to_feed("heartbeat", op_codes, from_reactor=True) self.heartbeat_call = task.LoopingCall(heartbeat) self.heartbeat_call.start(1.0) # Calls the hearbeat every second @@ -739,6 +747,7 @@ def encoded(self): SESSION_STATUS_CODES = [None, 'starting', 'running', 'stopping', 'done'] + class OpSession: """ When a caller requests that an Operation (Process or Task) is @@ -807,6 +816,22 @@ def encoded(self): 'data': self.data, 'messages': self.messages} + @property + def op_code(self): + """ + Returns the OpCode for the given session. This is what will be + published to the registry's ``operation_status`` feed. + """ + if self.status is None: + return OpCode.NONE + elif self.status in ['starting', 'running', 'stopping']: + return {'starting': OpCode.STARTING, 'running': OpCode.RUNNING, + 'stopping': OpCode.STOPPING}[self.status] + elif self.success: + return OpCode.SUCCEEDED + else: + return OpCode.FAILED + def set_status(self, status, timestamp=None, log_status=True): """Update the OpSession status and possibly post a message about it. diff --git a/ocs/ocs_feed.py b/ocs/ocs_feed.py index 95264f22..7bdd98b0 100644 --- a/ocs/ocs_feed.py +++ b/ocs/ocs_feed.py @@ -301,8 +301,8 @@ def verify_data_field_string(field): # check for invalid characters result = check_invalid.search(field) if result: - raise ValueError("message 'data' block contains a key with the " + - f"invalid character '{result.group(0)}'. " + raise ValueError(f"message 'data' block contains the key {field} " + f"with the invalid character '{result.group(0)}'. " "Valid characters are a-z, A-Z, 0-9, and underscore.") # check for non-letter start, even after underscores