Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

pkg/daemon: stash the node object #464

Merged
merged 1 commit into from
Feb 26, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
46 changes: 28 additions & 18 deletions pkg/daemon/daemon.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,10 @@ type Daemon struct {

// channel used to ensure all spawned goroutines exit when we exit.
stopCh <-chan struct{}

// node is the current instance of the node being processed through handleNodeUpdate
// or the very first instance grabbed when the daemon starts
node *corev1.Node
}

// pendingConfigState is stored as JSON at pathStateJSON; it is only
Expand Down Expand Up @@ -241,7 +245,11 @@ func NewClusterDrivenDaemon(
dn.recorder = eventBroadcaster.NewRecorder(scheme.Scheme, corev1.EventSource{Component: "machineconfigdaemon", Host: nodeName})

glog.Infof("Managing node: %s", nodeName)
if err = loadNodeAnnotations(dn.kubeClient.CoreV1().Nodes(), nodeName); err != nil {

if err := dn.setInitialNode(nodeName); err != nil {
return nil, err
}
if err = dn.loadNodeAnnotations(); err != nil {
return nil, err
}

Expand Down Expand Up @@ -469,19 +477,19 @@ func (dn *Daemon) getStateAndConfigs(pendingConfigName string) (*stateAndConfigs
glog.Info("In bootstrap mode")
}

currentConfigName, err := getNodeAnnotation(dn.kubeClient.CoreV1().Nodes(), dn.name, constants.CurrentMachineConfigAnnotationKey)
currentConfigName, err := getNodeAnnotation(dn.node, constants.CurrentMachineConfigAnnotationKey)
if err != nil {
return nil, err
}
desiredConfigName, err := getNodeAnnotation(dn.kubeClient.CoreV1().Nodes(), dn.name, constants.DesiredMachineConfigAnnotationKey)
desiredConfigName, err := getNodeAnnotation(dn.node, constants.DesiredMachineConfigAnnotationKey)
if err != nil {
return nil, err
}
currentConfig, err := getMachineConfig(dn.client.MachineconfigurationV1().MachineConfigs(), currentConfigName)
if err != nil {
return nil, err
}
state, err := getNodeAnnotationExt(dn.kubeClient.CoreV1().Nodes(), dn.name, constants.MachineConfigDaemonStateAnnotationKey, true)
state, err := getNodeAnnotationExt(dn.node, constants.MachineConfigDaemonStateAnnotationKey, true)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -647,7 +655,7 @@ func (dn *Daemon) CheckStateOnBoot() error {
// Great, we've successfully rebooted for the desired config,
// let's mark it done!
glog.Infof("Completing pending config %s", state.pendingConfig.GetName())
if err := dn.completeUpdate(state.pendingConfig.GetName()); err != nil {
if err := dn.completeUpdate(dn.node, state.pendingConfig.GetName()); err != nil {
return err
}
}
Expand Down Expand Up @@ -712,6 +720,8 @@ func (dn *Daemon) handleNodeUpdate(old, cur interface{}) {

// First check if the node that was updated is this daemon's node
if node.Name == dn.name {
// stash the current node being processed
dn.node = node
// Pass to the shared update prep method
needUpdate, err := dn.prepUpdateFromCluster()
if err != nil {
Expand All @@ -737,22 +747,27 @@ func (dn *Daemon) handleNodeUpdate(old, cur interface{}) {
// update is required, false otherwise.
func (dn *Daemon) prepUpdateFromCluster() (bool, error) {
// Then check we're not already in a degraded state.
state, err := getNodeAnnotation(dn.kubeClient.CoreV1().Nodes(), dn.name, constants.MachineConfigDaemonStateAnnotationKey)
state, err := getNodeAnnotation(dn.node, constants.MachineConfigDaemonStateAnnotationKey)
if err != nil {
return false, err
}
if state == constants.MachineConfigDaemonStateDegraded {
return false, fmt.Errorf("state is already degraded")
}

// Grab the node instance
node, err := GetNode(dn.kubeClient.CoreV1().Nodes(), dn.name)
desiredConfigName, err := getNodeAnnotationExt(dn.node, constants.DesiredMachineConfigAnnotationKey, true)
if err != nil {
return false, err
}
// currentConfig is always expected to be there as loadNodeAnnotations
// is one of the very first calls when the daemon starts.
currentConfigName, err := getNodeAnnotation(dn.node, constants.CurrentMachineConfigAnnotationKey)
if err != nil {
return false, err
}

// Detect if there is an update
if node.Annotations[constants.DesiredMachineConfigAnnotationKey] == node.Annotations[constants.CurrentMachineConfigAnnotationKey] {
if desiredConfigName == currentConfigName {
// No actual update to the config
glog.V(2).Info("No updating is required")
return false, nil
Expand Down Expand Up @@ -783,13 +798,8 @@ func (dn *Daemon) executeUpdateFromCluster() error {
// completeUpdate marks the node as schedulable again, then deletes the
// "transient state" file, which signifies that all of those prior steps have
// been completed.
func (dn *Daemon) completeUpdate(desiredConfigName string) error {
node, err := GetNode(dn.kubeClient.CoreV1().Nodes(), dn.name)
if err != nil {
return err
}
err = drain.Uncordon(dn.kubeClient.CoreV1().Nodes(), node, nil)
if err != nil {
func (dn *Daemon) completeUpdate(node *corev1.Node, desiredConfigName string) error {
if err := drain.Uncordon(dn.kubeClient.CoreV1().Nodes(), node, nil); err != nil {
return err
}

Expand All @@ -802,7 +812,7 @@ func (dn *Daemon) completeUpdate(desiredConfigName string) error {
// the current and desired config if they weren't passed.
func (dn *Daemon) triggerUpdateWithMachineConfig(currentConfig *mcfgv1.MachineConfig, desiredConfig *mcfgv1.MachineConfig) error {
if currentConfig == nil {
ccAnnotation, err := getNodeAnnotation(dn.kubeClient.CoreV1().Nodes(), dn.name, constants.CurrentMachineConfigAnnotationKey)
ccAnnotation, err := getNodeAnnotation(dn.node, constants.CurrentMachineConfigAnnotationKey)
if err != nil {
return err
}
Expand All @@ -813,7 +823,7 @@ func (dn *Daemon) triggerUpdateWithMachineConfig(currentConfig *mcfgv1.MachineCo
}

if desiredConfig == nil {
dcAnnotation, err := getNodeAnnotation(dn.kubeClient.CoreV1().Nodes(), dn.name, constants.DesiredMachineConfigAnnotationKey)
dcAnnotation, err := getNodeAnnotation(dn.node, constants.DesiredMachineConfigAnnotationKey)
if err != nil {
return err
}
Expand Down
38 changes: 22 additions & 16 deletions pkg/daemon/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,8 @@ import (
corev1 "k8s.io/client-go/kubernetes/typed/core/v1"
)

func loadNodeAnnotations(client corev1.NodeInterface, node string) error {
ccAnnotation, err := getNodeAnnotation(client, node, constants.CurrentMachineConfigAnnotationKey)

func (dn *Daemon) loadNodeAnnotations() error {
ccAnnotation, err := getNodeAnnotation(dn.node, constants.CurrentMachineConfigAnnotationKey)
// we need to load the annotations from the file only for the
// first run.
// the initial annotations do no need to be set if the node
Expand All @@ -38,47 +37,54 @@ func loadNodeAnnotations(client corev1.NodeInterface, node string) error {
}

glog.Infof("Setting initial node config: %s", initial[constants.CurrentMachineConfigAnnotationKey])
err = setNodeAnnotations(client, node, initial)
node, err := setNodeAnnotations(dn.kubeClient.CoreV1().Nodes(), dn.node.Name, initial)
if err != nil {
return fmt.Errorf("Failed to set initial annotations: %v", err)
}
dn.node = node

return nil
}

// getNodeAnnotation gets the node annotation, unsurprisingly
func getNodeAnnotation(client corev1.NodeInterface, node string, k string) (string, error) {
return getNodeAnnotationExt(client, node, k, false)
func getNodeAnnotation(node *core_v1.Node, k string) (string, error) {
return getNodeAnnotationExt(node, k, false)
}

// GetNode gets the node object.
func GetNode(client corev1.NodeInterface, node string) (*core_v1.Node, error) {
// getNode queries the kube apiserver to get the node named nodeName
func getNode(client corev1.NodeInterface, nodeName string) (*core_v1.Node, error) {
var lastErr error
var n *core_v1.Node
err := wait.PollImmediate(10*time.Second, 5*time.Minute, func() (bool, error) {
n, lastErr = client.Get(node, metav1.GetOptions{})
n, lastErr = client.Get(nodeName, metav1.GetOptions{})
if lastErr == nil {
return true, nil
}
glog.Warningf("Failed to fetch node %s (%v); retrying...", node, lastErr)
glog.Warningf("Failed to fetch node %s (%v); retrying...", nodeName, lastErr)
return false, nil
})
if err != nil {
if err == wait.ErrWaitTimeout {
return nil, errors.Wrapf(lastErr, "Timed out trying to fetch node %s", node)
return nil, errors.Wrapf(lastErr, "Timed out trying to fetch node %s", nodeName)
}
return nil, err
}
return n, nil
}

// getNodeAnnotationExt is like getNodeAnnotation, but allows one to customize ENOENT handling
func getNodeAnnotationExt(client corev1.NodeInterface, node string, k string, allowNoent bool) (string, error) {
n, err := GetNode(client, node)
// setInitialNode gets the node object by querying the api server when the daemon starts
func (dn *Daemon) setInitialNode(nodeName string) error {
node, err := getNode(dn.kubeClient.CoreV1().Nodes(), nodeName)
if err != nil {
return "", fmt.Errorf("Failed fetching node %s: %v", node, err)
return err
}
dn.node = node
return nil
}

v, ok := n.Annotations[k]
// getNodeAnnotationExt is like getNodeAnnotation, but allows one to customize ENOENT handling
func getNodeAnnotationExt(node *core_v1.Node, k string, allowNoent bool) (string, error) {
v, ok := node.Annotations[k]
if !ok {
if !allowNoent {
return "", fmt.Errorf("%s annotation not found in %s", k, node)
Expand Down
9 changes: 2 additions & 7 deletions pkg/daemon/update.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,12 +82,7 @@ func (dn *Daemon) updateOSAndReboot(newConfig *mcfgv1.MachineConfig) error {
if dn.onceFrom == "" {
glog.Info("Update prepared; draining the node")

node, err := GetNode(dn.kubeClient.CoreV1().Nodes(), dn.name)
if err != nil {
return err
}

dn.recorder.Eventf(node, corev1.EventTypeNormal, "Drain", "Draining node to update config.")
dn.recorder.Eventf(dn.node, corev1.EventTypeNormal, "Drain", "Draining node to update config.")

backoff := wait.Backoff{
Steps: 5,
Expand All @@ -96,7 +91,7 @@ func (dn *Daemon) updateOSAndReboot(newConfig *mcfgv1.MachineConfig) error {
}
var lastErr error
if err := wait.ExponentialBackoff(backoff, func() (bool, error) {
err := drain.Drain(dn.kubeClient, []*corev1.Node{node}, &drain.DrainOptions{
err := drain.Drain(dn.kubeClient, []*corev1.Node{dn.node}, &drain.DrainOptions{
DeleteLocalData: true,
Force: true,
GracePeriodSeconds: 600,
Expand Down
23 changes: 13 additions & 10 deletions pkg/daemon/writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,8 @@ func (nw *NodeWriter) Run(stop <-chan struct{}) {
case <-stop:
return
case msg := <-nw.writer:
msg.responseChannel <- setNodeAnnotations(msg.client, msg.node, msg.annos)
_, err := setNodeAnnotations(msg.client, msg.node, msg.annos)
msg.responseChannel <- err
}
}
}
Expand Down Expand Up @@ -142,32 +143,34 @@ func (nw *NodeWriter) SetSSHAccessed(client corev1.NodeInterface, node string) e
// number of times.
// f will be called each time since the node object will likely have changed if
// a retry is necessary.
func updateNodeRetry(client corev1.NodeInterface, node string, f func(*v1.Node)) error {
func updateNodeRetry(client corev1.NodeInterface, nodeName string, f func(*v1.Node)) (*v1.Node, error) {
var node *v1.Node
err := retry.RetryOnConflict(retry.DefaultBackoff, func() error {
n, getErr := GetNode(client, node)
n, getErr := getNode(client, nodeName)
if getErr != nil {
return getErr
}

// Call the node modifier.
f(n)

_, err := client.Update(n)
var err error
node, err = client.Update(n)
return err
})
if err != nil {
// may be conflict if max retries were hit
return fmt.Errorf("Unable to update node %q: %v", node, err)
return nil, fmt.Errorf("Unable to update node %q: %v", node, err)
}

return nil
return node, nil
}

// setConfig sets the given annotation key, value pair.
func setNodeAnnotations(client corev1.NodeInterface, node string, m map[string]string) error {
return updateNodeRetry(client, node, func(node *v1.Node) {
func setNodeAnnotations(client corev1.NodeInterface, nodeName string, m map[string]string) (*v1.Node, error) {
node, err := updateNodeRetry(client, nodeName, func(node *v1.Node) {
for k, v := range m {
node.Annotations[k] = v
}
})
}
return node, err
}