forked from openshift/origin
-
Notifications
You must be signed in to change notification settings - Fork 0
/
shared_informer.go
159 lines (127 loc) · 5.29 KB
/
shared_informer.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
package shared
import (
"reflect"
"sync"
"time"
"k8s.io/kubernetes/pkg/api/unversioned"
"k8s.io/kubernetes/pkg/client/cache"
kclient "k8s.io/kubernetes/pkg/client/unversioned"
"k8s.io/kubernetes/pkg/controller/framework"
oclient "github.com/openshift/origin/pkg/client"
)
type InformerFactory interface {
// Start starts informers that can start AFTER the API server and controllers have started
Start(stopCh <-chan struct{})
// StartCore starts core informers that must initialize in order for the API server to start
StartCore(stopCh <-chan struct{})
Pods() PodInformer
Namespaces() NamespaceInformer
Nodes() NodeInformer
PersistentVolumes() PersistentVolumeInformer
PersistentVolumeClaims() PersistentVolumeClaimInformer
ReplicationControllers() ReplicationControllerInformer
ClusterPolicies() ClusterPolicyInformer
ClusterPolicyBindings() ClusterPolicyBindingInformer
Policies() PolicyInformer
PolicyBindings() PolicyBindingInformer
DeploymentConfigs() DeploymentConfigInformer
ImageStreams() ImageStreamInformer
SecurityContextConstraints() SecurityContextConstraintsInformer
ClusterResourceQuotas() ClusterResourceQuotaInformer
}
// ListerWatcherOverrides allows a caller to specify special behavior for particular ListerWatchers
// For instance, authentication and authorization types need to go direct to etcd, not through an API server
type ListerWatcherOverrides interface {
// GetListerWatcher returns back a ListerWatcher for a given resource or nil if
// no particular ListerWatcher was specified for the type
GetListerWatcher(resource unversioned.GroupResource) cache.ListerWatcher
}
type DefaultListerWatcherOverrides map[unversioned.GroupResource]cache.ListerWatcher
func (o DefaultListerWatcherOverrides) GetListerWatcher(resource unversioned.GroupResource) cache.ListerWatcher {
return o[resource]
}
func NewInformerFactory(kubeClient kclient.Interface, originClient oclient.Interface, customListerWatchers ListerWatcherOverrides, defaultResync time.Duration) InformerFactory {
return &sharedInformerFactory{
kubeClient: kubeClient,
originClient: originClient,
customListerWatchers: customListerWatchers,
defaultResync: defaultResync,
informers: map[reflect.Type]framework.SharedIndexInformer{},
coreInformers: map[reflect.Type]framework.SharedIndexInformer{},
startedInformers: map[reflect.Type]bool{},
startedCoreInformers: map[reflect.Type]bool{},
}
}
type sharedInformerFactory struct {
kubeClient kclient.Interface
originClient oclient.Interface
customListerWatchers ListerWatcherOverrides
defaultResync time.Duration
informers map[reflect.Type]framework.SharedIndexInformer
coreInformers map[reflect.Type]framework.SharedIndexInformer
startedInformers map[reflect.Type]bool
startedCoreInformers map[reflect.Type]bool
lock sync.Mutex
}
func (f *sharedInformerFactory) Start(stopCh <-chan struct{}) {
f.lock.Lock()
defer f.lock.Unlock()
for informerType, informer := range f.informers {
if !f.startedInformers[informerType] {
go informer.Run(stopCh)
f.startedInformers[informerType] = true
}
}
}
func (f *sharedInformerFactory) StartCore(stopCh <-chan struct{}) {
f.lock.Lock()
defer f.lock.Unlock()
for informerType, informer := range f.coreInformers {
if !f.startedCoreInformers[informerType] {
go informer.Run(stopCh)
f.startedCoreInformers[informerType] = true
}
}
}
func (f *sharedInformerFactory) Pods() PodInformer {
return &podInformer{sharedInformerFactory: f}
}
func (f *sharedInformerFactory) Nodes() NodeInformer {
return &nodeInformer{sharedInformerFactory: f}
}
func (f *sharedInformerFactory) PersistentVolumes() PersistentVolumeInformer {
return &persistentVolumeInformer{sharedInformerFactory: f}
}
func (f *sharedInformerFactory) PersistentVolumeClaims() PersistentVolumeClaimInformer {
return &persistentVolumeClaimInformer{sharedInformerFactory: f}
}
func (f *sharedInformerFactory) ReplicationControllers() ReplicationControllerInformer {
return &replicationControllerInformer{sharedInformerFactory: f}
}
func (f *sharedInformerFactory) Namespaces() NamespaceInformer {
return &namespaceInformer{sharedInformerFactory: f}
}
func (f *sharedInformerFactory) ClusterPolicies() ClusterPolicyInformer {
return &clusterPolicyInformer{sharedInformerFactory: f}
}
func (f *sharedInformerFactory) ClusterPolicyBindings() ClusterPolicyBindingInformer {
return &clusterPolicyBindingInformer{sharedInformerFactory: f}
}
func (f *sharedInformerFactory) Policies() PolicyInformer {
return &policyInformer{sharedInformerFactory: f}
}
func (f *sharedInformerFactory) PolicyBindings() PolicyBindingInformer {
return &policyBindingInformer{sharedInformerFactory: f}
}
func (f *sharedInformerFactory) DeploymentConfigs() DeploymentConfigInformer {
return &deploymentConfigInformer{sharedInformerFactory: f}
}
func (f *sharedInformerFactory) ImageStreams() ImageStreamInformer {
return &imageStreamInformer{sharedInformerFactory: f}
}
func (f *sharedInformerFactory) SecurityContextConstraints() SecurityContextConstraintsInformer {
return &securityContextConstraintsInformer{sharedInformerFactory: f}
}
func (f *sharedInformerFactory) ClusterResourceQuotas() ClusterResourceQuotaInformer {
return &clusterResourceQuotaInformer{sharedInformerFactory: f}
}