/
scheduler.go
279 lines (237 loc) · 7.75 KB
/
scheduler.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
// Copyright 2020 Palantir Technologies, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package githubapp
import (
"context"
"sync/atomic"
"time"
"github.com/pkg/errors"
"github.com/rcrowley/go-metrics"
"github.com/rs/zerolog"
)
const (
MetricsKeyQueueLength = "github.event.queued"
MetricsKeyActiveWorkers = "github.event.workers"
MetricsKeyEventAge = "github.event.age"
MetricsKeyDroppedEvents = "github.event.dropped"
)
const (
// values from metrics.NewTimer, which match those used by UNIX load averages
histogramReservoirSize = 1028
histogramAlpha = 0.015
)
var (
ErrCapacityExceeded = errors.New("scheduler: capacity exceeded")
)
// Dispatch is a webhook payload and the handler that handles it.
type Dispatch struct {
Handler EventHandler
EventType string
DeliveryID string
Payload []byte
}
// Execute calls the Dispatch's handler with the stored arguments.
func (d Dispatch) Execute(ctx context.Context) error {
return d.Handler.Handle(ctx, d.EventType, d.DeliveryID, d.Payload)
}
// AsyncErrorCallback is called by an asynchronous scheduler when an event
// handler returns an error or panics. The error from the handler is passed
// directly as the final argument.
//
// If the handler panics, err will be a HandlerPanicError.
type AsyncErrorCallback func(ctx context.Context, d Dispatch, err error)
// DefaultAsyncErrorCallback logs errors.
func DefaultAsyncErrorCallback(ctx context.Context, d Dispatch, err error) {
defaultAsyncErrorCallback(ctx, d, err)
}
var defaultAsyncErrorCallback = MetricsAsyncErrorCallback(nil)
// MetricsAsyncErrorCallback logs errors and increments an error counter.
func MetricsAsyncErrorCallback(reg metrics.Registry) AsyncErrorCallback {
return func(ctx context.Context, d Dispatch, err error) {
zerolog.Ctx(ctx).Error().Err(err).Msg("Unexpected error handling webhook")
errorCounter(reg, d.EventType).Inc(1)
}
}
// ContextDeriver creates a new independent context from a request's context.
// The new context must be based on context.Background(), not the input.
type ContextDeriver func(context.Context) context.Context
// DefaultContextDeriver copies the logger from the request's context to a new
// context.
func DefaultContextDeriver(ctx context.Context) context.Context {
newCtx := context.Background()
// this value is always unused by async schedulers, but is set for
// compatibility with existing handlers that call SetResponder
newCtx = InitializeResponder(newCtx)
return zerolog.Ctx(ctx).WithContext(newCtx)
}
// Scheduler is a strategy for executing event handlers.
//
// The Schedule method takes a Dispatch and executes it by calling the handler
// for the payload. The execution may be asynchronous, but the scheduler must
// create a new context in this case. The dispatcher waits for Schedule to
// return before responding to GitHub, so asynchronous schedulers should only
// return errors that happen during scheduling, not during execution.
//
// Schedule may return ErrCapacityExceeded if it cannot schedule or queue new
// events at the time of the call.
type Scheduler interface {
Schedule(ctx context.Context, d Dispatch) error
}
// SchedulerOption configures properties of a scheduler.
type SchedulerOption func(*scheduler)
// WithAsyncErrorCallback sets the error callback for an asynchronous
// scheduler. If not set, the scheduler uses DefaultAsyncErrorCallback.
func WithAsyncErrorCallback(onError AsyncErrorCallback) SchedulerOption {
return func(s *scheduler) {
if onError != nil {
s.onError = onError
}
}
}
// WithContextDeriver sets the context deriver for an asynchronous scheduler.
// If not set, the scheduler uses DefaultContextDeriver.
func WithContextDeriver(deriver ContextDeriver) SchedulerOption {
return func(s *scheduler) {
if deriver != nil {
s.deriver = deriver
}
}
}
// WithSchedulingMetrics enables metrics reporting for schedulers.
func WithSchedulingMetrics(r metrics.Registry) SchedulerOption {
return func(s *scheduler) {
metrics.NewRegisteredFunctionalGauge(MetricsKeyQueueLength, r, func() int64 {
return int64(len(s.queue))
})
metrics.NewRegisteredFunctionalGauge(MetricsKeyActiveWorkers, r, func() int64 {
return atomic.LoadInt64(&s.activeWorkers)
})
sample := metrics.NewExpDecaySample(histogramReservoirSize, histogramAlpha)
s.eventAge = metrics.NewRegisteredHistogram(MetricsKeyEventAge, r, sample)
s.dropped = metrics.NewRegisteredCounter(MetricsKeyDroppedEvents, r)
}
}
type queueDispatch struct {
ctx context.Context
t time.Time
d Dispatch
}
// core functionality and options for (async) schedulers
type scheduler struct {
onError AsyncErrorCallback
deriver ContextDeriver
activeWorkers int64
queue chan queueDispatch
eventAge metrics.Histogram
dropped metrics.Counter
}
func (s *scheduler) safeExecute(ctx context.Context, d Dispatch) {
var err error
defer func() {
atomic.AddInt64(&s.activeWorkers, -1)
if r := recover(); r != nil {
err = HandlerPanicError{
value: r,
stack: getStack(1),
}
}
if err != nil && s.onError != nil {
s.onError(ctx, d, err)
}
}()
atomic.AddInt64(&s.activeWorkers, 1)
err = d.Execute(ctx)
}
func (s *scheduler) derive(ctx context.Context) context.Context {
if s.deriver == nil {
return ctx
}
return s.deriver(ctx)
}
// DefaultScheduler returns a scheduler that executes handlers in the go
// routine of the caller and returns any error.
func DefaultScheduler() Scheduler {
return &defaultScheduler{}
}
type defaultScheduler struct{}
func (s *defaultScheduler) Schedule(ctx context.Context, d Dispatch) error {
return d.Execute(ctx)
}
// AsyncScheduler returns a scheduler that executes handlers in new goroutines.
// Goroutines are not reused and there is no limit on the number created.
func AsyncScheduler(opts ...SchedulerOption) Scheduler {
s := &asyncScheduler{
scheduler: scheduler{
deriver: DefaultContextDeriver,
onError: DefaultAsyncErrorCallback,
},
}
for _, opt := range opts {
opt(&s.scheduler)
}
return s
}
type asyncScheduler struct {
scheduler
}
func (s *asyncScheduler) Schedule(ctx context.Context, d Dispatch) error {
go s.safeExecute(s.derive(ctx), d)
return nil
}
// QueueAsyncScheduler returns a scheduler that executes handlers in a fixed
// number of worker goroutines. If no workers are available, events queue until
// the queue is full.
func QueueAsyncScheduler(queueSize int, workers int, opts ...SchedulerOption) Scheduler {
if queueSize < 0 {
panic("QueueAsyncScheduler: queue size must be non-negative")
}
if workers < 1 {
panic("QueueAsyncScheduler: worker count must be positive")
}
s := &queueScheduler{
scheduler: scheduler{
deriver: DefaultContextDeriver,
onError: DefaultAsyncErrorCallback,
queue: make(chan queueDispatch, queueSize),
},
}
for _, opt := range opts {
opt(&s.scheduler)
}
for i := 0; i < workers; i++ {
go func() {
for d := range s.queue {
if s.eventAge != nil {
s.eventAge.Update(time.Since(d.t).Milliseconds())
}
s.safeExecute(d.ctx, d.d)
}
}()
}
return s
}
type queueScheduler struct {
scheduler
}
func (s *queueScheduler) Schedule(ctx context.Context, d Dispatch) error {
select {
case s.queue <- queueDispatch{ctx: s.derive(ctx), t: time.Now(), d: d}:
default:
if s.dropped != nil {
s.dropped.Inc(1)
}
return ErrCapacityExceeded
}
return nil
}