Skip to content

Commit

Permalink
Implement parallel staggering
Browse files Browse the repository at this point in the history
This achieved using multiple leases. Basically nodes race for acquiring
one of the timedout lease.
  • Loading branch information
Nuckal777 committed Oct 5, 2021
1 parent 06f0323 commit 93141f5
Show file tree
Hide file tree
Showing 4 changed files with 82 additions and 42 deletions.
11 changes: 6 additions & 5 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -134,20 +134,21 @@ config:
__condition__ Checks if a node condition has the defined status.
```yaml
config:
type: the node conditions type (usually one of Ready, MemoryPressure, DiskPressure, PIDPressure or NetworkUnavailable)
status: the expected condition status (usually one of True, False or Unknown)
type: the node conditions type (usually one of Ready, MemoryPressure, DiskPressure, PIDPressure or NetworkUnavailable), required
status: the expected condition status (usually one of True, False or Unknown), required
```
__maxMaintenance:__ Checks that less than the specified amount of nodes are in the in-maintenance state. Due to optimistic concurrency control of the API-Server this check might return the wrong result if more than one node is reconciled at any given time.
```yaml
config:
max: the limit of nodes that are in-maintenance
max: the limit of nodes that are in-maintenance, required
```
__stagger__: Checks that a certain duration has passed since the previous node passed. This is implemented with a `coordination.k8s.io/Lease`, which needs to be manually removed when the maintenance controller is removed from a cluster.
__stagger__: Checks that a certain duration has passed since a previous node passed. This is implemented with `coordination.k8s.io/Lease`s, which needs to be manually removed when the maintenance controller is removed from a cluster.
```yaml
config:
duration: the duration to await, required
leaseName: name of the lease, required
leaseName: name prefix of the lease, required
leaseNamespace: namespace of the lease, required
parallel: the amount of leases to use, optional (defaults to 1)
```
__timeWindow:__ Checks if the current systemtime is within the specified weekly UTC time window.
```yaml
Expand Down
48 changes: 35 additions & 13 deletions controllers/node_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -253,16 +253,14 @@ var _ = Describe("The stagger plugin", func() {
})

AfterEach(func() {
var lease coordinationv1.Lease
lease.Name = leaseName.Name
lease.Namespace = leaseName.Namespace
err := k8sClient.Delete(context.Background(), &lease)
Expect(err).To(Succeed())

err = k8sClient.Delete(context.Background(), firstNode)
Expect(err).To(Succeed())
err = k8sClient.Delete(context.Background(), secondNode)
Expect(err).To(Succeed())
var leaseList coordinationv1.LeaseList
Expect(k8sClient.List(context.Background(), &leaseList)).To(Succeed())
for i := range leaseList.Items {
err := k8sClient.Delete(context.Background(), &leaseList.Items[i])
Expect(err).To(Succeed())
}
Expect(k8sClient.Delete(context.Background(), firstNode)).To(Succeed())
Expect(k8sClient.Delete(context.Background(), secondNode)).To(Succeed())
})

checkNode := func(stagger *impl.Stagger, node *corev1.Node) bool {
Expand All @@ -274,7 +272,12 @@ var _ = Describe("The stagger plugin", func() {
}

It("blocks within the lease duration", func() {
stagger := impl.Stagger{Duration: 3 * time.Second, LeaseName: leaseName}
stagger := impl.Stagger{
Duration: 3 * time.Second,
LeaseName: leaseName.Name,
LeaseNamespace: leaseName.Namespace,
Parallel: 1,
}
result := checkNode(&stagger, firstNode)
Expect(result).To(BeTrue())
result = checkNode(&stagger, firstNode)
Expand All @@ -284,17 +287,36 @@ var _ = Describe("The stagger plugin", func() {
})

It("grabs the lease after it timed out", func() {
stagger := impl.Stagger{Duration: 3 * time.Second, LeaseName: leaseName}
stagger := impl.Stagger{
Duration: 3 * time.Second,
LeaseName: leaseName.Name,
LeaseNamespace: leaseName.Namespace,
Parallel: 1,
}
checkNode(&stagger, firstNode)
time.Sleep(4 * time.Second)
result := checkNode(&stagger, secondNode)
Expect(result).To(BeTrue())
lease := &coordinationv1.Lease{}
err := k8sClient.Get(context.Background(), leaseName, lease)
err := k8sClient.Get(context.Background(), types.NamespacedName{
Namespace: "default",
Name: stagger.LeaseName + "-0",
}, lease)
Expect(err).To(Succeed())
Expect(*lease.Spec.HolderIdentity).To(Equal("secondnode"))
})

It("passes for two nodes if parallel is 2", func() {
stagger := impl.Stagger{
Duration: 3 * time.Second,
LeaseName: leaseName.Name,
LeaseNamespace: leaseName.Namespace,
Parallel: 2,
}
Expect(checkNode(&stagger, firstNode)).To(BeTrue())
Expect(checkNode(&stagger, secondNode)).To(BeTrue())
})

})

var _ = Describe("The slack thread plugin", func() {
Expand Down
57 changes: 38 additions & 19 deletions plugin/impl/stagger.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
package impl

import (
"fmt"
"time"

"github.com/elastic/go-ucfg"
Expand All @@ -34,8 +35,12 @@ import (
// Stagger is a check plugin that checks that only one node
// can pass every configurable period.
type Stagger struct {
Duration time.Duration
LeaseName types.NamespacedName
Duration time.Duration
LeaseName string
LeaseNamespace string
Parallel int
// index of the available lease to grab in AfterEval()
grabIndex int
}

// New creates a new Stagger instance with the given config.
Expand All @@ -44,39 +49,46 @@ func (s *Stagger) New(config *ucfg.Config) (plugin.Checker, error) {
Duration time.Duration `config:"duration" validate:"required"`
LeaseName string `config:"leaseName" validate:"required"`
LeaseNamespace string `config:"leaseNamespace" validate:"required"`
}{}
Parallel int `config:"parallel"`
}{Parallel: 1}
if err := config.Unpack(&conf); err != nil {
return nil, err
}
return &Stagger{LeaseName: types.NamespacedName{
Namespace: conf.LeaseNamespace,
Name: conf.LeaseName},
Duration: conf.Duration}, nil
return &Stagger{
LeaseName: conf.LeaseName,
LeaseNamespace: conf.LeaseNamespace,
Duration: conf.Duration,
Parallel: conf.Parallel,
}, nil
}

// Check asserts that since the last successful check is a certain time has passed.
func (s *Stagger) Check(params plugin.Parameters) (bool, error) {
lease, err := s.getOrCreateLease(&params)
if err != nil {
return false, err
}
if time.Since(lease.Spec.RenewTime.Time) <= time.Duration(*lease.Spec.LeaseDurationSeconds)*time.Second {
return false, nil
for i := 0; i < s.Parallel; i++ {
lease, err := s.getOrCreateLease(i, &params)
if err != nil {
return false, err
}
if time.Since(lease.Spec.RenewTime.Time) > time.Duration(*lease.Spec.LeaseDurationSeconds)*time.Second {
s.grabIndex = i
return true, nil
}
}
return true, nil
return false, nil
}

func (s *Stagger) getOrCreateLease(params *plugin.Parameters) (coordinationv1.Lease, error) {
func (s *Stagger) getOrCreateLease(idx int, params *plugin.Parameters) (coordinationv1.Lease, error) {
leaseKey := s.makeLeaseKey(idx)
var lease coordinationv1.Lease
err := params.Client.Get(params.Ctx, s.LeaseName, &lease)
err := params.Client.Get(params.Ctx, leaseKey, &lease)
if err == nil {
return lease, nil
}
if !errors.IsNotFound(err) {
return coordinationv1.Lease{}, err
}
lease.Name = s.LeaseName.Name
lease.Namespace = s.LeaseName.Namespace
lease.Name = leaseKey.Name
lease.Namespace = leaseKey.Namespace
lease.Spec.HolderIdentity = &params.Node.Name
// Create the lease in the past, so it can immediately pass the timeout check.
// In AfterEval() the lease will then also receive sensible values.
Expand All @@ -100,7 +112,7 @@ func (s *Stagger) AfterEval(chainResult bool, params plugin.Parameters) error {
return nil
}
lease := &coordinationv1.Lease{}
err := params.Client.Get(params.Ctx, s.LeaseName, lease)
err := params.Client.Get(params.Ctx, s.makeLeaseKey(s.grabIndex), lease)
if err != nil {
return err
}
Expand All @@ -122,3 +134,10 @@ func (s *Stagger) grabLease(params *plugin.Parameters, lease *coordinationv1.Lea
lease.Spec.LeaseTransitions = &transitions
return params.Client.Patch(params.Ctx, lease, client.MergeFrom(unmodified))
}

func (s *Stagger) makeLeaseKey(idx int) types.NamespacedName {
return types.NamespacedName{
Namespace: s.LeaseNamespace,
Name: fmt.Sprintf("%v-%v", s.LeaseName, idx),
}
}
8 changes: 3 additions & 5 deletions plugin/impl/stagger_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@ import (
"github.com/elastic/go-ucfg/yaml"
. "github.com/onsi/ginkgo"
. "github.com/onsi/gomega"
"k8s.io/apimachinery/pkg/types"
)

var _ = Describe("The Stagger plugin", func() {
Expand All @@ -38,10 +37,9 @@ var _ = Describe("The Stagger plugin", func() {
plugin, err := base.New(config)
Expect(err).To(Succeed())
Expect(plugin.(*Stagger).Duration).To(Equal(1 * time.Minute))
Expect(plugin.(*Stagger).LeaseName).To(Equal(types.NamespacedName{
Namespace: "default",
Name: "mc-lease",
}))
Expect(plugin.(*Stagger).Parallel).To(Equal(1))
Expect(plugin.(*Stagger).LeaseName).To(Equal("mc-lease"))
Expect(plugin.(*Stagger).LeaseNamespace).To(Equal("default"))
})

})

0 comments on commit 93141f5

Please sign in to comment.