forked from open-lambda/open-lambda
/
influxManager.go
75 lines (63 loc) · 1.63 KB
/
influxManager.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
package main
import (
"fmt"
"log"
"time"
influx "github.com/influxdata/influxdb/client/v2"
)
const (
// TODO: Do any of these need to be configurable?
precision = "ms"
database = "mydata"
flushThresh = 100
)
type InfluxManager struct {
client influx.Client
bp influx.BatchPoints
pointCount int
}
func NewInfluxManager(host string, port string) (mgr *InfluxManager) {
mgr = new(InfluxManager)
client, err := influx.NewHTTPClient(influx.HTTPConfig{
Addr: fmt.Sprintf("http://%s:%s", host, port),
})
if err != nil {
log.Fatalf("failed to create new http client with err: %v\n", err)
}
q := influx.NewQuery(fmt.Sprintf("CREATE DATABASE mydata"), "", "")
// q := influx.NewQuery(fmt.Sprintf("CREATE DATABASE %s", database), "", "")
if _, err := client.Query(q); err != nil {
log.Fatalf("failed to create db: %v\n", err)
}
mgr.client = client
mgr.bp = newBP()
mgr.pointCount = 0
return mgr
}
func (mgr *InfluxManager) AddPointNow(name string, fields map[string]interface{}) {
if mgr.pointCount > flushThresh {
err := mgr.client.Write(mgr.bp)
if err != nil {
log.Fatalf("failed to write points with err %v\n", err)
}
log.Printf("good flush\n")
mgr.bp = newBP()
mgr.pointCount = 0
}
pt, err := influx.NewPoint(name, map[string]string{}, fields, time.Now())
if err != nil {
log.Fatalf("failed to create new point with err %v\n", err)
}
mgr.bp.AddPoint(pt)
mgr.pointCount++
}
func newBP() influx.BatchPoints {
bp, err := influx.NewBatchPoints(influx.BatchPointsConfig{
Database: database,
Precision: precision,
})
if err != nil {
log.Fatalf("failed to make new bp err: %v\n")
}
return bp
}