Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[monitoring] Adding backend for influxdb2.x #274 #354

Draft
wants to merge 4 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ services:
volumes:
- influxdb-data:/var/lib/influxdb
ports:
- "8086:8086"
- "8085:8085"
environment:
INFLUXDB_DB: openwisp2
INFLUXDB_USER: openwisp
Expand Down
18 changes: 8 additions & 10 deletions openwisp_monitoring/db/backends/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,15 +9,8 @@

TIMESERIES_DB = getattr(settings, 'TIMESERIES_DATABASE', None)
if not TIMESERIES_DB:
TIMESERIES_DB = {
'BACKEND': 'openwisp_monitoring.db.backends.influxdb',
'USER': getattr(settings, 'INFLUXDB_USER', 'openwisp'),
'PASSWORD': getattr(settings, 'INFLUXDB_PASSWORD', 'openwisp'),
'NAME': getattr(settings, 'INFLUXDB_DATABASE', 'openwisp2'),
'HOST': getattr(settings, 'INFLUXDB_HOST', 'localhost'),
'PORT': getattr(settings, 'INFLUXDB_PORT', '8086'),
}
logger.warning(
'Timeseries database not set up in settings'
'The previous method to define Timeseries Database has been deprecated. Please refer to the docs:\n'
'https://github.com/openwisp/openwisp-monitoring#setup-integrate-in-an-existing-django-project'
)
Expand All @@ -30,11 +23,16 @@ def load_backend_module(backend_name=TIMESERIES_DB['BACKEND'], module=None):
"""
try:
assert 'BACKEND' in TIMESERIES_DB, 'BACKEND'
assert 'USER' in TIMESERIES_DB, 'USER'
assert 'PASSWORD' in TIMESERIES_DB, 'PASSWORD'
assert 'NAME' in TIMESERIES_DB, 'NAME'
assert 'HOST' in TIMESERIES_DB, 'HOST'
assert 'PORT' in TIMESERIES_DB, 'PORT'
backend = TIMESERIES_DB['BACKEND']
if backend == 'openwisp_monitoring.db.backends.influxdb':
assert 'USER' in TIMESERIES_DB, 'USER'
assert 'PASSWORD' in TIMESERIES_DB, 'PASSWORD'
else if backend == 'openwisp_monitoring.db.backends.influxdb2':
assert 'ORG' in TIMESERIES_DB, 'ORG'
assert 'TOKEN' in TIMESERIES_DB, 'TOKEN'
if module:
return import_module(f'{backend_name}.{module}')
else:
Expand Down
8 changes: 8 additions & 0 deletions openwisp_monitoring/db/backends/influxdb/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,14 @@ def write(self, name, values, **kwargs):
return
raise TimeseriesWriteException

def standardizeResult(self, resultObject):
"""
return the query result in the form of a list of dictionaries
to make all the back ends uniform
"""
#TODO
pass

def read(self, key, fields, tags, **kwargs):
extra_fields = kwargs.get('extra_fields')
since = kwargs.get('since')
Expand Down
Empty file.
135 changes: 135 additions & 0 deletions openwisp_monitoring/db/backends/influxdb2/client.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,135 @@
import logging
import operator
import re
from collections import OrderedDict
from datetime import datetime

from django.conf import settings
from django.core.exceptions import ValidationError
from django.utils.functional import cached_property
from django.utils.timezone import now
from django.utils.translation import gettext_lazy as _

from influxdb_client import InfluxDBClient, Point
from influxdb_client.client.exceptions import InfluxDBError, BucketRetentionRules
from influxdb_client.client.write_api import SYNCHRONOUS

from openwisp_monitoring.utils import retry

from ...exceptions import TimeseriesWriteException
from .. import TIMESERIES_DB

logger = logging.getLogger(__name__)

class DatabaseClient(object):
backend_name = 'influxdb2'

def __init__(self, db_name=None):
self._db = None
self.client_error = InfluxDBError
self.write_api = None
self.query_api = None

@cached_property
def db(self):
"Returns an InfluxDBClient instance"
return InfluxDBClient{
url = f"http://{TIMESERIES_DB["HOST"]}:{TIMESERIES_DB["PORT"]}",
bucket = self.db_name,
token = TIMESERIES_DB["TOKEN"]
}

@retry
def create_database(self):
"initialize APIs required for writing and querying data from influxdb database"
logger.debug(f'quert_api and write_api for {str(self.db)} initiated')
self.write_api = self.db.write_api(write_options=SYNCHRONOUS)
self.query_api = self.db.query_api()

@retry
def drop_database(self):
"deletes all buckets"
pass

@retry
def query(self, query):
resultObject = self.query_api.query(query)
return resultObject

def standardizeResult(self, resultObject):
"""
return the query result in the form of a list of dictionaries
to make all the back ends uniform
"""
result = list()

for table in tables:
for row in table.records:
listObject = {
"time": row["_time"],
"field": row["_field"],
"value": row["_value"],
"mesurement": row["_measurement"]
}
result.append(listObject)

return result

def write(self, name, values, **kwargs):
"""
values can either be a list or a string
"""
timestamp = kwargs.get('timestamp') or now()
point = Point(name).time(timestamp.isoformat(sep='T', timespec='microseconds'))
tags = kwargs.get('tags')
values = kwargs.get('values')

for tag in tags:
if type(tag) == str:
tag = tag.split('=')
point.tag(tag[0], tag[1])

for value in values:
if type(value) == str:
value = tag.split('=')
point.field(value[0], value[1])

try:
self.write_api.write(bucket=self.db_name, record=point)
except Exception as e:
except Exception as exception:
logger.warning(f'got exception while writing to tsdb: {exception}')
if isinstance(exception, self.client_error):
exception_code = getattr(exception, 'code', None)
exception_message = getattr(exception, 'content')
if (
exception_code == 400
and 'points beyond retention policy dropped' in exception_message
):
return
raise TimeseriesWriteException

@retry
def get_list_retention_policies(self, name=None):
if name is None:
logger.warning(f'no bucket name provided')
return None
bucket = self.db.buckets_api().find_bucket_by_name(name)
if bucket is None:
logger.warning(f'Bucket with name - {name} not found')
return None
return bucket.retention_rules

@retry
def create_or_alter_retention_policy(self, name, duration):
"""alters the retention policy if bucket with the given name exists,
otherwise create a new bucket"""
bucket = self.db.buckets_api().find_bucket_by_name(name)
retention_rules = BucketRetentionRules(type="expire", every_seconds=duration)
if not bucket is None:
bucket.retention_rules = retention_rules
self.db.buckets_api().update_bucket(bucket=bucket)
logger.warning(f'Retention policies for bucket '{name}' have been updated')
else:
bucket = buckets_api.create_bucket(bucket_name=name, retention_rules=retention_rules, org=TIMESERIES_DB["ORG"])
logger.warning(f'Bucket '{name}' with specified retention polcies has been created')