Skip to content

Commit

Permalink
Merge pull request #191 from simonsobs/influxdb-server-error-handling
Browse files Browse the repository at this point in the history
InfluxDB ServerError handling, line protocol, and gzip support
  • Loading branch information
BrianJKoopman committed Feb 15, 2021
2 parents 1162576 + 3098ba7 commit c91718a
Show file tree
Hide file tree
Showing 2 changed files with 84 additions and 23 deletions.
97 changes: 74 additions & 23 deletions agents/influxdb_publisher/influxdb_publisher.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@

from os import environ
from influxdb import InfluxDBClient
from influxdb.exceptions import InfluxDBClientError
from influxdb.exceptions import InfluxDBClientError, InfluxDBServerError
from requests.exceptions import ConnectionError as RequestsConnectionError

from ocs import ocs_agent, site_config
Expand All @@ -16,16 +16,22 @@
LOG = txaio.make_logger()


def timestamp2influxtime(time):
def timestamp2influxtime(time, protocol):
"""Convert timestamp for influx
Args:
time:
ctime timestamp
protocol:
'json' or line'
"""
t_dt = datetime.datetime.fromtimestamp(time)
return t_dt.strftime("%Y-%m-%dT%H:%M:%S.%f")
if protocol == 'json':
t_dt = datetime.datetime.fromtimestamp(time)
influx_t = t_dt.strftime("%Y-%m-%dT%H:%M:%S.%f")
elif protocol == 'line':
influx_t = int(time*1e9) # ns
return influx_t


class Publisher:
Expand All @@ -44,6 +50,10 @@ class Publisher:
database name within InfluxDB to publish to
port (int, optional):
port for InfluxDB instance, defaults to 8086.
protocol (str, optional):
Protocol for writing data. Either 'line' or 'json'.
gzip (bool, optional):
compress influxdb requsts with gzip
Attributes:
host (str):
Expand All @@ -58,13 +68,18 @@ class Publisher:
InfluxDB client connection
"""
def __init__(self, host, database, incoming_data, port=8086):
def __init__(self, host, database, incoming_data, port=8086, protocol='line', gzip=False):
self.host = host
self.port = port
self.db = database
self.incoming_data = incoming_data
self.protocol = protocol
self.gzip = gzip

self.client = InfluxDBClient(host=self.host, port=self.port)
print(f"gzip encoding enabled: {gzip}")
print(f"data protocol: {protocol}")

self.client = InfluxDBClient(host=self.host, port=self.port, gzip=gzip)

db_list = None
# ConnectionError here is indicative of InfluxDB being down
Expand All @@ -73,7 +88,7 @@ def __init__(self, host, database, incoming_data, port=8086):
db_list = self.client.get_list_database()
except RequestsConnectionError:
LOG.error("Connection error, attempting to reconnect to DB.")
self.client = InfluxDBClient(host=self.host, port=self.port)
self.client = InfluxDBClient(host=self.host, port=self.port, gzip=gzip)
time.sleep(1)
db_names = [x['name'] for x in db_list]

Expand All @@ -96,18 +111,23 @@ def process_incoming_data(self):
LOG.debug("Pulling data from queue.")

# Formatted for writing to InfluxDB
payload = self.format_data(data, feed)
payload = self.format_data(data, feed, protocol=self.protocol)
try:
self.client.write_points(payload)
self.client.write_points(payload,
batch_size=10000,
protocol=self.protocol,
)
LOG.debug("wrote payload to influx")
except RequestsConnectionError:
LOG.error("InfluxDB unavailable, attempting to reconnect.")
self.client = InfluxDBClient(host=self.host, port=self.port)
self.client = InfluxDBClient(host=self.host, port=self.port, gzip=self.gzip)
self.client.switch_database(self.db)
except InfluxDBClientError as err:
LOG.error("InfluxDB Client Error: {e}", e=err)
except InfluxDBServerError as err:
LOG.error("InfluxDB Server Error: {e}", e=err)

def format_data(self, data, feed):
def format_data(self, data, feed, protocol):
"""Format the data from an OCS feed into a dict for pushing to InfluxDB.
The scheme here is as follows:
Expand All @@ -124,6 +144,8 @@ def format_data(self, data, feed):
feed (dict):
feed from the OCS Feed subscription, contains feed information
used to structure our influxdb query
protocol (str):
Protocol for writing data. Either 'line' or 'json'.
"""
measurement = feed['agent_address']
Expand All @@ -143,16 +165,31 @@ def format_data(self, data, feed):
grouped_data_points.append(grouped_dict)

for fields, time_ in zip(grouped_data_points, times):
json_body.append(
{
"measurement": measurement,
"time": timestamp2influxtime(time_),
"fields": fields,
"tags": {
"feed": feed_tag
if protocol == 'line':
fields_line = []
for mk, mv in fields.items():
f_line = f"{mk}={mv}"
if isinstance(mv, int):
f_line += "i"
fields_line.append(f_line)

measurement_line = ','.join(fields_line)
t_line = timestamp2influxtime(time_, protocol='line')
line = f"{measurement},feed={feed_tag} {measurement_line} {t_line}"
json_body.append(line)
elif protocol == 'json':
json_body.append(
{
"measurement": measurement,
"time": timestamp2influxtime(time_, protocol='json'),
"fields": fields,
"tags": {
"feed": feed_tag
}
}
}
)
)
else:
self.log.warn(f"Protocol '{protocol}' not supported.")

LOG.debug("payload: {p}", p=json_body)

Expand Down Expand Up @@ -237,13 +274,19 @@ def start_aggregate(self, session: ocs_agent.OpSession, params=None):
session.set_status('starting')
self.aggregate = True

LOG.debug("Instatiating Publisher class")
publisher = Publisher(self.args.host, self.args.database,
self.incoming_data, port=self.args.port)
self.log.debug("Instatiating Publisher class")
publisher = Publisher(self.args.host,
self.args.database,
self.incoming_data,
port=self.args.port,
protocol=self.args.protocol,
gzip=self.args.gzip,
)

session.set_status('running')
while self.aggregate:
time.sleep(self.loop_time)
self.log.debug(f"Approx. queue size: {self.incoming_data.qsize()}")
publisher.run()

publisher.close()
Expand Down Expand Up @@ -274,6 +317,14 @@ def make_parser(parser=None):
pgroup.add_argument('--database',
default='ocs_feeds',
help="Database within InfluxDB to publish data to.")
pgroup.add_argument('--protocol',
default='line',
choices=['json', 'line'],
help="Protocol for writing data. Either 'line' or 'json'.")
pgroup.add_argument('--gzip',
type=bool,
default=False,
help="Use gzip content encoding to compress requests.")

return parser

Expand Down
10 changes: 10 additions & 0 deletions docs/agents/influxdb_publisher.rst
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ Add an InfluxDBAgent to your OCS configuration file::
'arguments': [['--initial-state', 'record'],
['--host', 'influxdb'],
['--port', 8086],
['--protocol', 'line'],
['--gzip', True],
['--database', 'ocs_feeds']]},

docker-compose Configuration
Expand Down Expand Up @@ -96,3 +98,11 @@ the SELECT query.
For more information about using InfluxDB in Grafana, see the `Grafana Documentation`_.

.. _`Grafana Documentation`: https://grafana.com/docs/features/datasources/influxdb/

API
---

Publisher
`````````
.. autoclass:: agents.influxdb_publisher.influxdb_publisher.Publisher
:members:

0 comments on commit c91718a

Please sign in to comment.