Skip to content

Commit

Permalink
Merge pull request #160 from simonsobs/develop
Browse files Browse the repository at this point in the history
Prepare release v0.7.0
  • Loading branch information
BrianJKoopman committed Aug 4, 2020
2 parents d7d89b3 + 336da8f commit c8c7979
Show file tree
Hide file tree
Showing 15 changed files with 696 additions and 67 deletions.
19 changes: 18 additions & 1 deletion .github/workflows/develop.yml
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,12 @@ jobs:
# Fetch all history for all tags and branches
with:
fetch-depth: 0
- name: Set up Python 3.8
uses: actions/setup-python@v2
with:
python-version: 3.8

# Test
# Test (steps from pytest workflow)
- name: Build docker images
run: |
docker-compose build
Expand All @@ -23,6 +27,19 @@ jobs:
run: |
docker run -v $PWD:/coverage --rm ocs sh -c "COVERAGE_FILE=/coverage/.coverage.docker python3 -m pytest -p no:wampy --cov /app/ocs/ocs/ ./tests/"
- name: Report test coverage
env:
GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }}
run: |
pip install coveralls
coverage combine
coverage report
coveralls
- name: Test documentation build
run: |
docker run --rm ocs sh -c "make -C docs/ html"
# Dockerize
- name: Build and push development docker image
env:
Expand Down
11 changes: 10 additions & 1 deletion .github/workflows/official-docker-images.yml
Original file line number Diff line number Diff line change
Expand Up @@ -13,16 +13,25 @@ jobs:
# Fetch all history for all tags and branches
with:
fetch-depth: 0
- name: Set up Python 3.8
uses: actions/setup-python@v2
with:
python-version: 3.8

# Test
# Build
- name: Build docker images
run: |
docker-compose build
# Test (already been run by pytest workflow, but they don't take long...)
- name: Test with pytest wtihin a docker container
run: |
docker run -v $PWD:/coverage --rm ocs sh -c "COVERAGE_FILE=/coverage/.coverage.docker python3 -m pytest -p no:wampy --cov /app/ocs/ocs/ ./tests/"
- name: Test documentation build
run: |
docker run --rm ocs sh -c "make -C docs/ html"
# Dockerize
- name: Build and push official docker image
env:
Expand Down
49 changes: 49 additions & 0 deletions .github/workflows/pytest.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
# This workflow will install Python dependencies, run tests and lint with a single version of Python
# For more information see: https://help.github.com/actions/language-and-framework-guides/using-python-with-github-actions

name: Run Tests

on:
push:
branches-ignore: [ develop ]
pull_request:

jobs:
build:
runs-on: ubuntu-18.04

steps:
- uses: actions/checkout@v2
- name: Set up Python 3.8
uses: actions/setup-python@v2
with:
python-version: 3.8

- name: Build docker images
run: |
docker-compose build
# I like this idea, but let's hold off for now.
#- name: Lint with flake8
# run: |
# # stop the build if there are Python syntax errors or undefined names
# flake8 . --count --select=E9,F63,F7,F82 --show-source --statistics
# # exit-zero treats all errors as warnings. The GitHub editor is 127 chars wide
# flake8 . --count --exit-zero --max-complexity=10 --max-line-length=127 --statistics

- name: Test with pytest wtihin a docker container
run: |
docker run -v $PWD:/coverage --rm ocs sh -c "COVERAGE_FILE=/coverage/.coverage.docker python3 -m pytest -p no:wampy --cov /app/ocs/ocs/ ./tests/"
- name: Report test coverage
env:
GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }}
run: |
pip install coveralls
coverage combine
coverage report
coveralls
- name: Test documentation build
run: |
docker run --rm ocs sh -c "make -C docs/ html"
3 changes: 3 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
# local sqlite db's
*.db

# iPython Notebooks
*.ipynb

Expand Down
35 changes: 0 additions & 35 deletions .travis.yml

This file was deleted.

2 changes: 1 addition & 1 deletion Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
# A container setup with an installation of ocs.

# Use ubuntu base image
FROM simonsobs/so3g:v0.0.8
FROM simonsobs/so3g:v0.1.0

# Set locale
ENV LANG C.UTF-8
Expand Down
5 changes: 3 additions & 2 deletions README.rst
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,9 @@
OCS - Observatory Control System
================================

.. image:: https://travis-ci.com/simonsobs/ocs.svg?branch=master
:target: https://travis-ci.com/simonsobs/ocs
.. image:: https://img.shields.io/github/workflow/status/simonsobs/ocs/Build%20Develop%20Images
:target: https://github.com/simonsobs/ocs/actions?query=workflow%3A%22Build+Develop+Images%22
:alt: GitHub Workflow Status

.. image:: https://readthedocs.org/projects/ocs/badge/?version=latest
:target: https://ocs.readthedocs.io/en/latest/?badge=latest
Expand Down
55 changes: 36 additions & 19 deletions agents/influxdb_publisher/influxdb_publisher.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
import time
import datetime
import os
import queue
import argparse
import txaio
Expand All @@ -16,6 +15,7 @@
txaio.use_twisted()
LOG = txaio.make_logger()


def timestamp2influxtime(time):
"""Convert timestamp for influx
Expand All @@ -27,6 +27,7 @@ def timestamp2influxtime(time):
t_dt = datetime.datetime.fromtimestamp(time)
return t_dt.strftime("%Y-%m-%dT%H:%M:%S.%f")


class Publisher:
"""
Data publisher. This manages data to be published to the InfluxDB.
Expand All @@ -39,6 +40,8 @@ class Publisher:
A thread-safe queue of (data, feed) pairs.
host (str):
host for InfluxDB instance.
database (str):
database name within InfluxDB to publish to
port (int, optional):
port for InfluxDB instance, defaults to 8086.
Expand All @@ -47,27 +50,38 @@ class Publisher:
host for InfluxDB instance.
port (int, optional):
port for InfluxDB instance, defaults to 8086.
db (str):
database name within InfluxDB to publish to (from database arg)
incoming_data:
data to be published
client:
InfluxDB client connection
"""
def __init__(self, host, incoming_data, port=8086):
def __init__(self, host, database, incoming_data, port=8086):
self.host = host
self.port = port
self.db = database
self.incoming_data = incoming_data

self.client = InfluxDBClient(host=self.host, port=self.port)

db_list = self.client.get_list_database()
db_list = None
# ConnectionError here is indicative of InfluxDB being down
while db_list is None:
try:
db_list = self.client.get_list_database()
except RequestsConnectionError:
LOG.error("Connection error, attempting to reconnect to DB.")
self.client = InfluxDBClient(host=self.host, port=self.port)
time.sleep(1)
db_names = [x['name'] for x in db_list]
if 'ocs_feeds' not in db_names:
print("ocs_feeds DB doesn't exist, creating DB")
self.client.create_database('ocs_feeds')
self.client.switch_database('ocs_feeds')

if self.db not in db_names:
print(f"{self.db} DB doesn't exist, creating DB")
self.client.create_database(self.db)

self.client.switch_database(self.db)

def process_incoming_data(self):
"""
Expand All @@ -78,8 +92,6 @@ def process_incoming_data(self):
data, feed = self.incoming_data.get()

LOG.debug("Pulling data from queue.")
#LOG.debug("data: {d}", d=data)
#LOG.debug("feed: {f}", f=feed)

# Formatted for writing to InfluxDB
payload = self.format_data(data, feed)
Expand All @@ -89,7 +101,7 @@ def process_incoming_data(self):
except RequestsConnectionError:
LOG.error("InfluxDB unavailable, attempting to reconnect.")
self.client = InfluxDBClient(host=self.host, port=self.port)
self.client.switch_database('ocs_feeds')
self.client.switch_database(self.db)
except InfluxDBClientError as err:
LOG.error("InfluxDB Client Error: {e}", e=err)

Expand Down Expand Up @@ -128,13 +140,13 @@ def format_data(self, data, feed):
grouped_dict[data_key] = data_value[i]
grouped_data_points.append(grouped_dict)

for fields, time in zip(grouped_data_points, times):
for fields, time_ in zip(grouped_data_points, times):
json_body.append(
{
"measurement": measurement,
"time": timestamp2influxtime(time),
"time": timestamp2influxtime(time_),
"fields": fields,
"tags" : {
"tags": {
"feed": feed_tag
}
}
Expand All @@ -160,7 +172,8 @@ class InfluxDBAgent:
"""
This class provide a WAMP wrapper for the data publisher. The run function
and the data handler **are** thread-safe, as long as multiple run functions
are not started at the same time, which should be prevented through OCSAgent.
are not started at the same time, which should be prevented through
OCSAgent.
Args:
agent (OCSAgent):
Expand All @@ -174,8 +187,8 @@ class InfluxDBAgent:
aggregate (bool):
Specifies if the agent is currently aggregating data.
incoming_data (queue.Queue):
Thread-safe queue where incoming (data, feed) pairs are stored before
being passed to the Publisher.
Thread-safe queue where incoming (data, feed) pairs are stored
before being passed to the Publisher.
loop_time (float):
Time between iterations of the run loop.
"""
Expand Down Expand Up @@ -223,7 +236,8 @@ def start_aggregate(self, session: ocs_agent.OpSession, params=None):
self.aggregate = True

LOG.debug("Instatiating Publisher class")
publisher = Publisher(self.args.host, self.incoming_data, port=self.args.port)
publisher = Publisher(self.args.host, self.args.database,
self.incoming_data, port=self.args.port)

session.set_status('running')
while self.aggregate:
Expand Down Expand Up @@ -255,6 +269,9 @@ def make_parser(parser=None):
pgroup.add_argument('--port',
default=8086,
help="InfluxDB port.")
pgroup.add_argument('--database',
default='ocs_feeds',
help="Database within InfluxDB to publish data to.")

return parser

Expand Down

0 comments on commit c8c7979

Please sign in to comment.