-
Notifications
You must be signed in to change notification settings - Fork 402
/
client.go
124 lines (106 loc) · 3.41 KB
/
client.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
// Copyright (C) 2019 Storj Labs, Inc.
// See LICENSE for copying information.
package telemetry
import (
"context"
"os"
"time"
"github.com/zeebo/admission/admmonkit"
"github.com/zeebo/admission/admproto"
"go.uber.org/zap"
monkit "gopkg.in/spacemonkeygo/monkit.v2"
)
const (
// DefaultInterval is the default amount of time between metric payload sends
DefaultInterval = time.Minute
// DefaultPacketSize sets the target packet size. MTUs are often 1500,
// though a good argument could be made for 512
DefaultPacketSize = 1000
// DefaultApplication is the default values for application name. Should be used
// when value in ClientOpts.Application is not set and len(os.Args) == 0
DefaultApplication = "unknown"
)
// ClientOpts allows you to set Client Options
type ClientOpts struct {
// Interval is how frequently stats from the provided Registry will be
// sent up. Note that this interval is "jittered", so the actual interval
// is taken from a normal distribution with a mean of Interval and a
// variance of Interval/4. Defaults to DefaultInterval
Interval time.Duration
// Application is the application name, usually prepended to metric names.
// By default it will be os.Args[0]
Application string
// Instance is a string that identifies this particular server. Could be a
// node id, but defaults to the result of DefaultInstanceId()
Instance string
// PacketSize controls how we fragment the data as it goes out in UDP
// packets. Defaults to DefaultPacketSize
PacketSize int
// Registry is where to get stats from. Defaults to monkit.Default
Registry *monkit.Registry
// FloatEncoding is how floats should be encoded on the wire.
// Default is float16.
FloatEncoding admproto.FloatEncoding
}
// Client is a telemetry client for sending UDP packets at a regular interval
// from a monkit.Registry
type Client struct {
interval time.Duration
opts admmonkit.Options
send func(context.Context, admmonkit.Options) error
}
// NewClient constructs a telemetry client that sends packets to remoteAddr
// over UDP.
func NewClient(remoteAddr string, opts ClientOpts) (rv *Client, err error) {
if opts.Interval == 0 {
opts.Interval = DefaultInterval
}
if opts.Application == "" {
if len(os.Args) > 0 {
opts.Application = os.Args[0]
} else {
// what the actual heck
opts.Application = DefaultApplication
}
}
if opts.Instance == "" {
opts.Instance = DefaultInstanceID()
}
if opts.Registry == nil {
opts.Registry = monkit.Default
}
if opts.PacketSize == 0 {
opts.PacketSize = DefaultPacketSize
}
return &Client{
interval: opts.Interval,
send: admmonkit.Send,
opts: admmonkit.Options{
Application: opts.Application,
InstanceId: []byte(opts.Instance),
Address: remoteAddr,
PacketSize: opts.PacketSize,
Registry: opts.Registry,
ProtoOpts: admproto.Options{FloatEncoding: opts.FloatEncoding},
},
}, nil
}
// Run calls Report roughly every Interval
func (c *Client) Run(ctx context.Context) {
zap.S().Debugf("Initialized telemetry batcher with id = %q", c.opts.InstanceId)
for {
time.Sleep(jitter(c.interval))
if ctx.Err() != nil {
return
}
err := c.Report(ctx)
if err != nil {
zap.S().Errorf("failed sending telemetry report: %v", err)
}
}
}
// Report bundles up all the current stats and writes them out as UDP packets
func (c *Client) Report(ctx context.Context) (err error) {
defer mon.Task()(&ctx)(&err)
return c.send(ctx, c.opts)
}