Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add BenchmarkSchedulingWaitForFirstConsumerPVs benchmark #88318

Merged
merged 1 commit into from Feb 25, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
55 changes: 55 additions & 0 deletions test/integration/scheduler_perf/scheduler_bench_test.go
Expand Up @@ -128,6 +128,25 @@ func BenchmarkSchedulingInTreePVs(b *testing.B) {
}
}

// BenchmarkSchedulingWaitForFirstConsumerPVs benchmarks the scheduling rate
// of pods with volumes with VolumeBindingMode set to WaitForFirstConsumer.
func BenchmarkSchedulingWaitForFirstConsumerPVs(b *testing.B) {
tests := []struct{ nodes, existingPods, minPods int }{
{nodes: 500, existingPods: 500, minPods: 1000},
// default 5000 existingPods is a way too much for now
}
basePod := makeBasePod()
testStrategy := testutils.NewCreatePodWithPersistentVolumeWithFirstConsumerStrategy(gceVolumeFactory, basePod)
nodeStrategy := testutils.NewLabelNodePrepareStrategy(v1.LabelZoneFailureDomain, "zone1")
for _, test := range tests {
name := fmt.Sprintf("%vNodes/%vPods", test.nodes, test.existingPods)
b.Run(name, func(b *testing.B) {
nodeStrategies := []testutils.CountToStrategy{{Count: test.nodes, Strategy: nodeStrategy}}
benchmarkScheduling(test.existingPods, test.minPods, nodeStrategies, testStrategy, b)
})
}
}

// BenchmarkSchedulingMigratedInTreePVs benchmarks the scheduling rate of pods with
// in-tree volumes (used via PV/PVC) that are migrated to CSI. CSINode instances exist
// for all nodes and have proper annotation that AWS is migrated.
Expand Down Expand Up @@ -557,6 +576,42 @@ func awsVolumeFactory(id int) *v1.PersistentVolume {
}
}

func gceVolumeFactory(id int) *v1.PersistentVolume {
return &v1.PersistentVolume{
ObjectMeta: metav1.ObjectMeta{
Name: fmt.Sprintf("vol-%d", id),
},
Spec: v1.PersistentVolumeSpec{
AccessModes: []v1.PersistentVolumeAccessMode{v1.ReadOnlyMany},
Capacity: v1.ResourceList{
v1.ResourceName(v1.ResourceStorage): resource.MustParse("1Gi"),
},
PersistentVolumeReclaimPolicy: v1.PersistentVolumeReclaimRetain,
PersistentVolumeSource: v1.PersistentVolumeSource{
GCEPersistentDisk: &v1.GCEPersistentDiskVolumeSource{
FSType: "ext4",
PDName: fmt.Sprintf("vol-%d-pvc", id),
},
},
NodeAffinity: &v1.VolumeNodeAffinity{
Required: &v1.NodeSelector{
NodeSelectorTerms: []v1.NodeSelectorTerm{
{
MatchExpressions: []v1.NodeSelectorRequirement{
{
Key: v1.LabelZoneFailureDomain,
Operator: v1.NodeSelectorOpIn,
Values: []string{"zone1"},
},
},
},
},
},
},
},
}
}

func csiVolumeFactory(id int) *v1.PersistentVolume {
return &v1.PersistentVolume{
ObjectMeta: metav1.ObjectMeta{
Expand Down
2 changes: 2 additions & 0 deletions test/integration/scheduler_perf/util.go
Expand Up @@ -61,8 +61,10 @@ func mustSetupScheduler() (util.ShutdownFunc, coreinformers.PodInformer, clients
Burst: 5000,
})
_, podInformer, schedulerShutdown := util.StartScheduler(clientSet)
fakePVControllerShutdown := util.StartFakePVController(clientSet)

shutdownFunc := func() {
fakePVControllerShutdown()
schedulerShutdown()
apiShutdown()
}
Expand Down
3 changes: 3 additions & 0 deletions test/integration/util/BUILD
Expand Up @@ -14,11 +14,14 @@ go_library(
importpath = "k8s.io/kubernetes/test/integration/util",
deps = [
"//pkg/api/legacyscheme:go_default_library",
"//pkg/controller/volume/persistentvolume/util:go_default_library",
"//pkg/scheduler:go_default_library",
"//staging/src/k8s.io/api/core/v1:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
"//staging/src/k8s.io/client-go/informers:go_default_library",
"//staging/src/k8s.io/client-go/informers/core/v1:go_default_library",
"//staging/src/k8s.io/client-go/kubernetes:go_default_library",
"//staging/src/k8s.io/client-go/tools/cache:go_default_library",
"//staging/src/k8s.io/client-go/tools/events:go_default_library",
"//staging/src/k8s.io/legacy-cloud-providers/gce:go_default_library",
"//test/integration/framework:go_default_library",
Expand Down
45 changes: 45 additions & 0 deletions test/integration/util/util.go
Expand Up @@ -22,12 +22,15 @@ import (
"net/http/httptest"

v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/informers"
coreinformers "k8s.io/client-go/informers/core/v1"
clientset "k8s.io/client-go/kubernetes"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/tools/events"
"k8s.io/klog"
"k8s.io/kubernetes/pkg/api/legacyscheme"
pvutil "k8s.io/kubernetes/pkg/controller/volume/persistentvolume/util"
"k8s.io/kubernetes/pkg/scheduler"
"k8s.io/kubernetes/test/integration/framework"
)
Expand Down Expand Up @@ -87,6 +90,48 @@ func StartScheduler(clientSet clientset.Interface) (*scheduler.Scheduler, corein
return sched, podInformer, shutdownFunc
}

// StartFakePVController is a simplified pv controller logic that sets PVC VolumeName and annotation for each PV binding.
// TODO(mborsz): Use a real PV controller here.
func StartFakePVController(clientSet clientset.Interface) ShutdownFunc {
ctx, cancel := context.WithCancel(context.Background())

informerFactory := informers.NewSharedInformerFactory(clientSet, 0)
pvInformer := informerFactory.Core().V1().PersistentVolumes()

syncPV := func(obj *v1.PersistentVolume) {
if obj.Spec.ClaimRef != nil {
claimRef := obj.Spec.ClaimRef
pvc, err := clientSet.CoreV1().PersistentVolumeClaims(claimRef.Namespace).Get(ctx, claimRef.Name, metav1.GetOptions{})
if err != nil {
klog.Errorf("error while getting %v/%v: %v", claimRef.Namespace, claimRef.Name, err)
return
}

if pvc.Spec.VolumeName == "" {
pvc.Spec.VolumeName = obj.Name
metav1.SetMetaDataAnnotation(&pvc.ObjectMeta, pvutil.AnnBindCompleted, "yes")
_, err := clientSet.CoreV1().PersistentVolumeClaims(claimRef.Namespace).Update(ctx, pvc, metav1.UpdateOptions{})
if err != nil {
klog.Errorf("error while getting %v/%v: %v", claimRef.Namespace, claimRef.Name, err)
return
}
}
}
}

pvInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
syncPV(obj.(*v1.PersistentVolume))
},
UpdateFunc: func(_, obj interface{}) {
syncPV(obj.(*v1.PersistentVolume))
},
})

informerFactory.Start(ctx.Done())
return ShutdownFunc(cancel)
}

// createScheduler create a scheduler with given informer factory and default name.
func createScheduler(
clientSet clientset.Interface,
Expand Down
1 change: 1 addition & 0 deletions test/utils/BUILD
Expand Up @@ -39,6 +39,7 @@ go_library(
"//staging/src/k8s.io/api/auditregistration/v1alpha1:go_default_library",
"//staging/src/k8s.io/api/batch/v1:go_default_library",
"//staging/src/k8s.io/api/core/v1:go_default_library",
"//staging/src/k8s.io/api/storage/v1:go_default_library",
"//staging/src/k8s.io/api/storage/v1beta1:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/api/equality:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/api/errors:go_default_library",
Expand Down
19 changes: 19 additions & 0 deletions test/utils/create_resources.go
Expand Up @@ -25,6 +25,8 @@ import (

apps "k8s.io/api/apps/v1"
batch "k8s.io/api/batch/v1"
storage "k8s.io/api/storage/v1"

"k8s.io/api/core/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
Expand Down Expand Up @@ -218,6 +220,23 @@ func CreateServiceWithRetries(c clientset.Interface, namespace string, obj *v1.S
return RetryWithExponentialBackOff(createFunc)
}

func CreateStorageClassWithRetries(c clientset.Interface, obj *storage.StorageClass) error {
if obj == nil {
return fmt.Errorf("Object provided to create is empty")
}
createFunc := func() (bool, error) {
_, err := c.StorageV1().StorageClasses().Create(context.TODO(), obj, metav1.CreateOptions{})
if err == nil || apierrors.IsAlreadyExists(err) {
return true, nil
}
if IsRetryableAPIError(err) {
return false, nil
}
return false, fmt.Errorf("Failed to create object with non-retriable error: %v", err)
}
return RetryWithExponentialBackOff(createFunc)
}

func CreateResourceQuotaWithRetries(c clientset.Interface, namespace string, obj *v1.ResourceQuota) error {
if obj == nil {
return fmt.Errorf("Object provided to create is empty")
Expand Down
89 changes: 73 additions & 16 deletions test/utils/runners.go
Expand Up @@ -28,6 +28,7 @@ import (
apps "k8s.io/api/apps/v1"
batch "k8s.io/api/batch/v1"
v1 "k8s.io/api/core/v1"
storage "k8s.io/api/storage/v1"
storagev1beta1 "k8s.io/api/storage/v1beta1"
apiequality "k8s.io/apimachinery/pkg/api/equality"
apierrors "k8s.io/apimachinery/pkg/api/errors"
Expand Down Expand Up @@ -1349,35 +1350,50 @@ func CreatePod(client clientset.Interface, namespace string, podCount int, podTe
return createError
}

func CreatePodWithPersistentVolume(client clientset.Interface, namespace string, claimTemplate *v1.PersistentVolumeClaim, factory volumeFactory, podTemplate *v1.Pod, count int) error {
func CreatePodWithPersistentVolume(client clientset.Interface, namespace string, claimTemplate *v1.PersistentVolumeClaim, factory volumeFactory, podTemplate *v1.Pod, count int, bindVolume bool) error {
var createError error
lock := sync.Mutex{}
createPodFunc := func(i int) {
pvcName := fmt.Sprintf("pvc-%d", i)

// pvc
pvc := claimTemplate.DeepCopy()
pvc.Name = pvcName
// pv
pv := factory(i)
// bind to "pvc-$i"
pv.Spec.ClaimRef = &v1.ObjectReference{
Kind: "PersistentVolumeClaim",
Namespace: namespace,
Name: pvcName,
APIVersion: "v1",
// PVs are cluster-wide resources.
// Prepend a namespace to make the name globally unique.
pv.Name = fmt.Sprintf("%s-%s", namespace, pv.Name)
if bindVolume {
// bind pv to "pvc-$i"
pv.Spec.ClaimRef = &v1.ObjectReference{
Kind: "PersistentVolumeClaim",
Namespace: namespace,
Name: pvcName,
APIVersion: "v1",
}
pv.Status.Phase = v1.VolumeBound

// bind pvc to "pv-$i"
// pvc.Spec.VolumeName = pv.Name
pvc.Status.Phase = v1.ClaimBound
} else {
pv.Status.Phase = v1.VolumeAvailable
}
pv.Status.Phase = v1.VolumeBound
if err := CreatePersistentVolumeWithRetries(client, pv); err != nil {
lock.Lock()
defer lock.Unlock()
createError = fmt.Errorf("error creating PV: %s", err)
return
}
// We need to update status separately, as creating persistentvolumes resets status to the default one
// (so with Status.Phase will be equal to PersistentVolumePhase).
if _, err := client.CoreV1().PersistentVolumes().UpdateStatus(context.TODO(), pv, metav1.UpdateOptions{}); err != nil {
lock.Lock()
defer lock.Unlock()
createError = fmt.Errorf("error creating PV: %s", err)
return
}

// pvc
pvc := claimTemplate.DeepCopy()
pvc.Name = pvcName
// bind to "pv-$i"
pvc.Spec.VolumeName = pv.Name
pvc.Status.Phase = v1.ClaimBound
if err := CreatePersistentVolumeClaimWithRetries(client, namespace, pvc); err != nil {
lock.Lock()
defer lock.Unlock()
Expand Down Expand Up @@ -1446,9 +1462,50 @@ type volumeFactory func(uniqueID int) *v1.PersistentVolume

func NewCreatePodWithPersistentVolumeStrategy(claimTemplate *v1.PersistentVolumeClaim, factory volumeFactory, podTemplate *v1.Pod) TestPodCreateStrategy {
return func(client clientset.Interface, namespace string, podCount int) error {
return CreatePodWithPersistentVolume(client, namespace, claimTemplate, factory, podTemplate, podCount)
return CreatePodWithPersistentVolume(client, namespace, claimTemplate, factory, podTemplate, podCount, true /* bindVolume */)
}
}

func makeUnboundPersistentVolumeClaim(storageClass string) *v1.PersistentVolumeClaim {
return &v1.PersistentVolumeClaim{
Spec: v1.PersistentVolumeClaimSpec{
AccessModes: []v1.PersistentVolumeAccessMode{v1.ReadOnlyMany},
StorageClassName: &storageClass,
Resources: v1.ResourceRequirements{
Requests: v1.ResourceList{
v1.ResourceName(v1.ResourceStorage): resource.MustParse("1Gi"),
},
},
},
}
}

func NewCreatePodWithPersistentVolumeWithFirstConsumerStrategy(factory volumeFactory, podTemplate *v1.Pod) TestPodCreateStrategy {
return func(client clientset.Interface, namespace string, podCount int) error {
volumeBindingMode := storage.VolumeBindingWaitForFirstConsumer
storageClass := &storage.StorageClass{
ObjectMeta: metav1.ObjectMeta{
Name: "storage-class-1",
},
Provisioner: "kubernetes.io/gce-pd",
VolumeBindingMode: &volumeBindingMode,
}
claimTemplate := makeUnboundPersistentVolumeClaim(storageClass.Name)

if err := CreateStorageClassWithRetries(client, storageClass); err != nil {
return fmt.Errorf("failed to create storage class: %v", err)
}

factoryWithStorageClass := func(i int) *v1.PersistentVolume {
pv := factory(i)
pv.Spec.StorageClassName = storageClass.Name
return pv
}

return CreatePodWithPersistentVolume(client, namespace, claimTemplate, factoryWithStorageClass, podTemplate, podCount, false /* bindVolume */)
}
}

func NewSimpleCreatePodStrategy() TestPodCreateStrategy {
basePod := &v1.Pod{
ObjectMeta: metav1.ObjectMeta{
Expand Down