forked from kubernetes/kubernetes
-
Notifications
You must be signed in to change notification settings - Fork 0
/
runtime_cache.go
145 lines (128 loc) · 4.3 KB
/
runtime_cache.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
/*
Copyright 2015 The Kubernetes Authors All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package container
import (
"sync"
"time"
)
var (
// TODO(yifan): Maybe set the them as parameters for NewCache().
defaultCachePeriod = time.Second * 2
defaultUpdateInterval = time.Millisecond * 100
)
type RuntimeCache interface {
GetPods() ([]*Pod, error)
ForceUpdateIfOlder(time.Time) error
}
type podsGetter interface {
GetPods(bool) ([]*Pod, error)
}
// NewRuntimeCache creates a container runtime cache.
func NewRuntimeCache(getter podsGetter) (RuntimeCache, error) {
return &runtimeCache{
getter: getter,
updating: false,
}, nil
}
// runtimeCache caches a list of pods. It records a timestamp (cacheTime) right
// before updating the pods, so the timestamp is at most as new as the pods
// (and can be slightly older). The timestamp always moves forward. Callers are
// expected not to modify the pods returned from GetPods.
// The pod updates can be triggered by a request (e.g., GetPods or
// ForceUpdateIfOlder) if the cached pods are considered stale. These requests
// will be blocked until the cache update is completed. To reduce the cache miss
// penalty, upon a miss, runtimeCache would start a separate goroutine
// (updatingThread) if one does not exist, to periodically updates the cache.
// updatingThread would stop after a period of inactivity (no incoming requests)
// to conserve resources.
type runtimeCache struct {
sync.Mutex
// The underlying container runtime used to update the cache.
getter podsGetter
// Last time when cache was updated.
cacheTime time.Time
// The content of the cache.
pods []*Pod
// Whether the background thread updating the cache is running.
updating bool
// Time when the background thread should be stopped.
updatingThreadStopTime time.Time
}
// GetPods returns the cached pods if they are not outdated; otherwise, it
// retrieves the latest pods and return them.
// If the cache updating loop has stopped, this function will restart it.
func (r *runtimeCache) GetPods() ([]*Pod, error) {
r.Lock()
defer r.Unlock()
if time.Since(r.cacheTime) > defaultCachePeriod {
if err := r.updateCache(); err != nil {
return nil, err
}
}
// Stop refreshing thread if there were no requests within the default cache period
r.updatingThreadStopTime = time.Now().Add(defaultCachePeriod)
if !r.updating {
r.updating = true
go r.startUpdatingCache()
}
return r.pods, nil
}
func (r *runtimeCache) ForceUpdateIfOlder(minExpectedCacheTime time.Time) error {
r.Lock()
defer r.Unlock()
if r.cacheTime.Before(minExpectedCacheTime) {
return r.updateCache()
}
return nil
}
func (r *runtimeCache) updateCache() error {
pods, timestamp, err := r.getPodsWithTimestamp()
if err != nil {
return err
}
r.writePodsIfNewer(pods, timestamp)
return nil
}
// getPodsWithTimestamp records a timestamp and retrieves pods from the getter.
func (r *runtimeCache) getPodsWithTimestamp() ([]*Pod, time.Time, error) {
// Always record the timestamp before getting the pods to avoid stale pods.
timestamp := time.Now()
pods, err := r.getter.GetPods(false)
return pods, timestamp, err
}
// writePodsIfNewer writes the pods and timestamp if they are newer than the
// cached ones.
func (r *runtimeCache) writePodsIfNewer(pods []*Pod, timestamp time.Time) {
if timestamp.After(r.cacheTime) {
r.pods, r.cacheTime = pods, timestamp
}
}
// startUpdateingCache continues to invoke GetPods to get the newest result until
// there are no requests within the default cache period.
func (r *runtimeCache) startUpdatingCache() {
run := true
for run {
time.Sleep(defaultUpdateInterval)
pods, timestamp, err := r.getPodsWithTimestamp()
if err != nil {
continue
}
r.Lock()
if time.Now().After(r.updatingThreadStopTime) {
r.updating = false
run = false
}
r.writePodsIfNewer(pods, timestamp)
r.Unlock()
}
}