forked from hashicorp/nomad
/
metrics.go
205 lines (184 loc) · 6.17 KB
/
metrics.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
package metrics
import (
"fmt"
"os"
"testing"
"time"
"github.com/hashicorp/nomad/e2e/e2eutil"
"github.com/hashicorp/nomad/e2e/framework"
"github.com/hashicorp/nomad/helper/uuid"
"github.com/prometheus/common/model"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
type MetricsTest struct {
framework.TC
jobIDs []string
prometheusID string
fabioID string
fabioAddress string
}
func init() {
framework.AddSuites(&framework.TestSuite{
Component: "Metrics",
CanRunLocal: true,
Cases: []framework.TestCase{
new(MetricsTest),
},
})
}
// Stand up prometheus to collect metrics from all clients and allocs,
// with fabio as a system job in front of it so that we don't need to
// have prometheus use host networking
func (tc *MetricsTest) BeforeAll(f *framework.F) {
t := f.T()
e2eutil.WaitForLeader(t, tc.Nomad())
e2eutil.WaitForNodesReady(t, tc.Nomad(), 1)
err := tc.setUpPrometheus(f)
require.Nil(t, err)
}
// Clean up the target jobs after each test case, but keep fabio/prometheus
// for reuse between the two test cases (Windows vs Linux)
func (tc *MetricsTest) AfterEach(f *framework.F) {
if os.Getenv("NOMAD_TEST_SKIPCLEANUP") == "1" {
return
}
for _, jobID := range tc.jobIDs {
tc.Nomad().Jobs().Deregister(jobID, true, nil)
}
tc.jobIDs = []string{}
tc.Nomad().System().GarbageCollect()
}
// Clean up fabio/prometheus
func (tc *MetricsTest) AfterAll(f *framework.F) {
if os.Getenv("NOMAD_TEST_SKIPCLEANUP") == "1" {
return
}
tc.tearDownPrometheus(f)
}
// TestMetricsLinux runs a collection of jobs that exercise alloc metrics.
// Then we query prometheus to verify we're collecting client and alloc metrics
// and correctly presenting them to the prometheus scraper.
func (tc *MetricsTest) TestMetricsLinux(f *framework.F) {
t := f.T()
clientNodes, err := e2eutil.ListLinuxClientNodes(tc.Nomad())
require.Nil(t, err)
if len(clientNodes) == 0 {
t.Skip("no Linux clients")
}
workloads := map[string]string{
"cpustress": "nomad_client_allocs_cpu_user",
"diskstress": "nomad_client_allocs_memory_rss", // TODO(tgross): do we have disk stats?
"helloworld": "nomad_client_allocs_cpu_allocated",
"memstress": "nomad_client_allocs_memory_usage",
"simpleweb": "nomad_client_allocs_memory_rss",
}
tc.runWorkloads(t, workloads)
tc.queryClientMetrics(t, clientNodes)
tc.queryAllocMetrics(t, workloads)
}
// TestMetricsWindows runs a collection of jobs that exercise alloc metrics.
// Then we query prometheus to verify we're collecting client and alloc metrics
// and correctly presenting them to the prometheus scraper.
func (tc *MetricsTest) TestMetricsWindows(f *framework.F) {
t := f.T()
clientNodes, err := e2eutil.ListWindowsClientNodes(tc.Nomad())
require.Nil(t, err)
if len(clientNodes) == 0 {
t.Skip("no Windows clients")
}
workloads := map[string]string{
"factorial_windows": "nomad_client_allocs_cpu_user",
"mem_windows": "nomad_client_allocs_memory_rss",
}
tc.runWorkloads(t, workloads)
tc.queryClientMetrics(t, clientNodes)
tc.queryAllocMetrics(t, workloads)
}
// run workloads and wait for allocations
func (tc *MetricsTest) runWorkloads(t *testing.T, workloads map[string]string) {
for jobName := range workloads {
uuid := uuid.Generate()
jobID := "metrics-" + jobName + "-" + uuid[0:8]
tc.jobIDs = append(tc.jobIDs, jobID)
file := "metrics/input/" + jobName + ".nomad"
allocs := e2eutil.RegisterAndWaitForAllocs(t, tc.Nomad(), file, jobID, "")
if len(allocs) == 0 {
t.Fatalf("failed to register %s", jobID)
}
}
}
// query prometheus to verify that metrics are being collected
// from clients
func (tc *MetricsTest) queryClientMetrics(t *testing.T, clientNodes []string) {
metrics := []string{
"nomad_client_allocated_memory",
"nomad_client_host_cpu_user",
"nomad_client_host_disk_available",
"nomad_client_host_memory_used",
"nomad_client_uptime",
}
// we start with a very long timeout here because it takes a while for
// prometheus to be live and for jobs to initially register metrics.
timeout := 60 * time.Second
for _, metric := range metrics {
var results model.Vector
var err error
ok := assert.Eventually(t, func() bool {
results, err = tc.promQuery(metric)
if err != nil {
return false
}
instances := make(map[string]struct{})
for _, result := range results {
instances[string(result.Metric["node_id"])] = struct{}{}
}
// we're testing only clients for a specific OS, so we
// want to make sure we're checking for specific node_ids
// and not just equal lengths
for _, clientNode := range clientNodes {
if _, ok := instances[clientNode]; !ok {
err = fmt.Errorf("expected metric '%s' for all clients. got:\n%v",
metric, results)
return false
}
}
return true
}, timeout, 1*time.Second)
require.Truef(t, ok, "prometheus query failed (%s): %v", metric, err)
// shorten the timeout after the first workload is successfully
// queried so that we don't hang the whole test run if something's
// wrong with only one of the jobs
timeout = 15 * time.Second
}
}
// query promtheus to verify that metrics are being collected
// from allocations
func (tc *MetricsTest) queryAllocMetrics(t *testing.T, workloads map[string]string) {
// we start with a very long timeout here because it takes a while for
// prometheus to be live and for jobs to initially register metrics.
timeout := 60 * time.Second
for jobName, metric := range workloads {
query := fmt.Sprintf("%s{exported_job=\"%s\"}", metric, jobName)
var results model.Vector
var err error
ok := assert.Eventually(t, func() bool {
results, err = tc.promQuery(query)
if err != nil {
return false
}
// make sure we didn't just collect a bunch of zero metrics
lastResult := results[len(results)-1]
if !(float64(lastResult.Value) > 0.0) {
err = fmt.Errorf("expected non-zero metrics, got: %v", results)
return false
}
return true
}, timeout, 1*time.Second)
require.Truef(t, ok, "prometheus query failed (%s): %v", query, err)
// shorten the timeout after the first workload is successfully
// queried so that we don't hang the whole test run if something's
// wrong with only one of the jobs
timeout = 15 * time.Second
}
}