diff --git a/test/e2e/e2e.go b/test/e2e/e2e.go index e9c4e6a530b..29be495c063 100644 --- a/test/e2e/e2e.go +++ b/test/e2e/e2e.go @@ -2541,114 +2541,152 @@ var _ = ginkgo.Describe("e2e ingress gateway traffic validation", func() { }) }) -// This test validates OVS exports NetFlow data from br-int to an external collector -var _ = ginkgo.Describe("e2e br-int NetFlow export validation", func() { +// This test validates that OVS exports flow monitoring data from br-int to an external collector +var _ = ginkgo.Describe("e2e br-int flow monitoring export validation", func() { + type flowMonitoringProtocol string + const ( - svcname string = "netflow-test" - ovnNs string = "ovn-kubernetes" - netFlowCollectorContainer string = "netflow-collector" - ciNetworkName string = "kind" + netflow_v5 flowMonitoringProtocol = "netflow" + ipfix flowMonitoringProtocol = "ipfix" + sflow flowMonitoringProtocol = "sflow" + + svcname string = "netflow-test" + ovnNs string = "ovn-kubernetes" + collectorContainer string = "netflow-collector" + ciNetworkName string = "kind" ) - f := framework.NewDefaultFramework(svcname) + keywordInLogs := map[flowMonitoringProtocol]string{ + netflow_v5: "NETFLOW_V5", ipfix: "IPFIX", sflow: "SFLOW_5"} + f := framework.NewDefaultFramework(svcname) ginkgo.AfterEach(func() { - // tear down the NetFlow container - if cid, _ := runCommand("docker", "ps", "-qaf", fmt.Sprintf("name=%s", netFlowCollectorContainer)); cid != "" { - if _, err := runCommand("docker", "rm", "-f", netFlowCollectorContainer); err != nil { - framework.Logf("failed to delete the netFlow collector test container %s %v", netFlowCollectorContainer, err) + // tear down the collector container + if cid, _ := runCommand("docker", "ps", "-qaf", fmt.Sprintf("name=%s", collectorContainer)); cid != "" { + if _, err := runCommand("docker", "rm", "-f", collectorContainer); err != nil { + framework.Logf("failed to delete the collector test container %s %v", + collectorContainer, err) } } }) - ginkgo.It("Should validate NetFlow data of br-int is sent to an external gateway and unset NetFlow Targets", func() { - var ( - ciNetworkFlag = "{{ .NetworkSettings.Networks.kind.IPAddress }}" - ) - ginkgo.By("Starting a netflow collector container") - // start the NetFlow collector container that will receive data - _, err := runCommand("docker", "run", "-itd", "--privileged", "--network", ciNetworkName, "--name", netFlowCollectorContainer, "cloudflare/goflow", "-kafka=false") - if err != nil { - framework.Failf("failed to start NetFlow collector test container %s: %v", netFlowCollectorContainer, err) - } - // retrieve the container ip of the NetFlow collector container - netFlowCollectorIp, err := runCommand("docker", "inspect", "-f", ciNetworkFlag, netFlowCollectorContainer) - if err != nil { - framework.Failf("failed to start NetFlow collector test container: %v", err) - } - // trim newline from the inspect output - netFlowCollectorIp = strings.TrimSuffix(netFlowCollectorIp, "\n") - if ip := net.ParseIP(netFlowCollectorIp); ip == nil { - framework.Failf("Unable to retrieve a valid address from container %s with inspect output of %s", netFlowCollectorContainer, netFlowCollectorIp) - } + table.DescribeTable("Should validate flow data of br-int is sent to an external gateway", + func(protocol flowMonitoringProtocol, collectorPort uint16) { + protocolStr := string(protocol) + ipField := "IPAddress" + isIpv6 := IsIPv6Cluster(f.ClientSet) + if isIpv6 { + ipField = "GlobalIPv6Address" + } + ciNetworkFlag := fmt.Sprintf("{{ .NetworkSettings.Networks.kind.%s }}", ipField) - ginkgo.By("Configuring ovnkube-node to use the new netflow collector target") - framework.Logf("Setting OVN_NETFLOW_TARGETS environment variable value to NetFlow collector IP %s", netFlowCollectorIp) - framework.RunKubectlOrDie(ovnNs, "set", "env", "daemonset/ovnkube-node", "-c", "ovnkube-node", "OVN_NETFLOW_TARGETS="+netFlowCollectorIp+":2056") + ginkgo.By("Starting a flow collector container") + // start the collector container that will receive data + _, err := runCommand("docker", "run", "-itd", "--privileged", "--network", ciNetworkName, + "--name", collectorContainer, "cloudflare/goflow", "-kafka=false") + if err != nil { + framework.Failf("failed to start flow collector container %s: %v", collectorContainer, err) + } + ovnEnvVar := fmt.Sprintf("OVN_%s_TARGETS", strings.ToUpper(protocolStr)) + // retrieve the ip of the collector container + collectorIP, err := runCommand("docker", "inspect", "-f", ciNetworkFlag, collectorContainer) + if err != nil { + framework.Failf("could not retrieve IP address of collector container: %v", err) + } + // trim newline from the inspect output + collectorIP = strings.TrimSpace(collectorIP) + if net.ParseIP(collectorIP) == nil { + framework.Failf("Unable to retrieve a valid address from container %s with inspect output of %s", + collectorContainer, collectorIP) + } + addressAndPort, err1 := formatAddressAndPort(collectorIP, collectorPort) + if err1 != nil { + framework.Failf("Unable to correctly format value for input env variable %s with "+ + "IP address %s and port %d", ovnEnvVar, collectorIP, collectorPort) + } + ginkgo.By(fmt.Sprintf("Configuring ovnkube-node to use the new %s collector target", protocolStr)) + framework.Logf("Setting %s environment variable to %s", + ovnEnvVar, addressAndPort) - // Make sure the updated daemonset has rolled out, verify it's completion 10 times - // TODO (Change this to use the exported upstream function) - err = waitForDaemonSetUpdate(f.ClientSet, ovnNs, "ovnkube-node", 0, dsRestartTimeout) - framework.ExpectNoError(err) + framework.RunKubectlOrDie(ovnNs, "set", "env", "daemonset/ovnkube-node", "-c", "ovnkube-node", + fmt.Sprintf("%s=%s", ovnEnvVar, addressAndPort)) - ginkgo.By("Checking that the collector container received netflow") - netFlowCollectorContainerLogsTest := func() wait.ConditionFunc { - return func() (bool, error) { - netFlowCollectorContainerLogs, err := runCommand("docker", "logs", netFlowCollectorContainer) - if err != nil { - framework.Logf("failed to inspect logs in test container: %v", err) + // Make sure the updated daemonset has rolled out + // TODO (Change this to use the exported upstream function) + err = waitForDaemonSetUpdate(f.ClientSet, ovnNs, "ovnkube-node", 0, dsRestartTimeout) + framework.ExpectNoError(err) + ginkgo.By(fmt.Sprintf("Checking that the collector container received %s data", protocolStr)) + keyword := keywordInLogs[protocol] + collectorContainerLogsTest := func() wait.ConditionFunc { + return func() (bool, error) { + collectorContainerLogs, err := runCommand("docker", "logs", collectorContainer) + if err != nil { + framework.Logf("failed to inspect logs in test container: %v", err) + return false, nil + } + collectorContainerLogs = strings.TrimSuffix(collectorContainerLogs, "\n") + logLines := strings.Split(collectorContainerLogs, "\n") + lastLine := logLines[len(logLines)-1] + // check that flow monitoring traffic has been logged + if strings.Contains(lastLine, keyword) { + framework.Logf("Successfully found string %s in last log line of"+ + " the collector: %s", keyword, lastLine) + return true, nil + } + framework.Logf("%s not found in last log line: %s", keyword, lastLine) return false, nil } - netFlowCollectorContainerLogs = strings.TrimSuffix(netFlowCollectorContainerLogs, "\n") - logLines := strings.Split(netFlowCollectorContainerLogs, "\n") - lastLine := logLines[len(logLines)-1] - // check that NetFlow traffic has been logged. - if strings.Contains(lastLine, "NETFLOW_V5") { - framework.Logf("the NetFlow collector received NetFlow data, last logs: %s", logLines[len(logLines)-1]) - return true, nil - } - return false, nil } - } - - err = wait.PollImmediate(retryInterval, retryTimeout, netFlowCollectorContainerLogsTest()) - framework.ExpectNoError(err, "failed to verify that NetFlow collector container received NetFlow data from br-int") + // retryTimeout of 40s is short sometimes, so let's increase it to 60s + err = wait.PollImmediate(retryInterval, retryTimeout, collectorContainerLogsTest()) + framework.ExpectNoError(err, fmt.Sprintf("failed to verify that collector container "+ + "received %s data from br-int: string %s not found in logs", + protocolStr, keyword)) - ginkgo.By("Unsetting the OVN_NETFLOW_TARGETS variable in the ovnkube-node daemonset") - framework.RunKubectlOrDie(ovnNs, "set", "env", "daemonset/ovnkube-node", "-c", "ovnkube-node", "OVN_NETFLOW_TARGETS-") + ginkgo.By(fmt.Sprintf("Unsetting %s variable in ovnkube-node daemonset", ovnEnvVar)) + framework.RunKubectlOrDie(ovnNs, "set", "env", "daemonset/ovnkube-node", "-c", "ovnkube-node", + fmt.Sprintf("%s-", ovnEnvVar)) - // Make sure the updated daemonset has rolled out, verify it's completion 10 times - // TODO (Change this to use the exported upstream function) - err = waitForDaemonSetUpdate(f.ClientSet, ovnNs, "ovnkube-node", 0, dsRestartTimeout) - framework.ExpectNoError(err) + // Make sure the updated daemonset has rolled out + // TODO (Change this to use the exported upstream function) + err = waitForDaemonSetUpdate(f.ClientSet, ovnNs, "ovnkube-node", 0, dsRestartTimeout) + framework.ExpectNoError(err) - ovnKubeNodePods, err := f.ClientSet.CoreV1().Pods(ovnNs).List(context.TODO(), metav1.ListOptions{ - LabelSelector: "name=ovnkube-node", - }) - if err != nil { - framework.Failf("could not get ovnkube-node pods: %v", err) - } + ovnKubeNodePods, err := f.ClientSet.CoreV1().Pods(ovnNs).List(context.TODO(), metav1.ListOptions{ + LabelSelector: "name=ovnkube-node", + }) + if err != nil { + framework.Failf("could not get ovnkube-node pods: %v", err) + } - for _, ovnKubeNodePod := range ovnKubeNodePods.Items { + for _, ovnKubeNodePod := range ovnKubeNodePods.Items { - execOptions := framework.ExecOptions{ - Command: []string{"ovs-vsctl", "find", "netflow"}, - Namespace: ovnNs, - PodName: ovnKubeNodePod.Name, - ContainerName: "ovnkube-node", - CaptureStdout: true, - CaptureStderr: true, - } + execOptions := framework.ExecOptions{ + Command: []string{"ovs-vsctl", "find", strings.ToLower(protocolStr)}, + Namespace: ovnNs, + PodName: ovnKubeNodePod.Name, + ContainerName: "ovnkube-node", + CaptureStdout: true, + CaptureStderr: true, + } - targets, stderr, _ := f.ExecWithOptions(execOptions) - framework.Logf("execOptions are %v", execOptions) - if err != nil { - framework.Failf("could not lookup ovs netflow targets: %v", stderr) + targets, stderr, _ := f.ExecWithOptions(execOptions) + framework.Logf("execOptions are %v", execOptions) + if err != nil { + framework.Failf("could not lookup ovs %s targets: %v", protocolStr, stderr) + } + framework.ExpectEmpty(targets) } - framework.ExpectEmpty(targets) - } + }, + // This is a long test (~5 minutes per run), so let's just validate netflow v5 + // in an IPv4 cluster and sflow in IPv6 cluster + table.Entry("with netflow v5", netflow_v5, uint16(2056)), + // goflow doesn't currently support OVS ipfix: + // https://github.com/cloudflare/goflow/issues/99 + // table.Entry("ipfix", ipfix, uint16(2055)), + table.Entry("with sflow", sflow, uint16(6343)), + ) - }) }) func getNodePodCIDR(nodeName string) (string, error) { diff --git a/test/e2e/util.go b/test/e2e/util.go index b3f00f3b50d..7d29d2fd081 100644 --- a/test/e2e/util.go +++ b/test/e2e/util.go @@ -463,6 +463,21 @@ func addressIsIP(address v1.NodeAddress) bool { return true } +func formatAddressAndPort(addr string, port uint16) (string, error) { + var ( + err error + addrAndPort string + ) + if utilnet.IsIPv4String(addr) { + addrAndPort = fmt.Sprintf("%s:%d", addr, port) + } else if utilnet.IsIPv6String(addr) { + addrAndPort = fmt.Sprintf("[%s]:%d", addr, port) + } else { + err = error(fmt.Errorf("IP address %s is not valid", addr)) + } + return addrAndPort, err +} + // Returns pod's ipv4 and ipv6 addresses IN ORDER func getPodAddresses(pod *v1.Pod) (string, string) { var ipv4Res, ipv6Res string diff --git a/test/scripts/e2e-cp.sh b/test/scripts/e2e-cp.sh index f5bcbf3628c..283f577a013 100755 --- a/test/scripts/e2e-cp.sh +++ b/test/scripts/e2e-cp.sh @@ -7,15 +7,23 @@ export KUBERNETES_CONFORMANCE_TEST=y export KUBECONFIG=${HOME}/admin.conf # Skip tests which are not IPv6 ready yet (see description of https://github.com/ovn-org/ovn-kubernetes/pull/2276) +# (Note that netflow v5 is IPv4 only) IPV6_SKIPPED_TESTS="Should be allowed by externalip services|\ should provide connection to external host by DNS name from a pod|\ -Should validate NetFlow data of br-int is sent to an external gateway|\ +Should validate flow data of br-int is sent to an external gateway with netflow v5|\ test tainting a node according to its defaults interface MTU size" SKIPPED_TESTS="" -if [ "$KIND_IPV4_SUPPORT" == true ] && [ "$KIND_IPV6_SUPPORT" == true ]; then - # No support for these features in dual-stack yet - SKIPPED_TESTS="hybrid.overlay|external.gateway" + +if [ "$KIND_IPV4_SUPPORT" == true ]; then + if [ "$KIND_IPV6_SUPPORT" == true ]; then + # No support for these features in dual-stack yet + SKIPPED_TESTS="hybrid.overlay|external.gateway" + else + # Skip sflow in IPv4 since it's a long test (~5 minutes) + # We're validating netflow v5 with an ipv4 cluster, sflow with an ipv6 cluster + SKIPPED_TESTS="Should validate flow data of br-int is sent to an external gateway with sflow" + fi fi if [ "$OVN_HA" == false ]; then