-
Notifications
You must be signed in to change notification settings - Fork 23
/
collector.go
267 lines (225 loc) · 7.96 KB
/
collector.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
// Copyright observIQ, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
// Package collector presents an interface that wraps the OTel Collector Core
package collector
import (
"context"
"errors"
"fmt"
"runtime/debug"
"sync"
"time"
"github.com/observiq/bindplane-agent/factories"
"go.opentelemetry.io/collector/otelcol"
"go.uber.org/zap"
)
// The timeout for how long to wait on shutting down the collector before bailing out during a Restart.
var collectorRestartTimeout = 10 * time.Second
// Collector is an interface for running the open telemetry collector.
//
//go:generate mockery --name Collector --filename mock_collector.go --structname MockCollector
type Collector interface {
Run(context.Context) error
Stop(context.Context)
Restart(context.Context) error
SetLoggingOpts([]zap.Option)
GetLoggingOpts() []zap.Option
Status() <-chan *Status
}
// collector is the standard implementation of the Collector interface.
type collector struct {
configPaths []string
version string
loggingOpts []zap.Option
// factories for modifying in test
factories otelcol.Factories
mux sync.Mutex
svc *otelcol.Collector
statusChan chan *Status
wg *sync.WaitGroup
// collectorCtx is the context that is fed into collector.Run
// Cancelling it will force shutdown
collectorCtx context.Context
collectorCtxCancel func()
}
// New returns a new collector.
func New(configPaths []string, version string, loggingOpts []zap.Option) (Collector, error) {
factories, err := factories.DefaultFactories()
if err != nil {
return nil, fmt.Errorf("error while setting up default factories: %w", err)
}
return &collector{
configPaths: configPaths,
version: version,
loggingOpts: loggingOpts,
statusChan: make(chan *Status, 10),
wg: &sync.WaitGroup{},
factories: factories,
}, nil
}
// GetLoggingOpts returns the current logging options
func (c *collector) GetLoggingOpts() []zap.Option {
return c.loggingOpts
}
// SetLoggingOpts sets the loggings options. These will take effect on next restart
func (c *collector) SetLoggingOpts(opts []zap.Option) {
c.loggingOpts = opts
}
// Run will run the collector. This function will return an error
// if the collector was unable to startup.
func (c *collector) Run(ctx context.Context) error {
c.mux.Lock()
defer c.mux.Unlock()
if c.svc != nil {
return errors.New("service already running")
}
// Register component telemetry before running to ensure a clean state for this run
if err := factories.RegisterComponentTelemetry(); err != nil {
return fmt.Errorf("register component telemetry: %w", err)
}
// The OT collector only supports using settings once during the lifetime
// of a single collector instance. We must remake the settings on each startup.
settings, err := NewSettings(c.configPaths, c.version, c.loggingOpts, c.factories)
if err != nil {
return err
}
// The OT collector only supports calling run once during the lifetime
// of a service. We must make a new instance each time we run the collector.
svc, err := otelcol.NewCollector(*settings)
if err != nil {
err := fmt.Errorf("failed to create service: %w", err)
c.sendStatus(false, false, err)
return err
}
startupErr := make(chan error, 1)
// Note: This doesn't provide any timeout mechanism if the incoming context is cancelled.
// If the context passed to Start is cancelled, shutdown could take a very long time (due to e.g. pipeline draining).
// This is because the collector passes the background context if the start context is cancelled before shutdown is called.
collectorCtx, collectorCtxCancel := context.WithCancel(ctx)
c.collectorCtx = collectorCtx
c.collectorCtxCancel = collectorCtxCancel
wg := sync.WaitGroup{}
wg.Add(1)
c.svc = svc
c.wg = &wg
go func() {
defer wg.Done()
// Ensure the collectorCtx context is cancelled after the service is done running, even if we cancelled it in Stop.
defer collectorCtxCancel()
// Catch panic
defer func() {
if r := recover(); r != nil {
var panicErr error
panicStack := string(debug.Stack())
switch v := r.(type) {
case error:
panicErr = fmt.Errorf("collector panicked with error: %w. Panic stacktrace: %s", v, panicStack)
case string:
panicErr = fmt.Errorf("collector panicked with error: %s. Panic stacktrace: %s", v, panicStack)
default:
panicErr = fmt.Errorf("collector panicked with error: %v. Panic stacktrace: %s", v, panicStack)
}
c.sendStatus(false, true, panicErr)
// Send error to startup channel so it doesn't wait for a timeout if a panic occurs.
startupErr <- panicErr
}
}()
err := svc.Run(collectorCtx)
c.sendStatus(false, false, err)
// The error may be nil;
// We want to signal even in this case, because otherwise waitForStartup could keep waiting
// for the collector startup, even though the collector will never start up.
// This can occur if an asynchronous error occurs quickly after collector startup.
startupErr <- err
}()
// A race condition exists in the OT collector where the shutdown channel
// is not guaranteed to be initialized before the shutdown function is called.
// We protect against this by waiting for startup to finish before unlocking the mutex.
return c.waitForStartup(ctx, startupErr)
}
// Stop will stop the collector.
func (c *collector) Stop(ctx context.Context) {
c.mux.Lock()
defer c.mux.Unlock()
if c.svc == nil {
return
}
c.svc.Shutdown()
shutdownCompleteChan := make(chan struct{})
go func() {
select {
case <-ctx.Done():
// Cancel the start context if the stop context is cancelled.
// Ideally, we'd be able to pass a context into shutdown, but the OTEL collector
// doesn't support that.
c.collectorCtxCancel()
case <-shutdownCompleteChan: // shutdown before context cancellation
}
}()
c.wg.Wait()
close(shutdownCompleteChan)
c.svc = nil
}
// Restart will restart the collector. It will also reset the status channel.
// After calling restart call Status() to get a handle to the new channel.
func (c *collector) Restart(ctx context.Context) error {
// We stop with a timeout, because we don't want the collector to hang when restarting.
timeoutCtx, cancel := context.WithTimeout(ctx, collectorRestartTimeout)
defer cancel()
c.Stop(timeoutCtx)
// Reset status channel so it's not polluted by the collector shutting down and restarting
c.statusChan = make(chan *Status, 10)
return c.Run(ctx)
}
// waitForStartup waits for the service to startup before exiting.
func (c *collector) waitForStartup(ctx context.Context, startupErr chan error) error {
ticker := time.NewTicker(time.Millisecond * 250)
defer ticker.Stop()
for {
if c.svc.GetState() == otelcol.StateRunning {
c.sendStatus(true, false, nil)
return nil
}
select {
case <-ticker.C:
case <-ctx.Done():
c.svc.Shutdown()
return ctx.Err()
case err := <-startupErr:
if err == nil {
// We want to report an error here, even if the error is nil, because we did not observe
// the collector actually start.
return fmt.Errorf("collector failed to start, and no error was returned")
}
return err
}
}
}
// Status will return the status of the collector.
func (c *collector) Status() <-chan *Status {
return c.statusChan
}
// sendStatus will set the status of the collector
func (c *collector) sendStatus(running, panicked bool, err error) {
select {
case c.statusChan <- &Status{running, panicked, err}:
default:
}
}
// Status is the status of a collector.
type Status struct {
Running bool
Panicked bool
Err error
}