Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

OCPBUGS-8502: Fix node scaling issue #343

Merged
merged 1 commit into from Apr 24, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
2 changes: 1 addition & 1 deletion go.mod
Expand Up @@ -9,6 +9,7 @@ require (
github.com/mitchellh/go-homedir v1.1.0
github.com/onsi/ginkgo v1.16.5
github.com/onsi/gomega v1.27.6
github.com/openshift/api v0.0.0-20230120195050-6ba31fa438f2
github.com/openshift/library-go v0.0.0-20230228181805-0899dfdba7d2
github.com/openshift/machine-config-operator v0.0.1-0.20200913004441-7eba765c69c9
github.com/pborman/uuid v1.2.1
Expand Down Expand Up @@ -70,7 +71,6 @@ require (
github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 // indirect
github.com/nbutton23/zxcvbn-go v0.0.0-20210217022336-fa2cb2858354 // indirect
github.com/nxadm/tail v1.4.8 // indirect
github.com/openshift/api v0.0.0-20230120195050-6ba31fa438f2 // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
github.com/prometheus/common v0.41.0 // indirect
github.com/prometheus/procfs v0.9.0 // indirect
Expand Down
1 change: 1 addition & 0 deletions pkg/controller/fileintegrity/config_defaults.go
Expand Up @@ -45,6 +45,7 @@ CONTENT_EX = sha512+ftype+p+u+g+n+acl+selinux+xattrs
!/hostroot/etc/docker/certs.d
!/hostroot/etc/selinux/targeted
!/hostroot/etc/openvswitch/conf.db
!/hostroot/etc/kubernetes/cni/net.d
!/hostroot/etc/kubernetes/cni/net.d/*
!/hostroot/etc/machine-config-daemon/currentconfig$
!/hostroot/etc/pki/ca-trust/extracted/java/cacerts$
Expand Down
23 changes: 21 additions & 2 deletions pkg/controller/status/status_controller.go
Expand Up @@ -2,9 +2,10 @@ package status

import (
"context"
"time"

"github.com/openshift/file-integrity-operator/pkg/apis/fileintegrity/v1alpha1"
"github.com/openshift/file-integrity-operator/pkg/controller/metrics"
"time"

"github.com/go-logr/logr"

Expand Down Expand Up @@ -171,12 +172,30 @@ func (r *StatusReconciler) mapActiveStatus(integrity *v1alpha1.FileIntegrity) (v
return v1alpha1.PhaseError, err
}

nodeList := corev1.NodeList{}
if err := r.client.List(context.TODO(), &nodeList, &client.ListOptions{}); err != nil {
return v1alpha1.PhaseError, err
}
nodeNameList := make(map[string]bool)
for _, node := range nodeList.Items {
nodeNameList[node.Name] = true
}

for _, nodeStatus := range nodeStatusList.Items {
// Check if the node is still there, and remove the node status if it's not.
// This is to handle the case where the node is deleted, but the node status is not.
if _, ok := nodeNameList[nodeStatus.NodeName]; !ok {
// If the node is not there, and the node status is success, we can just delete it.
if nodeStatus.LastResult.Condition == v1alpha1.NodeConditionSucceeded {
if err := r.client.Delete(context.TODO(), &nodeStatus); err != nil {
return v1alpha1.PhaseError, err
}
}
}
if nodeStatus.LastResult.Condition == v1alpha1.NodeConditionErrored {
return v1alpha1.PhaseError, nil
}
}

return v1alpha1.PhaseActive, nil
}

Expand Down
41 changes: 40 additions & 1 deletion tests/e2e/e2e_test.go
Expand Up @@ -9,7 +9,6 @@ import (

"github.com/openshift/file-integrity-operator/pkg/apis/fileintegrity/v1alpha1"
fileintegrity2 "github.com/openshift/file-integrity-operator/pkg/controller/fileintegrity"

framework "github.com/openshift/file-integrity-operator/tests/framework"

"k8s.io/apimachinery/pkg/types"
Expand Down Expand Up @@ -918,3 +917,43 @@ func TestFileIntegrityAcceptsExpectedChange(t *testing.T) {
t.Log("Asserting that the FileIntegrity check is in a SUCCESS state after expected changes")
assertNodesConditionIsSuccess(t, f, testName, namespace, 5*time.Second, 10*time.Minute)
}

// This checks test for adding new node and remove a existing node to the cluster and making sure
// the all the nodestatuses are in a success state, and the old nodestatus is removed for the removed node.
func TestFileIntegrityNodeScaling(t *testing.T) {
f, testctx, namespace := setupTest(t)
testName := testIntegrityNamePrefix + "-nodescale"
setupFileIntegrity(t, f, testctx, testName, namespace)
defer testctx.Cleanup()
defer func() {
if err := cleanNodes(f, namespace); err != nil {
t.Fatal(err)
}
if err := resetBundleTestMetrics(f, namespace); err != nil {
t.Fatal(err)
}
}()
defer logContainerOutput(t, f, namespace, testName)
// wait to go active.
err := waitForScanStatus(t, f, namespace, testName, v1alpha1.PhaseActive)
if err != nil {
t.Errorf("Timeout waiting for scan status")
}

t.Log("Asserting that the FileIntegrity check is in a SUCCESS state after deploying it")
assertNodesConditionIsSuccess(t, f, testName, namespace, 2*time.Second, 5*time.Minute)

t.Log("Adding a new worker node to the cluster through the machineset")
scaledUpMachineSetName, newNodeName := scaleUpWorkerMachineSet(t, f, 2*time.Second, 10*time.Minute)
if newNodeName == "" || scaledUpMachineSetName == "" {
t.Fatal("Failed to scale up worker machineset")
}
assertSingleNodeConditionIsSuccess(t, f, testName, namespace, newNodeName, 2*time.Second, 5*time.Minute)

t.Log("Scale down the worker machineset")
removedNodeName := scaleDownWorkerMachineSet(t, f, scaledUpMachineSetName, 2*time.Second, 10*time.Minute)
if removedNodeName == "" {
t.Fatal("Failed to scale down worker machineset")
}
assertNodeStatusForRemovedNode(t, f, testName, namespace, removedNodeName, 2*time.Second, 5*time.Minute)
}
162 changes: 162 additions & 0 deletions tests/e2e/helpers.go
Expand Up @@ -3,6 +3,7 @@ package e2e
import (
"bufio"
"bytes"
"context"
goctx "context"
"encoding/json"
"fmt"
Expand All @@ -16,6 +17,7 @@ import (
"testing"
"time"

machinev1 "github.com/openshift/api/machine/v1beta1"
"github.com/openshift/file-integrity-operator/pkg/apis/fileintegrity/v1alpha1"
"github.com/openshift/file-integrity-operator/pkg/controller/metrics"
"github.com/pborman/uuid"
Expand Down Expand Up @@ -66,6 +68,7 @@ const (
metricsTestCRBName = "fio-metrics-client"
metricsTestSAName = "default"
metricsTestTokenName = "metrics-token"
machineSetNamespace = "openshift-machine-api"
compressionFileCmd = "for i in `seq 1 10000`; do mktemp \"/hostroot/etc/addedbytest$i.XXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX\"; done || true"
)

Expand Down Expand Up @@ -115,6 +118,7 @@ CONTENT_EX = sha512+ftype+p+u+g+n+acl+selinux+xattrs
!/hostroot/etc/docker/certs.d
!/hostroot/etc/selinux/targeted
!/hostroot/etc/openvswitch/conf.db
!/hostroot/etc/kubernetes/cni/net.d
!/hostroot/etc/kubernetes/cni/net.d/*
!/hostroot/etc/machine-config-daemon/currentconfig$
!/hostroot/etc/pki/ca-trust/extracted/java/cacerts$
Expand Down Expand Up @@ -1005,6 +1009,164 @@ func assertNodeOKStatusEvents(t *testing.T, f *framework.Framework, namespace st
}
}

func scaleUpWorkerMachineSet(t *testing.T, f *framework.Framework, interval, timeout time.Duration) (string, string) {
// Add a new worker node to the cluster through the machineset
// Get the machineset
machineSets := &machinev1.MachineSetList{}
err := f.Client.List(context.TODO(), machineSets, &client.ListOptions{
Namespace: machineSetNamespace})
if err != nil {
t.Error(err)
}
if len(machineSets.Items) == 0 {
t.Error("No machinesets found")
}
machineSetName := ""
for _, ms := range machineSets.Items {
if ms.Spec.Replicas != nil && *ms.Spec.Replicas > 0 {
t.Logf("Found machineset %s with %d replicas", ms.Name, *ms.Spec.Replicas)
machineSetName = ms.Name
break
}
}

// Add one more replica to one of the machinesets
machineSet := &machinev1.MachineSet{}
err = f.Client.Get(context.TODO(), types.NamespacedName{Name: machineSetName, Namespace: machineSetNamespace}, machineSet)
if err != nil {
t.Error(err)
}
t.Logf("Scaling up machineset %s", machineSetName)

replicas := *machineSet.Spec.Replicas + 1
machineSet.Spec.Replicas = &replicas
err = f.Client.Update(context.TODO(), machineSet)
if err != nil {
t.Error(err)
}
t.Logf("Waiting for scaling up machineset %s", machineSetName)
provisionningMachineName := ""
err = wait.Poll(interval, timeout, func() (bool, error) {
err = f.Client.Get(context.TODO(), types.NamespacedName{Name: machineSetName, Namespace: machineSetNamespace}, machineSet)
if err != nil {
t.Error(err)
}
// get name of the new machine
if provisionningMachineName == "" {
machines := &machinev1.MachineList{}
err = f.Client.List(context.TODO(), machines, &client.ListOptions{
Namespace: machineSetNamespace})
if err != nil {
t.Error(err)
}
for _, machine := range machines.Items {
if *machine.Status.Phase == "Provisioning" {
provisionningMachineName = machine.Name
break
}
}
}
if machineSet.Status.Replicas == machineSet.Status.ReadyReplicas {
t.Logf("Machineset %s scaled up", machineSetName)
return true, nil
}
t.Logf("Waiting for machineset %s to scale up, current ready replicas: %d of %d", machineSetName, machineSet.Status.ReadyReplicas, machineSet.Status.Replicas)
return false, nil
})
if err != nil {
t.Error(err)
}
// get the new node name
newNodeName := ""
machine := &machinev1.Machine{}
err = f.Client.Get(context.TODO(), types.NamespacedName{Name: provisionningMachineName, Namespace: machineSetNamespace}, machine)
if err != nil {
t.Error(err)
}
newNodeName = machine.Status.NodeRef.Name
t.Logf("New node name is %s", newNodeName)

return machineSetName, newNodeName
}

func scaleDownWorkerMachineSet(t *testing.T, f *framework.Framework, machineSetName string, interval, timeout time.Duration) string {
// Remove the worker node from the cluster through the machineset
// Get the machineset
machineSet := &machinev1.MachineSet{}
err := f.Client.Get(context.TODO(), types.NamespacedName{Name: machineSetName, Namespace: machineSetNamespace}, machineSet)
if err != nil {
t.Error(err)
}

// Remove one replica from the machineset
t.Logf("Scaling down machineset %s", machineSetName)
replicas := *machineSet.Spec.Replicas - 1
machineSet.Spec.Replicas = &replicas
err = f.Client.Update(context.TODO(), machineSet)
if err != nil {
t.Error(err)
}
deletedNodeName := ""
t.Logf("Waiting for scaling down machineset %s", machineSetName)
err = wait.Poll(interval, timeout, func() (bool, error) {
err = f.Client.Get(context.TODO(), types.NamespacedName{Name: machineSetName, Namespace: machineSetNamespace}, machineSet)
if err != nil {
t.Error(err)
}
if machineSet.Status.Replicas == machineSet.Status.ReadyReplicas {
t.Logf("Machineset %s scaled down", machineSetName)
return true, nil
}
t.Logf("Waiting for machineset %s to scale down, current ready replicas: %d of %d", machineSet.Name, machineSet.Status.ReadyReplicas, machineSet.Status.Replicas)
return false, nil
})
if err != nil {
t.Error(err)
}
if deletedNodeName == "" {
// Get the node that was deleted
machineList := &machinev1.MachineList{}
err = f.Client.List(context.TODO(), machineList, &client.ListOptions{
Namespace: machineSetNamespace})
if err != nil {
t.Error(err)
}
if len(machineList.Items) == 0 {
t.Error("No machines found")
}
for _, machine := range machineList.Items {
if machine.DeletionTimestamp != nil {
deletedNodeName = machine.Status.NodeRef.Name
t.Logf("Found deleted node %s", deletedNodeName)
return deletedNodeName
}
}
}
return deletedNodeName
}

func assertNodeStatusForRemovedNode(t *testing.T, f *framework.Framework, integrityName, namespace, deletedNodeName string, interval, timeout time.Duration) {
timeoutErr := wait.PollImmediate(interval, timeout, func() (bool, error) {
nodestatus := &v1alpha1.FileIntegrityNodeStatus{}
err := f.Client.Get(goctx.TODO(), types.NamespacedName{Name: integrityName + "-" + deletedNodeName, Namespace: namespace}, nodestatus)
if err != nil {
if kerr.IsNotFound(err) {
t.Logf("Node status for node %s not found, as expected", deletedNodeName)
return true, nil
} else {
t.Errorf("error getting node status for node %s: %v", deletedNodeName, err)
return true, err
}
} else {
t.Logf("Node status for node %s found, waiting for it to be deleted", deletedNodeName)
return false, nil
}
})
if timeoutErr != nil {
t.Errorf("timed out waiting for node status for node %s to be deleted", deletedNodeName)
}
}

func assertNodesConditionIsSuccess(t *testing.T, f *framework.Framework, integrityName, namespace string, interval, timeout time.Duration) {
var lastErr error
type nodeStatus struct {
Expand Down
7 changes: 5 additions & 2 deletions tests/framework/framework.go
Expand Up @@ -15,15 +15,15 @@ import (
"time"

// Import all Kubernetes client auth plugins (e.g. Azure, GCP, OIDC, etc.)
_ "k8s.io/client-go/plugin/pkg/client/auth"

machinev1 "github.com/openshift/api/machine/v1beta1"
log "github.com/sirupsen/logrus"
extscheme "k8s.io/apiextensions-apiserver/pkg/client/clientset/clientset/scheme"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/util/wait"
cached "k8s.io/client-go/discovery/cached"
"k8s.io/client-go/kubernetes"
cgoscheme "k8s.io/client-go/kubernetes/scheme"
_ "k8s.io/client-go/plugin/pkg/client/auth"
"k8s.io/client-go/rest"
"k8s.io/client-go/restmapper"
"k8s.io/client-go/tools/clientcmd"
Expand Down Expand Up @@ -139,6 +139,9 @@ func newFramework(opts *frameworkOpts) (*Framework, error) {
if err := extscheme.AddToScheme(scheme); err != nil {
return nil, fmt.Errorf("failed to add api extensions scheme to runtime scheme: %w", err)
}
if err := machinev1.AddToScheme(scheme); err != nil {
return nil, fmt.Errorf("failed to add machine api scheme to runtime scheme: %w", err)
}

cachedDiscoveryClient := cached.NewMemCacheClient(kubeclient.Discovery())
restMapper := restmapper.NewDeferredDiscoveryRESTMapper(cachedDiscoveryClient)
Expand Down