Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Derive InfluxDB database and measurement from dev_id and signal data from all gateways #2

Merged
merged 20 commits into from Jan 28, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Expand Up @@ -4,3 +4,4 @@
.venv*
/dist
/var
/build
20 changes: 14 additions & 6 deletions README.rst
Expand Up @@ -14,16 +14,25 @@ TTN (`The Things Network`_) is building a global open LoRaWAN™ network.
********
Synopsis
********
The ``ttnlogger`` program can be invoked in two ways. Either it obtains four
positional arguments on the command line or it obtains the four parameters
from respective environment variables.
The ``ttnlogger`` program can be invoked in two ways. Either it obtains the TTN
credentials as well as InfluxDB database and measurement as four command-line options
or it obtains the parameters from respective environment variables.

With the ``-s`` option the InfluxDB agent database as well as the measurement are being
derived from the TTN ``dev_id`` while ``-d`` and ``-m`` are being ignored.
In case of using it within the Hiveeyes.org environment please follow this scheme for the ``dev_id``:

``hiveeyes-USER-LOCATION-NAMEOFHIVE``

and replace upper case strings with your individual lower case namings without additional dashes)

The ``-n`` option disables saving geolocations of receiving gateways into the database.

Command line arguments
----------------------
::

ttnlogger "<ttn_app_id>" "<ttn_access_key>" "<influxdb_database>" "<influxdb_measurement>"
ttnlogger -i "<ttn_app_id>" -k "<ttn_access_key>" [-s] [-d "<influxdb_database>"] [-m "<influxdb_measurement>"]


Environment variables
Expand All @@ -34,7 +43,6 @@ Environment variables
export TTN_ACCESS_KEY="ttn-account-v2.UcOZ3_gRRVbzsJ1lR7WfuINLN_DKIlc9oKvgukHPGck"
export INFLUXDB_DATABASE="test"
export INFLUXDB_MEASUREMENT="data"

ttnlogger


Expand All @@ -49,7 +57,7 @@ Setup
python setup.py develop


Run as systemd unit::
Run as systemd unit ()::

cp etc/default/ttnlogger /etc/default/
ln -sr etc/systemd/ttnlogger.service /usr/lib/systemd/system/
Expand Down
2 changes: 1 addition & 1 deletion setup.py
Expand Up @@ -20,7 +20,7 @@
}

setup(name='ttnlogger',
version='0.1.0',
version='0.2.0',
description='Converge TTN messages into InfluxDB and MongoDB and display in Grafana',
long_description=README,
license="AGPL 3, EUPL 1.2",
Expand Down
98 changes: 78 additions & 20 deletions ttnlogger/ttn_logger.py
Expand Up @@ -8,24 +8,42 @@
import json
from influxdb import InfluxDBClient
from collections import OrderedDict
import argparse

from ttnlogger.util import convert_floats


class TTNDatenpumpe:

def __init__(self, ttn=None, influxdb=None, mongodb=None):
def __init__(self, ttn=None, influxdb=None, mongodb=None, nogeo=False):
self.ttn = ttn
self.influxdb = influxdb
self.mongodb = mongodb
self.nogeo = nogeo

def on_receive(self, message=None, client=None):
#print('on_receive:', message, client)

if self.influxdb is None:
influxdb_env = message.dev_id.split('-')
influxdb_database = '_'.join(influxdb_env[:2])
influxdb_measurement = '_'.join(influxdb_env[2:])

print('influxdb_database :', influxdb_database)
print('influxdb_measurement :', influxdb_measurement)

self.influxdb = InfluxDatabase(database=influxdb_database, measurement=influxdb_measurement, nogeo=self.nogeo)

try:
self.influxdb.store(message)
except Exception as ex:
print('ERROR:', ex)

try:
del self.influxdb
except Exception as ex:
print('ERROR: Could not delete InfluxDB object', ex)

def enable(self):
self.ttn.receive_callback = self.on_receive
while True:
Expand Down Expand Up @@ -74,13 +92,14 @@ def uplink_callback(self, msg, client):

class InfluxDatabase:

def __init__(self, database='ttnlogger', measurement='data'):
def __init__(self, database='ttnlogger', measurement='data', nogeo=False):

assert database, 'Database name missing or empty'
assert measurement, 'Measurement name missing or empty'

self.database = database
self.measurement = measurement
self.nogeo = nogeo
self.connect()

def connect(self):
Expand All @@ -101,15 +120,35 @@ def store(self, ttn_message):
data[field] = value

# Pick up telemetry values from gateway information.
gw0 = ttn_message.metadata.gateways[0]
data['gw_rssi'] = float(gw0.rssi)
data['gw_snr'] = float(gw0.snr)
data['gw_latitude'] = float(gw0.latitude)
data['gw_longitude'] = float(gw0.longitude)
data['gw_altitude'] = float(gw0.altitude)
num_gtws = len(ttn_message.metadata.gateways)
print('Message received from ' + str(num_gtws) + ' gateway(s)')

for i in range(num_gtws):
gtw_id = ttn_message.metadata.gateways[i].gtw_id
print('Gateway ' + str(i+1) + ' ID : ' + gtw_id)

# signal quality
key_rssi = 'gw_' + gtw_id + '_rssi'
key_snr = 'gw_' + gtw_id + '_snr'
data[key_rssi] = int(ttn_message.metadata.gateways[i].rssi)
data[key_snr] = float(ttn_message.metadata.gateways[i].snr)

# gateway location
if not self.nogeo:
key_lat = 'gw_' + gtw_id + '_lat'
key_lon = 'gw_' + gtw_id + '_lon'
key_alt = 'gw_' + gtw_id + '_alt'
data[key_lat] = float(ttn_message.metadata.gateways[i].latitude)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

key_lat might be undefined here.

data[key_lon] = float(ttn_message.metadata.gateways[i].longitude)
data[key_alt] = float(ttn_message.metadata.gateways[i].altitude)

data['sf'] = int(ttn_message.metadata.data_rate.split('BW')[0][2:])
data['bw'] = int(ttn_message.metadata.data_rate.split('BW')[1])

data['gtw_count'] = int(num_gtws)

# Convert all numeric values to floats.
data = convert_floats(data)
data = convert_floats(data, integers=["gtw_count", "sf", "bw"])

# Add application and device id as tags.
tags = OrderedDict()
Expand All @@ -127,19 +166,38 @@ def store(self, ttn_message):


def run():
try:
ttn_app_id = os.getenv('TTN_APP_ID') or sys.argv[1]
ttn_access_key = os.getenv('TTN_ACCESS_KEY') or sys.argv[2]
influxdb_database = os.getenv('INFLUXDB_DATABASE') or sys.argv[3]
influxdb_measurement = os.getenv('INFLUXDB_MEASUREMENT') or sys.argv[4]

except IndexError:
print('ERROR: Missing arguments. Either provide them using '
'environment variables or as positional arguments.')
parser = argparse.ArgumentParser(description='Subscribe to TTN MQTT topic and write data to InfluxDB')
parser.add_argument('-i', '--ttn_app_id', dest='ttn_app_id', action='store', required=True)
parser.add_argument('-k', '--ttn_access_key', dest='ttn_access_key', action='store', required=True)
parser.add_argument('-s', '--split-topic', dest='split', action='store_true', default=False, help='Get InfluxDB database and measurement from MQTT topic split. If not set provide with --database and --measurement')
parser.add_argument('-d', '--database', dest='influxdb_database', action='store', help='InfluxDB database')
parser.add_argument('-m', '--measurement', dest='influxdb_measurement', action='store', help='InfluxDB measurement')
parser.add_argument('-n', '--no-gw-location', dest='nogeo', action='store_true', default=False, help='Discard geolocation of receiving gateways. Default is False')

options = parser.parse_args()

ttn_app_id = os.getenv('TTN_APP_ID') or options.ttn_app_id
ttn_access_key = os.getenv('TTN_ACCESS_KEY') or options.ttn_access_key
influxdb_database = os.getenv('INFLUXDB_DATABASE') or options.influxdb_database
influxdb_measurement = os.getenv('INFLUXDB_MEASUREMENT') or options.influxdb_measurement

# print('TTN_APP_ID : ', ttn_app_id)
# print('TTN_ACCESS_KEY : ', ttn_access_key)
# print('Split topic : ', options.split)
# print('InfluxDB DB : ', influxdb_database)
# print('InfluxDB MEAS : ', influxdb_measurement)
# print('GW LOC : ', options.nogeo)

if options.split is False and ( influxdb_database is None or influxdb_measurement is None ):
parser.error('Missing -s requires --database and --measurement options. See -h for help.')
sys.exit(1)

ttn = TTNClient(ttn_app_id, ttn_access_key)
influxdb = InfluxDatabase(database=influxdb_database, measurement=influxdb_measurement)

datenpumpe = TTNDatenpumpe(ttn, influxdb)
if options.split is True:
datenpumpe = TTNDatenpumpe(ttn, nogeo=options.nogeo)
else:
influxdb = InfluxDatabase(database=influxdb_database, measurement=influxdb_measurement, nogeo=options.nogeo)
datenpumpe = TTNDatenpumpe(ttn, influxdb)

datenpumpe.enable()
2 changes: 1 addition & 1 deletion ttnlogger/util.py
Expand Up @@ -11,7 +11,7 @@ def convert_floats(data, integers=None):
"""
integers = integers or []
delete_keys = []
for key, value in data.iteritems():
for key, value in data.items():
try:
if isinstance(value, datetime.datetime):
continue
Expand Down