From 6c078b42046b4733f702b3fbb585578d6304dafb Mon Sep 17 00:00:00 2001 From: Keran Yang Date: Tue, 20 Dec 2022 10:23:15 -0500 Subject: [PATCH] refactor: a prototype for enhancing E2E test framework (#424) This code change demonstrate a prototype to enhance current E2E testing framework. The prototype includes the following two changes and updates an existing test case - TestFiltering. 1. Send data to input http vertex through E2E api instead of http vertex port forwarding. 2. Replace log verification with checking output data in redis sink. Next step is to iteratively generalize the testing APIs to meet requirements from all existing and future test cases. e.g. supporting specifying message headers, support per-sink data validations etc. Signed-off-by: Keran Yang --- config/apps/redis/kustomization.yaml | 7 ++ config/apps/redis/redis-minimal.yaml | 80 ++++++++++++++++++ docs/user-guide/sources/http.md | 4 +- pkg/sinks/sink.go | 3 +- test/e2e-api/http.go | 55 +++++++++++++ test/e2e-api/redis.go | 63 +++++++++++++++ test/e2e/functional_test.go | 34 +++----- test/e2e/testdata/filtering.yaml | 4 +- test/fixtures/expect.go | 20 +++++ test/fixtures/http_send.go | 22 +++++ test/fixtures/redis.go | 33 ++++++++ test/fixtures/redis_check.go | 117 +++++++++++++++++++++++++++ test/fixtures/when.go | 28 +++++++ 13 files changed, 443 insertions(+), 27 deletions(-) create mode 100644 config/apps/redis/kustomization.yaml create mode 100644 config/apps/redis/redis-minimal.yaml create mode 100644 test/e2e-api/http.go create mode 100644 test/e2e-api/redis.go create mode 100644 test/fixtures/http_send.go create mode 100644 test/fixtures/redis.go create mode 100644 test/fixtures/redis_check.go diff --git a/config/apps/redis/kustomization.yaml b/config/apps/redis/kustomization.yaml new file mode 100644 index 0000000000..861423b93f --- /dev/null +++ b/config/apps/redis/kustomization.yaml @@ -0,0 +1,7 @@ +# redis-minimal.yaml is used in E2E testing to create a redis instance before we start a test pipeline which writes to redis. +resources: + - redis-minimal.yaml + +commonLabels: + "numaflow-e2e": "true" + diff --git a/config/apps/redis/redis-minimal.yaml b/config/apps/redis/redis-minimal.yaml new file mode 100644 index 0000000000..944942a3fd --- /dev/null +++ b/config/apps/redis/redis-minimal.yaml @@ -0,0 +1,80 @@ +--- +# +# Redis service +# +apiVersion: v1 +kind: Service +metadata: + name: redis + labels: + app: redis +spec: + ports: + - port: 6379 + targetPort: 6379 + name: client + clusterIP: None + selector: + app: redis +--- +# +# Redis configuration file +# +apiVersion: v1 +kind: ConfigMap +metadata: + name: redis-config + labels: + app: redis +data: + redis-config: | + maxmemory 10mb + maxmemory-policy allkeys-lru +--- +# +# Redis stateful set +# +apiVersion: apps/v1 +kind: StatefulSet +metadata: + name: redis +spec: + serviceName: redis + replicas: 1 + minReadySeconds: 10 # by default is 0 + selector: + matchLabels: + app: redis # has to match .spec.template.metadata.labels + template: + metadata: + labels: + app: redis + name: redis + spec: + terminationGracePeriodSeconds: 10 + containers: + - name: redis + image: redis:5.0.4 + ports: + - containerPort: 6379 + name: client + command: + - redis-server + - "/redis-master/redis.conf" + env: + - name: MASTER + value: "true" + volumeMounts: + - mountPath: /redis-master-data + name: data + - mountPath: /redis-master + name: config + volumes: + - name: data + emptyDir: {} + - name: config + configMap: + name: redis-config + items: + - key: redis-config + path: redis.conf diff --git a/docs/user-guide/sources/http.md b/docs/user-guide/sources/http.md index 249b74eab4..07d7b9a300 100644 --- a/docs/user-guide/sources/http.md +++ b/docs/user-guide/sources/http.md @@ -2,7 +2,7 @@ HTTP Source starts an HTTP service with TLS enabled to accept POST request in the Vertex Pod. It listens to port 8443, with request URI `/vertices/{vertexName}`. -An Pipeline with HTTP Source: +A Pipeline with HTTP Source: ```yaml apiVersion: numaflow.numaproj.io/v1alpha1 @@ -127,4 +127,4 @@ curl -kq -X POST -H "Authorization: $TOKEN" -d "hello world" https://http-pipeli ## Health Check -The HTTP Source also has an endpoint `/health` created automatically, which is useful for for LoadBalancer or Ingress configuration, where a health check endpoint is often required by the cloud provider. +The HTTP Source also has an endpoint `/health` created automatically, which is useful for LoadBalancer or Ingress configuration, where a health check endpoint is often required by the cloud provider. diff --git a/pkg/sinks/sink.go b/pkg/sinks/sink.go index f0d1983236..ba5b88ab76 100644 --- a/pkg/sinks/sink.go +++ b/pkg/sinks/sink.go @@ -23,8 +23,6 @@ import ( "go.uber.org/zap" - "github.com/numaproj/numaflow/pkg/sinks/blackhole" - dfv1 "github.com/numaproj/numaflow/pkg/apis/numaflow/v1alpha1" "github.com/numaproj/numaflow/pkg/isb" jetstreamisb "github.com/numaproj/numaflow/pkg/isb/stores/jetstream" @@ -34,6 +32,7 @@ import ( jsclient "github.com/numaproj/numaflow/pkg/shared/clients/nats" redisclient "github.com/numaproj/numaflow/pkg/shared/clients/redis" "github.com/numaproj/numaflow/pkg/shared/logging" + "github.com/numaproj/numaflow/pkg/sinks/blackhole" kafkasink "github.com/numaproj/numaflow/pkg/sinks/kafka" logsink "github.com/numaproj/numaflow/pkg/sinks/logger" "github.com/numaproj/numaflow/pkg/sinks/udsink" diff --git a/test/e2e-api/http.go b/test/e2e-api/http.go new file mode 100644 index 0000000000..04dca30697 --- /dev/null +++ b/test/e2e-api/http.go @@ -0,0 +1,55 @@ +/* +Copyright 2022 The Numaproj Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package main + +import ( + "bytes" + "crypto/tls" + "fmt" + "io" + "net/http" +) + +var httpClient *http.Client + +func init() { + httpClient = &http.Client{ + Transport: &http.Transport{ + TLSClientConfig: &tls.Config{InsecureSkipVerify: true}, + }, + } + + // send-message API is used to post data to a http source vertex pod. + // The API takes in two parameters(podIp and vertexName) and constructs the target url as + // https://{podIp}:8443/vertices/{vertexName}. + http.HandleFunc("/http/send-message", func(w http.ResponseWriter, r *http.Request) { + podIp := r.URL.Query().Get("podIp") + vertexName := r.URL.Query().Get("vertexName") + buf, err := io.ReadAll(r.Body) + if err != nil { + w.WriteHeader(500) + _, _ = w.Write([]byte(err.Error())) + } + + _, err = httpClient.Post(fmt.Sprintf("https://%s:8443/vertices/%s", podIp, vertexName), "application/json", bytes.NewBuffer(buf)) + + if err != nil { + w.WriteHeader(500) + _, _ = w.Write([]byte(err.Error())) + } + }) +} diff --git a/test/e2e-api/redis.go b/test/e2e-api/redis.go new file mode 100644 index 0000000000..1d017dd354 --- /dev/null +++ b/test/e2e-api/redis.go @@ -0,0 +1,63 @@ +/* +Copyright 2022 The Numaproj Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package main + +import ( + "context" + "fmt" + "github.com/go-redis/redis/v8" + "log" + "net/http" + "net/url" +) + +var redisClient *redis.Client + +func init() { + + // When we use this API to validate e2e test result, we always assume a redis UDSink is used + // to persist data to a redis instance listening on port 6379. + redisClient = redis.NewClient(&redis.Options{ + Addr: "redis:6379", + }) + + // get-msg-count-contains takes a targetRegex and returns number of keys in redis + // which contain a substring matching the targetRegex. + http.HandleFunc("/redis/get-msg-count-contains", func(w http.ResponseWriter, r *http.Request) { + targetRegex, err := url.QueryUnescape(r.URL.Query().Get("targetRegex")) + + if err != nil { + log.Println(err) + w.WriteHeader(500) + _, _ = w.Write([]byte(err.Error())) + return + } + + // Redis Keys API uses scan to retrieve data, which is not best practice in terms of performance. + // TODO - Look into replacing it with a more efficient API or data structure. + keyList, err := redisClient.Keys(context.Background(), fmt.Sprintf("*%s*", targetRegex)).Result() + if err != nil { + log.Println(err) + w.WriteHeader(500) + _, _ = w.Write([]byte(err.Error())) + return + } + + w.WriteHeader(200) + _, _ = w.Write([]byte(fmt.Sprint(len(keyList)))) + }) +} diff --git a/test/e2e/functional_test.go b/test/e2e/functional_test.go index e17d509dca..671547c288 100644 --- a/test/e2e/functional_test.go +++ b/test/e2e/functional_test.go @@ -32,6 +32,8 @@ import ( . "github.com/numaproj/numaflow/test/fixtures" ) +//go:generate kubectl -n numaflow-system delete statefulset redis --ignore-not-found=true +//go:generate kubectl apply -k ../../config/apps/redis -n numaflow-system type FunctionalSuite struct { E2ESuite } @@ -41,7 +43,6 @@ func (s *FunctionalSuite) TestCreateSimplePipeline() { When(). CreatePipelineAndWait() defer w.DeletePipelineAndWait() - pipelineName := "simple-pipeline" w.Expect(). @@ -108,34 +109,22 @@ func (s *FunctionalSuite) TestFiltering() { When(). CreatePipelineAndWait() defer w.DeletePipelineAndWait() + pipelineName := "filtering" w.Expect(). VertexPodsRunning(). VertexPodLogContains("in", LogSourceVertexStarted). VertexPodLogContains("p1", LogUDFVertexStarted, PodLogCheckOptionWithContainer("numa")). - VertexPodLogContains("out", LogSinkVertexStarted) - - defer w.VertexPodPortForward("in", 8443, dfv1.VertexHTTPSPort). - TerminateAllPodPortForwards() - - HTTPExpect(s.T(), "https://localhost:8443").POST("/vertices/in").WithBytes([]byte(`{"id": 180, "msg": "hello", "expect0": "fail", "desc": "A bad example"}`)). - Expect(). - Status(204) - - HTTPExpect(s.T(), "https://localhost:8443").POST("/vertices/in").WithBytes([]byte(`{"id": 80, "msg": "hello1", "expect1": "fail", "desc": "A bad example"}`)). - Expect(). - Status(204) - - HTTPExpect(s.T(), "https://localhost:8443").POST("/vertices/in").WithBytes([]byte(`{"id": 80, "msg": "hello", "expect2": "fail", "desc": "A bad example"}`)). - Expect(). - Status(204) + VertexPodLogContains("out", LogSinkVertexStarted, PodLogCheckOptionWithContainer("numa")) - HTTPExpect(s.T(), "https://localhost:8443").POST("/vertices/in").WithBytes([]byte(`{"id": 80, "msg": "hello", "expect3": "succeed", "desc": "A good example"}`)). - Expect(). - Status(204) + w.SendMessageTo(pipelineName, "in", []byte(`{"id": 180, "msg": "hello", "expect0": "fail", "desc": "A bad example"}`)). + SendMessageTo(pipelineName, "in", []byte(`{"id": 80, "msg": "hello1", "expect1": "fail", "desc": "A bad example"}`)). + SendMessageTo(pipelineName, "in", []byte(`{"id": 80, "msg": "hello", "expect2": "fail", "desc": "A bad example"}`)). + SendMessageTo(pipelineName, "in", []byte(`{"id": 80, "msg": "hello", "expect3": "succeed", "desc": "A good example"}`)). + SendMessageTo(pipelineName, "in", []byte(`{"id": 80, "msg": "hello", "expect4": "succeed", "desc": "A good example"}`)) - w.Expect().VertexPodLogContains("out", "expect3") - w.Expect().VertexPodLogNotContains("out", "expect[0-2]", PodLogCheckOptionWithTimeout(2*time.Second)) + w.Expect().RedisContains("out", "expect[3-4]", RedisCheckOptionWithCount(2)) + w.Expect().RedisNotContains("out", "expect[0-2]") } func (s *FunctionalSuite) TestConditionalForwarding() { @@ -152,6 +141,7 @@ func (s *FunctionalSuite) TestConditionalForwarding() { VertexPodLogContains("even-sink", LogSinkVertexStarted). VertexPodLogContains("odd-sink", LogSinkVertexStarted). VertexPodLogContains("number-sink", LogSinkVertexStarted) + defer w.VertexPodPortForward("in", 8443, dfv1.VertexHTTPSPort). TerminateAllPodPortForwards() diff --git a/test/e2e/testdata/filtering.yaml b/test/e2e/testdata/filtering.yaml index e54887d9ce..353ea4215a 100644 --- a/test/e2e/testdata/filtering.yaml +++ b/test/e2e/testdata/filtering.yaml @@ -15,7 +15,9 @@ spec: expression: int(json(payload).id) < 100 && json(payload).msg == 'hello' && sprig.contains('good', string(json(payload).desc)) - name: out sink: - log: {} + udsink: + container: + image: quay.io/numaio/numaflow-sink/redis-e2e-test-sink:latest edges: - from: in to: p1 diff --git a/test/fixtures/expect.go b/test/fixtures/expect.go index 5bfe70c2e4..7e5606a436 100644 --- a/test/fixtures/expect.go +++ b/test/fixtures/expect.go @@ -41,6 +41,26 @@ type Expect struct { kubeClient kubernetes.Interface } +func (t *Expect) RedisContains(sinkName string, targetRegex string, opts ...RedisCheckOption) *Expect { + t.t.Helper() + ctx := context.Background() + contains := RedisContains(ctx, sinkName, targetRegex, opts...) + if !contains { + t.t.Fatalf("Expected redis contains target regex %s populated by sink %s.", targetRegex, sinkName) + } + return t +} + +func (t *Expect) RedisNotContains(sinkName string, targetRegex string) *Expect { + t.t.Helper() + ctx := context.Background() + notContains := RedisNotContains(ctx, sinkName, targetRegex) + if !notContains { + t.t.Fatalf("Not expected redis contains target regex %s populated by sink %s.", targetRegex, sinkName) + } + return t +} + func (t *Expect) ISBSvcDeleted(timeout time.Duration) *Expect { t.t.Helper() ctx := context.Background() diff --git a/test/fixtures/http_send.go b/test/fixtures/http_send.go new file mode 100644 index 0000000000..5e71a1f96f --- /dev/null +++ b/test/fixtures/http_send.go @@ -0,0 +1,22 @@ +/* +Copyright 2022 The Numaproj Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package fixtures + +// SendMessageTo sends msg to a pod in http source vertex. +func SendMessageTo(podIp string, vertexName string, msg []byte) { + InvokeE2EAPIPOST("/http/send-message?podIp=%s&vertexName=%s", string(msg[:]), podIp, vertexName) +} diff --git a/test/fixtures/redis.go b/test/fixtures/redis.go new file mode 100644 index 0000000000..1d9558ccad --- /dev/null +++ b/test/fixtures/redis.go @@ -0,0 +1,33 @@ +/* +Copyright 2022 The Numaproj Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package fixtures + +import ( + "fmt" + "net/url" + "strconv" +) + +// GetMsgCountContains returns number of keys in redis which contain a substring matching the targetRegex. +func GetMsgCountContains(sinkName string, targetRegex string) int { + str := InvokeE2EAPI("/redis/get-msg-count-contains?sinkName=%s&targetRegex=%s", sinkName, url.QueryEscape(targetRegex)) + count, err := strconv.Atoi(str) + if err != nil { + panic(fmt.Sprintf("Can't parse string %s to an integer.", str)) + } + return count +} diff --git a/test/fixtures/redis_check.go b/test/fixtures/redis_check.go new file mode 100644 index 0000000000..fa4b8f27b7 --- /dev/null +++ b/test/fixtures/redis_check.go @@ -0,0 +1,117 @@ +/* +Copyright 2022 The Numaproj Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package fixtures + +import ( + "context" + "time" +) + +// Retry checking redis every 5 seconds. +const retryInterval = time.Second * 5 + +// RedisNotContains verifies that there is no key in redis which contain a substring matching the targetRegex. +func RedisNotContains(ctx context.Context, sinkName string, regex string, opts ...RedisCheckOption) bool { + o := defaultRedisCheckOptions() + for _, opt := range opts { + if opt != nil { + opt(o) + } + } + ctx, cancel := context.WithTimeout(ctx, o.timeout) + defer cancel() + + return runChecks(ctx, func() bool { + return !redisContains(sinkName, regex, 1) + }) +} + +// RedisContains verifies that there are keys in redis which contain a substring matching the targetRegex. +func RedisContains(ctx context.Context, sinkName string, targetRegex string, opts ...RedisCheckOption) bool { + o := defaultRedisCheckOptions() + for _, opt := range opts { + if opt != nil { + opt(o) + } + } + ctx, cancel := context.WithTimeout(ctx, o.timeout) + defer cancel() + + return runChecks(ctx, func() bool { + return redisContains(sinkName, targetRegex, o.count) + }) +} + +func redisContains(sinkName string, targetRegex string, expectedCount int) bool { + // If number of matches is higher than expected, we treat it as passing the check. + return GetMsgCountContains(sinkName, targetRegex) >= expectedCount +} + +type redisCheckOptions struct { + count int + timeout time.Duration +} + +func defaultRedisCheckOptions() *redisCheckOptions { + return &redisCheckOptions{ + count: 1, + timeout: defaultTimeout, + } +} + +type RedisCheckOption func(*redisCheckOptions) + +// RedisCheckOptionWithCount updates the redisCheckOptions to specify count. +// The count is the expected number of matches for the check. +func RedisCheckOptionWithCount(c int) RedisCheckOption { + return func(o *redisCheckOptions) { + o.count = c + } +} + +// RedisCheckOptionWithTimeout updates the redisCheckOptions to specify timeout. +// The timeout specifies how long the redis check will wait for expected data to be ready in redis. +func RedisCheckOptionWithTimeout(t time.Duration) RedisCheckOption { + return func(o *redisCheckOptions) { + o.timeout = t + } +} + +type CheckFunc func() bool + +// runChecks executes a performChecks function with retry strategy (retryInterval with timeout). +// If performChecks doesn't pass within timeout, runChecks returns false indicating the checks have failed. +// This is to mitigate the problem that we don't know exactly when a numaflow pipeline finishes processing our test data. +// Please notice such approach is not strictly accurate as there can be case where runChecks passes before pipeline finishes processing data. +// Which could result in false positive test results. e.g. checking data doesn't exist can pass before data gets persisted to redis. +func runChecks(ctx context.Context, performChecks CheckFunc) bool { + ticker := time.NewTicker(retryInterval) + defer ticker.Stop() + + for { + select { + case <-ctx.Done(): + // Timeout reached, meaning checks did not pass within timeout, return false + return false + case <-ticker.C: + if performChecks() { + // All checks passed, return true + return true + } + } + } +} diff --git a/test/fixtures/when.go b/test/fixtures/when.go index 89abd971df..145e4b8943 100644 --- a/test/fixtures/when.go +++ b/test/fixtures/when.go @@ -40,6 +40,34 @@ type When struct { kubeClient kubernetes.Interface portForwarderStopChannels map[string]chan struct{} + // Key: vertex label selector + // Value: the ip of, one of the pods matching the label selector + vertexToPodIpMapping map[string]string +} + +// SendMessageTo sends msg to one of the pods in http source vertex. +func (w *When) SendMessageTo(pipelineName string, vertexName string, msg []byte) *When { + w.t.Helper() + if w.vertexToPodIpMapping == nil { + w.vertexToPodIpMapping = make(map[string]string) + } + labelSelector := fmt.Sprintf("%s=%s,%s=%s", dfv1.KeyPipelineName, pipelineName, dfv1.KeyVertexName, vertexName) + if w.vertexToPodIpMapping[labelSelector] == "" { + ctx := context.Background() + podList, err := w.kubeClient.CoreV1().Pods(Namespace).List(ctx, metav1.ListOptions{LabelSelector: labelSelector, FieldSelector: "status.phase=Running"}) + if err != nil { + w.t.Fatalf("Error getting vertex pod list: %v", err) + } + if len(podList.Items) == 0 { + w.t.Fatalf("No running pod found in pipeline %s, vertex: %s", pipelineName, vertexName) + } + w.vertexToPodIpMapping[labelSelector] = podList.Items[0].Status.PodIP + } + + // There could be a rare corner case when a previous added pod gets replaced by a new one, making the mapping entry no longer valid. + // Considering current e2e tests are all lightweight, we assume no such case for now. + SendMessageTo(w.vertexToPodIpMapping[labelSelector], vertexName, msg) + return w } func (w *When) CreateISBSvc() *When {