forked from kubermatic/kubermatic
-
Notifications
You must be signed in to change notification settings - Fork 0
/
util.go
156 lines (133 loc) · 6.3 KB
/
util.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
/*
Copyright 2020 The Kubermatic Kubernetes Platform contributors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package util
import (
"context"
"fmt"
kubermaticv1 "github.com/kubermatic/kubermatic/api/pkg/crd/kubermatic/v1"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/types"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
"sigs.k8s.io/controller-runtime/pkg/cache"
ctrlruntimeclient "sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/handler"
"sigs.k8s.io/controller-runtime/pkg/reconcile"
)
// EnqueueClusterForNamespacedObject enqueues the cluster that owns a namespaced object, if any
// It is used by various controllers to react to changes in the resources in the cluster namespace
func EnqueueClusterForNamespacedObject(client ctrlruntimeclient.Client) *handler.EnqueueRequestsFromMapFunc {
return &handler.EnqueueRequestsFromMapFunc{ToRequests: handler.ToRequestsFunc(func(a handler.MapObject) []reconcile.Request {
clusterList := &kubermaticv1.ClusterList{}
if err := client.List(context.Background(), clusterList); err != nil {
utilruntime.HandleError(fmt.Errorf("failed to list Clusters: %v", err))
return []reconcile.Request{}
}
for _, cluster := range clusterList.Items {
if cluster.Status.NamespaceName == a.Meta.GetNamespace() {
return []reconcile.Request{{NamespacedName: types.NamespacedName{Name: cluster.Name}}}
}
}
return []reconcile.Request{}
})}
}
// EnqueueClusterForNamespacedObjectWithSeedName enqueues the cluster that owns a namespaced object,
// if any. The seedName is put into the namespace field
// It is used by various controllers to react to changes in the resources in the cluster namespace
func EnqueueClusterForNamespacedObjectWithSeedName(client ctrlruntimeclient.Client, seedName string, clusterSelector labels.Selector) *handler.EnqueueRequestsFromMapFunc {
return &handler.EnqueueRequestsFromMapFunc{ToRequests: handler.ToRequestsFunc(func(a handler.MapObject) []reconcile.Request {
clusterList := &kubermaticv1.ClusterList{}
listOpts := &ctrlruntimeclient.ListOptions{
LabelSelector: clusterSelector,
}
if err := client.List(context.Background(), clusterList, listOpts); err != nil {
utilruntime.HandleError(fmt.Errorf("failed to list Clusters: %v", err))
return []reconcile.Request{}
}
for _, cluster := range clusterList.Items {
if cluster.Status.NamespaceName == a.Meta.GetNamespace() {
return []reconcile.Request{{NamespacedName: types.NamespacedName{
Namespace: seedName,
Name: cluster.Name,
}}}
}
}
return []reconcile.Request{}
})}
}
// EnqueueClusterScopedObjectWithSeedName enqueues a cluster-scoped object with the seedName
// as namespace. If it gets an object with a non-empty name, it will log an error and not enqueue
// anything.
func EnqueueClusterScopedObjectWithSeedName(seedName string) *handler.EnqueueRequestsFromMapFunc {
return &handler.EnqueueRequestsFromMapFunc{ToRequests: handler.ToRequestsFunc(func(a handler.MapObject) []reconcile.Request {
if a.Meta.GetNamespace() != "" {
utilruntime.HandleError(fmt.Errorf("EnqueueClusterScopedObjectWithSeedName was used with namespace scoped object %s/%s of type %T", a.Meta.GetNamespace(), a.Meta.GetName(), a.Object))
}
return []reconcile.Request{{NamespacedName: types.NamespacedName{
Namespace: seedName,
Name: a.Meta.GetName(),
}}}
})}
}
// EnqueueConst enqueues a constant. It is meant for controllers that don't have a parent object
// they could enc and instead reconcile everything at once.
// The queueKey will be defaulted if empty
func EnqueueConst(queueKey string) *handler.EnqueueRequestsFromMapFunc {
if queueKey == "" {
queueKey = "const"
}
return &handler.EnqueueRequestsFromMapFunc{ToRequests: handler.ToRequestsFunc(func(o handler.MapObject) []reconcile.Request {
return []reconcile.Request{
{NamespacedName: types.NamespacedName{
Name: queueKey,
Namespace: "",
}}}
})}
}
// ClusterAvailableForReconciling returns true if the given cluster can be reconciled. This is true if
// the cluster does not yet have the SeedResourcesUpToDate condition or if the concurrency limit of the
// controller is not yet reached. This ensures that not too many cluster updates are running at the same
// time, but also makes sure that un-UpToDate clusters will continue to be reconciled.
func ClusterAvailableForReconciling(ctx context.Context, client ctrlruntimeclient.Client, cluster *kubermaticv1.Cluster, concurrencyLimit int) (bool, error) {
if !cluster.Status.HasConditionValue(kubermaticv1.ClusterConditionSeedResourcesUpToDate, corev1.ConditionTrue) {
return true, nil
}
limitReached, err := ConcurrencyLimitReached(ctx, client, concurrencyLimit)
return !limitReached, err
}
// ConcurrencyLimitReached checks all the clusters inside the seed cluster and checks for the
// SeedResourcesUpToDate condition. Returns true if the number of clusters without this condition
// is equal or larger than the given limit.
func ConcurrencyLimitReached(ctx context.Context, client ctrlruntimeclient.Client, limit int) (bool, error) {
clusters := &kubermaticv1.ClusterList{}
if err := client.List(ctx, clusters); err != nil {
return true, fmt.Errorf("failed to list clusters: %v", err)
}
finishedUpdatingClustersCount := 0
for _, cluster := range clusters.Items {
if cluster.Status.HasConditionValue(kubermaticv1.ClusterConditionSeedResourcesUpToDate, corev1.ConditionTrue) {
finishedUpdatingClustersCount++
}
}
clustersUpdatingInProgressCount := len(clusters.Items) - finishedUpdatingClustersCount
return clustersUpdatingInProgressCount >= limit, nil
}
// IsCacheNotStarted returns true if the given error is not nil and an instance of
// cache.ErrCacheNotStarted.
func IsCacheNotStarted(err error) bool {
if err == nil {
return false
}
_, ok := err.(*cache.ErrCacheNotStarted)
return ok
}