/
utils.go
116 lines (99 loc) · 2.89 KB
/
utils.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
package utils
import (
"context"
"encoding/json"
"hash/fnv"
"strings"
"sync/atomic"
"github.com/jackc/pgx/v4"
"github.com/influxdata/telegraf"
)
func TagListToJSON(tagList []*telegraf.Tag) []byte {
tags := make(map[string]string, len(tagList))
for _, tag := range tagList {
tags[tag.Key] = tag.Value
}
bs, _ := json.Marshal(tags)
return bs
}
func FieldListToJSON(fieldList []*telegraf.Field) ([]byte, error) {
fields := make(map[string]interface{}, len(fieldList))
for _, field := range fieldList {
fields[field.Key] = field.Value
}
return json.Marshal(fields)
}
// QuoteIdentifier returns a sanitized string safe to use in SQL as an identifier
func QuoteIdentifier(name string) string {
return pgx.Identifier{name}.Sanitize()
}
// QuoteLiteral returns a sanitized string safe to use in sql as a string literal
func QuoteLiteral(name string) string {
return "'" + strings.Replace(name, "'", "''", -1) + "'"
}
// FullTableName returns a sanitized table name with its schema (if supplied)
func FullTableName(schema, name string) pgx.Identifier {
if schema != "" {
return pgx.Identifier{schema, name}
}
return pgx.Identifier{name}
}
// PGXLogger makes telegraf.Logger compatible with pgx.Logger
type PGXLogger struct {
telegraf.Logger
}
func (l PGXLogger) Log(_ context.Context, level pgx.LogLevel, msg string, data map[string]interface{}) {
switch level {
case pgx.LogLevelError:
l.Errorf("PG %s - %+v", msg, data)
case pgx.LogLevelWarn:
l.Warnf("PG %s - %+v", msg, data)
case pgx.LogLevelInfo, pgx.LogLevelNone:
l.Infof("PG %s - %+v", msg, data)
case pgx.LogLevelDebug, pgx.LogLevelTrace:
l.Debugf("PG %s - %+v", msg, data)
default:
l.Debugf("PG %s - %+v", msg, data)
}
}
func GetTagID(metric telegraf.Metric) int64 {
hash := fnv.New64a()
for _, tag := range metric.TagList() {
hash.Write([]byte(tag.Key)) //nolint:revive // all Write() methods for hash in fnv.go returns nil err
hash.Write([]byte{0}) //nolint:revive // all Write() methods for hash in fnv.go returns nil err
hash.Write([]byte(tag.Value)) //nolint:revive // all Write() methods for hash in fnv.go returns nil err
hash.Write([]byte{0}) //nolint:revive // all Write() methods for hash in fnv.go returns nil err
}
// Convert to int64 as postgres does not support uint64
return int64(hash.Sum64())
}
// WaitGroup is similar to sync.WaitGroup, but allows interruptable waiting (e.g. a timeout).
type WaitGroup struct {
count int32
done chan struct{}
}
func NewWaitGroup() *WaitGroup {
return &WaitGroup{
done: make(chan struct{}),
}
}
func (wg *WaitGroup) Add(i int32) {
select {
case <-wg.done:
panic("use of an already-done WaitGroup")
default:
}
atomic.AddInt32(&wg.count, i)
}
func (wg *WaitGroup) Done() {
i := atomic.AddInt32(&wg.count, -1)
if i == 0 {
close(wg.done)
}
if i < 0 {
panic("too many Done() calls")
}
}
func (wg *WaitGroup) C() <-chan struct{} {
return wg.done
}