Skip to content

Commit

Permalink
node: device-plugin: add node reboot test scenario
Browse files Browse the repository at this point in the history
Add a test suit to simulate node reboot (achieved by removing pods
using CRI API before kubelet is restarted).

Signed-off-by: Swati Sehgal <swsehgal@redhat.com>
  • Loading branch information
swatisehgal committed May 3, 2023
1 parent b50d53f commit 4ba6f87
Showing 1 changed file with 198 additions and 0 deletions.
198 changes: 198 additions & 0 deletions test/e2e_node/device_plugin_test.go
Expand Up @@ -18,7 +18,9 @@ package e2enode

import (
"context"
"errors"
"fmt"
"os"
"path/filepath"
"regexp"
"strings"
Expand All @@ -32,6 +34,8 @@ import (
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/serializer"
"k8s.io/apimachinery/pkg/util/sets"
runtimeapi "k8s.io/cri-api/pkg/apis/runtime/v1"
"k8s.io/klog/v2"
kubeletdevicepluginv1beta1 "k8s.io/kubelet/pkg/apis/deviceplugin/v1beta1"
admissionapi "k8s.io/pod-security-admission/api"

Expand All @@ -43,6 +47,7 @@ import (
"k8s.io/kubernetes/test/e2e/framework"
e2enode "k8s.io/kubernetes/test/e2e/framework/node"
e2epod "k8s.io/kubernetes/test/e2e/framework/pod"
e2etestfiles "k8s.io/kubernetes/test/e2e/framework/testfiles"
)

var (
Expand All @@ -55,6 +60,7 @@ var _ = SIGDescribe("Device Plugin [Feature:DevicePluginProbe][NodeFeature:Devic
f := framework.NewDefaultFramework("device-plugin-errors")
f.NamespacePodSecurityEnforceLevel = admissionapi.LevelPrivileged
testDevicePlugin(f, kubeletdevicepluginv1beta1.DevicePluginPath)
testDevicePluginNodeReboot(f, kubeletdevicepluginv1beta1.DevicePluginPath)
})

// readDaemonSetV1OrDie reads daemonset object from bytes. Panics on error.
Expand Down Expand Up @@ -373,6 +379,198 @@ func testDevicePlugin(f *framework.Framework, pluginSockDir string) {
})
}

func testDevicePluginNodeReboot(f *framework.Framework, pluginSockDir string) {
ginkgo.Context("DevicePlugin [Serial] [Disruptive]", func() {
var devicePluginPod *v1.Pod
var v1alphaPodResources *kubeletpodresourcesv1alpha1.ListPodResourcesResponse
var v1PodResources *kubeletpodresourcesv1.ListPodResourcesResponse
var triggerPathFile, triggerPathDir string
var err error
ginkgo.BeforeEach(func() {
ginkgo.By("Wait for node to be ready")
gomega.Eventually(func() bool {
nodes, err := e2enode.TotalReady(f.ClientSet)
framework.ExpectNoError(err)
return nodes == 1
}, time.Minute, time.Second).Should(gomega.BeTrue())

// Before we run the device plugin test, we need to ensure
// that the cluster is in a clean state and there are no
// pods running on this node.
// This is done in a gomega.Eventually with retries since a prior test in a different test suite could've run and the deletion of it's resources may still be in progress.
// xref: https://issue.k8s.io/115381
gomega.Eventually(func() error {
v1alphaPodResources, err = getV1alpha1NodeDevices()
if err != nil {
return fmt.Errorf("failed to get node local podresources by accessing the (v1alpha) podresources API endpoint: %v", err)
}

v1PodResources, err = getV1NodeDevices()
if err != nil {
return fmt.Errorf("failed to get node local podresources by accessing the (v1) podresources API endpoint: %v", err)
}

if len(v1alphaPodResources.PodResources) > 0 {
return fmt.Errorf("expected v1alpha pod resources to be empty, but got non-empty resources: %+v", v1alphaPodResources.PodResources)
}

if len(v1PodResources.PodResources) > 0 {
return fmt.Errorf("expected v1 pod resources to be empty, but got non-empty resources: %+v", v1PodResources.PodResources)
}
return nil
}, f.Timeouts.PodDelete, 2*time.Second).Should(gomega.Succeed())

ginkgo.By("Setting up the directory and file for controlling registration")
triggerPathDir = filepath.Join(devicePluginDir, "sample")
if _, err := os.Stat(triggerPathDir); errors.Is(err, os.ErrNotExist) {
err := os.Mkdir(triggerPathDir, os.ModePerm)
if err != nil {
klog.Errorf("Directory creation %s failed: %v ", triggerPathDir, err)
panic(err)
}
klog.InfoS("Directory created successfully")

triggerPathFile = filepath.Join(triggerPathDir, "registration")
if _, err := os.Stat(triggerPathFile); errors.Is(err, os.ErrNotExist) {
_, err = os.Create(triggerPathFile)
if err != nil {
klog.Errorf("File creation %s failed: %v ", triggerPathFile, err)
panic(err)
}
}
}

ginkgo.By("Scheduling a sample device plugin pod")
data, err := e2etestfiles.Read(SampleDevicePluginControlRegistrationDSYAML)
if err != nil {
framework.Fail(err.Error())
}
ds := readDaemonSetV1OrDie(data)

dp := &v1.Pod{
ObjectMeta: metav1.ObjectMeta{
Name: SampleDevicePluginName,
},
Spec: ds.Spec.Template.Spec,
}

devicePluginPod = f.PodClient().CreateSync(dp)

go func() {
// Since autoregistration is disabled for the device plugin (as REGISTER_CONTROL_FILE
// environment variable is specified), device plugin registration needs to be triggerred
// manually.
// This is done by deleting the control file at the following path:
// `/var/lib/kubelet/device-plugins/sample/registration`.

defer ginkgo.GinkgoRecover()
framework.Logf("Deleting the control file: %q to trigger registration", triggerPathFile)
err := os.Remove(triggerPathFile)
framework.ExpectNoError(err)
}()

ginkgo.By("Waiting for devices to become available on the local node")
gomega.Eventually(func() bool {
node, ready := getLocalTestNode(f)
return ready && CountSampleDeviceCapacity(node) > 0
}, 5*time.Minute, framework.Poll).Should(gomega.BeTrue())
framework.Logf("Successfully created device plugin pod")

ginkgo.By(fmt.Sprintf("Waiting for the resource exported by the sample device plugin to become available on the local node (instances: %d)", expectedSampleDevsAmount))
gomega.Eventually(func() bool {
node, ready := getLocalTestNode(f)
return ready &&
CountSampleDeviceCapacity(node) == expectedSampleDevsAmount &&
CountSampleDeviceAllocatable(node) == expectedSampleDevsAmount
}, 30*time.Second, framework.Poll).Should(gomega.BeTrue())
})

ginkgo.AfterEach(func() {
ginkgo.By("Deleting the device plugin pod")
f.PodClient().DeleteSync(devicePluginPod.Name, metav1.DeleteOptions{}, time.Minute)

ginkgo.By("Deleting any Pods created by the test")
l, err := f.PodClient().List(context.TODO(), metav1.ListOptions{})
framework.ExpectNoError(err)
for _, p := range l.Items {
if p.Namespace != f.Namespace.Name {
continue
}

framework.Logf("Deleting pod: %s", p.Name)
f.PodClient().DeleteSync(p.Name, metav1.DeleteOptions{}, 2*time.Minute)
}

err = os.Remove(triggerPathDir)
framework.ExpectNoError(err)

ginkgo.By("Waiting for devices to become unavailable on the local node")
gomega.Eventually(func() bool {
node, ready := getLocalTestNode(f)
return ready && CountSampleDeviceCapacity(node) <= 0
}, 5*time.Minute, framework.Poll).Should(gomega.BeTrue())

ginkgo.By("devices now unavailable on the local node")
})

// simulate node reboot scenario by removing pods using CRI before kubelet is started. In addition to that,
// intentionally a scenario is created where after node reboot, application pods requesting devices appear before the device plugin pod
// exposing those devices as resource has re-registers itself to Kubelet. The expected behavior is that the application pod fails at
// admission time.
ginkgo.It("Keeps device plugin assignments across node reboots (no pod restart, no device plugin re-registration)", func() {
podRECMD := fmt.Sprintf("devs=$(ls /tmp/ | egrep '^Dev-[0-9]+$') && echo stub devices: $devs && sleep %s", sleepIntervalForever)
pod1 := f.PodClient().CreateSync(makeBusyboxPod(SampleDeviceResourceName, podRECMD))
deviceIDRE := "stub devices: (Dev-[0-9]+)"
devID1, err := parseLog(f, pod1.Name, pod1.Name, deviceIDRE)
framework.ExpectNoError(err, "getting logs for pod %q", pod1.Name)

gomega.Expect(devID1).To(gomega.Not(gomega.Equal("")))

pod1, err = f.PodClient().Get(context.TODO(), pod1.Name, metav1.GetOptions{})
framework.ExpectNoError(err)

ginkgo.By("stopping the kubelet")
startKubelet := stopKubelet()

ginkgo.By("stopping all the local containers - using CRI")
rs, _, err := getCRIClient()
framework.ExpectNoError(err)
sandboxes, err := rs.ListPodSandbox(&runtimeapi.PodSandboxFilter{})
framework.ExpectNoError(err)
for _, sandbox := range sandboxes {
gomega.Expect(sandbox.Metadata).ToNot(gomega.BeNil())
ginkgo.By(fmt.Sprintf("deleting pod using CRI: %s/%s -> %s", sandbox.Metadata.Namespace, sandbox.Metadata.Name, sandbox.Id))

err := rs.RemovePodSandbox(sandbox.Id)
framework.ExpectNoError(err)
}

ginkgo.By("restarting the kubelet")
startKubelet()

ginkgo.By("Wait for node to be ready again")
framework.WaitForAllNodesSchedulable(f.ClientSet, 5*time.Minute)

ginkgo.By("Waiting for the pod to fail with admission error as device plugin hasn't re-registered yet")
ensurePodFailedWithAdmissionError(f, pod1.Name)

// crosscheck from the device assignment is preserved and stable from perspective of the kubelet.
// note we don't check again the logs of the container: the check is done at startup, the container
// never restarted (runs "forever" from this test timescale perspective) hence re-doing this check
// is useless.
ginkgo.By("Verifying the device assignment after kubelet restart using podresources API")
gomega.Eventually(func() error {
v1PodResources, err = getV1NodeDevices()
return err
}, 30*time.Second, framework.Poll).ShouldNot(gomega.HaveOccurred(), "cannot fetch the compute resource assignment after kubelet restart")

err = checkPodResourcesAssignment(v1PodResources, pod1.Namespace, pod1.Name, pod1.Spec.Containers[0].Name, SampleDeviceResourceName, []string{devID1})
framework.ExpectNoError(err, "inconsistent device assignment after node reboot")

})
})
}

// makeBusyboxPod returns a simple Pod spec with a busybox container
// that requests SampleDeviceResourceName and runs the specified command.
func makeBusyboxPod(SampleDeviceResourceName, cmd string) *v1.Pod {
Expand Down

0 comments on commit 4ba6f87

Please sign in to comment.