Skip to content

Commit

Permalink
Merge pull request #250 from damoon/19-PreferNoSchedule
Browse files Browse the repository at this point in the history
implement issue-19 add prefer no schedule taint to avoid double draining of pods
  • Loading branch information
Daniel Holbach committed Jan 12, 2021
2 parents 1320c5d + 5a4e197 commit fade706
Show file tree
Hide file tree
Showing 3 changed files with 237 additions and 41 deletions.
15 changes: 8 additions & 7 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -80,25 +80,26 @@ The following arguments can be passed to kured via the daemonset pod template:

```console
Flags:
--lock-ttl time force clean annotation after this ammount of time (default 0, disabled)
--alert-filter-regexp regexp.Regexp alert names to ignore when checking for active alerts
--blocking-pod-selector stringArray label selector identifying pods whose presence should prevent reboots
--ds-name string name of daemonset on which to place lock (default "kured")
--ds-namespace string namespace containing daemonset on which to place lock (default "kube-system")
--end-time string only reboot before this time of day (default "23:59")
--end-time string schedule reboot only before this time of day (default "23:59:59")
-h, --help help for kured
--lock-annotation string annotation in which to record locking node (default "weave.works/kured-node-lock")
--lock-ttl duration expire lock annotation after this duration (default: 0, disabled)
--message-template-drain string message template used to notify about a node being drained (default "Draining node %s")
--message-template-reboot string message template used to notify about a node being rebooted (default "Rebooting node %s")
--period duration reboot check period (default 1h0m0s)
--prefer-no-schedule-taint string Taint name applied during pending node reboot (to prevent receiving additional pods from other rebooting nodes). Disabled by default. Set e.g. to "weave.works/kured-node-reboot" to enable tainting.
--prometheus-url string Prometheus instance to probe for active alerts
--reboot-days strings only reboot on these days (default [su,mo,tu,we,th,fr,sa])
--reboot-days strings schedule reboot on these days (default [su,mo,tu,we,th,fr,sa])
--reboot-sentinel string path to file whose existence signals need to reboot (default "/var/run/reboot-required")
--slack-channel string slack channel for reboot notfications
--slack-hook-url string slack hook URL for reboot notfications
--slack-username string slack username for reboot notfications (default "kured")
--message-template-drain string message template used to notify about a node being drained (default "Draining node %s")
--message-template-reboot string message template used to notify about a node being rebooted (default "Rebooting node %s")
--start-time string only reboot after this time of day (default "0:00")
--time-zone string use this timezone to calculate allowed reboot time (default "UTC")
--start-time string schedule reboot only after this time of day (default "0:00")
--time-zone string use this timezone for schedule inputs (default "UTC")
```

### Reboot Sentinel File & Period
Expand Down
97 changes: 63 additions & 34 deletions cmd/kured/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,27 +24,29 @@ import (
"github.com/weaveworks/kured/pkg/daemonsetlock"
"github.com/weaveworks/kured/pkg/delaytick"
"github.com/weaveworks/kured/pkg/notifications/slack"
"github.com/weaveworks/kured/pkg/taints"
"github.com/weaveworks/kured/pkg/timewindow"
)

var (
version = "unreleased"

// Command line flags
period time.Duration
dsNamespace string
dsName string
lockAnnotation string
lockTTL time.Duration
prometheusURL string
alertFilter *regexp.Regexp
rebootSentinel string
slackHookURL string
slackUsername string
slackChannel string
messageTemplateDrain string
messageTemplateReboot string
podSelectors []string
period time.Duration
dsNamespace string
dsName string
lockAnnotation string
lockTTL time.Duration
prometheusURL string
alertFilter *regexp.Regexp
rebootSentinel string
preferNoScheduleTaintName string
slackHookURL string
slackUsername string
slackChannel string
messageTemplateDrain string
messageTemplateReboot string
podSelectors []string

rebootDays []string
rebootStart string
Expand Down Expand Up @@ -85,6 +87,8 @@ func main() {
"alert names to ignore when checking for active alerts")
rootCmd.PersistentFlags().StringVar(&rebootSentinel, "reboot-sentinel", "/var/run/reboot-required",
"path to file whose existence signals need to reboot")
rootCmd.PersistentFlags().StringVar(&preferNoScheduleTaintName, "prefer-no-schedule-taint", "",
"Taint name applied during pending node reboot (to prevent receiving additional pods from other rebooting nodes). Disabled by default. Set e.g. to \"weave.works/kured-node-reboot\" to enable tainting.")

rootCmd.PersistentFlags().StringVar(&slackHookURL, "slack-hook-url", "",
"slack hook URL for reboot notfications")
Expand Down Expand Up @@ -258,11 +262,11 @@ func drain(client *kubernetes.Clientset, node *v1.Node) {
Out: os.Stdout,
}
if err := kubectldrain.RunCordonOrUncordon(drainer, node, true); err != nil {
log.Fatal("Error cordonning %s: %v", nodename, err)
log.Fatalf("Error cordonning %s: %v", nodename, err)
}

if err := kubectldrain.RunNodeDrain(drainer, nodename); err != nil {
log.Fatal("Error draining %s: %v", nodename, err)
log.Fatalf("Error draining %s: %v", nodename, err)
}
}

Expand All @@ -275,7 +279,7 @@ func uncordon(client *kubernetes.Clientset, node *v1.Node) {
Out: os.Stdout,
}
if err := kubectldrain.RunCordonOrUncordon(drainer, node, false); err != nil {
log.Fatal("Error uncordonning %s: %v", nodename, err)
log.Fatalf("Error uncordonning %s: %v", nodename, err)
}
}

Expand Down Expand Up @@ -336,26 +340,50 @@ func rebootAsRequired(nodeID string, window *timewindow.TimeWindow, TTL time.Dur
release(lock)
}

preferNoScheduleTaint := taints.New(client, nodeID, preferNoScheduleTaintName, v1.TaintEffectPreferNoSchedule)

// Remove taint immediately during startup to quickly allow scheduling again.
if !rebootRequired() {
preferNoScheduleTaint.Disable()
}

source := rand.NewSource(time.Now().UnixNano())
tick := delaytick.New(source, period)
for range tick {
if window.Contains(time.Now()) && rebootRequired() && !rebootBlocked(client, nodeID) {
node, err := client.CoreV1().Nodes().Get(context.TODO(), nodeID, metav1.GetOptions{})
if err != nil {
log.Fatal(err)
}
nodeMeta.Unschedulable = node.Spec.Unschedulable

if acquire(lock, &nodeMeta, TTL) {
if !nodeMeta.Unschedulable {
drain(client, node)
}
commandReboot(nodeID)
for {
log.Infof("Waiting for reboot")
time.Sleep(time.Minute)
}
}
if !window.Contains(time.Now()) {
// Remove taint outside the reboot time window to allow for normal operation.
preferNoScheduleTaint.Disable()
continue
}

if !rebootRequired() {
preferNoScheduleTaint.Disable()
continue
}

if rebootBlocked(client, nodeID) {
continue
}

node, err := client.CoreV1().Nodes().Get(context.TODO(), nodeID, metav1.GetOptions{})
if err != nil {
log.Fatal(err)
}
nodeMeta.Unschedulable = node.Spec.Unschedulable

if !acquire(lock, &nodeMeta, TTL) {
// Prefer to not schedule pods onto this node to avoid draing the same pod multiple times.
preferNoScheduleTaint.Enable()
continue
}

if !nodeMeta.Unschedulable {
drain(client, node)
}
commandReboot(nodeID)
for {
log.Infof("Waiting for reboot")
time.Sleep(time.Minute)
}
}
}
Expand All @@ -380,6 +408,7 @@ func root(cmd *cobra.Command, args []string) {
} else {
log.Info("Lock TTL not set, lock will remain until being released")
}
log.Infof("PreferNoSchedule taint: %s", preferNoScheduleTaintName)
log.Infof("Reboot Sentinel: %s every %v", rebootSentinel, period)
log.Infof("Blocking Pod Selectors: %v", podSelectors)
log.Infof("Reboot on: %v", window)
Expand Down
166 changes: 166 additions & 0 deletions pkg/taints/taints.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,166 @@
package taints

import (
"context"
"encoding/json"
"fmt"

log "github.com/sirupsen/logrus"
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/client-go/kubernetes"
)

// Taint allows to set soft and hard limitations for scheduling and executing pods on nodes.
type Taint struct {
client *kubernetes.Clientset
nodeID string
taintName string
effect v1.TaintEffect
exists bool
}

// New provides a new taint.
func New(client *kubernetes.Clientset, nodeID, taintName string, effect v1.TaintEffect) *Taint {
exists, _, _ := taintExists(client, nodeID, taintName)

return &Taint{
client: client,
nodeID: nodeID,
taintName: taintName,
effect: effect,
exists: exists,
}
}

// Enable creates the taint for a node. Creating an existing taint is a noop.
func (t *Taint) Enable() {
if t.taintName == "" {
return
}

if t.exists {
return
}

preferNoSchedule(t.client, t.nodeID, t.taintName, t.effect, true)

t.exists = true
}

// Disable removes the taint for a node. Removing a missing taint is a noop.
func (t *Taint) Disable() {
if t.taintName == "" {
return
}

if !t.exists {
return
}

preferNoSchedule(t.client, t.nodeID, t.taintName, t.effect, false)

t.exists = false
}

func taintExists(client *kubernetes.Clientset, nodeID, taintName string) (bool, int, *v1.Node) {
updatedNode, err := client.CoreV1().Nodes().Get(context.TODO(), nodeID, metav1.GetOptions{})
if err != nil || updatedNode == nil {
log.Fatalf("Error reading node %s: %v", nodeID, err)
}

for i, taint := range updatedNode.Spec.Taints {
if taint.Key == taintName {
return true, i, updatedNode
}
}

return false, 0, updatedNode
}

func preferNoSchedule(client *kubernetes.Clientset, nodeID, taintName string, effect v1.TaintEffect, shouldExists bool) {
taintExists, offset, updatedNode := taintExists(client, nodeID, taintName)

if taintExists && shouldExists {
log.Debugf("Taint %v exists already for node %v.", taintName, nodeID)
return
}

if !taintExists && !shouldExists {
log.Debugf("Taint %v already missing for node %v.", taintName, nodeID)
return
}

type patchTaints struct {
Op string `json:"op"`
Path string `json:"path"`
Value interface{} `json:"value,omitempty"`
}

taint := v1.Taint{
Key: taintName,
Effect: effect,
}

var patches []patchTaints

if len(updatedNode.Spec.Taints) == 0 {
// add first taint and ensure to keep current taints
patches = []patchTaints{
{
Op: "test",
Path: "/spec",
Value: updatedNode.Spec,
},
{
Op: "add",
Path: "/spec/taints",
Value: []v1.Taint{},
},
{
Op: "add",
Path: "/spec/taints/-",
Value: taint,
},
}
} else if taintExists {
// remove taint and ensure to test against race conditions
patches = []patchTaints{
{
Op: "test",
Path: fmt.Sprintf("/spec/taints/%d", offset),
Value: taint,
},
{
Op: "remove",
Path: fmt.Sprintf("/spec/taints/%d", offset),
},
}
} else {
// add missing taint to exsting list
patches = []patchTaints{
{
Op: "add",
Path: "/spec/taints/-",
Value: taint,
},
}
}

patchBytes, err := json.Marshal(patches)
if err != nil {
log.Fatalf("Error encoding taint patch for node %s: %v", nodeID, err)
}

_, err = client.CoreV1().Nodes().Patch(context.TODO(), nodeID, types.JSONPatchType, patchBytes, metav1.PatchOptions{})
if err != nil {
log.Fatalf("Error patching taint for node %s: %v", nodeID, err)
}

if shouldExists {
log.Info("Node taint added")
} else {
log.Info("Node taint removed")
}
}

0 comments on commit fade706

Please sign in to comment.