Skip to content

Commit

Permalink
Merge pull request #64 from jsiembida/influxdb-support
Browse files Browse the repository at this point in the history
Influxdb support
  • Loading branch information
trbs authored Jun 25, 2017
2 parents 38c279d + d39eb22 commit a779aeb
Show file tree
Hide file tree
Showing 4 changed files with 151 additions and 5 deletions.
22 changes: 22 additions & 0 deletions README.rst
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,7 @@ output::
--disable-statsd Disable the StatsD server
--graphite-ip=IP IP address of the Graphite/Carbon server [127.0.0.1]
--graphite-port=INT Port of the Graphite/Carbon server [2003]
--enable-influxdb Enable InfluxDB client
--full-trace Display full error if config file fails to load
--log-level=NAME Logging output verbosity [INFO]
--version show program's version number and exit
Expand Down Expand Up @@ -249,6 +250,11 @@ config file::
graphite_backoff_factor = 1.5
graphite_backoff_max = 60

influxdb_enabled = False
influxdb_hosts = [
"127.0.0.1:8089"
]

# Configuration for sending metrics to Graphite via the pickle
# interface. Be sure to edit graphite_port to match the settings
# on your Graphite cache/relay.
Expand Down Expand Up @@ -318,6 +324,22 @@ Configuring MetricsD
TODO


Configuring InfluxDB
--------------------

Make sure that your InfluxDB server(s) have a UDP listener enabled,
like so::

[[udp]]
enabled = true
bind-address = ":8089"
database = "mydatabase"

Bucky will periodically resolve all hostnames in the `influxdb_hosts`
list and fan out metrics to all resolved endpoints. Thus providing
replication as well as hot swapping.


A note on CollectD converters
-----------------------------

Expand Down
6 changes: 6 additions & 0 deletions bucky/cfg.py
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@
statsd_timer_median = True
statsd_timer_std = True

graphite_enabled = True
graphite_ip = "127.0.0.1"
graphite_port = 2003
graphite_max_reconnects = 60
Expand All @@ -75,6 +76,11 @@
graphite_pickle_enabled = False
graphite_pickle_buffer_size = 500

influxdb_enabled = False
influxdb_hosts = [
"127.0.0.1:8089"
]

full_trace = False

name_prefix = None
Expand Down
102 changes: 102 additions & 0 deletions bucky/influxdb.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
# -*- coding: utf-8 -
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may not
# use this file except in compliance with the License. You may obtain a copy of
# the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations under
# the License.
#
# Copyright 2011 Cloudant, Inc.

import six
import time
import socket
import logging

import bucky.client as client


if six.PY3:
xrange = range
long = int


log = logging.getLogger(__name__)


class InfluxDBClient(client.Client):
def __init__(self, cfg, pipe):
super(InfluxDBClient, self).__init__(pipe)
self.hosts = cfg.influxdb_hosts
self.sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
self.flush_timestamp = time.time()
self.resolved_hosts = None
self.resolve_timestamp = 0
self.buffer = []

def parse_address(self, address, default_port=8089):
bits = address.split(":")
if len(bits) == 1:
host, port = address, default_port
elif len(bits) == 2:
host, port = bits[0], int(bits[1])
else:
raise ValueError("Address %s is invalid" % (address,))
hostname, aliaslist, ipaddrlist = socket.gethostbyname_ex(host)
for ip in ipaddrlist:
yield ip, port

def resolve_hosts(self):
now = time.time()
if self.resolved_hosts is None or (now - self.resolve_timestamp) > 180:
resolved_hosts = []
for host in self.hosts:
for ip, port in self.parse_address(host):
log.info("Resolved InfluxDB endpoint: %s:%d", ip, port)
resolved_hosts.append((ip, port))
self.resolved_hosts = resolved_hosts
self.resolve_timestamp = now

def close(self):
try:
self.sock.close()
except:
pass

def kv(self, k, v):
return str(k) + '=' + str(v)

def flush(self):
now = time.time()
if len(self.buffer) > 30 or (now - self.flush_timestamp) > 3:
payload = '\n'.join(self.buffer).encode()
self.resolve_hosts()
for ip, port in self.resolved_hosts:
self.sock.sendto(payload, (ip, port))
self.buffer = []
self.flush_timestamp = now

def send(self, host, name, value, mtime, metadata=None):
buf = [name]
if host:
if metadata is None:
metadata = {'host': host}
else:
if 'host' not in metadata:
metadata['host'] = host
if metadata:
for k in metadata.keys():
v = metadata[k]
# InfluxDB will drop insert with tags without values
if v is not None:
buf.append(self.kv(k, v))
# https://docs.influxdata.com/influxdb/v1.2/write_protocols/line_protocol_tutorial/
line = ' '.join((','.join(buf), self.kv('value', value), str(long(mtime) * 1000000000)))
self.buffer.append(line)
self.flush()
26 changes: 21 additions & 5 deletions bucky/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import bucky.collectd as collectd
import bucky.metricsd as metricsd
import bucky.statsd as statsd
import bucky.influxdb as influxdb
import bucky.processor as processor
from bucky.errors import BuckyError

Expand Down Expand Up @@ -121,6 +122,16 @@ def options():
type="int", default=cfg.graphite_port,
help="Port of the Graphite/Carbon server [%default]"
),
op.make_option(
"--disable-graphite", dest="graphite_enabled",
default=cfg.graphite_enabled, action="store_false",
help="Disable the Graphite/Carbon client"
),
op.make_option(
"--enable-influxdb", dest="influxdb_enabled",
default=cfg.influxdb_enabled, action="store_true",
help="Enable the InfluxDB line protocol client"
),
op.make_option(
"--full-trace", dest="full_trace",
default=cfg.full_trace, action="store_true",
Expand Down Expand Up @@ -264,13 +275,18 @@ def __init__(self, cfg):
self.proc = None
self.psampleq = self.sampleq

if cfg.graphite_pickle_enabled:
carbon_client = carbon.PickleClient
else:
carbon_client = carbon.PlaintextClient
default_clients = []
if cfg.graphite_enabled:
if cfg.graphite_pickle_enabled:
carbon_client = carbon.PickleClient
else:
carbon_client = carbon.PlaintextClient
default_clients.append(carbon_client)
if cfg.influxdb_enabled:
default_clients.append(influxdb.InfluxDBClient)

self.clients = []
for client in cfg.custom_clients + [carbon_client]:
for client in cfg.custom_clients + default_clients:
send, recv = multiprocessing.Pipe()
instance = client(cfg, recv)
self.clients.append((instance, send))
Expand Down

0 comments on commit a779aeb

Please sign in to comment.