Skip to content

Commit

Permalink
Merge pull request #241 from simonsobs/ocsbow-updates-prep
Browse files Browse the repository at this point in the history
Core updates, prelude for major ocsbow
  • Loading branch information
BrianJKoopman committed Nov 17, 2021
2 parents 58f7e3f + aba51a0 commit e1d89f0
Show file tree
Hide file tree
Showing 7 changed files with 156 additions and 52 deletions.
1 change: 1 addition & 0 deletions docs/api.rst
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ ocs.ocs_agent
.. automodule:: ocs.ocs_agent
:members:
:undoc-members:
:private-members:
:show-inheritance:

.. _ocs_feed_api:
Expand Down
34 changes: 28 additions & 6 deletions ocs/client_http.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,9 @@
import json
import requests

class ControlClientError(RuntimeError):
pass

class ControlClient():
def __init__(self, agent_addr, **kwargs):
self.agent_addr = agent_addr
Expand All @@ -25,18 +28,37 @@ def call(self, procedure, *args, **kwargs):
try:
r = requests.post(self.call_url, data=params)
except requests.exceptions.ConnectionError as e:
raise RuntimeError([0,0,0,0,'client_http.error.connection_error',
['Failed to connect to %s' % self.call_url], {}])
raise ControlClientError([0,0,0,0,'client_http.error.connection_error',
['Failed to connect to %s' % self.call_url], {}])
if r.status_code != 200:
raise RuntimeError([0,0,0,0,'client_http.error.request_error',
['Server replied with code %i' % r.status_code], {}])
raise ControlClientError([0,0,0,0,'client_http.error.request_error',
['Server replied with code %i' % r.status_code], {}])
decoded = r.json()
if 'error' in decoded:
# Return errors in the same way wampy does, roughly.
raise RuntimeError([0,0,0,0,decoded['error'],decoded['args'],decoded['kwargs']])
raise ControlClientError([0,0,0,0,decoded['error'],decoded['args'],decoded['kwargs']])
return decoded['args'][0]

# These are API we want to add.
def get_api(self, simple=False):
"""Query the API and other info from the Agent; this includes lists of
Processes, Tasks, and Feeds, docstrings, operation session
structures, and info about the Agent instance (class, PID,
host).
Args:
simple (bool): If True, then return just the lists of the op
and feed names without accompanying detail.
Returns:
A dict, see :func:`ocs.ocs_agent.OCSAgent._management_handler`
for detail.
"""
data = self.call(self.agent_addr, 'get_api')
if not simple:
return data
return {k: [_v[0] for _v in v]
for k, v in data.items() if isinstance(v, dict)}

def get_tasks(self):
"""
Expand Down
109 changes: 86 additions & 23 deletions ocs/ocs_agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
from .ocs_twisted import in_reactor_context

import time, datetime
import socket
import os
from deprecation import deprecated
from ocs import client_t
Expand Down Expand Up @@ -167,8 +168,8 @@ def onJoin(self, details):
# Register our processes...
# Register the device interface functions.
try:
yield self.register(self.my_device_handler, self.agent_address + '.ops')
yield self.register(self.my_management_handler, self.agent_address)
yield self.register(self._ops_handler, self.agent_address + '.ops')
yield self.register(self._management_handler, self.agent_address)
except ApplicationError:
self.log.error('Failed to register basic handlers @ %s; '
'agent probably running already.' % self.agent_address)
Expand Down Expand Up @@ -267,7 +268,7 @@ def encoded(self):
'processes': list(self.processes.keys())
}

def my_device_handler(self, action, op_name, params=None, timeout=None):
def _ops_handler(self, action, op_name, params=None, timeout=None):
if action == 'start':
return self.start(op_name, params=params)
if action == 'stop':
Expand All @@ -288,30 +289,69 @@ def _gather_sessions(self, parent):
parent: either self.tasks or self.processes.
Returns:
A list of session data blocks. Each session block contains
at least entries for 'op_name' and 'status'. In the case
that the operation has ever run, it will contain all the
stuff from OpSession.encode; otherwise 'no_history' is
returned for the status.
A list of Operation description tuples, one per registered
Task or Process. Each tuple consists of elements `(name,
session, op_info)`:
- `name`: The name of the operation.
- `session`: dict with OpSession.encode(() info for the
active or most recent session. If no such session exists
the result will have member 'status' set to 'no_history'.
- `op_info`: information registered about the operation,
such as `op_type`, `docstring` and `blocking`.
"""
result = []
for k, v in sorted(parent.items()):
session = self.sessions.get(k)
for name, op_info in sorted(parent.items()):
session = self.sessions.get(name)
if session is None:
session = {'op_name': k, 'status': 'no_history'}
session = {'op_name': name, 'status': 'no_history'}
else:
session = session.encoded()
result.append((k, session, v.encoded()))
result.append((name, session, op_info.encoded()))
return result

def my_management_handler(self, q, **kwargs):
def _management_handler(self, q, **kwargs):
"""Get a description of this Agent's API. This is for adaptive
clients (such as MatchedClient) to construct their interfaces.
Params
------
q : string
One of 'get_api', 'get_tasks', 'get_processes', 'get_feeds',
'get_agent_class'.
Returns
-------
api_description : dict
If the argument is 'get_api', then a dict with the following
entries is returned:
- 'agent_class': The class name of this agent.
- 'instance_hostname': The host name where the Agent is
running, as returned by socket.gethostname().
- 'instance_pid': The PID of the Agent interpreter session,
as returned by os.getpid().
- 'feeds': The list of encoded feed information, tuples
(feed_name, feed_info).
- 'processes': The list of Process api description info, as
returned by :func:`_gather_sessions`.
- 'tasks': The list of Task api description info, as
returned by :func:`_gather_sessions`.
Passing get_X will, for some values of X, return only that
subset of the full API; treat that as deprecated.
"""
if q == 'get_api':
return {
'tasks': self._gather_sessions(self.tasks),
'processes': self._gather_sessions(self.processes),
'agent_class': self.class_name,
'instance_hostname': socket.gethostname(),
'instance_pid': os.getpid(),
'feeds': [(k, v.encoded()) for k, v in self.feeds.items()],
'agent_class': self.class_name
'processes': self._gather_sessions(self.processes),
'tasks': self._gather_sessions(self.tasks),
}
if q == 'get_tasks':
return self._gather_sessions(self.tasks)
Expand Down Expand Up @@ -442,15 +482,34 @@ def register_feed(self, feed_name, **kwargs):
self.feeds[feed_name] = ocs_feed.Feed(self, feed_name, **kwargs)
return self.feeds[feed_name]

def publish_to_feed(self, feed_name, message, from_reactor=False):
if feed_name not in self.feeds.keys():
def publish_to_feed(self, feed_name, message, from_reactor=None):
"""Publish data to named feed.
Args:
feed_name (str): should match the name of a registered feed.
message (serializable): data to publish. Acceptable format
depends on feed configuration; see Feed.publish_message.
from_reactor (bool or None): This is deprecated; the code
will check whether you're in a thread or not.
Notes:
If an unknown feed_name is passed in, an error is printed to
the log and that's all.
If you are running a "blocking" operation, in a thread, then
it is best if the message is not a persistent data structure
from your thread (especially something you might modify soon
after this call). The code will take a copy of your
structure and pass that to the reactor thread, but the copy
may not be deep enough!
"""
if feed_name not in self.feeds:
self.log.error("Feed {} is not registered.".format(feed_name))
return

if from_reactor:
self.feeds[feed_name].publish_message(message)
else:
reactor.callFromThread(self.feeds[feed_name].publish_message, message)
# We expect that publish_message will check threading context
# and do the right thing (as of this writing, it does).
self.feeds[feed_name].publish_message(message)

def subscribe(self, handler, topic, options=None, force_subscribe=False):
"""
Expand Down Expand Up @@ -753,9 +812,11 @@ def __init__(self, launcher, blocking=None):
self.docstring = launcher.__doc__

def encoded(self):
"""Dict of static info for API self-description."""
return {
'blocking': self.blocking,
'docstring': self.docstring,
'op_type': 'task',
}

class AgentProcess:
Expand All @@ -766,9 +827,11 @@ def __init__(self, launcher, stopper, blocking=None):
self.docstring = launcher.__doc__

def encoded(self):
"""Dict of static info for API self-description."""
return {
'blocking': self.blocking,
'docstring': self.docstring,
'op_type': 'process',
}


Expand Down
5 changes: 3 additions & 2 deletions ocs/ocs_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -102,12 +102,13 @@ def __init__(self, instance_id, **kwargs):

self._client = site_config.get_control_client(instance_id, **kwargs)
self.instance_id = instance_id
self._api = self._client.get_api()

for name, _, encoded in self._client.get_tasks():
for name, _, encoded in self._api['tasks']:
setattr(self, _opname_to_attr(name),
_get_op('task', name, encoded, self._client))

for name, _, encoded in self._client.get_processes():
for name, _, encoded in self._api['processes']:
setattr(self, _opname_to_attr(name),
_get_op('process', name, encoded, self._client))

Expand Down
24 changes: 14 additions & 10 deletions ocs/site_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -135,10 +135,6 @@ def from_dict(cls, data, parent=None):
self = cls()
self.parent = parent
self.binary = data.get('bin', shutil.which('crossbar'))
if self.binary is None:
raise RuntimeError("Unable to locate crossbar binary")
if not os.path.exists(self.binary):
raise FileNotFoundError(errno.ENOENT, os.strerror(errno.ENOENT), self.binary)
self.cbdir = data.get('config-dir')
if self.cbdir is None:
self.cbdir_args = []
Expand All @@ -147,6 +143,12 @@ def from_dict(cls, data, parent=None):
return self

def get_cmd(self, cmd):
if self.binary is None:
raise RuntimeError("Crossbar binary could not be found in PATH; "
"specify the binary in site_config?")
if not os.path.exists(self.binary):
raise RuntimeError("The crossbar binary specified in site_config "
"does not seem to exist: %s" % self.binary)
return [self.binary, cmd] + self.cbdir_args

def summary(self):
Expand Down Expand Up @@ -228,16 +230,17 @@ def from_dict(cls, data, parent=None):
``arguments`` (list, optional):
A list of arguments that should be passed back to the
agent. Each element of the list should be a pair of
strings, like ``['--option-name', 'value']``. This is not
as general as one might like, but is required in the
current scheme.
agent. Historically the arguments have been grouped into
into key value pairs, e.g. [['--key1', 'value'],
['--key2', 'value']] but these days whatever you passed in
gets flattened to a single list (i.e. that is equivalent
to ['--key1', 'value', '--key2', 'value'].
"""
self = cls()
self.parent = parent
self.data = data
self.arguments = self.data['arguments']
self.arguments = self.data.get('arguments', [])
return self


Expand Down Expand Up @@ -490,7 +493,8 @@ def get_config(args, agent_class=None):
dev, parent=host_config)
if instance_config is None and not no_dev_match:
raise RuntimeError("Could not find matching device description.")
return (site_config, host_config, instance_config)
return collections.namedtuple('SiteConfig', ['site', 'host', 'instance'])\
(site_config, host_config, instance_config)


def add_site_attributes(args, site, host=None):
Expand Down
11 changes: 8 additions & 3 deletions tests/test_site_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,9 +52,14 @@ def test_missing_crossbar(self):
config = CrossbarConfig.from_dict({})
assert config.binary == "someplace/bin/crossbar"

# CrossbarConfig should only raise errors if someone tries to
# use the invalid config.
crossbar_not_found = MagicMock(return_value=None)
with patch("shutil.which", crossbar_not_found), pytest.raises(RuntimeError):
with patch("shutil.which", crossbar_not_found):
config = CrossbarConfig.from_dict({})
with pytest.raises(RuntimeError):
config.get_cmd('start')

with pytest.raises(FileNotFoundError):
config = CrossbarConfig.from_dict({"bin": "not/a/valid/path/to/crossbar"})
config = CrossbarConfig.from_dict({"bin": "not/a/valid/path/to/crossbar"})
with pytest.raises(RuntimeError):
config.get_cmd('start')
24 changes: 16 additions & 8 deletions tests/util.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
from unittest.mock import MagicMock

import os

def fake_get_control_client(instance_id, **kwargs):
"""Quick function to return a Client like you'd expect from
Expand All @@ -9,12 +9,20 @@ def fake_get_control_client(instance_id, **kwargs):
"""
encoded_op = {'blocking': True,
'docstring': 'Example docstring'}
client = MagicMock()
client.get_tasks = MagicMock(return_value=([('task_name',
None,
encoded_op)]))
client.get_processes = MagicMock(return_value=([('process_name',
None,
encoded_op)]))
client = MagicMock() # mocks client_http.ControlClient

# The api structure is defined by OCSAgent._management_handler
api = {
'tasks': [('task_name', None, encoded_op)],
'processes': [('process_name', None, encoded_op)],
'feeds': [],
'agent_class': 'Mock',
'instance_hostname': 'testhost',
'instance_pid': os.getpid(),
}

client.get_tasks = MagicMock(return_value=api['tasks'])
client.get_processes = MagicMock(return_value=api['processes'])
client.get_api = MagicMock(return_value=api)

return client

0 comments on commit e1d89f0

Please sign in to comment.