/
graphite.go
238 lines (211 loc) · 6.46 KB
/
graphite.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
//go:generate ../../../tools/readme_config_includer/generator
package graphite
import (
"crypto/tls"
_ "embed"
"errors"
"fmt"
"io"
"math/rand"
"net"
"strings"
"time"
"github.com/influxdata/telegraf"
tlsint "github.com/influxdata/telegraf/plugins/common/tls"
"github.com/influxdata/telegraf/plugins/outputs"
"github.com/influxdata/telegraf/plugins/serializers"
)
//go:embed sample.conf
var sampleConfig string
type Graphite struct {
GraphiteTagSupport bool `toml:"graphite_tag_support"`
GraphiteTagSanitizeMode string `toml:"graphite_tag_sanitize_mode"`
GraphiteSeparator string `toml:"graphite_separator"`
// URL is only for backwards compatibility
Servers []string `toml:"servers"`
Prefix string `toml:"prefix"`
Template string `toml:"template"`
Templates []string `toml:"templates"`
Timeout int `toml:"timeout"`
Log telegraf.Logger `toml:"-"`
conns []net.Conn
tlsint.ClientConfig
failedServers []string
}
func (*Graphite) SampleConfig() string {
return sampleConfig
}
func (g *Graphite) Connect() error {
// Set default values
if g.Timeout <= 0 {
g.Timeout = 2
}
if len(g.Servers) == 0 {
g.Servers = append(g.Servers, "localhost:2003")
}
// Set tls config
tlsConfig, err := g.ClientConfig.TLSConfig()
if err != nil {
return err
}
// Only retry the failed servers
servers := g.Servers
if len(g.failedServers) > 0 {
servers = g.failedServers
// Remove failed server from exisiting connections
var workingConns []net.Conn
for _, conn := range g.conns {
var found bool
for _, server := range servers {
if conn.RemoteAddr().String() == server {
found = true
break
}
}
if !found {
workingConns = append(workingConns, conn)
}
}
g.conns = workingConns
}
// Get Connections
var conns []net.Conn
var failedServers []string
for _, server := range servers {
// Dialer with timeout
d := net.Dialer{Timeout: time.Duration(g.Timeout) * time.Second}
// Get secure connection if tls config is set
var conn net.Conn
if tlsConfig != nil {
conn, err = tls.DialWithDialer(&d, "tcp", server, tlsConfig)
} else {
conn, err = d.Dial("tcp", server)
}
if err == nil {
conns = append(conns, conn)
} else {
g.Log.Debugf("Failed to establish connection: %v", err)
failedServers = append(failedServers, server)
}
}
if len(g.failedServers) > 0 {
g.conns = append(g.conns, conns...)
g.failedServers = failedServers
} else {
g.conns = conns
}
return nil
}
func (g *Graphite) Close() error {
// Closing all connections
for _, conn := range g.conns {
_ = conn.Close()
}
return nil
}
// We need check eof as we can write to nothing without noticing anything is wrong
// the connection stays in a close_wait
// We can detect that by finding an eof
// if not for this, we can happily write and flush without getting errors (in Go) but getting RST tcp packets back (!)
// props to Tv via the authors of carbon-relay-ng` for this trick.
func (g *Graphite) checkEOF(conn net.Conn) error {
b := make([]byte, 1024)
if err := conn.SetReadDeadline(time.Now().Add(10 * time.Millisecond)); err != nil {
g.Log.Debugf(
"Couldn't set read deadline for connection due to error %v with remote address %s. closing conn explicitly",
err,
conn.RemoteAddr().String(),
)
err = conn.Close()
g.Log.Debugf("Failed to close the connection: %v", err)
return err
}
num, err := conn.Read(b)
if errors.Is(err, io.EOF) {
g.Log.Debugf("Conn %s is closed. closing conn explicitly", conn.RemoteAddr().String())
err = conn.Close()
g.Log.Debugf("Failed to close the connection: %v", err)
return err
}
// just in case i misunderstand something or the remote behaves badly
if num != 0 {
g.Log.Infof("conn %s .conn.Read data? did not expect that. data: %s", conn, b[:num])
}
// Log non-timeout errors and close.
var netErr net.Error
if !(errors.As(err, &netErr) && netErr.Timeout()) {
g.Log.Debugf("conn %s checkEOF .conn.Read returned err != EOF, which is unexpected. closing conn. error: %s", conn, err)
err = conn.Close()
g.Log.Debugf("Failed to close the connection: %v", err)
return err
}
return nil
}
// Choose a random server in the cluster to write to until a successful write
// occurs, logging each unsuccessful. If all servers fail, return error.
func (g *Graphite) Write(metrics []telegraf.Metric) error {
// Prepare data
var batch []byte
s, err := serializers.NewGraphiteSerializer(g.Prefix, g.Template, g.GraphiteTagSupport, g.GraphiteTagSanitizeMode, g.GraphiteSeparator, g.Templates)
if err != nil {
return err
}
for _, metric := range metrics {
buf, err := s.Serialize(metric)
if err != nil {
g.Log.Errorf("Error serializing some metrics to graphite: %s", err.Error())
}
batch = append(batch, buf...)
}
err = g.send(batch)
// If a send failed for a server, try to reconnect to that server
if len(g.failedServers) > 0 {
g.Log.Debugf("Reconnecting and retrying for the following servers: %s", strings.Join(g.failedServers, ","))
err = g.Connect()
if err != nil {
return fmt.Errorf("failed to reconnect: %w", err)
}
err = g.send(batch)
}
return err
}
func (g *Graphite) send(batch []byte) error {
// This will get set to nil if a successful write occurs
globalErr := errors.New("could not write to any Graphite server in cluster")
// Send data to a random server
p := rand.Perm(len(g.conns))
for _, n := range p {
if g.Timeout > 0 {
err := g.conns[n].SetWriteDeadline(time.Now().Add(time.Duration(g.Timeout) * time.Second))
if err != nil {
g.Log.Errorf("failed to set write deadline for %s: %v", g.conns[n].RemoteAddr().String(), err)
// Mark server as failed so a new connection will be made
g.failedServers = append(g.failedServers, g.conns[n].RemoteAddr().String())
}
}
err := g.checkEOF(g.conns[n])
if err != nil {
// Mark server as failed so a new connection will be made
g.failedServers = append(g.failedServers, g.conns[n].RemoteAddr().String())
break
}
_, e := g.conns[n].Write(batch)
if e == nil {
globalErr = nil
break
}
// Error
g.Log.Debugf("Graphite Error: " + e.Error())
// Close explicitly and let's try the next one
err = g.conns[n].Close()
g.Log.Debugf("Failed to close the connection: %v", err)
// Mark server as failed so a new connection will be made
g.failedServers = append(g.failedServers, g.conns[n].RemoteAddr().String())
}
return globalErr
}
func init() {
outputs.Add("graphite", func() telegraf.Output {
return &Graphite{}
})
}