Skip to content

Commit

Permalink
chore: major refactor of E2E testing components (#1725)
Browse files Browse the repository at this point in the history
Signed-off-by: Yashash H L <yashashhl25@gmail.com>
  • Loading branch information
yhl25 committed May 9, 2024
1 parent cd0a2c6 commit bfd73e8
Show file tree
Hide file tree
Showing 16 changed files with 471 additions and 407 deletions.
25 changes: 25 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,9 @@ test-diamond-e2e:
test-sideinputs-e2e:
test-%:
$(MAKE) cleanup-e2e
$(MAKE) deploy-nats
$(MAKE) deploy-kafka
$(MAKE) deploy-redis
$(MAKE) image e2eapi-image
$(MAKE) restart-control-plane-components
cat test/manifests/e2e-api-pod.yaml | sed 's@quay.io/numaproj/@$(IMAGE_NAMESPACE)/@' | sed 's/:latest/:$(VERSION)/' | kubectl -n numaflow-system apply -f -
Expand All @@ -136,6 +139,25 @@ restart-control-plane-components:
kubectl -n numaflow-system delete po -lapp.kubernetes.io/component=numaflow-ux,app.kubernetes.io/part-of=numaflow --ignore-not-found=true
kubectl -n numaflow-system delete po -lapp.kubernetes.io/component=numaflow-webhook,app.kubernetes.io/part-of=numaflow --ignore-not-found=true

.PHONY: deploy-nats
deploy-nats:
kubectl -n numaflow-system delete statefulset nats --ignore-not-found=true
kubectl -n numaflow-system apply -k config/apps/nats
kubectl wait --for=condition=ready pod -l app=nats -n numaflow-system --timeout=120s

.PHONY: deploy-kafka
deploy-kafka:
kubectl -n numaflow-system delete statefulset zookeeper kafka-broker --ignore-not-found=true
kubectl -n numaflow-system apply -k config/apps/kafka
kubectl wait --for=condition=ready pod -l app=kafka-broker -n numaflow-system --timeout=120s
kubectl wait --for=condition=ready pod -l app=zookeeper -n numaflow-system --timeout=120s

.PHONY: deploy-redis
deploy-redis:
kubectl -n numaflow-system delete statefulset redis --ignore-not-found=true
kubectl apply -k config/apps/redis -n numaflow-system
kubectl wait --for=condition=ready pod -l app=redis -n numaflow-system --timeout=120s

.PHONY: cleanup-e2e
cleanup-e2e:
kubectl -n numaflow-system delete svc -lnumaflow-e2e=true --ignore-not-found=true
Expand All @@ -148,6 +170,9 @@ cleanup-e2e:
# To run just one of the e2e tests by name (i.e. 'make TestCreateSimplePipeline'):
Test%:
$(MAKE) cleanup-e2e
$(MAKE) deploy-nats
$(MAKE) deploy-kafka
$(MAKE) deploy-redis
$(MAKE) image e2eapi-image
kubectl -n numaflow-system delete po -lapp.kubernetes.io/component=controller-manager,app.kubernetes.io/part-of=numaflow
kubectl -n numaflow-system delete po e2e-api-pod --ignore-not-found=true
Expand Down
36 changes: 19 additions & 17 deletions config/apps/kafka/kafka-minimal.yaml
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
# http://www.smartjava.org/content/minimal-kafka-instance-for-k8s/
---
apiVersion: v1
kind: Service
Expand All @@ -8,10 +7,6 @@ spec:
ports:
- name: client
port: 2181
- name: follower
port: 2888
- name: leader
port: 3888
selector:
app: zookeeper

Expand All @@ -31,7 +26,7 @@ spec:
spec:
containers:
- name: main
image: wurstmeister/zookeeper
image: bitnami/zookeeper
imagePullPolicy: IfNotPresent
ports:
- containerPort: 2181
Expand All @@ -40,6 +35,8 @@ spec:
value: "1"
- name: ZOOKEEPER_SERVER_1
value: zoo
- name: ALLOW_ANONYMOUS_LOGIN
value: "yes"
readinessProbe:
tcpSocket:
port: 2181
Expand Down Expand Up @@ -67,26 +64,31 @@ spec:
spec:
containers:
- name: main
image: wurstmeister/kafka:2.13-2.8.1
image: bitnami/kafka:3.7
imagePullPolicy: IfNotPresent
ports:
- containerPort: 9092
env:
- name: KAFKA_ADVERTISED_PORT
value: "9092"
- name: KAFKA_ADVERTISED_HOST_NAME
value: kafka-broker
- name: KAFKA_ZOOKEEPER_CONNECT
value: zookeeper:2181
- name: KAFKA_BROKER_ID
- name: KAFKA_CFG_BROKER_ID
value: "0"
- name: KAFKA_CREATE_TOPICS
value: "kafka-topic:2:1,input-topic:1:1,middle-topic:1:1,output-topic:1:1"
- name: KAFKA_CFG_ZOOKEEPER_CONNECT
value: zookeeper:2181
- name: KAFKA_CFG_LISTENERS
value: PLAINTEXT://:9092
- name: KAFKA_CFG_ADVERTISED_LISTENERS
value: PLAINTEXT://kafka-broker:9092
- name: ALLOW_PLAINTEXT_LISTENER
value: "yes"
- name: KAFKA_CFG_AUTO_CREATE_TOPICS_ENABLE
value: "true"
- name: KAFKA_CFG_DELETE_TOPIC_ENABLE
value: "true"
- name: KAFKA_CFG_NUM_PARTITIONS
value: "2"
readinessProbe:
tcpSocket:
port: 9092
selector:
matchLabels:
app: kafka-broker

serviceName: kafka-broker
3 changes: 3 additions & 0 deletions config/apps/kafka/kustomization.yaml
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
apiVersion: kustomize.config.k8s.io/v1beta1
kind: Kustomization

resources:
- kafka-minimal.yaml

Expand Down
4 changes: 4 additions & 0 deletions config/apps/nats/kustomization.yaml
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
apiVersion: kustomize.config.k8s.io/v1beta1
kind: Kustomization

resources:
- nats.yaml
- nats-auth-fake-token.yaml

commonLabels:
"numaflow-e2e": "true"
Expand Down
File renamed without changes.
3 changes: 3 additions & 0 deletions config/apps/redis/kustomization.yaml
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
apiVersion: kustomize.config.k8s.io/v1beta1
kind: Kustomization

# redis-minimal.yaml is used in E2E testing to create a redis instance before we start a test pipeline which writes to redis.
resources:
- redis-minimal.yaml
Expand Down
86 changes: 46 additions & 40 deletions test/e2e-api/http.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,51 +28,57 @@ import (
"github.com/numaproj/numaflow/test/fixtures"
)

var httpClient *http.Client
type HttpController struct {
client *http.Client
}

func init() {
httpClient = &http.Client{
Transport: &http.Transport{
TLSClientConfig: &tls.Config{InsecureSkipVerify: true},
func NewHttpController() *HttpController {
return &HttpController{
client: &http.Client{
Transport: &http.Transport{
TLSClientConfig: &tls.Config{InsecureSkipVerify: true},
},
},
}
}

// send-message API is used to post data to a http source vertex pod.
// The API takes in two parameters(host and vertexName) and constructs the target url as
// https://{host}:8443/vertices/{vertexName}.
http.HandleFunc("/http/send-message", func(w http.ResponseWriter, r *http.Request) {
host := r.URL.Query().Get("host")
vertexName := r.URL.Query().Get("vertexName")
reqBytes, err := io.ReadAll(r.Body)
if err != nil {
log.Println(err)
http.Error(w, err.Error(), http.StatusBadRequest)
return
}
func (h *HttpController) SendMessage(w http.ResponseWriter, r *http.Request) {
host := r.URL.Query().Get("host")
vertexName := r.URL.Query().Get("vertexName")
reqBytes, err := io.ReadAll(r.Body)
if err != nil {
log.Println(err)
http.Error(w, err.Error(), http.StatusBadRequest)
return
}

var req fixtures.HttpPostRequest
err = json.Unmarshal(reqBytes, &req)
if err != nil {
log.Println(err)
http.Error(w, err.Error(), http.StatusBadRequest)
return
}
var req fixtures.HttpPostRequest
err = json.Unmarshal(reqBytes, &req)
if err != nil {
log.Println(err)
http.Error(w, err.Error(), http.StatusBadRequest)
return
}

postReq, err := http.NewRequest("POST", fmt.Sprintf("https://%s:8443/vertices/%s", host, vertexName), bytes.NewBuffer(req.Body))
if err != nil {
log.Println(err)
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
postReq, err := http.NewRequest("POST", fmt.Sprintf("https://%s:8443/vertices/%s", host, vertexName), bytes.NewBuffer(req.Body))
if err != nil {
log.Println(err)
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}

for k, v := range req.Header {
postReq.Header.Add(k, v)
}
_, err = h.client.Do(postReq)
if err != nil {
log.Println(err)
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
}

for k, v := range req.Header {
postReq.Header.Add(k, v)
}
_, err = httpClient.Do(postReq)
if err != nil {
log.Println(err)
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
})
// Close closes the http client
func (h *HttpController) Close() {
h.client.CloseIdleConnections()
}
Loading

0 comments on commit bfd73e8

Please sign in to comment.