-
Notifications
You must be signed in to change notification settings - Fork 91
/
store2localdiff.go
171 lines (137 loc) · 4.83 KB
/
store2localdiff.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
/*
Copyright 2021 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package store2localdiff
import (
"context"
"runtime/trace"
"strconv"
"sigs.k8s.io/kpng/api/localv1"
"sigs.k8s.io/kpng/client/lightdiffstore"
"sigs.k8s.io/kpng/client/localsink"
"sigs.k8s.io/kpng/server/jobs/store2diff"
"sigs.k8s.io/kpng/server/pkg/endpoints"
"sigs.k8s.io/kpng/server/pkg/server/watchstate"
"sigs.k8s.io/kpng/server/proxystore"
"sigs.k8s.io/kpng/server/serde"
)
type Job struct {
Store *proxystore.Store
Sink localsink.Sink
}
func (j *Job) Run(ctx context.Context) error {
run := &jobRun{
Sink: j.Sink,
}
job := &store2diff.Job{
Store: j.Store,
Sets: []localv1.Set{
// Each one of these Sets will be used in a diffstore below.
localv1.Set_ServicesSet, // setN 0
localv1.Set_EndpointsSet, // setN 0
localv1.Set_EndpointsSet, // setN 1
// 2nd endpoints set for endpoints which do not have a corresponding pod name
},
Sink: run,
}
j.Sink.Setup()
return job.Run(ctx)
}
type jobRun struct {
localsink.Sink
nodeName string
}
func (s *jobRun) Wait() (err error) {
s.nodeName, err = s.WaitRequest()
return
}
func (s *jobRun) Update(tx *proxystore.Tx, w *watchstate.WatchState) {
if !tx.AllSynced() {
return
}
nodeName := s.nodeName
ctx, task := trace.NewTask(context.Background(), "LocalState.Update")
defer task.End()
// Lookup the existing diffstores here, so we can update them...
// We have 2 different endpoint sets: named endpoints
// and unnamed "anonymous" endpoints.
svcs := w.StoreForN(localv1.Set_ServicesSet, 0)
seps := w.StoreForN(localv1.Set_EndpointsSet, 0)
sepsAnonymous := w.StoreForN(localv1.Set_EndpointsSet, 1)
// set all new values
tx.Each(proxystore.Services, func(kv *proxystore.KV) bool {
key := []byte(kv.Namespace + "/" + kv.Name)
if trace.IsEnabled() {
trace.Log(ctx, "service", string(key))
}
svcs.Set(key, kv.Service.Hash, kv.Service.Service)
// iterate through ONLY the endpoints which are valid for
// this node to loadbalance to (i.e. in cases of
// topology constraints or trafficPolicy=Local,
// some endpoints may not be available for
// node to route to).
for _, ei := range endpoints.ForNode(tx, kv.Service, nodeName) {
// endpoints are not hashed, so hash, but hash ONLY the endpoint.
// to avoid false diff triggering in cases where endpoint metadata
// not relevant for "local" decision making (i.e. an endpoint
// annotation or label that is non-consequential).
hash := serde.Hash(ei.Endpoint)
var epKey []byte
// set is a localv1.Set
var set *lightdiffstore.DiffStore
if ei.PodName == "" {
set = sepsAnonymous
// key is service key + endpoint hash (64 bits, in hex)
epKey = append(make([]byte, 0, len(key)+1+64/8*2), key...)
epKey = append(epKey, '/')
epKey = strconv.AppendUint(epKey, hash, 16)
} else {
set = seps
// key is service key + podName
epKey = append(make([]byte, 0, len(key)+1+len(ei.PodName)), key...)
epKey = append(epKey, '/')
epKey = append(epKey, []byte(ei.PodName)...)
}
if trace.IsEnabled() {
trace.Log(ctx, "endpoint", string(epKey))
}
// Insert or update this key in the diffstore
set.Set(epKey, hash, ei.Endpoint)
}
return true
})
}
// SendDiff implements the store2diff interface. Called whenever
// the store2diff implementation recieves an updated from the underlying store.
// See the store2diff impl for this logic.
func (*jobRun) SendDiff(w *watchstate.WatchState) (updated bool) {
_, task := trace.NewTask(context.Background(), "LocalState.SendDiff")
defer task.End()
count := 0
// Create any service first, to avoid orphan endpoints being sent.
count += w.SendUpdates(localv1.Set_ServicesSet)
// Now delete anonymous endpoints (n=1, see comments above)
count += w.SendDeletesN(localv1.Set_EndpointsSet, 1)
// Now send updates for regular endpoints
count += w.SendUpdates(localv1.Set_EndpointsSet)
// And delete the regular endpoints if any
count += w.SendDeletes(localv1.Set_EndpointsSet)
// New anonymous endpoints added
count += w.SendUpdatesN(localv1.Set_EndpointsSet, 1)
// last, we delete any services , so that no endpoints are orphaned
// prematurely.
count += w.SendDeletes(localv1.Set_ServicesSet)
// Tell the diffstore that every item is now in the previous
// window, so the store is empty.
w.Reset(lightdiffstore.ItemDeleted)
return count != 0
}