forked from Azure/acs-engine
/
upgradeagentnode.go
127 lines (110 loc) · 4.47 KB
/
upgradeagentnode.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
package kubernetesupgrade
import (
"fmt"
"math/rand"
"time"
"k8s.io/client-go/pkg/api/v1/node"
"github.com/Azure/acs-engine/pkg/api"
"github.com/Azure/acs-engine/pkg/armhelpers"
"github.com/Azure/acs-engine/pkg/i18n"
"github.com/Azure/acs-engine/pkg/operations"
"github.com/sirupsen/logrus"
)
const (
interval = time.Second * 1
retry = time.Second * 5
)
// Compiler to verify QueueMessageProcessor implements OperationsProcessor
var _ UpgradeNode = &UpgradeAgentNode{}
// UpgradeAgentNode upgrades a Kubernetes 1.5 agent node to 1.6
type UpgradeAgentNode struct {
Translator *i18n.Translator
logger *logrus.Entry
TemplateMap map[string]interface{}
ParametersMap map[string]interface{}
UpgradeContainerService *api.ContainerService
ResourceGroup string
Client armhelpers.ACSEngineClient
kubeConfig string
timeout time.Duration
}
// DeleteNode takes state/resources of the master/agent node from ListNodeResources
// backs up/preserves state as needed by a specific version of Kubernetes and then deletes
// the node
// The 'drain' flag is used to invoke 'cordon and drain' flow.
func (kan *UpgradeAgentNode) DeleteNode(vmName *string, drain bool) error {
if drain {
var kubeAPIServerURL string
if kan.UpgradeContainerService.Properties.HostedMasterProfile != nil {
kubeAPIServerURL = kan.UpgradeContainerService.Properties.HostedMasterProfile.FQDN
} else {
kubeAPIServerURL = kan.UpgradeContainerService.Properties.MasterProfile.FQDN
}
err := operations.SafelyDrainNode(kan.Client, kan.logger, kubeAPIServerURL, kan.kubeConfig, *vmName, time.Minute)
if err != nil {
kan.logger.Warningf("Error draining agent VM %s. Proceeding with deletion. Error: %v", *vmName, err)
// Proceed with deletion anyways
}
}
if err := operations.CleanDeleteVirtualMachine(kan.Client, kan.logger, kan.ResourceGroup, *vmName); err != nil {
return err
}
return nil
}
// CreateNode creates a new master/agent node with the targeted version of Kubernetes
func (kan *UpgradeAgentNode) CreateNode(poolName string, agentNo int) error {
poolCountParameter := kan.ParametersMap[poolName+"Count"].(map[string]interface{})
poolCountParameter["value"] = agentNo + 1
agentCount, _ := poolCountParameter["value"]
kan.logger.Infof("Agent pool: %s, set count to: %d temporarily during upgrade. Upgrading agent: %d",
poolName, agentCount, agentNo)
poolOffsetVarName := poolName + "Offset"
templateVariables := kan.TemplateMap["variables"].(map[string]interface{})
templateVariables[poolOffsetVarName] = agentNo
// Debug function - keep commented out
// WriteTemplate(kan.Translator, kan.UpgradeContainerService, kan.TemplateMap, kan.ParametersMap)
random := rand.New(rand.NewSource(time.Now().UnixNano()))
deploymentSuffix := random.Int31()
deploymentName := fmt.Sprintf("agent-%s-%d", time.Now().Format("06-01-02T15.04.05"), deploymentSuffix)
return armhelpers.DeployTemplateSync(kan.Client, kan.logger, kan.ResourceGroup, deploymentName, kan.TemplateMap, kan.ParametersMap)
}
// Validate will verify that agent node has been upgraded as expected.
func (kan *UpgradeAgentNode) Validate(vmName *string) error {
if vmName == nil || *vmName == "" {
kan.logger.Warningf("VM name was empty. Skipping node condition check")
return nil
}
kan.logger.Infof("Validating %s", *vmName)
var masterURL string
if kan.UpgradeContainerService.Properties.HostedMasterProfile != nil {
masterURL = kan.UpgradeContainerService.Properties.HostedMasterProfile.FQDN
} else {
masterURL = kan.UpgradeContainerService.Properties.MasterProfile.FQDN
}
client, err := kan.Client.GetKubernetesClient(masterURL, kan.kubeConfig, interval, kan.timeout)
if err != nil {
return &armhelpers.DeploymentValidationError{Err: err}
}
retryTimer := time.NewTimer(time.Millisecond)
timeoutTimer := time.NewTimer(kan.timeout)
for {
select {
case <-timeoutTimer.C:
retryTimer.Stop()
return &armhelpers.DeploymentValidationError{Err: kan.Translator.Errorf("Node was not ready within %v", kan.timeout)}
case <-retryTimer.C:
agentNode, err := client.GetNode(*vmName)
if err != nil {
kan.logger.Infof("Agent VM: %s status error: %v", *vmName, err)
retryTimer.Reset(retry)
} else if node.IsNodeReady(agentNode) {
kan.logger.Infof("Agent VM: %s is ready", *vmName)
timeoutTimer.Stop()
return nil
} else {
kan.logger.Infof("Agent VM: %s not ready yet...", *vmName)
retryTimer.Reset(retry)
}
}
}
}