Skip to content

Commit

Permalink
restart influx client after drops, minimize logging
Browse files Browse the repository at this point in the history
  • Loading branch information
ryan-williams committed Nov 21, 2018
1 parent fcfc572 commit 65ede40
Showing 1 changed file with 53 additions and 31 deletions.
84 changes: 53 additions & 31 deletions read.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,18 +25,9 @@

from urllib.parse import urlparse

device = args.device
dry_run = args.dry_run

if not dry_run:
server = urlparse('tcp://%s' % args.server)
client = InfluxDBClient(
host = server.hostname,
port = server.port,
username = server.username,
password = server.password,
database = args.db
)

i2c = busio.I2C(board.SCL, board.SDA)

sensor = None
Expand All @@ -47,20 +38,14 @@
from adafruit_htu21d import HTU21D
sensor = HTU21D(i2c)

interval = args.interval
device = args.device

points = []

from queue import Empty, Queue
from threading import Thread

q = Queue()
points_sizes = Queue()
log_msgs = Queue()

running = True


def drain(q):
elems = []
while True:
Expand All @@ -77,6 +62,9 @@ def now_str(micros = False):


def sensor_reader():
interval = args.interval
since_error = 0
points = []
n = 0
while (not args.n or n < args.n):
now = now_str(micros = True)
Expand All @@ -94,9 +82,13 @@ def sensor_reader():
}
}

print("%d: time: %s, temp: %0.1f C, humidity: %0.1f" % (n, now, temp, humidity))
q.put(point)

if since_error < 60 or since_error % 10 == 0:
print("%d: %s %0.1f C %0.1f%%" % (n, now, temp, humidity))

since_error += 1

except OSError as e:
if e.errno == 121:
stderr.write('Failed to read from sensor! %s\n' % e)
Expand All @@ -109,48 +101,78 @@ def sensor_reader():


def influx_writer():
from influxdb.exceptions import InfluxDBServerError
from requests.exceptions import ConnectionError

interval = args.report_interval
backoff = 1.2
max_interval = 300
failing_since = None
server = urlparse('tcp://%s' % args.server)
def make_client():
return InfluxDBClient(
host = server.hostname,
port = server.port,
username = server.username,
password = server.password,
database = args.db
)
client = make_client()

while running:
points = drain(q)
if points:
try:
if client.write_points(points):
points_sizes.put(len(points))
log_msgs.put({ 'size': len(points) })
interval = args.report_interval
else:
if not failing_since:
failing_since = points[0].time
failing_since = points[0]['time']

stderr.write('%d failed points (influx library failure; since %s)' % (len(points), failing_since))
except Exception as e:
[ q.put(point) for point in points ]
if not failing_since:
failing_since = points[0].time
failing_since = points[0]['time']

stderr.write('%d failed points (since %s):\n%s' % (len(points), failing_since, e))

sleep(args.report_interval)
tpe = type(e)
if tpe == InfluxDBServerError or tpe == ConnectionError:
interval = min(max_interval, interval * backoff)
stderr.write('resetting client; new interval: %ds\n' % int(interval))
client.close()
client = make_client()

sleep(int(interval))


def points_size_hist_printer():
hist = {}
while running:
cur = {}
for size in drain(points_sizes):
if not size in cur:
cur[size] = 0
cur[size] += 1
if not size in hist:
hist[size] = 0
hist[size] += 1
msgs = drain(log_msgs)
n = 0
for msg in msgs:
if 'size' in msg:
size = msg['size']
n += size
if not size in cur:
cur[size] = 0
cur[size] += 1
if not size in hist:
hist[size] = 0
hist[size] += 1

def hist_str(hist):
items = list(hist.items())
items.sort(key=lambda t: t[0])
return ' '.join([ '%dx%d' % (k, v) for k, v in items ])

print(
'%s reported points per request: recent %s, all time %s' % (
now_str(),
'%d reported points per request: recent %s, all time %s' % (
n,
hist_str(cur),
hist_str(hist)
)
Expand Down

0 comments on commit 65ede40

Please sign in to comment.