/
swappable.go
110 lines (92 loc) · 3.86 KB
/
swappable.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
/*
Copyright 2018 The Knative Authors
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
// Package swappable provides an http.Handler that delegates all HTTP requests to an underlying
// multichannelfanout.Handler. When a new configuration is available, a new
// multichannelfanout.Handler is created and swapped in. All subsequent requests go to the new
// handler.
// It is often used in conjunction with something that notices changes to ConfigMaps, such as
// configmap.watcher or configmap.filesystem.
package swappable
import (
"context"
"errors"
"sync"
"sync/atomic"
cloudevents "github.com/cloudevents/sdk-go/v1"
"go.uber.org/zap"
"knative.dev/eventing/pkg/channel/multichannelfanout"
)
// Handler is an http.Handler that atomically swaps between underlying handlers.
type Handler struct {
// The current multichannelfanout.Handler to delegate HTTP requests to. Never use this directly,
// instead use {get,set}MultiChannelFanoutHandler, which enforces the type we expect.
fanout atomic.Value
updateLock sync.Mutex
logger *zap.Logger
}
// UpdateConfig updates the configuration to use the new config, returning an error if it can't.
type UpdateConfig func(config *multichannelfanout.Config) error
var _ UpdateConfig = (&Handler{}).UpdateConfig
// NewHandler creates a new swappable.Handler.
func NewHandler(handler *multichannelfanout.Handler, logger *zap.Logger) *Handler {
h := &Handler{
logger: logger.With(zap.String("httpHandler", "swappable")),
}
h.setMultiChannelFanoutHandler(handler)
return h
}
// NewEmptyHandler creates a new swappable.Handler with an empty configuration.
func NewEmptyHandler(logger *zap.Logger) (*Handler, error) {
h, err := multichannelfanout.NewHandler(logger, multichannelfanout.Config{})
if err != nil {
return nil, err
}
return NewHandler(h, logger), nil
}
// getMultiChannelFanoutHandler gets the current multichannelfanout.Handler to delegate all HTTP
// requests to.
func (h *Handler) getMultiChannelFanoutHandler() *multichannelfanout.Handler {
return h.fanout.Load().(*multichannelfanout.Handler)
}
// setMultiChannelFanoutHandler sets a new multichannelfanout.Handler to delegate all subsequent
// HTTP requests to.
func (h *Handler) setMultiChannelFanoutHandler(nh *multichannelfanout.Handler) {
h.fanout.Store(nh)
}
// UpdateConfig copies the current inner multichannelfanout.Handler with the new configuration. If
// the new configuration is valid, then the new inner handler is swapped in and will start serving
// HTTP traffic.
func (h *Handler) UpdateConfig(config *multichannelfanout.Config) error {
if config == nil {
return errors.New("nil config")
}
h.updateLock.Lock()
defer h.updateLock.Unlock()
ih := h.getMultiChannelFanoutHandler()
if diff := ih.ConfigDiff(*config); diff != "" {
h.logger.Info("Updating config (-old +new)", zap.String("diff", diff))
newIh, err := ih.CopyWithNewConfig(*config)
if err != nil {
h.logger.Info("Unable to update config", zap.Error(err), zap.Any("config", config))
return err
}
h.setMultiChannelFanoutHandler(newIh)
}
return nil
}
// ServeHTTP delegates all HTTP requests to the current multichannelfanout.Handler.
func (h *Handler) ServeHTTP(ctx context.Context, event cloudevents.Event, resp *cloudevents.EventResponse) error {
// Hand work off to the current multi channel fanout handler.
h.logger.Debug("ServeHTTP request received")
return h.getMultiChannelFanoutHandler().ServeHTTP(ctx, event, resp)
}