Skip to content

Commit

Permalink
Fix Node scaling issue
Browse files Browse the repository at this point in the history
This PR removes nodestatus when a node is being removed from the cluster, and the status was succeed. It also addresses the fail nodestatus issue when adding a new node to the cluster.
  • Loading branch information
Vincent056 committed Apr 12, 2023
1 parent 582e4fb commit 0cb7c6e
Show file tree
Hide file tree
Showing 25 changed files with 5,712 additions and 23 deletions.
2 changes: 1 addition & 1 deletion go.mod
Expand Up @@ -9,6 +9,7 @@ require (
github.com/mitchellh/go-homedir v1.1.0
github.com/onsi/ginkgo v1.16.5
github.com/onsi/gomega v1.27.4
github.com/openshift/api v0.0.0-20230120195050-6ba31fa438f2
github.com/openshift/library-go v0.0.0-20230228181805-0899dfdba7d2
github.com/openshift/machine-config-operator v0.0.1-0.20200913004441-7eba765c69c9
github.com/pborman/uuid v1.2.1
Expand Down Expand Up @@ -70,7 +71,6 @@ require (
github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 // indirect
github.com/nbutton23/zxcvbn-go v0.0.0-20210217022336-fa2cb2858354 // indirect
github.com/nxadm/tail v1.4.8 // indirect
github.com/openshift/api v0.0.0-20230120195050-6ba31fa438f2 // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
github.com/prometheus/common v0.41.0 // indirect
github.com/prometheus/procfs v0.9.0 // indirect
Expand Down
1 change: 1 addition & 0 deletions pkg/controller/fileintegrity/config_defaults.go
Expand Up @@ -45,6 +45,7 @@ CONTENT_EX = sha512+ftype+p+u+g+n+acl+selinux+xattrs
!/hostroot/etc/docker/certs.d
!/hostroot/etc/selinux/targeted
!/hostroot/etc/openvswitch/conf.db
!/hostroot/etc/kubernetes/cni/net.d
!/hostroot/etc/kubernetes/cni/net.d/*
!/hostroot/etc/machine-config-daemon/currentconfig$
!/hostroot/etc/pki/ca-trust/extracted/java/cacerts$
Expand Down
21 changes: 20 additions & 1 deletion pkg/controller/status/status_controller.go
Expand Up @@ -2,9 +2,10 @@ package status

import (
"context"
"time"

"github.com/openshift/file-integrity-operator/pkg/apis/fileintegrity/v1alpha1"
"github.com/openshift/file-integrity-operator/pkg/controller/metrics"
"time"

"github.com/go-logr/logr"

Expand Down Expand Up @@ -171,7 +172,25 @@ func (r *StatusReconciler) mapActiveStatus(integrity *v1alpha1.FileIntegrity) (v
return v1alpha1.PhaseError, err
}

nodeList := corev1.NodeList{}
if err := r.client.List(context.TODO(), &nodeList, &client.ListOptions{}); err != nil {
return v1alpha1.PhaseError, err
}
nodeNameList := make(map[string]bool)
for _, node := range nodeList.Items {
nodeNameList[node.Name] = true
}

for _, nodeStatus := range nodeStatusList.Items {
// Check if the node is still there, and remove the node status if it's not.
// This is to handle the case where the node is deleted, but the node status is not.
if _, ok := nodeNameList[nodeStatus.Name]; !ok {
// If the node is not there, and the node status is success, we can just delete it.
if err := r.client.Delete(context.TODO(), &nodeStatus); err != nil && nodeStatus.LastResult.Condition == v1alpha1.NodeConditionSucceeded {
return v1alpha1.PhaseError, err
}
continue
}
if nodeStatus.LastResult.Condition == v1alpha1.NodeConditionErrored {
return v1alpha1.PhaseError, nil
}
Expand Down
41 changes: 40 additions & 1 deletion tests/e2e/e2e_test.go
Expand Up @@ -9,7 +9,6 @@ import (

"github.com/openshift/file-integrity-operator/pkg/apis/fileintegrity/v1alpha1"
fileintegrity2 "github.com/openshift/file-integrity-operator/pkg/controller/fileintegrity"

framework "github.com/openshift/file-integrity-operator/tests/framework"

"k8s.io/apimachinery/pkg/types"
Expand Down Expand Up @@ -918,3 +917,43 @@ func TestFileIntegrityAcceptsExpectedChange(t *testing.T) {
t.Log("Asserting that the FileIntegrity check is in a SUCCESS state after expected changes")
assertNodesConditionIsSuccess(t, f, testName, namespace, 5*time.Second, 10*time.Minute)
}

// This checks test for adding new node and remove a existing node to the cluster and making sure
// the all the nodestatuses are in a success state, and the old nodestatus is removed for the removed node.
func TestFileIntegrityNodeScaling(t *testing.T) {
f, testctx, namespace := setupTest(t)
testName := testIntegrityNamePrefix + "-nodescale"
setupFileIntegrity(t, f, testctx, testName, namespace)
defer testctx.Cleanup()
defer func() {
if err := cleanNodes(f, namespace); err != nil {
t.Fatal(err)
}
if err := resetBundleTestMetrics(f, namespace); err != nil {
t.Fatal(err)
}
}()
defer logContainerOutput(t, f, namespace, testName)
// wait to go active.
err := waitForScanStatus(t, f, namespace, testName, v1alpha1.PhaseActive)
if err != nil {
t.Errorf("Timeout waiting for scan status")
}

t.Log("Asserting that the FileIntegrity check is in a SUCCESS state after deploying it")
assertNodesConditionIsSuccess(t, f, testName, namespace, 2*time.Second, 5*time.Minute)

t.Log("Adding a new worker node to the cluster through the machineset")
scaledUpMachineSetName, newNodeName := scaleUpWorkerMachineSet(t, f, 2*time.Second, 10*time.Minute)
if newNodeName == "" || scaledUpMachineSetName == "" {
t.Fatal("Failed to scale up worker machineset")
}
assertSingleNodeConditionIsSuccess(t, f, testName, namespace, newNodeName, 2*time.Second, 5*time.Minute)

t.Log("Scale down the worker machineset")
removedNodeName := scaleDownWorkerMachineSet(t, f, scaledUpMachineSetName, 2*time.Second, 10*time.Minute)
if removedNodeName == "" {
t.Fatal("Failed to scale down worker machineset")
}
assertNodeStatusForRemovedNode(t, f, testName, namespace, removedNodeName, 2*time.Second, 5*time.Minute)
}
199 changes: 179 additions & 20 deletions tests/e2e/helpers.go
Expand Up @@ -3,6 +3,7 @@ package e2e
import (
"bufio"
"bytes"
"context"
goctx "context"
"encoding/json"
"fmt"
Expand All @@ -16,6 +17,7 @@ import (
"testing"
"time"

machinev1 "github.com/openshift/api/machine/v1beta1"
"github.com/openshift/file-integrity-operator/pkg/apis/fileintegrity/v1alpha1"
"github.com/openshift/file-integrity-operator/pkg/controller/metrics"
"github.com/pborman/uuid"
Expand Down Expand Up @@ -47,26 +49,29 @@ import (
)

const (
pollInterval = time.Second * 2
pollTimeout = time.Minute * 5
retryInterval = time.Second * 5
timeout = time.Minute * 30
cleanupRetryInterval = time.Second * 1
cleanupTimeout = time.Minute * 5
testIntegrityNamePrefix = "e2e-test"
testConfName = "test-conf"
testConfDataKey = "conf"
nodeWorkerRoleLabelKey = "node-role.kubernetes.io/worker"
mcWorkerRoleLabelKey = "machineconfiguration.openshift.io/role"
defaultTestGracePeriod = 20
defaultTestInitialDelay = 0
testInitialDelay = 180
deamonsetWaitTimeout = 30 * time.Second
legacyReinitOnHost = "/hostroot/etc/kubernetes/aide.reinit"
metricsTestCRBName = "fio-metrics-client"
metricsTestSAName = "default"
metricsTestTokenName = "metrics-token"
compressionFileCmd = "for i in `seq 1 10000`; do mktemp \"/hostroot/etc/addedbytest$i.XXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX\"; done || true"
pollInterval = time.Second * 2
pollTimeout = time.Minute * 5
retryInterval = time.Second * 5
timeout = time.Minute * 30
cleanupRetryInterval = time.Second * 1
cleanupTimeout = time.Minute * 5
testIntegrityNamePrefix = "e2e-test"
testConfName = "test-conf"
testConfDataKey = "conf"
nodeWorkerRoleLabelKey = "node-role.kubernetes.io/worker"
mcWorkerRoleLabelKey = "machineconfiguration.openshift.io/role"
defaultTestGracePeriod = 20
defaultTestInitialDelay = 0
testInitialDelay = 180
deamonsetWaitTimeout = 30 * time.Second
legacyReinitOnHost = "/hostroot/etc/kubernetes/aide.reinit"
metricsTestCRBName = "fio-metrics-client"
metricsTestSAName = "default"
metricsTestTokenName = "metrics-token"
machineSetLabelKey = "machine.openshift.io/cluster-api-machine-role"
machineSetLabelWorkerValue = "worker"
machineSetNamespace = "openshift-machine-api"
compressionFileCmd = "for i in `seq 1 10000`; do mktemp \"/hostroot/etc/addedbytest$i.XXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX\"; done || true"
)

const (
Expand Down Expand Up @@ -115,6 +120,7 @@ CONTENT_EX = sha512+ftype+p+u+g+n+acl+selinux+xattrs
!/hostroot/etc/docker/certs.d
!/hostroot/etc/selinux/targeted
!/hostroot/etc/openvswitch/conf.db
!/hostroot/etc/kubernetes/cni/net.d
!/hostroot/etc/kubernetes/cni/net.d/*
!/hostroot/etc/machine-config-daemon/currentconfig$
!/hostroot/etc/pki/ca-trust/extracted/java/cacerts$
Expand Down Expand Up @@ -1005,6 +1011,159 @@ func assertNodeOKStatusEvents(t *testing.T, f *framework.Framework, namespace st
}
}

func scaleUpWorkerMachineSet(t *testing.T, f *framework.Framework, interval, timeout time.Duration) (string, string) {
// Add a new worker node to the cluster through the machineset
// Get the machineset
machineSets := &machinev1.MachineSetList{}
err := f.Client.List(context.TODO(), machineSets, &client.ListOptions{
Namespace: machineSetNamespace})
if err != nil {
t.Error(err)
}
if len(machineSets.Items) == 0 {
t.Error("No machinesets found")
}
machineSetName := ""
for _, ms := range machineSets.Items {
if ms.Spec.Replicas != nil && *ms.Spec.Replicas > 0 {
t.Logf("Found machineset %s with %d replicas", ms.Name, *ms.Spec.Replicas)
machineSetName = ms.Name
break
}
}

// Add one more replica to one of the machinesets
machineSet := &machinev1.MachineSet{}
err = f.Client.Get(context.TODO(), types.NamespacedName{Name: machineSetName, Namespace: machineSetNamespace}, machineSet)
if err != nil {
t.Error(err)
}
t.Logf("Scaling up machineset %s", machineSetName)

replicas := *machineSet.Spec.Replicas + 1
machineSet.Spec.Replicas = &replicas
err = f.Client.Update(context.TODO(), machineSet)
if err != nil {
t.Error(err)
}
t.Logf("Waiting for scaling up machineset %s", machineSetName)
provisionningMachineName := ""
err = wait.Poll(interval, timeout, func() (bool, error) {
err = f.Client.Get(context.TODO(), types.NamespacedName{Name: machineSetName, Namespace: machineSetNamespace}, machineSet)
if err != nil {
t.Error(err)
}
// get name of the new machine
if provisionningMachineName == "" {
machines := &machinev1.MachineList{}
err = f.Client.List(context.TODO(), machines, &client.ListOptions{
Namespace: machineSetNamespace})
if err != nil {
t.Error(err)
}
for _, machine := range machines.Items {
if *machine.Status.Phase == "Provisioning" {
provisionningMachineName = machine.Name
break
}
}
}
if &machineSet.Status.Replicas == &machineSet.Status.ReadyReplicas {
t.Logf("Machineset %s scaled up", machineSetName)
return true, nil
}
t.Logf("Waiting for machineset %s to scale up, current ready replicas: %d of %d", machineSetName, &machineSet.Status.ReadyReplicas, &machineSet.Status.Replicas)
return false, nil
})
if err != nil {
t.Error(err)
}

// get the new node name
newNodeName := ""
machine := &machinev1.Machine{}
err = f.Client.Get(context.TODO(), types.NamespacedName{Name: provisionningMachineName, Namespace: machineSetNamespace}, machine)
if err != nil {
t.Error(err)
}
newNodeName = machine.Status.NodeRef.Name
t.Logf("New node name is %s", newNodeName)

return machineSetName, newNodeName
}

func scaleDownWorkerMachineSet(t *testing.T, f *framework.Framework, machineSetName string, interval, timeout time.Duration) string {
// Remove the worker node from the cluster through the machineset
// Get the machineset
machineSet := &machinev1.MachineSet{}
err := f.Client.Get(context.TODO(), types.NamespacedName{Name: machineSetName, Namespace: machineSetNamespace}, machineSet)
if err != nil {
t.Error(err)
}

// Remove one replica from the machineset
t.Logf("Scaling down machineset %s", machineSetName)
replicas := *machineSet.Spec.Replicas - 1
machineSet.Spec.Replicas = &replicas
err = f.Client.Update(context.TODO(), machineSet)
if err != nil {
t.Error(err)
}

deletedNodeName := ""

t.Logf("Waiting for scaling down machineset %s", machineSetName)
err = wait.Poll(interval, timeout, func() (bool, error) {
err = f.Client.Get(context.TODO(), types.NamespacedName{Name: machineSetName, Namespace: machineSetNamespace}, machineSet)
if err != nil {
t.Error(err)
}
if &machineSet.Status.Replicas == &machineSet.Status.ReadyReplicas {
t.Logf("Machineset %s scaled down", machineSetName)
return true, nil
}
t.Logf("Waiting for machineset %s to scale down, current ready replicas: %d of %d", machineSetName, &machineSet.Status.ReadyReplicas, &machineSet.Status.Replicas)
if deletedNodeName == "" {
// Get the node that was deleted
machineList := &machinev1.MachineList{}
err = f.Client.List(context.TODO(), machineList, &client.ListOptions{
Namespace: machineSetNamespace})
if err != nil {
t.Error(err)
}
if len(machineList.Items) == 0 {
t.Error("No machines found")
}
for _, machine := range machineList.Items {
if machine.DeletionTimestamp != nil {
deletedNodeName = machine.Status.NodeRef.Name
t.Logf("Found deleted node %s", deletedNodeName)
break
}
}
}
return false, nil
})
if err != nil {
t.Error(err)
}
return deletedNodeName
}

func assertNodeStatusForRemovedNode(t *testing.T, f *framework.Framework, integrityName, namespace, deletedNodeName string, interval, timeout time.Duration) {
nodestatus := &v1alpha1.FileIntegrityNodeStatus{}
err := f.Client.Get(goctx.TODO(), types.NamespacedName{Name: integrityName + "-" + deletedNodeName, Namespace: namespace}, nodestatus)
if err != nil {
if kerr.IsNotFound(err) {
t.Logf("Node status for node %s not found, as expected", deletedNodeName)
} else {
t.Errorf("error getting node status for node %s: %v", deletedNodeName, err)
}
} else {
t.Errorf("Node status for node %s found, but should not have been", deletedNodeName)
}
}

func assertNodesConditionIsSuccess(t *testing.T, f *framework.Framework, integrityName, namespace string, interval, timeout time.Duration) {
var lastErr error
type nodeStatus struct {
Expand Down

0 comments on commit 0cb7c6e

Please sign in to comment.