/
cache.go
237 lines (212 loc) · 7.61 KB
/
cache.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
/*
Copyright 2015 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package container
import (
"sync"
"time"
"k8s.io/apimachinery/pkg/types"
utilfeature "k8s.io/apiserver/pkg/util/feature"
"k8s.io/kubernetes/pkg/features"
)
// Cache stores the PodStatus for the pods. It represents *all* the visible
// pods/containers in the container runtime. All cache entries are at least as
// new or newer than the global timestamp (set by UpdateTime()), while
// individual entries may be slightly newer than the global timestamp. If a pod
// has no states known by the runtime, Cache returns an empty PodStatus object
// with ID populated.
//
// Cache provides two methods to retrieve the PodStatus: the non-blocking Get()
// and the blocking GetNewerThan() method. The component responsible for
// populating the cache is expected to call Delete() to explicitly free the
// cache entries.
type Cache interface {
Get(types.UID) (*PodStatus, error)
// Set updates the cache by setting the PodStatus for the pod only
// if the data is newer than the cache based on the provided
// time stamp. Returns if the cache was updated.
Set(types.UID, *PodStatus, error, time.Time) (updated bool)
// GetNewerThan is a blocking call that only returns the status
// when it is newer than the given time.
GetNewerThan(types.UID, time.Time) (*PodStatus, error)
Delete(types.UID)
UpdateTime(time.Time)
}
type data struct {
// Status of the pod.
status *PodStatus
// Error got when trying to inspect the pod.
err error
// Time when the data was last modified.
modified time.Time
}
type subRecord struct {
time time.Time
ch chan *data
}
// cache implements Cache.
type cache struct {
// Lock which guards all internal data structures.
lock sync.RWMutex
// Map that stores the pod statuses.
pods map[types.UID]*data
// A global timestamp represents how fresh the cached data is. All
// cache content is at the least newer than this timestamp. Note that the
// timestamp is nil after initialization, and will only become non-nil when
// it is ready to serve the cached statuses.
timestamp *time.Time
// Map that stores the subscriber records.
subscribers map[types.UID][]*subRecord
}
// NewCache creates a pod cache.
func NewCache() Cache {
return &cache{pods: map[types.UID]*data{}, subscribers: map[types.UID][]*subRecord{}}
}
// Get returns the PodStatus for the pod; callers are expected not to
// modify the objects returned.
func (c *cache) Get(id types.UID) (*PodStatus, error) {
c.lock.RLock()
defer c.lock.RUnlock()
d := c.get(id)
return d.status, d.err
}
func (c *cache) GetNewerThan(id types.UID, minTime time.Time) (*PodStatus, error) {
ch := c.subscribe(id, minTime)
d := <-ch
return d.status, d.err
}
// Set sets the PodStatus for the pod only if the data is newer than the cache
func (c *cache) Set(id types.UID, status *PodStatus, err error, timestamp time.Time) (updated bool) {
c.lock.Lock()
defer c.lock.Unlock()
if utilfeature.DefaultFeatureGate.Enabled(features.EventedPLEG) {
// Set the value in the cache only if it's not present already
// or the timestamp in the cache is older than the current update timestamp
if cachedVal, ok := c.pods[id]; ok && cachedVal.modified.After(timestamp) {
return false
}
}
c.pods[id] = &data{status: status, err: err, modified: timestamp}
c.notify(id, timestamp)
return true
}
// Delete removes the entry of the pod.
func (c *cache) Delete(id types.UID) {
c.lock.Lock()
defer c.lock.Unlock()
delete(c.pods, id)
}
// UpdateTime modifies the global timestamp of the cache and notify
// subscribers if needed.
func (c *cache) UpdateTime(timestamp time.Time) {
c.lock.Lock()
defer c.lock.Unlock()
c.timestamp = ×tamp
// Notify all the subscribers if the condition is met.
for id := range c.subscribers {
c.notify(id, *c.timestamp)
}
}
func makeDefaultData(id types.UID) *data {
return &data{status: &PodStatus{ID: id}, err: nil}
}
func (c *cache) get(id types.UID) *data {
d, ok := c.pods[id]
if !ok {
// Cache should store *all* pod/container information known by the
// container runtime. A cache miss indicates that there are no states
// regarding the pod last time we queried the container runtime.
// What this *really* means is that there are no visible pod/containers
// associated with this pod. Simply return an default (mostly empty)
// PodStatus to reflect this.
return makeDefaultData(id)
}
return d
}
// getIfNewerThan returns the data it is newer than the given time.
// Otherwise, it returns nil. The caller should acquire the lock.
func (c *cache) getIfNewerThan(id types.UID, minTime time.Time) *data {
d, ok := c.pods[id]
if utilfeature.DefaultFeatureGate.Enabled(features.EventedPLEG) {
// Evented PLEG has CREATED, STARTED, STOPPED and DELETED events
// However if the container creation fails for some reason there is no
// CRI event received by the kubelet and that pod will get stuck a
// GetNewerThan call in the pod workers. This is reproducible with
// the node e2e test,
// https://github.com/kubernetes/kubernetes/blob/83415e5c9e6e59a3d60a148160490560af2178a1/test/e2e_node/pod_hostnamefqdn_test.go#L161
// which forces failure during pod creation. This issue also exists in
// Generic PLEG but since it updates global timestamp periodically
// the GetNewerThan call gets unstuck.
// During node e2e tests, it was observed this change does not have any
// adverse impact on the behaviour of the Generic PLEG as well.
switch {
case !ok:
return makeDefaultData(id)
case ok && (d.modified.After(minTime) || (c.timestamp != nil && c.timestamp.After(minTime))):
return d
default:
return nil
}
}
globalTimestampIsNewer := (c.timestamp != nil && c.timestamp.After(minTime))
if !ok && globalTimestampIsNewer {
// Status is not cached, but the global timestamp is newer than
// minTime, return the default status.
return makeDefaultData(id)
}
if ok && (d.modified.After(minTime) || globalTimestampIsNewer) {
// Status is cached, return status if either of the following is true.
// * status was modified after minTime
// * the global timestamp of the cache is newer than minTime.
return d
}
// The pod status is not ready.
return nil
}
// notify sends notifications for pod with the given id, if the requirements
// are met. Note that the caller should acquire the lock.
func (c *cache) notify(id types.UID, timestamp time.Time) {
list, ok := c.subscribers[id]
if !ok {
// No one to notify.
return
}
newList := []*subRecord{}
for i, r := range list {
if timestamp.Before(r.time) {
// Doesn't meet the time requirement; keep the record.
newList = append(newList, list[i])
continue
}
r.ch <- c.get(id)
close(r.ch)
}
if len(newList) == 0 {
delete(c.subscribers, id)
} else {
c.subscribers[id] = newList
}
}
func (c *cache) subscribe(id types.UID, timestamp time.Time) chan *data {
ch := make(chan *data, 1)
c.lock.Lock()
defer c.lock.Unlock()
d := c.getIfNewerThan(id, timestamp)
if d != nil {
// If the cache entry is ready, send the data and return immediately.
ch <- d
return ch
}
// Add the subscription record.
c.subscribers[id] = append(c.subscribers[id], &subRecord{time: timestamp, ch: ch})
return ch
}