Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
54 changes: 47 additions & 7 deletions agents/registry/registry.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,12 @@
from ocs import ocs_agent, site_config
from ocs.base import OpCode
import time
from twisted.internet.defer import inlineCallbacks
from autobahn.twisted.util import sleep as dsleep
from collections import defaultdict
from ocs.ocs_feed import Feed

from ocs.agent.aggregator import Provider

class RegisteredAgent:
"""
Expand All @@ -17,26 +20,37 @@ class RegisteredAgent:
is not expired.
last_updated (float):
ctime at which the agent was last updated
op_codes (dict):
Dictionary of operation codes for each of the agent's
operations. For details on what the operation codes mean, see
docs from the ``ocs_agent`` module
"""
def __init__(self):
self.expired = False
self.time_expired = None
self.last_updated = time.time()
self.op_codes = {}

def refresh(self):
def refresh(self, op_codes=None):
self.expired = False
self.time_expired = None
self.last_updated = time.time()

if op_codes:
self.op_codes.update(op_codes)

def expire(self):
self.expired = True
self.time_expired = time.time()
for k in self.op_codes:
self.op_codes[k] = OpCode.EXPIRED.value

def encoded(self):
return {
'expired': self.expired,
'time_expired': self.time_expired,
'last_updated': self.last_updated
'last_updated': self.last_updated,
'op_codes': self.op_codes,
}


Expand Down Expand Up @@ -76,13 +90,20 @@ def __init__(self, agent):
options={'match': 'wildcard'}
)

agg_params = {
'frame_length': 60,
'fresh_time': 10,
}
self.agent.register_feed('agent_operations', record=True,
agg_params=agg_params, buffer_time=0)

def _register_heartbeat(self, _data):
"""
"""
Function that is called whenever a heartbeat is received from an agent.
It will update that agent in the Registry's registered_agent dict.
"""
data, feed = _data
self.registered_agents[feed['agent_address']].refresh()
op_codes, feed = _data
self.registered_agents[feed['agent_address']].refresh(op_codes=op_codes)

@inlineCallbacks
def main(self, session: ocs_agent.OpSession, params=None):
Expand All @@ -94,7 +115,7 @@ def main(self, session: ocs_agent.OpSession, params=None):
The session.data object for this process will be a dictionary containing
the encoded RegisteredAgent object for each agent observed during the
lifetime of the registry. For instance, this might look like

>>> session.data
{'observatory.aggregator':
{'expired': False,
Expand All @@ -119,11 +140,30 @@ def main(self, session: ocs_agent.OpSession, params=None):

for k, agent in self.registered_agents.items():
if time.time() - agent.last_updated > self.agent_timeout:
agent.expire()
agent.expire()

session.data = {
k: agent.encoded() for k, agent in self.registered_agents.items()
}

for addr, agent in self.registered_agents.items():
msg = { 'block_name': addr,
'timestamp': time.time(),
'data': {}}
for op_name, op_code in agent.op_codes.items():
field = f'{addr}_{op_name}'
field = field.replace('.', '_')
field = field.replace('-', '_')
field = Provider._enforce_field_name_rules(field)
try:
Feed.verify_data_field_string(field)
except ValueError as e:
self.log.warn(f"Improper field name: {field}\n{e}")
continue
msg['data'][field] = op_code
if msg['data']:
self.agent.publish_to_feed('agent_operations', msg)

return True, "Stopped registry main process"

def stop(self, session, params=None):
Expand Down
Binary file added docs/_static/operation_monitor_screenshot.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
36 changes: 28 additions & 8 deletions docs/agents/registry.rst
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,11 @@ It listens to the heartbeat feeds of all agents on the crossbar server,
and keeps track of the last heartbeat time of each agent and whether
or not each agent has agent has "expired" (gone 5 seconds without a heartbeat).

This check happens in the registry's single "main" process. The session.data object
of this process is set to a dict of agents on the system, including their last
heartbeat time, whether they have expired, and the time at which they expired.
This data can the be viewed by checking the session variable of the main process.
This check happens in the registry's single "main" process. The session.data
object of this process is set to a dict of agents on the system, including
their last heartbeat time, whether they have expired, the time at which they
expired, and a dictionary of their operation codes. This data can the be
viewed by checking the session variable of the main process.

For instance, the following code will print agent's that have been on the system
since the registry started running::
Expand All @@ -24,21 +25,40 @@ since the registry started running::
status, msg, session = registry_client.main.status()

print(session['data'])

which will print a dictionary that might look like::

{'observatory.aggregator':
{'expired': False,
'last_updated': 1583179794.5175,
'time_expired': None},
'time_expired': None,
'op_codes': {'record': 3}},
'observatory.faker1':
{'expired': False,
'last_updated': 1583179795.072248,
'time_expired': None},
'time_expired': None,
'op_codes': {'acq': 3, 'set_heartbeat': 1, 'delay_task': 1}},
'observatory.faker2':
{'expired': True,
'last_updated': 1583179777.0211036,
'time_expired': 1583179795.3862052}}
'time_expired': 1583179795.3862052,
'op_codes': {'acq': 3, 'set_heartbeat': 1, 'delay_task': 1}}}

Operation Monitor
-------------------

The registry is also used to track the status of each agent's tasks and
processes. `Operation codes` for each operation are regularly passed through an
agent's heartbeat feed, which the registry assembles and publishes through its
own OCS feed. This makes it possible to monitor individual operation states in
grafana and to easily set alerts when a process stops running or when a task
fails.

By mapping the enumeration values described in the ``OpCode`` documentation in
the :ref:`ocs_base api <ocs_base_api>`, one can make a grafana panel to monitor
all operations on a network as pictured below:

.. image:: ../_static/operation_monitor_screenshot.png




Expand Down
2 changes: 2 additions & 0 deletions docs/api.rst
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ agent.aggregator
:undoc-members:
:show-inheritance:

.. _ocs_base_api:

ocs.base
--------

Expand Down
13 changes: 13 additions & 0 deletions ocs/base.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import ocs
from enum import Enum

RET_VALS = {
'OK': 0,
Expand All @@ -9,3 +10,15 @@
OK = RET_VALS['OK']
ERROR = RET_VALS['ERROR']
TIMEOUT = RET_VALS['TIMEOUT']

class OpCode(Enum):
"""
Enumeration of OpSession states.
"""
NONE = 1
STARTING = 2
RUNNING = 3
STOPPING = 4
SUCCEEDED = 5
FAILED = 6
EXPIRED = 7
27 changes: 26 additions & 1 deletion ocs/ocs_agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
from deprecation import deprecated
from ocs import client_t
from ocs import ocs_feed
from ocs.base import OpCode

def init_site_agent(args, address=None):
"""
Expand Down Expand Up @@ -178,7 +179,14 @@ def heartbeat():
if self._heartbeat_on:
self.log.debug(' {:.1f} {address} heartbeat '
.format(time.time(), address=self.agent_address))
self.publish_to_feed("heartbeat", 0, from_reactor=True)

op_codes = {}
for name, session in self.sessions.items():
if session is None:
op_codes[name] = OpCode.NONE.value
else:
op_codes[name] = session.op_code.value
self.publish_to_feed("heartbeat", op_codes, from_reactor=True)

self.heartbeat_call = task.LoopingCall(heartbeat)
self.heartbeat_call.start(1.0) # Calls the hearbeat every second
Expand Down Expand Up @@ -739,6 +747,7 @@ def encoded(self):

SESSION_STATUS_CODES = [None, 'starting', 'running', 'stopping', 'done']


class OpSession:
"""
When a caller requests that an Operation (Process or Task) is
Expand Down Expand Up @@ -807,6 +816,22 @@ def encoded(self):
'data': self.data,
'messages': self.messages}

@property
def op_code(self):
"""
Returns the OpCode for the given session. This is what will be
published to the registry's ``operation_status`` feed.
"""
if self.status is None:
return OpCode.NONE
elif self.status in ['starting', 'running', 'stopping']:
return {'starting': OpCode.STARTING, 'running': OpCode.RUNNING,
'stopping': OpCode.STOPPING}[self.status]
elif self.success:
return OpCode.SUCCEEDED
else:
return OpCode.FAILED

def set_status(self, status, timestamp=None, log_status=True):
"""Update the OpSession status and possibly post a message about it.

Expand Down
4 changes: 2 additions & 2 deletions ocs/ocs_feed.py
Original file line number Diff line number Diff line change
Expand Up @@ -301,8 +301,8 @@ def verify_data_field_string(field):
# check for invalid characters
result = check_invalid.search(field)
if result:
raise ValueError("message 'data' block contains a key with the " +
f"invalid character '{result.group(0)}'. "
raise ValueError(f"message 'data' block contains the key {field} "
f"with the invalid character '{result.group(0)}'. "
"Valid characters are a-z, A-Z, 0-9, and underscore.")

# check for non-letter start, even after underscores
Expand Down