Skip to content

Commit

Permalink
refactor: a prototype for enhancing E2E test framework (#424)
Browse files Browse the repository at this point in the history
This code change demonstrate a prototype to enhance current E2E testing framework. The prototype includes the following two changes and updates an existing test case - TestFiltering.

1. Send data to input http vertex through E2E api instead of http vertex port forwarding.
2. Replace log verification with checking output data in redis sink.

Next step is to iteratively generalize the testing APIs to meet requirements from all existing and future test cases. e.g. supporting specifying message headers, support per-sink data validations etc.

Signed-off-by: Keran Yang <yangkr920208@gmail.com>
  • Loading branch information
KeranYang authored and whynowy committed Jan 13, 2023
1 parent e7021c9 commit 6c078b4
Show file tree
Hide file tree
Showing 13 changed files with 443 additions and 27 deletions.
7 changes: 7 additions & 0 deletions config/apps/redis/kustomization.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
# redis-minimal.yaml is used in E2E testing to create a redis instance before we start a test pipeline which writes to redis.
resources:
- redis-minimal.yaml

commonLabels:
"numaflow-e2e": "true"

80 changes: 80 additions & 0 deletions config/apps/redis/redis-minimal.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
---
#
# Redis service
#
apiVersion: v1
kind: Service
metadata:
name: redis
labels:
app: redis
spec:
ports:
- port: 6379
targetPort: 6379
name: client
clusterIP: None
selector:
app: redis
---
#
# Redis configuration file
#
apiVersion: v1
kind: ConfigMap
metadata:
name: redis-config
labels:
app: redis
data:
redis-config: |
maxmemory 10mb
maxmemory-policy allkeys-lru
---
#
# Redis stateful set
#
apiVersion: apps/v1
kind: StatefulSet
metadata:
name: redis
spec:
serviceName: redis
replicas: 1
minReadySeconds: 10 # by default is 0
selector:
matchLabels:
app: redis # has to match .spec.template.metadata.labels
template:
metadata:
labels:
app: redis
name: redis
spec:
terminationGracePeriodSeconds: 10
containers:
- name: redis
image: redis:5.0.4
ports:
- containerPort: 6379
name: client
command:
- redis-server
- "/redis-master/redis.conf"
env:
- name: MASTER
value: "true"
volumeMounts:
- mountPath: /redis-master-data
name: data
- mountPath: /redis-master
name: config
volumes:
- name: data
emptyDir: {}
- name: config
configMap:
name: redis-config
items:
- key: redis-config
path: redis.conf
4 changes: 2 additions & 2 deletions docs/user-guide/sources/http.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

HTTP Source starts an HTTP service with TLS enabled to accept POST request in the Vertex Pod. It listens to port 8443, with request URI `/vertices/{vertexName}`.

An Pipeline with HTTP Source:
A Pipeline with HTTP Source:

```yaml
apiVersion: numaflow.numaproj.io/v1alpha1
Expand Down Expand Up @@ -127,4 +127,4 @@ curl -kq -X POST -H "Authorization: $TOKEN" -d "hello world" https://http-pipeli

## Health Check

The HTTP Source also has an endpoint `/health` created automatically, which is useful for for LoadBalancer or Ingress configuration, where a health check endpoint is often required by the cloud provider.
The HTTP Source also has an endpoint `/health` created automatically, which is useful for LoadBalancer or Ingress configuration, where a health check endpoint is often required by the cloud provider.
3 changes: 1 addition & 2 deletions pkg/sinks/sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,6 @@ import (

"go.uber.org/zap"

"github.com/numaproj/numaflow/pkg/sinks/blackhole"

dfv1 "github.com/numaproj/numaflow/pkg/apis/numaflow/v1alpha1"
"github.com/numaproj/numaflow/pkg/isb"
jetstreamisb "github.com/numaproj/numaflow/pkg/isb/stores/jetstream"
Expand All @@ -34,6 +32,7 @@ import (
jsclient "github.com/numaproj/numaflow/pkg/shared/clients/nats"
redisclient "github.com/numaproj/numaflow/pkg/shared/clients/redis"
"github.com/numaproj/numaflow/pkg/shared/logging"
"github.com/numaproj/numaflow/pkg/sinks/blackhole"
kafkasink "github.com/numaproj/numaflow/pkg/sinks/kafka"
logsink "github.com/numaproj/numaflow/pkg/sinks/logger"
"github.com/numaproj/numaflow/pkg/sinks/udsink"
Expand Down
55 changes: 55 additions & 0 deletions test/e2e-api/http.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
/*
Copyright 2022 The Numaproj Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package main

import (
"bytes"
"crypto/tls"
"fmt"
"io"
"net/http"
)

var httpClient *http.Client

func init() {
httpClient = &http.Client{
Transport: &http.Transport{
TLSClientConfig: &tls.Config{InsecureSkipVerify: true},
},
}

// send-message API is used to post data to a http source vertex pod.
// The API takes in two parameters(podIp and vertexName) and constructs the target url as
// https://{podIp}:8443/vertices/{vertexName}.
http.HandleFunc("/http/send-message", func(w http.ResponseWriter, r *http.Request) {
podIp := r.URL.Query().Get("podIp")
vertexName := r.URL.Query().Get("vertexName")
buf, err := io.ReadAll(r.Body)
if err != nil {
w.WriteHeader(500)
_, _ = w.Write([]byte(err.Error()))
}

_, err = httpClient.Post(fmt.Sprintf("https://%s:8443/vertices/%s", podIp, vertexName), "application/json", bytes.NewBuffer(buf))

if err != nil {
w.WriteHeader(500)
_, _ = w.Write([]byte(err.Error()))
}
})
}
63 changes: 63 additions & 0 deletions test/e2e-api/redis.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
/*
Copyright 2022 The Numaproj Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package main

import (
"context"
"fmt"
"github.com/go-redis/redis/v8"
"log"
"net/http"
"net/url"
)

var redisClient *redis.Client

func init() {

// When we use this API to validate e2e test result, we always assume a redis UDSink is used
// to persist data to a redis instance listening on port 6379.
redisClient = redis.NewClient(&redis.Options{
Addr: "redis:6379",
})

// get-msg-count-contains takes a targetRegex and returns number of keys in redis
// which contain a substring matching the targetRegex.
http.HandleFunc("/redis/get-msg-count-contains", func(w http.ResponseWriter, r *http.Request) {
targetRegex, err := url.QueryUnescape(r.URL.Query().Get("targetRegex"))

if err != nil {
log.Println(err)
w.WriteHeader(500)
_, _ = w.Write([]byte(err.Error()))
return
}

// Redis Keys API uses scan to retrieve data, which is not best practice in terms of performance.
// TODO - Look into replacing it with a more efficient API or data structure.
keyList, err := redisClient.Keys(context.Background(), fmt.Sprintf("*%s*", targetRegex)).Result()
if err != nil {
log.Println(err)
w.WriteHeader(500)
_, _ = w.Write([]byte(err.Error()))
return
}

w.WriteHeader(200)
_, _ = w.Write([]byte(fmt.Sprint(len(keyList))))
})
}
34 changes: 12 additions & 22 deletions test/e2e/functional_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@ import (
. "github.com/numaproj/numaflow/test/fixtures"
)

//go:generate kubectl -n numaflow-system delete statefulset redis --ignore-not-found=true
//go:generate kubectl apply -k ../../config/apps/redis -n numaflow-system
type FunctionalSuite struct {
E2ESuite
}
Expand All @@ -41,7 +43,6 @@ func (s *FunctionalSuite) TestCreateSimplePipeline() {
When().
CreatePipelineAndWait()
defer w.DeletePipelineAndWait()

pipelineName := "simple-pipeline"

w.Expect().
Expand Down Expand Up @@ -108,34 +109,22 @@ func (s *FunctionalSuite) TestFiltering() {
When().
CreatePipelineAndWait()
defer w.DeletePipelineAndWait()
pipelineName := "filtering"

w.Expect().
VertexPodsRunning().
VertexPodLogContains("in", LogSourceVertexStarted).
VertexPodLogContains("p1", LogUDFVertexStarted, PodLogCheckOptionWithContainer("numa")).
VertexPodLogContains("out", LogSinkVertexStarted)

defer w.VertexPodPortForward("in", 8443, dfv1.VertexHTTPSPort).
TerminateAllPodPortForwards()

HTTPExpect(s.T(), "https://localhost:8443").POST("/vertices/in").WithBytes([]byte(`{"id": 180, "msg": "hello", "expect0": "fail", "desc": "A bad example"}`)).
Expect().
Status(204)

HTTPExpect(s.T(), "https://localhost:8443").POST("/vertices/in").WithBytes([]byte(`{"id": 80, "msg": "hello1", "expect1": "fail", "desc": "A bad example"}`)).
Expect().
Status(204)

HTTPExpect(s.T(), "https://localhost:8443").POST("/vertices/in").WithBytes([]byte(`{"id": 80, "msg": "hello", "expect2": "fail", "desc": "A bad example"}`)).
Expect().
Status(204)
VertexPodLogContains("out", LogSinkVertexStarted, PodLogCheckOptionWithContainer("numa"))

HTTPExpect(s.T(), "https://localhost:8443").POST("/vertices/in").WithBytes([]byte(`{"id": 80, "msg": "hello", "expect3": "succeed", "desc": "A good example"}`)).
Expect().
Status(204)
w.SendMessageTo(pipelineName, "in", []byte(`{"id": 180, "msg": "hello", "expect0": "fail", "desc": "A bad example"}`)).
SendMessageTo(pipelineName, "in", []byte(`{"id": 80, "msg": "hello1", "expect1": "fail", "desc": "A bad example"}`)).
SendMessageTo(pipelineName, "in", []byte(`{"id": 80, "msg": "hello", "expect2": "fail", "desc": "A bad example"}`)).
SendMessageTo(pipelineName, "in", []byte(`{"id": 80, "msg": "hello", "expect3": "succeed", "desc": "A good example"}`)).
SendMessageTo(pipelineName, "in", []byte(`{"id": 80, "msg": "hello", "expect4": "succeed", "desc": "A good example"}`))

w.Expect().VertexPodLogContains("out", "expect3")
w.Expect().VertexPodLogNotContains("out", "expect[0-2]", PodLogCheckOptionWithTimeout(2*time.Second))
w.Expect().RedisContains("out", "expect[3-4]", RedisCheckOptionWithCount(2))
w.Expect().RedisNotContains("out", "expect[0-2]")
}

func (s *FunctionalSuite) TestConditionalForwarding() {
Expand All @@ -152,6 +141,7 @@ func (s *FunctionalSuite) TestConditionalForwarding() {
VertexPodLogContains("even-sink", LogSinkVertexStarted).
VertexPodLogContains("odd-sink", LogSinkVertexStarted).
VertexPodLogContains("number-sink", LogSinkVertexStarted)

defer w.VertexPodPortForward("in", 8443, dfv1.VertexHTTPSPort).
TerminateAllPodPortForwards()

Expand Down
4 changes: 3 additions & 1 deletion test/e2e/testdata/filtering.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,9 @@ spec:
expression: int(json(payload).id) < 100 && json(payload).msg == 'hello' && sprig.contains('good', string(json(payload).desc))
- name: out
sink:
log: {}
udsink:
container:
image: quay.io/numaio/numaflow-sink/redis-e2e-test-sink:latest
edges:
- from: in
to: p1
Expand Down
20 changes: 20 additions & 0 deletions test/fixtures/expect.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,26 @@ type Expect struct {
kubeClient kubernetes.Interface
}

func (t *Expect) RedisContains(sinkName string, targetRegex string, opts ...RedisCheckOption) *Expect {
t.t.Helper()
ctx := context.Background()
contains := RedisContains(ctx, sinkName, targetRegex, opts...)
if !contains {
t.t.Fatalf("Expected redis contains target regex %s populated by sink %s.", targetRegex, sinkName)
}
return t
}

func (t *Expect) RedisNotContains(sinkName string, targetRegex string) *Expect {
t.t.Helper()
ctx := context.Background()
notContains := RedisNotContains(ctx, sinkName, targetRegex)
if !notContains {
t.t.Fatalf("Not expected redis contains target regex %s populated by sink %s.", targetRegex, sinkName)
}
return t
}

func (t *Expect) ISBSvcDeleted(timeout time.Duration) *Expect {
t.t.Helper()
ctx := context.Background()
Expand Down
22 changes: 22 additions & 0 deletions test/fixtures/http_send.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
/*
Copyright 2022 The Numaproj Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package fixtures

// SendMessageTo sends msg to a pod in http source vertex.
func SendMessageTo(podIp string, vertexName string, msg []byte) {
InvokeE2EAPIPOST("/http/send-message?podIp=%s&vertexName=%s", string(msg[:]), podIp, vertexName)
}
Loading

0 comments on commit 6c078b4

Please sign in to comment.