/
obj_store.go
155 lines (124 loc) · 3.68 KB
/
obj_store.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0
package k8sclient // import "github.com/open-telemetry/opentelemetry-collector-contrib/internal/aws/k8s/k8sclient"
import (
"fmt"
"sync"
"go.uber.org/zap"
"k8s.io/apimachinery/pkg/api/meta"
"k8s.io/apimachinery/pkg/types"
)
// ObjStore implements the cache.Store interface:
// https://github.com/kubernetes/client-go/blob/release-1.20/tools/cache/store.go#L26-L71
// It is used by cache.Reflector to keep the updated information about resources
// https://github.com/kubernetes/client-go/blob/release-1.20/tools/cache/reflector.go#L48
type ObjStore struct {
mu sync.RWMutex
refreshed bool
objs map[types.UID]any
transformFunc func(any) (any, error)
logger *zap.Logger
}
func NewObjStore(transformFunc func(any) (any, error), logger *zap.Logger) *ObjStore {
return &ObjStore{
transformFunc: transformFunc,
objs: map[types.UID]any{},
logger: logger,
}
}
// GetResetRefreshStatus tracks whether the underlying data store is refreshed or not.
// Calling this func itself will reset the state to false.
func (s *ObjStore) GetResetRefreshStatus() bool {
s.mu.Lock()
defer s.mu.Unlock()
refreshed := s.refreshed
if refreshed {
s.refreshed = false
}
return refreshed
}
// Add implements the Add method of the store interface.
// Add adds an entry to the ObjStore.
func (s *ObjStore) Add(obj any) error {
o, err := meta.Accessor(obj)
if err != nil {
s.logger.Warn(fmt.Sprintf("Cannot find the metadata for %v.", obj))
return err
}
var toCacheObj any
if toCacheObj, err = s.transformFunc(obj); err != nil {
s.logger.Warn(fmt.Sprintf("Failed to update obj %v in the cached store.", obj))
return err
}
s.mu.Lock()
defer s.mu.Unlock()
s.objs[o.GetUID()] = toCacheObj
s.refreshed = true
return nil
}
// Update implements the Update method of the store interface.
// Update updates the existing entry in the ObjStore.
func (s *ObjStore) Update(obj any) error {
return s.Add(obj)
}
// Delete implements the Delete method of the store interface.
// Delete deletes an existing entry in the ObjStore.
func (s *ObjStore) Delete(obj any) error {
o, err := meta.Accessor(obj)
if err != nil {
return err
}
s.mu.Lock()
defer s.mu.Unlock()
delete(s.objs, o.GetUID())
s.refreshed = true
return nil
}
// List implements the List method of the store interface.
// List lists all the objects in the ObjStore
func (s *ObjStore) List() []any {
s.mu.RLock()
defer s.mu.RUnlock()
result := make([]any, 0, len(s.objs))
for _, v := range s.objs {
result = append(result, v)
}
return result
}
// ListKeys implements the ListKeys method of the store interface.
// ListKeys lists the keys for all objects in the ObjStore
func (s *ObjStore) ListKeys() []string {
s.mu.RLock()
defer s.mu.RUnlock()
result := make([]string, 0, len(s.objs))
for k := range s.objs {
result = append(result, string(k))
}
return result
}
// Get implements the Get method of the store interface.
func (s *ObjStore) Get(_ any) (item any, exists bool, err error) {
return nil, false, nil
}
// GetByKey implements the GetByKey method of the store interface.
func (s *ObjStore) GetByKey(_ string) (item any, exists bool, err error) {
return nil, false, nil
}
// Replace implements the Replace method of the store interface.
// Replace will delete the contents of the store, using instead the given list.
func (s *ObjStore) Replace(list []any, _ string) error {
s.mu.Lock()
s.objs = map[types.UID]any{}
s.mu.Unlock()
for _, o := range list {
err := s.Add(o)
if err != nil {
return err
}
}
return nil
}
// Resync implements the Resync method of the store interface.
func (s *ObjStore) Resync() error {
return nil
}