Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[release-4.12] OCPBUGS-23020: Introduce upgrading label to block concurrent upgrades #1932

Merged
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
50 changes: 50 additions & 0 deletions controllers/controllers.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"fmt"
"net"
"sync"

"github.com/go-logr/logr"
config "github.com/openshift/api/config/v1"
Expand All @@ -27,6 +28,17 @@ import (
"github.com/openshift/windows-machine-config-operator/version"
)

const (
// MaxParallelUpgrades is the default maximum allowed number of nodes that can be upgraded in parallel.
// It is a positive integer and cannot be used to stop upgrades, only to limit the number of concurrent upgrades.
MaxParallelUpgrades = 1
)

var (
// controllerLocker is used to synchronize upgrades between controllers
controllerLocker sync.Mutex
)

// instanceReconciler contains everything needed to perform actions on a Windows instance
type instanceReconciler struct {
// Client is the cache client
Expand Down Expand Up @@ -76,6 +88,9 @@ func (r *instanceReconciler) ensureInstanceIsUpToDate(instanceInfo *instance.Inf
// Instance requiring an upgrade indicates that node object is present with the version annotation
r.log.Info("instance requires upgrade", "node", instanceInfo.Node.GetName(), "version",
instanceInfo.Node.GetAnnotations()[metadata.VersionAnnotation], "expected version", version.Get())
if err := markNodeAsUpgrading(context.TODO(), r.client, instanceInfo.Node); err != nil {
return err
}
if err := nc.Deconfigure(); err != nil {
return err
}
Expand Down Expand Up @@ -288,3 +303,38 @@ func markAsFreeOnSuccess(c client.Client, watchNamespace string, recorder record
}
return err
}

// markNodeAsUpgrading marks the given node as upgrading by adding an annotation to it. If the number of nodes
// performing upgrades in parallel exceeds the maximum allowed, an error is returned
func markNodeAsUpgrading(ctx context.Context, c client.Client, currentNode *core.Node) error {
controllerLocker.Lock()
defer controllerLocker.Unlock()
upgradingNodes, err := findUpgradingNodes(ctx, c)
if err != nil {
return err
}
// check if current node is already marked as upgrading
for _, node := range upgradingNodes.Items {
if node.Name == currentNode.Name {
// current node is upgrading, continue with it
return nil
}
}
if len(upgradingNodes.Items) >= MaxParallelUpgrades {
return fmt.Errorf("cannot mark node %s as upgrading, maximum number of parallel upgrading nodes reached (%d)",
currentNode.Name, MaxParallelUpgrades)
}
return metadata.ApplyUpgradingLabel(ctx, c, currentNode)
}

// findUpgradingNodes returns a pointer to the resulting list of Windows nodes that are upgrading i.e. have the
// upgrading label set to true
func findUpgradingNodes(ctx context.Context, c client.Client) (*core.NodeList, error) {
// get nodes Windows nodes with upgrading label
matchingLabels := client.MatchingLabels{core.LabelOSStable: "windows", metadata.UpgradingLabel: "true"}
nodeList := &core.NodeList{}
if err := c.List(ctx, nodeList, matchingLabels); err != nil {
return nil, fmt.Errorf("error listing Windows nodes with upgrading label: %w", err)
}
return nodeList, nil
}
25 changes: 25 additions & 0 deletions hack/common.sh
Original file line number Diff line number Diff line change
Expand Up @@ -215,3 +215,28 @@ getWindowsInstanceCountFromConfigMap() {
-n "${WMCO_DEPLOY_NAMESPACE}" \
-o json | jq '.data | length'
}

# creates the a job and required RBAC to check the number of Windows nodes performing
# parallel upgrade in the test cluster
createParallelUpgradeCheckerResources() {
winNodesCount=$(oc get nodes -l kubernetes.io/os=windows -o jsonpath='{.items[*].metadata.name}' | wc -w)
if [[ winNodesCount -lt 2 ]]; then
echo "Skipping parallel upgrade checker job, requires 2 or more nodes. Found ${winNodesCount} nodes"
return
fi
# get the latest tools image from the image stream
export TOOLS_IMAGE=$(oc get imagestreamtag tools:latest -n openshift -o jsonpath='{.tag.from.name}')
# set job' container image and create the job
JOB=$(sed -e "s|REPLACE_WITH_OPENSHIFT_TOOLS_IMAGE|${TOOLS_IMAGE}|g" hack/e2e/resources/parallel-upgrade-checker-job.yaml)
cat <<EOF | oc apply -f -
${JOB}
EOF
}

# creates the a job and required RBAC to check the number of Windows nodes performing
# parallel upgrade in the test cluster
deleteParallelUpgradeCheckerResources() {
oc delete -f hack/e2e/resources/parallel-upgrade-checker-job.yaml || {
echo "error deleting parallel upgrade checker job"
}
}
52 changes: 52 additions & 0 deletions hack/e2e/resources/parallel-upgrade-checker-job.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
kind: Job
apiVersion: batch/v1
metadata:
name: parallel-upgrades-checker
namespace: wmco-test
labels:
batch.kubernetes.io/job-name: parallel-upgrades-checker
job-name: parallel-upgrades-checker
spec:
parallelism: 1
completions: 1
backoffLimit: 3
template:
metadata:
labels:
batch.kubernetes.io/job-name: parallel-upgrades-checker
job-name: parallel-upgrades-checker
spec:
nodeSelector:
kubernetes.io/os: linux
restartPolicy: Never
serviceAccountName: wmco-test
os:
name: linux
containers:
- name: parallel-upgrades-checker
image: 'REPLACE_WITH_OPENSHIFT_TOOLS_IMAGE'
command:
- bash
- '-c'
- |
#!/bin/bash
set -euo pipefail

# max number of parallel upgrades allowed, fixed to 1. Refer to controllers.MaxParallelUpgrades
export MAX_PARALLEL_UPGRADES=1

# loop indefinitely until count exceeded
while true; do
upgradingCount=$(oc get nodes -l kubernetes.io/os=windows -o jsonpath='{.items[*].metadata.labels.windowsmachineconfig\.openshift\.io/upgrading}' | wc -w)
if [[ $upgradingCount -gt $MAX_PARALLEL_UPGRADES ]]; then
echo "error: max upgrading count exceeded"
exit 1
fi
echo ""
echo "pass: upgrading count $upgradingCount/$MAX_PARALLEL_UPGRADES"
echo "waiting 5s for next check..."
sleep 5
done
imagePullPolicy: IfNotPresent
securityContext:
runAsNonRoot: true
2 changes: 2 additions & 0 deletions hack/run-ci-e2e-test.sh
Original file line number Diff line number Diff line change
Expand Up @@ -174,9 +174,11 @@ fi
if [[ "$TEST" = "upgrade-setup" ]]; then
go test ./test/e2e/... -run=TestWMCO/create/Creation -v -timeout=90m -args $GO_TEST_ARGS
go test ./test/e2e/... -run=TestWMCO/create/Nodes_ready_and_schedulable -v -timeout=90m -args $GO_TEST_ARGS
createParallelUpgradeCheckerResources
fi

if [[ "$TEST" = "upgrade-test" ]]; then
trap deleteParallelUpgradeCheckerResources EXIT
go test ./test/e2e/... -run=TestUpgrade -v -timeout=20m -args $GO_TEST_ARGS
fi

Expand Down
32 changes: 32 additions & 0 deletions pkg/metadata/metadata.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ const (
VersionAnnotation = "windowsmachineconfig.openshift.io/version"
// DesiredVersionAnnotation is a Node annotation, indicating the Service ConfigMap that should be used to configure it
DesiredVersionAnnotation = "windowsmachineconfig.openshift.io/desired-version"
// UpgradingLabel indicates the node's underlying instance is performing an upgrade
UpgradingLabel = "windowsmachineconfig.openshift.io/upgrading"
)

// generatePatch creates a patch applying the given operation onto each given annotation key and value
Expand All @@ -42,6 +44,24 @@ func generatePatch(op string, labels, annotations map[string]string) ([]*patch.J
return patches, nil
}

// removeLabel removes the given label from the node object
func removeLabel(ctx context.Context, c client.Client, node *core.Node, label string) error {
_, present := node.GetLabels()[label]
if !present {
// label not present in node, nothing to remove
return nil
}
patchData, err := GenerateRemovePatch([]string{label}, []string{})
if err != nil {
return fmt.Errorf("error creating label remove patch: %w", err)
}
err = c.Patch(ctx, node, client.RawPatch(kubeTypes.JSONPatchType, patchData))
if err != nil {
return fmt.Errorf("error removing label from node %s: %w", node.GetName(), err)
}
return nil
}

// GenerateAddPatch creates a comma-separated list of operations to add all given labels and annotations from an object
// An "add" patch overwrites existing value if a label or annotation already exists
func GenerateAddPatch(labels, annotations map[string]string) ([]byte, error) {
Expand Down Expand Up @@ -135,3 +155,15 @@ func WaitForVersionAnnotation(ctx context.Context, c client.Client, nodeName str
}
return nil
}

// RemoveUpgradingLabel clears the upgrading label from the node reference, indicating the instance is
// no longer upgrading
func RemoveUpgradingLabel(ctx context.Context, c client.Client, node *core.Node) error {
return removeLabel(ctx, c, node, UpgradingLabel)
}

// ApplyUpgradingLabel applies the upgrading label to the given node reference indicating the instance
// is performing an upgrade
func ApplyUpgradingLabel(ctx context.Context, c client.Client, node *core.Node) error {
return ApplyLabelsAndAnnotations(ctx, c, *node, map[string]string{UpgradingLabel: "true"}, nil)
}
4 changes: 4 additions & 0 deletions pkg/nodeconfig/nodeconfig.go
Original file line number Diff line number Diff line change
Expand Up @@ -237,6 +237,10 @@ func (nc *nodeConfig) Configure() error {
return fmt.Errorf("error uncordoning the node %s: %w", nc.node.GetName(), err)
}

if err := metadata.RemoveUpgradingLabel(context.TODO(), nc.client, nc.node); err != nil {
return fmt.Errorf("error removing upgrading label from node %s: %w", nc.node.GetName(), err)
}

nc.log.Info("instance has been configured as a worker node", "version",
nc.node.Annotations[metadata.VersionAnnotation])
return nil
Expand Down
20 changes: 20 additions & 0 deletions test/e2e/upgrade_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@ const (
windowsWorkloadTesterJob = "windows-workload-tester"
// outdatedVersion is the 'previous' version in the simulated upgrade that the operator is being upgraded from
outdatedVersion = "old-version"
// parallelUpgradesCheckerJobName is a fixed name for the job that checks for the number of parallel upgrades
parallelUpgradesCheckerJobName = "parallel-upgrades-checker"
)

// upgradeTestSuite tests behaviour of the operator when an upgrade takes place.
Expand Down Expand Up @@ -225,6 +227,24 @@ func TestUpgrade(t *testing.T) {
// test that any workloads deployed on the node have not been broken by the upgrade
t.Run("Workloads ready", tc.testWorkloadsAvailable)
t.Run("Node Logs", tc.testNodeLogs)
t.Run("Parallel Upgrades Checker", tc.testParallelUpgradesChecker)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

super nit: there's an extra newline here that also existed in the original PR

}

// testParallelUpgradesChecker tests that the number of parallel upgrades does not exceed the max allowed
// in the lifetime of the job execution. This test is run after the upgrade is complete.
func (tc *testContext) testParallelUpgradesChecker(t *testing.T) {
// get current Windows node state
require.NoError(t, tc.loadExistingNodes(), "error getting the current Windows nodes in the cluster")
if len(gc.allNodes()) < 2 {
t.Skipf("Requires 2 or more nodes to run. Found %d nodes", len(gc.allNodes()))
}
failedPods, err := tc.client.K8s.CoreV1().Pods(tc.workloadNamespace).List(context.TODO(), metav1.ListOptions{
LabelSelector: "job-name=" + parallelUpgradesCheckerJobName, FieldSelector: "status.phase=Failed"})
require.NoError(t, err)
tc.writePodLogs("job-name=" + parallelUpgradesCheckerJobName)
require.Equal(t, 0, len(failedPods.Items), "parallel upgrades check failed",
"failed pod count", len(failedPods.Items))
}

// testWorkloadsAvailable tests that all workloads deployed on Windows nodes by the test suite are available
Expand Down
73 changes: 65 additions & 8 deletions test/e2e/validation_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -311,7 +311,7 @@ func (tc *testContext) ensureTestRunnerSA() error {

// ensureTestRunnerRole ensures the proper Role exists, a requirement for SSHing into a Windows node
// noop if the Role already exists.
func (tc *testContext) ensureTestRunnerRole() error {
func (tc *testContext) ensureTestRunnerRole(ctx context.Context) error {
role := rbac.Role{
TypeMeta: meta.TypeMeta{},
ObjectMeta: meta.ObjectMeta{Name: tc.workloadNamespace},
Expand All @@ -324,7 +324,31 @@ func (tc *testContext) ensureTestRunnerRole() error {
},
},
}
_, err := tc.client.K8s.RbacV1().Roles(tc.workloadNamespace).Create(context.TODO(), &role, meta.CreateOptions{})
_, err := tc.client.K8s.RbacV1().Roles(tc.workloadNamespace).Create(ctx, &role, meta.CreateOptions{})
if err != nil && !apierrors.IsAlreadyExists(err) {
return fmt.Errorf("unable to create role: %w", err)
}
return nil
}

// ensureTestRunnerClusterRole ensures the proper ClusterRole exists, a requirement for listing Windows node
// noop if the ClusterRole already exists. Nodes are cluster-scoped resources, and only ClusterRoles
// can grant permissions to cluster-scoped resources.
func (tc *testContext) ensureTestRunnerClusterRole(ctx context.Context) error {
clusterRole := rbac.ClusterRole{
TypeMeta: meta.TypeMeta{},
ObjectMeta: meta.ObjectMeta{
Name: tc.workloadNamespace,
},
Rules: []rbac.PolicyRule{
{
Resources: []string{"nodes"},
APIGroups: []string{""},
Verbs: []string{"get", "list"},
},
},
}
_, err := tc.client.K8s.RbacV1().ClusterRoles().Create(ctx, &clusterRole, meta.CreateOptions{})
if err != nil && !apierrors.IsAlreadyExists(err) {
return fmt.Errorf("unable to create role: %w", err)
}
Expand All @@ -333,7 +357,7 @@ func (tc *testContext) ensureTestRunnerRole() error {

// ensureTestRunnerRoleBinding ensures the proper RoleBinding exists, a requirement for SSHing into a Windows node
// noop if the RoleBinding already exists.
func (tc *testContext) ensureTestRunnerRoleBinding() error {
func (tc *testContext) ensureTestRunnerRoleBinding(ctx context.Context) error {
rb := rbac.RoleBinding{
TypeMeta: meta.TypeMeta{},
ObjectMeta: meta.ObjectMeta{Name: tc.workloadNamespace},
Expand All @@ -349,22 +373,55 @@ func (tc *testContext) ensureTestRunnerRoleBinding() error {
Name: tc.workloadNamespace,
},
}
_, err := tc.client.K8s.RbacV1().RoleBindings(tc.workloadNamespace).Create(context.TODO(), &rb, meta.CreateOptions{})
_, err := tc.client.K8s.RbacV1().RoleBindings(tc.workloadNamespace).Create(ctx, &rb, meta.CreateOptions{})
if err != nil && !apierrors.IsAlreadyExists(err) {
return fmt.Errorf("unable to create role: %w", err)
}
return nil
}

// sshSetup creates all the Kubernetes resources required to SSH into a Windows node
func (tc *testContext) sshSetup() error {
// ensureTestRunnerClusterRoleBinding ensures the proper ClusterRoleBinding exists, a requirement for listing
// Windows nodes, noop if the ClusterRoleBinding already exists.
func (tc *testContext) ensureTestRunnerClusterRoleBinding(ctx context.Context) error {
crb := rbac.ClusterRoleBinding{
TypeMeta: meta.TypeMeta{},
ObjectMeta: meta.ObjectMeta{
Name: tc.workloadNamespace,
},
Subjects: []rbac.Subject{{
Kind: rbac.ServiceAccountKind,
Name: tc.workloadNamespace,
Namespace: tc.workloadNamespace,
}},
RoleRef: rbac.RoleRef{
APIGroup: rbac.GroupName,
Kind: "ClusterRole",
Name: tc.workloadNamespace,
},
}
_, err := tc.client.K8s.RbacV1().ClusterRoleBindings().Create(ctx, &crb, meta.CreateOptions{})
if err != nil && !apierrors.IsAlreadyExists(err) {
return fmt.Errorf("unable to create cluster role: %w", err)
}
return nil
}

// ensureTestRunnerRBAC creates the RBAC resources required for the test runner service account
func (tc *testContext) ensureTestRunnerRBAC() error {
ctx := context.TODO()
if err := tc.ensureTestRunnerSA(); err != nil {
return fmt.Errorf("error ensuring SA created: %w", err)
}
if err := tc.ensureTestRunnerRole(); err != nil {
if err := tc.ensureTestRunnerClusterRole(ctx); err != nil {
return fmt.Errorf("error ensuring Role created: %w", err)
}
if err := tc.ensureTestRunnerClusterRoleBinding(ctx); err != nil {
return fmt.Errorf("error ensuring RoleBinding created: %w", err)
}
if err := tc.ensureTestRunnerRole(ctx); err != nil {
return fmt.Errorf("error ensuring Role created: %w", err)
}
if err := tc.ensureTestRunnerRoleBinding(); err != nil {
if err := tc.ensureTestRunnerRoleBinding(ctx); err != nil {
return fmt.Errorf("error ensuring RoleBinding created: %w", err)
}
return nil
Expand Down
2 changes: 1 addition & 1 deletion test/e2e/wmco_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ func TestWMCO(t *testing.T) {
log.Printf("Testing against Windows Server %s\n", tc.windowsServerVersion)
// Create the namespace test resources can be deployed in, as well as required resources within said namespace.
require.NoError(t, tc.ensureNamespace(tc.workloadNamespace, tc.workloadNamespaceLabels), "error creating test namespace")
require.NoError(t, tc.sshSetup(), "unable to setup SSH requirements")
require.NoError(t, tc.ensureTestRunnerRBAC(), "error creating test runner RBAC")

// When the upgrade test is run from CI, the namespace that gets created does not have the required monitoring
// label, so we ensure that it gets applied and the WMCO deployment is restarted.
Expand Down