-
Notifications
You must be signed in to change notification settings - Fork 405
/
object_count_tracker.go
169 lines (139 loc) · 5.04 KB
/
object_count_tracker.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
/*
Copyright 2021 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package request
import (
"errors"
"sync"
"time"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/klog/v2"
"k8s.io/utils/clock"
)
const (
// type deletion (it applies mostly to CRD) is not a very frequent
// operation so we can afford to prune the cache at a large interval.
// at the same time, we also want to make sure that the scalability
// tests hit this code path.
pruneInterval = 1 * time.Hour
// the storage layer polls for object count at every 1m interval, we will allow
// up to 2-3 transient failures to get the latest count for a given resource.
staleTolerationThreshold = 3 * time.Minute
)
var (
// ObjectCountNotFoundErr is returned when the object count for
// a given resource is not being tracked.
ObjectCountNotFoundErr = errors.New("object count not found for the given resource")
// ObjectCountStaleErr is returned when the object count for a
// given resource has gone stale due to transient failures.
ObjectCountStaleErr = errors.New("object count has gone stale for the given resource")
)
// StorageObjectCountTracker is an interface that is used to keep track of
// of the total number of objects for each resource.
// {group}.{resource} is used as the key name to update and retrieve
// the total number of objects for a given resource.
type StorageObjectCountTracker interface {
// Set is invoked to update the current number of total
// objects for the given resource
Set(string, int64)
// Get returns the total number of objects for the given resource.
// The following errors are returned:
// - if the count has gone stale for a given resource due to transient
// failures ObjectCountStaleErr is returned.
// - if the given resource is not being tracked then
// ObjectCountNotFoundErr is returned.
Get(string) (int64, error)
// RunUntil starts all the necessary maintenance.
RunUntil(stopCh <-chan struct{})
}
// NewStorageObjectCountTracker returns an instance of
// StorageObjectCountTracker interface that can be used to
// keep track of the total number of objects for each resource.
func NewStorageObjectCountTracker() StorageObjectCountTracker {
return &objectCountTracker{
clock: &clock.RealClock{},
counts: map[string]*timestampedCount{},
}
}
// timestampedCount stores the count of a given resource with a last updated
// timestamp so we can prune it after it goes stale for certain threshold.
type timestampedCount struct {
count int64
lastUpdatedAt time.Time
}
// objectCountTracker implements StorageObjectCountTracker with
// reader/writer mutual exclusion lock.
type objectCountTracker struct {
clock clock.PassiveClock
lock sync.RWMutex
counts map[string]*timestampedCount
}
func (t *objectCountTracker) Set(groupResource string, count int64) {
if count <= -1 {
// a value of -1 indicates that the 'Count' call failed to contact
// the storage layer, in most cases this error can be transient.
// we will continue to work with the count that is in the cache
// up to a certain threshold defined by staleTolerationThreshold.
// in case this becomes a non transient error then the count for
// the given resource will will eventually be removed from
// the cache by the pruner.
return
}
now := t.clock.Now()
// lock for writing
t.lock.Lock()
defer t.lock.Unlock()
if item, ok := t.counts[groupResource]; ok {
item.count = count
item.lastUpdatedAt = now
return
}
t.counts[groupResource] = ×tampedCount{
count: count,
lastUpdatedAt: now,
}
}
func (t *objectCountTracker) Get(groupResource string) (int64, error) {
staleThreshold := t.clock.Now().Add(-staleTolerationThreshold)
t.lock.RLock()
defer t.lock.RUnlock()
if item, ok := t.counts[groupResource]; ok {
if item.lastUpdatedAt.Before(staleThreshold) {
return item.count, ObjectCountStaleErr
}
return item.count, nil
}
return 0, ObjectCountNotFoundErr
}
// RunUntil runs all the necessary maintenance.
func (t *objectCountTracker) RunUntil(stopCh <-chan struct{}) {
wait.PollUntil(
pruneInterval,
func() (bool, error) {
// always prune at every pruneInterval
return false, t.prune(pruneInterval)
}, stopCh)
klog.InfoS("StorageObjectCountTracker pruner is exiting")
}
func (t *objectCountTracker) prune(threshold time.Duration) error {
oldestLastUpdatedAtAllowed := t.clock.Now().Add(-threshold)
// lock for writing
t.lock.Lock()
defer t.lock.Unlock()
for groupResource, count := range t.counts {
if count.lastUpdatedAt.After(oldestLastUpdatedAtAllowed) {
continue
}
delete(t.counts, groupResource)
}
return nil
}