/
results_manager.go
74 lines (61 loc) · 2.04 KB
/
results_manager.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
/*
Copyright 2022 The Kruise Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package podprobe
import (
"sync"
appsv1alpha1 "github.com/openkruise/kruise/apis/apps/v1alpha1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/util/workqueue"
)
const maxSyncProbeTime = 600
// Update is an enum of the types of updates sent over the Updates channel.
type Update struct {
ContainerID string
Key probeKey
State appsv1alpha1.ProbeState
Msg string
LastProbeTime metav1.Time
}
// resultManager implementation, store container probe result
type resultManager struct {
// map of container ID -> probe Result
cache *sync.Map
queue workqueue.RateLimitingInterface
}
// newResultManager creates and returns an empty results resultManager.
func newResultManager(queue workqueue.RateLimitingInterface) *resultManager {
return &resultManager{
cache: &sync.Map{},
queue: queue,
}
}
func (m *resultManager) listResults() []Update {
var results []Update
listFunc := func(key, value any) bool {
results = append(results, value.(Update))
return true
}
m.cache.Range(listFunc)
return results
}
func (m *resultManager) set(id string, key probeKey, result appsv1alpha1.ProbeState, msg string) {
currentTime := metav1.Now()
prev, exists := m.cache.Load(id)
if !exists || prev.(Update).State != result || currentTime.Sub(prev.(Update).LastProbeTime.Time) >= maxSyncProbeTime {
m.cache.Store(id, Update{id, key, result, msg, currentTime})
m.queue.Add("updateStatus")
}
}
func (m *resultManager) remove(id string) {
m.cache.Delete(id)
}