forked from influxdata/telegraf
/
graphite.go
134 lines (122 loc) · 3 KB
/
graphite.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
package graphite
import (
"errors"
"fmt"
"github.com/influxdata/influxdb/client/v2"
"github.com/influxdata/telegraf/plugins/outputs"
"log"
"math/rand"
"net"
"strings"
"time"
)
type Graphite struct {
// URL is only for backwards compatability
Servers []string
Prefix string
Timeout int
conns []net.Conn
}
var sampleConfig = `
# TCP endpoint for your graphite instance.
servers = ["localhost:2003"]
# Prefix metrics name
prefix = ""
# timeout in seconds for the write connection to graphite
timeout = 2
`
func (g *Graphite) Connect() error {
// Set default values
if g.Timeout <= 0 {
g.Timeout = 2
}
if len(g.Servers) == 0 {
g.Servers = append(g.Servers, "localhost:2003")
}
// Get Connections
var conns []net.Conn
for _, server := range g.Servers {
conn, err := net.DialTimeout("tcp", server, time.Duration(g.Timeout)*time.Second)
if err == nil {
conns = append(conns, conn)
}
}
g.conns = conns
return nil
}
func (g *Graphite) Close() error {
// Closing all connections
for _, conn := range g.conns {
conn.Close()
}
return nil
}
func (g *Graphite) SampleConfig() string {
return sampleConfig
}
func (g *Graphite) Description() string {
return "Configuration for Graphite server to send metrics to"
}
// Choose a random server in the cluster to write to until a successful write
// occurs, logging each unsuccessful. If all servers fail, return error.
func (g *Graphite) Write(points []*client.Point) error {
// Prepare data
var bp []string
for _, point := range points {
// Get name
name := point.Name()
// Convert UnixNano to Unix timestamps
timestamp := point.UnixNano() / 1000000000
for field_name, value := range point.Fields() {
// Convert value
value_str := fmt.Sprintf("%#v", value)
// Write graphite point
var graphitePoint string
if name == field_name {
graphitePoint = fmt.Sprintf("%s.%s %s %d\n",
strings.Replace(point.Tags()["host"], ".", "_", -1),
strings.Replace(name, ".", "_", -1),
value_str,
timestamp)
} else {
graphitePoint = fmt.Sprintf("%s.%s.%s %s %d\n",
strings.Replace(point.Tags()["host"], ".", "_", -1),
strings.Replace(name, ".", "_", -1),
strings.Replace(field_name, ".", "_", -1),
value_str,
timestamp)
}
if g.Prefix != "" {
graphitePoint = fmt.Sprintf("%s.%s", g.Prefix, graphitePoint)
}
bp = append(bp, graphitePoint)
//fmt.Printf(graphitePoint)
}
}
graphitePoints := strings.Join(bp, "")
// This will get set to nil if a successful write occurs
err := errors.New("Could not write to any Graphite server in cluster\n")
// Send data to a random server
p := rand.Perm(len(g.conns))
for _, n := range p {
if _, e := fmt.Fprintf(g.conns[n], graphitePoints); e != nil {
// Error
log.Println("ERROR: " + err.Error())
// Let's try the next one
} else {
// Success
err = nil
break
}
}
// try to reconnect
if err != nil {
g.Connect()
}
return err
}
func init() {
outputs.Add("graphite", func() outputs.Output {
return &Graphite{}
})
}