forked from google/cloudprober
/
probeutils.go
167 lines (150 loc) · 5.83 KB
/
probeutils.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
// Copyright 2017 Google Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package probeutils
import (
"bytes"
"context"
"fmt"
"net"
"time"
"github.com/google/cloudprober/logger"
"github.com/google/cloudprober/metrics"
)
// ProbeResult represents results of a probe run.
type ProbeResult interface {
// Metrics converts ProbeResult into a map of the metrics that is suitable for
// working with metrics.EventMetrics.
Metrics() *metrics.EventMetrics
// Target returns the target associated with the probe result.
Target() string
}
// StatsKeeper manages and outputs probe results.
//
// Typical StatsKeepr usage pattern is that the probes start a StatsKeeper
// goroutine in the beginning. StatsKeeper goroutine manages access to the
// per-target cumulative metrics. It listens on an input channel for probe
// results and updates the metrics whenever a new probe result is obtained.
// It exports aggregate probe statistics to the output channel, at intervals
// controlled by a Ticker. These two operations are mutually exclusive. This
// is the only goroutine that accesses the metrics. StatsKeeper runs
// indefinitely, across multiple probe runs, and should not stop during normal
// program execution.
//
// If we get a new result on resultsChan, update the probe statistics.
// If we get a timer tick on doExport, export probe data for all targets.
// If context is canceled, return.
//
// Note that StatsKeeper calls a function (targetsFunc) to get the list of the
// targets for exporting results, instead of getting a static list in the
// arguments. We do that as the list of targets is usually dynamic and is
// updated on a regular basis.
func StatsKeeper(ctx context.Context, ptype, name string, exportInterval time.Duration, targetsFunc func() []string, resultsChan <-chan ProbeResult, dataChan chan<- *metrics.EventMetrics, l *logger.Logger) {
targetMetrics := make(map[string]*metrics.EventMetrics)
doExport := time.Tick(exportInterval)
for {
select {
case result := <-resultsChan:
// result is a ProbeResult
t := result.Target()
if targetMetrics[t] == nil {
targetMetrics[t] = result.Metrics()
continue
}
err := targetMetrics[t].Update(result.Metrics())
if err != nil {
l.Errorf("Error adding metrics from the probe result for the target: %s. Err: %v", t, err)
}
case ts := <-doExport:
for _, t := range targetsFunc() {
em := targetMetrics[t]
if em != nil {
em.AddLabel("ptype", ptype)
em.AddLabel("probe", name)
em.AddLabel("dst", t)
em.Timestamp = ts
l.Info(em.String())
dataChan <- em.Clone()
}
}
case <-ctx.Done():
return
}
}
}
// PatternPayload builds a payload that can be verified using VerifyPayloadPattern.
// It repeats the pattern to fill a []byte slice of finalSize. Last remaining
// bytes (finalSize mod patternSize) are left unpopulated (hence set to 0
// bytes). If final payload size is smaller than the pattern size, we return
// the pattern unmodified.
func PatternPayload(pattern []byte, finalSize int) []byte {
if len(pattern) >= finalSize {
return pattern
}
b := make([]byte, finalSize)
patternSize := len(pattern)
// We create finalSize/patternSize replicas of the payload.
for nReplica := 0; nReplica < finalSize/patternSize; nReplica++ {
copy(b[nReplica*patternSize:], pattern)
}
return b
}
// VerifyPayloadPattern verifies the payload built using PatternPayload.
func VerifyPayloadPattern(payload, pattern []byte) error {
patternSize := len(pattern)
nReplica := len(payload) / patternSize
for i := 0; i < nReplica; i++ {
bN := payload[0:patternSize] // Next pattern sized bytes
payload = payload[patternSize:] // Shift payload for next iteration
if bytes.Compare(bN, pattern) != 0 {
return fmt.Errorf("bytes are not in the expected format. payload[%d-Replica]=%v, pattern=%v", i, bN, pattern)
}
}
// Verity that remaining bytes in payload are all zeros.
if bytes.Compare(payload, make([]byte, len(payload))) != 0 {
return fmt.Errorf("payload doesn't have 0s padding in the last 'payloadSize mod patternSize' bytes: %v", payload)
}
return nil
}
// Addr is used for tests, allowing net.InterfaceByName to be mocked.
type Addr interface {
Addrs() ([]net.Addr, error)
}
// InterfaceByName is a mocking point for net.InterfaceByName, used for tests.
var InterfaceByName = func(s string) (Addr, error) { return net.InterfaceByName(s) }
// ResolveIntfAddr takes the name of a network interface, and returns the first ip
// address listed for this interface. This is typically the IPv4 address.
func ResolveIntfAddr(intfName string) (string, error) {
i, err := InterfaceByName(intfName)
if err != nil {
return "", fmt.Errorf("resolveIntfAddr(%v) got error getting interface: %v", intfName, err)
}
addrs, err := i.Addrs()
if err != nil {
return "", fmt.Errorf("resolveIntfAddr(%v) got error getting addresses for interface: %v", intfName, err)
} else if len(addrs) == 0 {
return "", fmt.Errorf("resolveIntfAddr(%v) go 0 addrs for interface", intfName)
}
// i.Addrs() mostly returns network addresses of the form "172.17.90.252/23".
// This bit of code will pull the IP address from this address.
var ip net.IP
switch v := addrs[0].(type) {
case *net.IPNet:
ip = v.IP
case *net.IPAddr:
ip = v.IP
default:
return "", fmt.Errorf("resolveIntfAddr(%v) found unknown type for first address: %T", intfName, v)
}
return ip.String(), nil
}