diff --git a/influxdb.go b/influxdb.go index 93f8895..3c24c01 100644 --- a/influxdb.go +++ b/influxdb.go @@ -7,12 +7,13 @@ import ( "time" log "github.com/Sirupsen/logrus" - influxdb "github.com/influxdata/influxdb/client" + influxdb "github.com/influxdata/influxdb/client/v2" "github.com/oleiade/lane" ) const ( DefaultTick = 1 + PingTimeout = 500 * time.Millisecond ) type InfluxDBConf struct { @@ -27,7 +28,7 @@ type InfluxDBConf struct { } type InfluxDBClient struct { - Client *influxdb.Client + Client influxdb.Client Config InfluxDBConf Status string @@ -43,22 +44,21 @@ func NewInfluxDBClient(conf InfluxDBConf, ifChan chan Message, commandChan chan host := fmt.Sprintf("http://%s:%d", conf.Hostname, conf.Port) log.Infof("influxdb host: %s", host) - u, err := url.Parse(host) + _, err := url.Parse(host) if err != nil { return nil, err } - ifConf := influxdb.Config{ - URL: *u, + // Make client + con, err := influxdb.NewHTTPClient(influxdb.HTTPConfig{ + Addr: host, Username: conf.UserName, Password: conf.Password, - // IsUDP: conf.UDP, - } - con, err := influxdb.NewClient(ifConf) + }) if err != nil { return nil, err } // Check connectivity - _, _, err = con.Ping() + _, _, err = con.Ping(PingTimeout) if err != nil { return nil, err } @@ -108,16 +108,11 @@ func (ifc *InfluxDBClient) Send() error { } buf[i] = m } - bp := Msg2Series(buf) - bp.Database = ifc.Config.Db + bp := ifc.Msg2Series(buf) - var res *influxdb.Response - if res, err = ifc.Client.Write(bp); err != nil { + if err = ifc.Client.Write(bp); err != nil { return err } - if res != nil && res.Err != nil { - return res.Err - } return nil } @@ -157,10 +152,21 @@ func (ifc *InfluxDBClient) Start() error { return nil } -func Msg2Series(msgs []Message) influxdb.BatchPoints { - pts := make([]influxdb.Point, 0, len(msgs)) +func (ifc *InfluxDBClient) Msg2Series(msgs []Message) influxdb.BatchPoints { now := time.Now() + // Create a new point batch + bp, err := influxdb.NewBatchPoints(influxdb.BatchPointsConfig{ + Database: ifc.Config.Db, + Precision: "s", + }) + + if err != nil { + log.Warn(err) + return nil + } + + for _, msg := range msgs { if msg.Topic == "" && len(msg.Payload) == 0 { break @@ -170,22 +176,17 @@ func Msg2Series(msgs []Message) influxdb.BatchPoints { log.Warn(err) continue } + name := strings.Replace(msg.Topic, "/", ".", -1) tags := map[string]string{ "topic": msg.Topic, } - pt := influxdb.Point{ - Measurement: name, - Tags: tags, - Fields: j, - Time: now, - Precision: "s", // TODO + pt, err := influxdb.NewPoint(name, tags, j, now); + if err != nil { + log.Warn(err) + continue } - pts = append(pts, pt) - } - bp := influxdb.BatchPoints{ - RetentionPolicy: "default", - Points: pts, + bp.AddPoint(pt) } return bp