forked from projectcalico/felix
-
Notifications
You must be signed in to change notification settings - Fork 0
/
calc_graph.go
217 lines (188 loc) · 7.54 KB
/
calc_graph.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
// Copyright (c) 2016-2017 Tigera, Inc. All rights reserved.
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package calc
import (
log "github.com/Sirupsen/logrus"
"github.com/prometheus/client_golang/prometheus"
"github.com/projectcalico/felix/dispatcher"
"github.com/projectcalico/felix/ip"
"github.com/projectcalico/felix/labelindex"
"github.com/projectcalico/libcalico-go/lib/backend/api"
"github.com/projectcalico/libcalico-go/lib/backend/model"
"github.com/projectcalico/libcalico-go/lib/net"
"github.com/projectcalico/libcalico-go/lib/selector"
)
var (
gaugeNumActiveSelectors = prometheus.NewGauge(prometheus.GaugeOpts{
Name: "felix_active_local_selectors",
Help: "Number of active selectors on this host.",
})
gaugeNumActiveTags = prometheus.NewGauge(prometheus.GaugeOpts{
Name: "felix_active_local_tags",
Help: "Number of active tags on this host.",
})
)
func init() {
prometheus.MustRegister(gaugeNumActiveTags)
prometheus.MustRegister(gaugeNumActiveSelectors)
}
type ipSetUpdateCallbacks interface {
OnIPSetAdded(setID string)
OnIPAdded(setID string, ip ip.Addr)
OnIPRemoved(setID string, ip ip.Addr)
OnIPSetRemoved(setID string)
}
type rulesUpdateCallbacks interface {
OnPolicyActive(model.PolicyKey, *ParsedRules)
OnPolicyInactive(model.PolicyKey)
OnProfileActive(model.ProfileRulesKey, *ParsedRules)
OnProfileInactive(model.ProfileRulesKey)
}
type endpointCallbacks interface {
OnEndpointTierUpdate(endpointKey model.Key,
endpoint interface{},
filteredTiers []tierInfo)
}
type configCallbacks interface {
OnConfigUpdate(globalConfig, hostConfig map[string]string)
OnDatastoreNotReady()
}
type passthruCallbacks interface {
OnHostIPUpdate(hostname string, ip *net.IP)
OnHostIPRemove(hostname string)
OnIPPoolUpdate(model.IPPoolKey, *model.IPPool)
OnIPPoolRemove(model.IPPoolKey)
}
type PipelineCallbacks interface {
ipSetUpdateCallbacks
rulesUpdateCallbacks
endpointCallbacks
configCallbacks
passthruCallbacks
}
func NewCalculationGraph(callbacks PipelineCallbacks, hostname string) (allUpdDispatcher *dispatcher.Dispatcher) {
log.Infof("Creating calculation graph, filtered to hostname %v", hostname)
// The source of the processing graph, this dispatcher will be fed all
// the updates from the datastore, fanning them out to the registered
// handlers.
allUpdDispatcher = dispatcher.NewDispatcher()
// Some of the handlers only need to know about local endpoints.
// Create a second dispatcher which will filter out non-local endpoints.
localEndpointDispatcher := dispatcher.NewDispatcher()
(*localEndpointDispatcherReg)(localEndpointDispatcher).RegisterWith(allUpdDispatcher)
localEndpointFilter := &endpointHostnameFilter{hostname: hostname}
localEndpointFilter.RegisterWith(localEndpointDispatcher)
// The active rules calculator matches local endpoints against policies
// and profiles to figure out which policies/profiles are active on this
// host.
activeRulesCalc := NewActiveRulesCalculator()
activeRulesCalc.RegisterWith(localEndpointDispatcher, allUpdDispatcher)
// The rule scanner takes the output from the active rules calculator
// and scans the individual rules for selectors and tags. It generates
// events when a new selector/tag starts/stops being used.
ruleScanner := NewRuleScanner()
activeRulesCalc.RuleScanner = ruleScanner
ruleScanner.RulesUpdateCallbacks = callbacks
// The active selector index matches the active selectors found by the
// rule scanner against *all* endpoints. It emits events when an
// endpoint starts/stops matching one of the active selectors. We
// send the events to the membership calculator, which will extract the
// ip addresses of the endpoints. The member calculator handles tags
// and selectors uniformly but we need to shim the interface because
// it expects a string ID.
var memberCalc *MemberCalculator
activeSelectorIndex := labelindex.NewInheritIndex(
func(selId, labelId interface{}) {
// Match started callback.
memberCalc.MatchStarted(labelId.(model.Key), selId.(string))
},
func(selId, labelId interface{}) {
// Match stopped callback.
memberCalc.MatchStopped(labelId.(model.Key), selId.(string))
},
)
ruleScanner.OnSelectorActive = func(sel selector.Selector) {
log.Infof("Selector %v now active", sel)
callbacks.OnIPSetAdded(sel.UniqueId())
activeSelectorIndex.UpdateSelector(sel.UniqueId(), sel)
gaugeNumActiveSelectors.Inc()
}
ruleScanner.OnSelectorInactive = func(sel selector.Selector) {
log.Infof("Selector %v now inactive", sel)
activeSelectorIndex.DeleteSelector(sel.UniqueId())
callbacks.OnIPSetRemoved(sel.UniqueId())
gaugeNumActiveSelectors.Dec()
}
activeSelectorIndex.RegisterWith(allUpdDispatcher)
// The member calculator merges the IPs from different endpoints to
// calculate the actual IPs that should be in each IP set. It deals
// with corner cases, such as having the same IP on multiple endpoints.
memberCalc = NewMemberCalculator()
// It needs to know about *all* endpoints to do the calculation.
memberCalc.RegisterWith(allUpdDispatcher)
// Hook it up to the output.
memberCalc.callbacks = callbacks
// The endpoint policy resolver marries up the active policies with
// local endpoints and calculates the complete, ordered set of
// policies that apply to each endpoint.
polResolver := NewPolicyResolver()
// Hook up the inputs to the policy resolver.
activeRulesCalc.PolicyMatchListener = polResolver
polResolver.RegisterWith(allUpdDispatcher, localEndpointDispatcher)
// And hook its output to the callbacks.
polResolver.Callbacks = callbacks
// Register for host IP updates.
hostIPPassthru := NewDataplanePassthru(callbacks)
hostIPPassthru.RegisterWith(allUpdDispatcher)
// Register for config updates.
configBatcher := NewConfigBatcher(hostname, callbacks)
configBatcher.RegisterWith(allUpdDispatcher)
return allUpdDispatcher
}
type localEndpointDispatcherReg dispatcher.Dispatcher
func (l *localEndpointDispatcherReg) RegisterWith(disp *dispatcher.Dispatcher) {
led := (*dispatcher.Dispatcher)(l)
disp.Register(model.WorkloadEndpointKey{}, led.OnUpdate)
disp.Register(model.HostEndpointKey{}, led.OnUpdate)
disp.RegisterStatusHandler(led.OnDatamodelStatus)
}
// endpointHostnameFilter provides an UpdateHandler that filters out endpoints
// that are not on the given host.
type endpointHostnameFilter struct {
hostname string
}
func (f *endpointHostnameFilter) RegisterWith(localEndpointDisp *dispatcher.Dispatcher) {
localEndpointDisp.Register(model.WorkloadEndpointKey{}, f.OnUpdate)
localEndpointDisp.Register(model.HostEndpointKey{}, f.OnUpdate)
}
func (f *endpointHostnameFilter) OnUpdate(update api.Update) (filterOut bool) {
switch key := update.Key.(type) {
case model.WorkloadEndpointKey:
if key.Hostname != f.hostname {
filterOut = true
}
case model.HostEndpointKey:
if key.Hostname != f.hostname {
filterOut = true
}
}
if !filterOut {
// To keep log spam down, log only for local endpoints.
if update.Value == nil {
log.WithField("id", update.Key).Info("Local endpoint deleted")
} else {
log.WithField("id", update.Key).Info("Local endpoint updated")
}
}
return
}