Skip to content

Commit

Permalink
Ported to influxdb client v2 interface
Browse files Browse the repository at this point in the history
- Changed pointer to interface to interface in InfluxDBClient struct
- Now uses v2 HTTP client
  • Loading branch information
Christian Renz committed Oct 19, 2016
1 parent fc03d49 commit 5dd177f
Showing 1 changed file with 30 additions and 29 deletions.
59 changes: 30 additions & 29 deletions influxdb.go
Expand Up @@ -7,12 +7,13 @@ import (
"time"

log "github.com/Sirupsen/logrus"
influxdb "github.com/influxdata/influxdb/client"
influxdb "github.com/influxdata/influxdb/client/v2"
"github.com/oleiade/lane"
)

const (
DefaultTick = 1
PingTimeout = 500 * time.Millisecond
)

type InfluxDBConf struct {
Expand All @@ -27,7 +28,7 @@ type InfluxDBConf struct {
}

type InfluxDBClient struct {
Client *influxdb.Client
Client influxdb.Client
Config InfluxDBConf

Status string
Expand All @@ -43,22 +44,21 @@ func NewInfluxDBClient(conf InfluxDBConf, ifChan chan Message, commandChan chan
host := fmt.Sprintf("http://%s:%d", conf.Hostname, conf.Port)
log.Infof("influxdb host: %s", host)

u, err := url.Parse(host)
_, err := url.Parse(host)
if err != nil {
return nil, err
}
ifConf := influxdb.Config{
URL: *u,
// Make client
con, err := influxdb.NewHTTPClient(influxdb.HTTPConfig{
Addr: host,
Username: conf.UserName,
Password: conf.Password,
// IsUDP: conf.UDP,
}
con, err := influxdb.NewClient(ifConf)
})
if err != nil {
return nil, err
}
// Check connectivity
_, _, err = con.Ping()
_, _, err = con.Ping(PingTimeout)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -108,16 +108,11 @@ func (ifc *InfluxDBClient) Send() error {
}
buf[i] = m
}
bp := Msg2Series(buf)
bp.Database = ifc.Config.Db
bp := ifc.Msg2Series(buf)

var res *influxdb.Response
if res, err = ifc.Client.Write(bp); err != nil {
if err = ifc.Client.Write(bp); err != nil {
return err
}
if res != nil && res.Err != nil {
return res.Err
}
return nil
}

Expand Down Expand Up @@ -157,10 +152,21 @@ func (ifc *InfluxDBClient) Start() error {
return nil
}

func Msg2Series(msgs []Message) influxdb.BatchPoints {
pts := make([]influxdb.Point, 0, len(msgs))
func (ifc *InfluxDBClient) Msg2Series(msgs []Message) influxdb.BatchPoints {
now := time.Now()

// Create a new point batch
bp, err := influxdb.NewBatchPoints(influxdb.BatchPointsConfig{
Database: ifc.Config.Db,
Precision: "s",
})

if err != nil {
log.Warn(err)
return nil
}


for _, msg := range msgs {
if msg.Topic == "" && len(msg.Payload) == 0 {
break
Expand All @@ -170,22 +176,17 @@ func Msg2Series(msgs []Message) influxdb.BatchPoints {
log.Warn(err)
continue
}

name := strings.Replace(msg.Topic, "/", ".", -1)
tags := map[string]string{
"topic": msg.Topic,
}
pt := influxdb.Point{
Measurement: name,
Tags: tags,
Fields: j,
Time: now,
Precision: "s", // TODO
pt, err := influxdb.NewPoint(name, tags, j, now);
if err != nil {
log.Warn(err)
continue
}
pts = append(pts, pt)
}
bp := influxdb.BatchPoints{
RetentionPolicy: "default",
Points: pts,
bp.AddPoint(pt)
}

return bp
Expand Down

0 comments on commit 5dd177f

Please sign in to comment.