Skip to content

Commit

Permalink
Merge pull request #204 from simonsobs/develop
Browse files Browse the repository at this point in the history
Release v0.8.0
  • Loading branch information
BrianJKoopman committed May 12, 2021
2 parents d8dcb41 + 17767b2 commit e1a813e
Show file tree
Hide file tree
Showing 35 changed files with 607 additions and 426 deletions.
4 changes: 2 additions & 2 deletions .github/workflows/develop.yml
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ jobs:
- name: Test with pytest within a docker container
run: |
docker run -v $PWD:/coverage --rm ocs sh -c "COVERAGE_FILE=/coverage/.coverage.docker python3 -m pytest -p no:wampy -m 'not integtest' --cov /app/ocs/ocs/ ./tests/"
docker run -v $PWD:/coverage --rm ocs sh -c "COVERAGE_FILE=/coverage/.coverage.docker python3 -m pytest -m 'not integtest' --cov /app/ocs/ocs/ ./tests/"
- name: Report test coverage
env:
Expand All @@ -34,7 +34,7 @@ jobs:
pip install coveralls
coverage combine
coverage report
coveralls
coveralls --service=github
- name: Test documentation build
run: |
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/official-docker-images.yml
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ jobs:
# Test (already been run by pytest workflow, but they don't take long...)
- name: Test with pytest wtihin a docker container
run: |
docker run -v $PWD:/coverage --rm ocs sh -c "COVERAGE_FILE=/coverage/.coverage.docker python3 -m pytest -p no:wampy -m 'not integtest' --cov /app/ocs/ocs/ ./tests/"
docker run -v $PWD:/coverage --rm ocs sh -c "COVERAGE_FILE=/coverage/.coverage.docker python3 -m pytest -m 'not integtest' --cov /app/ocs/ocs/ ./tests/"
- name: Test documentation build
run: |
Expand Down
4 changes: 2 additions & 2 deletions .github/workflows/pytest.yml
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ jobs:

- name: Test with pytest wtihin a docker container
run: |
docker run -v $PWD:/coverage --rm ocs sh -c "COVERAGE_FILE=/coverage/.coverage.docker python3 -m pytest -p no:wampy -m 'not integtest' --cov /app/ocs/ocs/ ./tests/"
docker run -v $PWD:/coverage --rm ocs sh -c "COVERAGE_FILE=/coverage/.coverage.docker python3 -m pytest -m 'not integtest' --cov /app/ocs/ocs/ ./tests/"
- name: Report test coverage
env:
Expand All @@ -42,7 +42,7 @@ jobs:
pip install coveralls
coverage combine
coverage report
coveralls
coveralls --service=github
- name: Test documentation build
run: |
Expand Down
2 changes: 1 addition & 1 deletion Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
# A container setup with an installation of ocs.

# Use ubuntu base image
FROM simonsobs/so3g:v0.1.0
FROM simonsobs/so3g:v0.1.0-62-g55ad726

# Set locale
ENV LANG C.UTF-8
Expand Down
180 changes: 20 additions & 160 deletions agents/influxdb_publisher/influxdb_publisher.py
Original file line number Diff line number Diff line change
@@ -1,173 +1,18 @@
import time
import datetime
import queue
import argparse
import txaio

from os import environ
from influxdb import InfluxDBClient
from influxdb.exceptions import InfluxDBClientError
from requests.exceptions import ConnectionError as RequestsConnectionError

from ocs import ocs_agent, site_config
from ocs.agent.influxdb_publisher import Publisher

# For logging
txaio.use_twisted()
LOG = txaio.make_logger()


def timestamp2influxtime(time):
"""Convert timestamp for influx
Args:
time:
ctime timestamp
"""
t_dt = datetime.datetime.fromtimestamp(time)
return t_dt.strftime("%Y-%m-%dT%H:%M:%S.%f")


class Publisher:
"""
Data publisher. This manages data to be published to the InfluxDB.
This class should only be accessed by a single thread. Data can be passed
to it by appending it to the referenced `incoming_data` queue.
Args:
incoming_data (queue.Queue):
A thread-safe queue of (data, feed) pairs.
host (str):
host for InfluxDB instance.
database (str):
database name within InfluxDB to publish to
port (int, optional):
port for InfluxDB instance, defaults to 8086.
Attributes:
host (str):
host for InfluxDB instance.
port (int, optional):
port for InfluxDB instance, defaults to 8086.
db (str):
database name within InfluxDB to publish to (from database arg)
incoming_data:
data to be published
client:
InfluxDB client connection
"""
def __init__(self, host, database, incoming_data, port=8086):
self.host = host
self.port = port
self.db = database
self.incoming_data = incoming_data

self.client = InfluxDBClient(host=self.host, port=self.port)

db_list = None
# ConnectionError here is indicative of InfluxDB being down
while db_list is None:
try:
db_list = self.client.get_list_database()
except RequestsConnectionError:
LOG.error("Connection error, attempting to reconnect to DB.")
self.client = InfluxDBClient(host=self.host, port=self.port)
time.sleep(1)
db_names = [x['name'] for x in db_list]

if self.db not in db_names:
print(f"{self.db} DB doesn't exist, creating DB")
self.client.create_database(self.db)

self.client.switch_database(self.db)

def process_incoming_data(self):
"""
Takes all data from the incoming_data queue, and puts them into
provider blocks.
"""
while not self.incoming_data.empty():
data, feed = self.incoming_data.get()

LOG.debug("Pulling data from queue.")

# Formatted for writing to InfluxDB
payload = self.format_data(data, feed)
try:
self.client.write_points(payload)
LOG.debug("wrote payload to influx")
except RequestsConnectionError:
LOG.error("InfluxDB unavailable, attempting to reconnect.")
self.client = InfluxDBClient(host=self.host, port=self.port)
self.client.switch_database(self.db)
except InfluxDBClientError as err:
LOG.error("InfluxDB Client Error: {e}", e=err)

def format_data(self, data, feed):
"""Format the data from an OCS feed into a dict for pushing to InfluxDB.
The scheme here is as follows:
- agent_address is the "measurement" (conceptually like an SQL
table)
- feed names are an indexed "tag" on the data structure
(effectively a table column)
- keys within an OCS block's 'data' dictionary are the field names
(effectively a table column)
Args:
data (dict):
data from the OCS Feed subscription
feed (dict):
feed from the OCS Feed subscription, contains feed information
used to structure our influxdb query
"""
measurement = feed['agent_address']
feed_tag = feed['feed_name']

json_body = []

# Reshape data for query
for bk, bv in data.items():
grouped_data_points = []
times = bv['timestamps']
num_points = len(bv['timestamps'])
for i in range(num_points):
grouped_dict = {}
for data_key, data_value in bv['data'].items():
grouped_dict[data_key] = data_value[i]
grouped_data_points.append(grouped_dict)

for fields, time_ in zip(grouped_data_points, times):
json_body.append(
{
"measurement": measurement,
"time": timestamp2influxtime(time_),
"fields": fields,
"tags": {
"feed": feed_tag
}
}
)

LOG.debug("payload: {p}", p=json_body)

return json_body

def run(self):
"""Main run iterator for the publisher. This processes all incoming
data, removes stale providers, and writes active providers to disk.
"""
self.process_incoming_data()

def close(self):
"""Flushes all remaining data and closes InfluxDB connection."""
pass


class InfluxDBAgent:
"""
This class provide a WAMP wrapper for the data publisher. The run function
Expand Down Expand Up @@ -235,13 +80,19 @@ def start_aggregate(self, session: ocs_agent.OpSession, params=None):
session.set_status('starting')
self.aggregate = True

LOG.debug("Instatiating Publisher class")
publisher = Publisher(self.args.host, self.args.database,
self.incoming_data, port=self.args.port)
self.log.debug("Instatiating Publisher class")
publisher = Publisher(self.args.host,
self.args.database,
self.incoming_data,
port=self.args.port,
protocol=self.args.protocol,
gzip=self.args.gzip,
)

session.set_status('running')
while self.aggregate:
time.sleep(self.loop_time)
self.log.debug(f"Approx. queue size: {self.incoming_data.qsize()}")
publisher.run()

publisher.close()
Expand All @@ -261,7 +112,7 @@ def make_parser(parser=None):
pgroup = parser.add_argument_group('Agent Options')
pgroup.add_argument('--initial-state',
default='record', choices=['idle', 'record'],
help="Initial state of argument parser. Can be either"
help="Initial state of argument parser. Can be either "
"idle or record")
pgroup.add_argument('--host',
default='influxdb',
Expand All @@ -272,6 +123,15 @@ def make_parser(parser=None):
pgroup.add_argument('--database',
default='ocs_feeds',
help="Database within InfluxDB to publish data to.")
pgroup.add_argument('--protocol',
default='line',
choices=['json', 'line'],
help="Protocol for writing data. Either 'line' or "
"'json'.")
pgroup.add_argument('--gzip',
type=bool,
default=False,
help="Use gzip content encoding to compress requests.")

return parser

Expand Down
54 changes: 47 additions & 7 deletions agents/registry/registry.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,12 @@
from ocs import ocs_agent, site_config
from ocs.base import OpCode
import time
from twisted.internet.defer import inlineCallbacks
from autobahn.twisted.util import sleep as dsleep
from collections import defaultdict
from ocs.ocs_feed import Feed

from ocs.agent.aggregator import Provider

class RegisteredAgent:
"""
Expand All @@ -17,26 +20,37 @@ class RegisteredAgent:
is not expired.
last_updated (float):
ctime at which the agent was last updated
op_codes (dict):
Dictionary of operation codes for each of the agent's
operations. For details on what the operation codes mean, see
docs from the ``ocs_agent`` module
"""
def __init__(self):
self.expired = False
self.time_expired = None
self.last_updated = time.time()
self.op_codes = {}

def refresh(self):
def refresh(self, op_codes=None):
self.expired = False
self.time_expired = None
self.last_updated = time.time()

if op_codes:
self.op_codes.update(op_codes)

def expire(self):
self.expired = True
self.time_expired = time.time()
for k in self.op_codes:
self.op_codes[k] = OpCode.EXPIRED.value

def encoded(self):
return {
'expired': self.expired,
'time_expired': self.time_expired,
'last_updated': self.last_updated
'last_updated': self.last_updated,
'op_codes': self.op_codes,
}


Expand Down Expand Up @@ -76,13 +90,20 @@ def __init__(self, agent):
options={'match': 'wildcard'}
)

agg_params = {
'frame_length': 60,
'fresh_time': 10,
}
self.agent.register_feed('agent_operations', record=True,
agg_params=agg_params, buffer_time=0)

def _register_heartbeat(self, _data):
"""
"""
Function that is called whenever a heartbeat is received from an agent.
It will update that agent in the Registry's registered_agent dict.
"""
data, feed = _data
self.registered_agents[feed['agent_address']].refresh()
op_codes, feed = _data
self.registered_agents[feed['agent_address']].refresh(op_codes=op_codes)

@inlineCallbacks
def main(self, session: ocs_agent.OpSession, params=None):
Expand All @@ -94,7 +115,7 @@ def main(self, session: ocs_agent.OpSession, params=None):
The session.data object for this process will be a dictionary containing
the encoded RegisteredAgent object for each agent observed during the
lifetime of the registry. For instance, this might look like
>>> session.data
{'observatory.aggregator':
{'expired': False,
Expand All @@ -119,11 +140,30 @@ def main(self, session: ocs_agent.OpSession, params=None):

for k, agent in self.registered_agents.items():
if time.time() - agent.last_updated > self.agent_timeout:
agent.expire()
agent.expire()

session.data = {
k: agent.encoded() for k, agent in self.registered_agents.items()
}

for addr, agent in self.registered_agents.items():
msg = { 'block_name': addr,
'timestamp': time.time(),
'data': {}}
for op_name, op_code in agent.op_codes.items():
field = f'{addr}_{op_name}'
field = field.replace('.', '_')
field = field.replace('-', '_')
field = Provider._enforce_field_name_rules(field)
try:
Feed.verify_data_field_string(field)
except ValueError as e:
self.log.warn(f"Improper field name: {field}\n{e}")
continue
msg['data'][field] = op_code
if msg['data']:
self.agent.publish_to_feed('agent_operations', msg)

return True, "Stopped registry main process"

def stop(self, session, params=None):
Expand Down

0 comments on commit e1a813e

Please sign in to comment.