/
binding.go
271 lines (238 loc) · 9.81 KB
/
binding.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
package helper
import (
"context"
"sort"
corev1 "k8s.io/api/core/v1"
apiextensionsv1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/api/meta"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/util/errors"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/client-go/dynamic"
"k8s.io/klog/v2"
"sigs.k8s.io/controller-runtime/pkg/client"
workv1alpha1 "github.com/karmada-io/karmada/pkg/apis/work/v1alpha1"
workv1alpha2 "github.com/karmada-io/karmada/pkg/apis/work/v1alpha2"
"github.com/karmada-io/karmada/pkg/util"
"github.com/karmada-io/karmada/pkg/util/informermanager"
"github.com/karmada-io/karmada/pkg/util/names"
"github.com/karmada-io/karmada/pkg/util/restmapper"
)
// ClusterWeightInfo records the weight of a cluster
type ClusterWeightInfo struct {
ClusterName string
Weight int64
}
// ClusterWeightInfoList is a slice of ClusterWeightInfo that implements sort.Interface to sort by Value.
type ClusterWeightInfoList []ClusterWeightInfo
func (p ClusterWeightInfoList) Swap(i, j int) { p[i], p[j] = p[j], p[i] }
func (p ClusterWeightInfoList) Len() int { return len(p) }
func (p ClusterWeightInfoList) Less(i, j int) bool {
if p[i].Weight != p[j].Weight {
return p[i].Weight > p[j].Weight
}
return p[i].ClusterName < p[j].ClusterName
}
// SortClusterByWeight sort clusters by the weight
func SortClusterByWeight(m map[string]int64) ClusterWeightInfoList {
p := make(ClusterWeightInfoList, len(m))
i := 0
for k, v := range m {
p[i] = ClusterWeightInfo{k, v}
i++
}
sort.Sort(p)
return p
}
// IsBindingScheduled will check if resourceBinding/clusterResourceBinding is successfully scheduled.
func IsBindingScheduled(status *workv1alpha2.ResourceBindingStatus) bool {
return meta.IsStatusConditionTrue(status.Conditions, workv1alpha2.Scheduled)
}
// HasScheduledReplica checks if the scheduler has assigned replicas for each cluster.
func HasScheduledReplica(scheduleResult []workv1alpha2.TargetCluster) bool {
for _, clusterResult := range scheduleResult {
if clusterResult.Replicas > 0 {
return true
}
}
return false
}
// GetBindingClusterNames will get clusterName list from bind clusters field and requiredBy field.
func GetBindingClusterNames(targetClusters []workv1alpha2.TargetCluster, bindingSnapshot []workv1alpha2.BindingSnapshot) []string {
clusterNames := util.ConvertToClusterNames(targetClusters)
for _, binding := range bindingSnapshot {
for _, targetCluster := range binding.Clusters {
clusterNames.Insert(targetCluster.Name)
}
}
return clusterNames.List()
}
// FindOrphanWorks retrieves all works that labeled with current binding(ResourceBinding or ClusterResourceBinding) objects,
// then pick the works that not meet current binding declaration.
func FindOrphanWorks(c client.Client, bindingNamespace, bindingName string, clusterNames []string, scope apiextensionsv1.ResourceScope) ([]workv1alpha1.Work, error) {
var needJudgeWorks []workv1alpha1.Work
if scope == apiextensionsv1.NamespaceScoped {
workList, err := GetWorksByLabelsSet(c, labels.Set{
workv1alpha2.ResourceBindingReferenceKey: names.GenerateBindingReferenceKey(bindingNamespace, bindingName),
})
if err != nil {
klog.Errorf("Failed to get works by ResourceBinding(%s/%s): %v", bindingNamespace, bindingName, err)
return nil, err
}
needJudgeWorks = append(needJudgeWorks, workList.Items...)
} else {
workList, err := GetWorksByLabelsSet(c, labels.Set{
workv1alpha2.ClusterResourceBindingReferenceKey: names.GenerateBindingReferenceKey("", bindingName),
})
if err != nil {
klog.Errorf("Failed to get works by ClusterResourceBinding(%s): %v", bindingName, err)
return nil, err
}
needJudgeWorks = append(needJudgeWorks, workList.Items...)
}
var orphanWorks []workv1alpha1.Work
expectClusters := sets.NewString(clusterNames...)
for _, work := range needJudgeWorks {
workTargetCluster, err := names.GetClusterName(work.GetNamespace())
if err != nil {
klog.Errorf("Failed to get cluster name which Work %s/%s belongs to. Error: %v.",
work.GetNamespace(), work.GetName(), err)
return nil, err
}
if !expectClusters.Has(workTargetCluster) {
orphanWorks = append(orphanWorks, work)
}
}
return orphanWorks, nil
}
// RemoveOrphanWorks will remove orphan works.
func RemoveOrphanWorks(c client.Client, works []workv1alpha1.Work) error {
var errs []error
for workIndex, work := range works {
err := c.Delete(context.TODO(), &works[workIndex])
if err != nil {
klog.Errorf("Failed to delete orphan work %s/%s, err is %v", work.GetNamespace(), work.GetName(), err)
errs = append(errs, err)
continue
}
klog.Infof("Delete orphan work %s/%s successfully.", work.GetNamespace(), work.GetName())
}
if len(errs) > 0 {
return errors.NewAggregate(errs)
}
return nil
}
// FetchWorkload fetches the kubernetes resource to be propagated.
func FetchWorkload(dynamicClient dynamic.Interface, informerManager informermanager.SingleClusterInformerManager,
restMapper meta.RESTMapper, resource workv1alpha2.ObjectReference) (*unstructured.Unstructured, error) {
dynamicResource, err := restmapper.GetGroupVersionResource(restMapper,
schema.FromAPIVersionAndKind(resource.APIVersion, resource.Kind))
if err != nil {
klog.Errorf("Failed to get GVR from GVK %s %s. Error: %v", resource.APIVersion,
resource.Kind, err)
return nil, err
}
var workload runtime.Object
if len(resource.Namespace) == 0 {
// cluster-scoped resource
workload, err = informerManager.Lister(dynamicResource).Get(resource.Name)
} else {
workload, err = informerManager.Lister(dynamicResource).ByNamespace(resource.Namespace).Get(resource.Name)
}
if err != nil {
// fall back to call api server in case the cache has not been synchronized yet
klog.Warningf("Failed to get workload from cache, kind: %s, namespace: %s, name: %s. Error: %v. Fall back to call api server",
resource.Kind, resource.Namespace, resource.Name, err)
workload, err = dynamicClient.Resource(dynamicResource).Namespace(resource.Namespace).Get(context.TODO(),
resource.Name, metav1.GetOptions{})
if err != nil {
klog.Errorf("Failed to get workload from api server, kind: %s, namespace: %s, name: %s. Error: %v",
resource.Kind, resource.Namespace, resource.Name, err)
return nil, err
}
}
unstructuredWorkLoad, err := ToUnstructured(workload)
if err != nil {
klog.Errorf("Failed to transform object(%s/%s): %v", resource.Namespace, resource.Name, err)
return nil, err
}
return unstructuredWorkLoad, nil
}
// GetClusterResourceBindings returns a ClusterResourceBindingList by labels.
func GetClusterResourceBindings(c client.Client, ls labels.Set) (*workv1alpha2.ClusterResourceBindingList, error) {
bindings := &workv1alpha2.ClusterResourceBindingList{}
listOpt := &client.ListOptions{LabelSelector: labels.SelectorFromSet(ls)}
return bindings, c.List(context.TODO(), bindings, listOpt)
}
// GetResourceBindings returns a ResourceBindingList by labels
func GetResourceBindings(c client.Client, ls labels.Set) (*workv1alpha2.ResourceBindingList, error) {
bindings := &workv1alpha2.ResourceBindingList{}
listOpt := &client.ListOptions{LabelSelector: labels.SelectorFromSet(ls)}
return bindings, c.List(context.TODO(), bindings, listOpt)
}
// DeleteWorkByRBNamespaceAndName will delete all Work objects by ResourceBinding namespace and name.
func DeleteWorkByRBNamespaceAndName(c client.Client, namespace, name string) error {
return DeleteWorks(c, labels.Set{
workv1alpha2.ResourceBindingReferenceKey: names.GenerateBindingReferenceKey(namespace, name),
})
}
// DeleteWorkByCRBName will delete all Work objects by ClusterResourceBinding name.
func DeleteWorkByCRBName(c client.Client, name string) error {
return DeleteWorks(c, labels.Set{
workv1alpha2.ClusterResourceBindingReferenceKey: names.GenerateBindingReferenceKey("", name),
})
}
// DeleteWorks will delete all Work objects by labels.
func DeleteWorks(c client.Client, selector labels.Set) error {
workList, err := GetWorksByLabelsSet(c, selector)
if err != nil {
klog.Errorf("Failed to get works by label %v: %v", selector, err)
return err
}
var errs []error
for index, work := range workList.Items {
if err := c.Delete(context.TODO(), &workList.Items[index]); err != nil {
if apierrors.IsNotFound(err) {
continue
}
klog.Errorf("Failed to delete work(%s/%s): %v", work.Namespace, work.Name, err)
errs = append(errs, err)
}
}
if len(errs) > 0 {
return errors.NewAggregate(errs)
}
return nil
}
// GenerateNodeClaimByPodSpec will return a NodeClaim from PodSpec.
func GenerateNodeClaimByPodSpec(podSpec *corev1.PodSpec) *workv1alpha2.NodeClaim {
nodeClaim := &workv1alpha2.NodeClaim{
NodeSelector: podSpec.NodeSelector,
Tolerations: podSpec.Tolerations,
}
hasAffinity := podSpec.Affinity != nil && podSpec.Affinity.NodeAffinity != nil && podSpec.Affinity.NodeAffinity.RequiredDuringSchedulingIgnoredDuringExecution != nil
if hasAffinity {
nodeClaim.HardNodeAffinity = podSpec.Affinity.NodeAffinity.RequiredDuringSchedulingIgnoredDuringExecution
}
if nodeClaim.NodeSelector == nil && nodeClaim.HardNodeAffinity == nil && len(nodeClaim.Tolerations) == 0 {
return nil
}
return nodeClaim
}
// GenerateReplicaRequirements generates replica requirements for node and resources.
func GenerateReplicaRequirements(podTemplate *corev1.PodTemplateSpec) *workv1alpha2.ReplicaRequirements {
nodeClaim := GenerateNodeClaimByPodSpec(&podTemplate.Spec)
resourceRequest := util.EmptyResource().AddPodTemplateRequest(&podTemplate.Spec).ResourceList()
if nodeClaim != nil || resourceRequest != nil {
return &workv1alpha2.ReplicaRequirements{
NodeClaim: nodeClaim,
ResourceRequest: resourceRequest,
}
}
return nil
}