Skip to content

Commit

Permalink
Implement watch-based daemonset replica counter.
Browse files Browse the repository at this point in the history
  • Loading branch information
mborsz committed Apr 1, 2021
1 parent 54890c1 commit cfae9aa
Show file tree
Hide file tree
Showing 9 changed files with 238 additions and 74 deletions.
2 changes: 1 addition & 1 deletion clusterloader2/pkg/execservice/exec_service.go
Expand Up @@ -85,7 +85,7 @@ func SetUpExecService(f *framework.Framework) error {
}
options := &measurementutil.WaitForPodOptions{
Selector: selector,
DesiredPodCount: execPodReplicas,
DesiredPodCount: func() int { return execPodReplicas },
CallerName: execServiceName,
WaitForPodsInterval: execPodCheckInterval,
}
Expand Down
8 changes: 6 additions & 2 deletions clusterloader2/pkg/imagepreload/imagepreload.go
Expand Up @@ -126,10 +126,14 @@ func (c *controller) PreloadImages() error {
if err != nil {
return err
}
clusterSize := int(size)
if err := size.Start(); err != nil {
return err
}
defer size.Stop()

klog.V(2).Infof("Waiting for %d Node objects to be updated...", clusterSize)
klog.V(2).Infof("Waiting for %d Node objects to be updated...", size.Replicas())
if err := wait.Poll(pollingInterval, pollingTimeout, func() (bool, error) {
clusterSize := size.Replicas()
klog.V(3).Infof("%d out of %d nodes have pulled images", len(doneNodes), clusterSize)
return len(doneNodes) == clusterSize, nil
}); err != nil {
Expand Down
Expand Up @@ -232,7 +232,7 @@ func (npm *networkPerformanceMeasurement) createAndWaitForWorkerPods() error {
})
options := &measurementutil.WaitForPodOptions{
Selector: &measurementutil.ObjectSelector{Namespace: netperfNamespace},
DesiredPodCount: npm.numberOfClients + npm.numberOfServers,
DesiredPodCount: func() int { return npm.numberOfClients + npm.numberOfServers },
CallerName: networkPerformanceMetricsName,
WaitForPodsInterval: 2 * time.Second,
}
Expand Down
29 changes: 20 additions & 9 deletions clusterloader2/pkg/measurement/common/wait_for_controlled_pods.go
Expand Up @@ -19,11 +19,12 @@ package common
import (
"context"
"fmt"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"strings"
"sync"
"time"

metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"

v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/meta"
"k8s.io/apimachinery/pkg/runtime"
Expand Down Expand Up @@ -352,11 +353,19 @@ func (w *waitForControlledPodsRunningMeasurement) handleObject(oldObj, newObj in
}

func (w *waitForControlledPodsRunningMeasurement) checkScaledown(oldObj, newObj runtime.Object) (bool, error) {
oldReplicas, err := runtimeobjects.GetReplicasFromRuntimeObject(w.clusterFramework.GetClientSets().GetClient(), oldObj)
oldReplicasWatcher, err := runtimeobjects.GetReplicasFromRuntimeObject(w.clusterFramework.GetClientSets().GetClient(), oldObj)
if err != nil {
return false, err
}
oldReplicas, err := runtimeobjects.GetReplicasOnce(oldReplicasWatcher)
if err != nil {
return false, err
}
newReplicas, err := runtimeobjects.GetReplicasFromRuntimeObject(w.clusterFramework.GetClientSets().GetClient(), newObj)
newReplicasWatcher, err := runtimeobjects.GetReplicasFromRuntimeObject(w.clusterFramework.GetClientSets().GetClient(), newObj)
if err != nil {
return false, err
}
newReplicas, err := runtimeobjects.GetReplicasOnce(newReplicasWatcher)
if err != nil {
return false, err
}
Expand Down Expand Up @@ -509,7 +518,7 @@ func (w *waitForControlledPodsRunningMeasurement) waitForRuntimeObject(obj runti
}
}
if isDeleted {
runtimeObjectReplicas = 0
runtimeObjectReplicas = &runtimeobjects.ConstReplicas{0}
}
key, err := runtimeobjects.CreateMetaNamespaceKey(obj)
if err != nil {
Expand All @@ -520,18 +529,20 @@ func (w *waitForControlledPodsRunningMeasurement) waitForRuntimeObject(obj runti
o.lock.Lock()
defer o.lock.Unlock()
w.handlingGroup.Start(func() {
var minDesiredPodCount int
if w.countErrorMargin > 0 {
minDesiredPodCount = int(runtimeObjectReplicas) - w.countErrorMargin
if err := runtimeObjectReplicas.Start(); err != nil {
klog.Errorf("%s: error while starting runtimeObjectReplicas: %v", key, err)
o.err = fmt.Errorf("failed to start runtimeObjectReplicas: %v", err)
return
}
defer runtimeObjectReplicas.Stop()
options := &measurementutil.WaitForPodOptions{
Selector: &measurementutil.ObjectSelector{
Namespace: runtimeObjectNamespace,
LabelSelector: runtimeObjectSelector.String(),
FieldSelector: "",
},
DesiredPodCount: int(runtimeObjectReplicas),
MinDesiredPodCount: minDesiredPodCount,
DesiredPodCount: runtimeObjectReplicas.Replicas,
CountErrorMargin: w.countErrorMargin,
CallerName: w.String(),
WaitForPodsInterval: defaultWaitForPodsInterval,
IsPodUpdated: isPodUpdated,
Expand Down
2 changes: 1 addition & 1 deletion clusterloader2/pkg/measurement/common/wait_for_pods.go
Expand Up @@ -66,7 +66,7 @@ func (w *waitForRunningPodsMeasurement) Execute(config *measurement.Config) ([]m
})
options := &measurementutil.WaitForPodOptions{
Selector: selector,
DesiredPodCount: desiredPodCount,
DesiredPodCount: func() int { return desiredPodCount },
CallerName: w.String(),
WaitForPodsInterval: defaultWaitForPodsInterval,
}
Expand Down
156 changes: 156 additions & 0 deletions clusterloader2/pkg/measurement/util/runtimeobjects/replicaswatcher.go
@@ -0,0 +1,156 @@
/*
Copyright 2021 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package runtimeobjects

import (
"fmt"
"sync"
"time"

corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
clientset "k8s.io/client-go/kubernetes"
"k8s.io/klog"
"k8s.io/perf-tests/clusterloader2/pkg/measurement/util"
"k8s.io/perf-tests/clusterloader2/pkg/measurement/util/informer"
)

const (
informerSyncTimeout = time.Minute
)

// ReplicasWatcher is a struct that allows to check a number of replicas at a given time.
// Usage:
// var rw ReplicasWatcher = (...)
// if err := rw.Start(); err != nil {
// panic(err);
// }
// // Get number of replicas as needed.
// val = rw.Replicas()
// ...
// val = rw.Replicas()
//
// // Once you are done...
// rw.Stop();
type ReplicasWatcher interface {
Replicas() int
// Start must block until Replicas() starts returning correct value.
Start() error
Stop()
}

type ConstReplicas struct {
ReplicasVal int
}

func (c *ConstReplicas) Replicas() int {
return c.ReplicasVal
}

func (c *ConstReplicas) Start() error {
return nil
}
func (c *ConstReplicas) Stop() {}

var _ ReplicasWatcher = &ConstReplicas{}

// nodeCounter counts a number of node objects matching nodeSelector and affinity.
type nodeCounter struct {
client clientset.Interface
nodeSelector labels.Selector
affinity *corev1.Affinity
stopCh chan struct{}

mu sync.Mutex
replicas int
}

var _ ReplicasWatcher = &nodeCounter{}

func NewNodeCounter(client clientset.Interface, nodeSelector labels.Selector, affinity *corev1.Affinity) *nodeCounter {
return &nodeCounter{
client: client,
nodeSelector: nodeSelector,
affinity: affinity,
stopCh: make(chan struct{}),
}
}

func (n *nodeCounter) Start() error {
i := informer.NewInformer(
n.client,
"nodes",
&util.ObjectSelector{
Namespace: metav1.NamespaceAll,
LabelSelector: n.nodeSelector.String(),
},
func(oldObj, newObj interface{}) {
if err := n.handleObject(oldObj, newObj); err != nil {
klog.Errorf("Error while processing node: %v", err)
}
},
)
return informer.StartAndSync(i, n.stopCh, informerSyncTimeout)
}

func (n *nodeCounter) Stop() {}

func (n *nodeCounter) handleObject(oldObj, newObj interface{}) error {
old, err := n.shouldRun(oldObj)
if err != nil {
return err
}
new, err := n.shouldRun(newObj)
if err != nil {
return err
}
if new == old {
return nil
}
n.mu.Lock()
defer n.mu.Unlock()
if new && !old {
n.replicas++
} else {
n.replicas--
}
klog.Infof("Updated expected replicas: %v", n.replicas)
return nil
}

func (n *nodeCounter) Replicas() int { return n.replicas }

func (n *nodeCounter) shouldRun(obj interface{}) (bool, error) {
if obj == nil {
return false, nil
}
node, ok := obj.(*corev1.Node)
if !ok {
return false, fmt.Errorf("unexpected type of obj: %v. got %T, want *corev1.Node", obj, obj)
}
matched, err := podMatchesNodeAffinity(n.affinity, node)
return !node.Spec.Unschedulable && matched, err
}

func GetReplicasOnce(rw ReplicasWatcher) (int, error) {
if err := rw.Start(); err != nil {
return 0, err
}
defer rw.Stop()
return rw.Replicas(), nil
}

0 comments on commit cfae9aa

Please sign in to comment.