diff --git a/pkg/test/e2e/kafka/kafka.strimzi.yaml b/pkg/test/e2e/kafka/kafka.strimzi.yaml index d0a779205..eb319dd9a 100644 --- a/pkg/test/e2e/kafka/kafka.strimzi.yaml +++ b/pkg/test/e2e/kafka/kafka.strimzi.yaml @@ -5,7 +5,7 @@ metadata: spec: kafka: version: 3.1.0 - replicas: 3 + replicas: 1 listeners: - name: plain port: 9092 @@ -20,16 +20,16 @@ spec: type: nodeport tls: false config: - offsets.topic.replication.factor: 3 - transaction.state.log.replication.factor: 3 - transaction.state.log.min.isr: 2 - default.replication.factor: 3 - min.insync.replicas: 2 + offsets.topic.replication.factor: 1 + transaction.state.log.replication.factor: 1 + transaction.state.log.min.isr: 1 + default.replication.factor: 1 + min.insync.replicas: 1 inter.broker.protocol.version: "3.1" storage: type: ephemeral zookeeper: - replicas: 3 + replicas: 1 storage: type: ephemeral entityOperator: diff --git a/pkg/test/e2e/kafka/kafka_end_to_end_test.go b/pkg/test/e2e/kafka/kafka_end_to_end_test.go index 6c027642f..23b44e07f 100644 --- a/pkg/test/e2e/kafka/kafka_end_to_end_test.go +++ b/pkg/test/e2e/kafka/kafka_end_to_end_test.go @@ -15,21 +15,20 @@ * */ -package test +package e2e import ( "bufio" "bytes" "fmt" "os" - "os/exec" - "strings" "testing" "time" jsoniter "github.com/json-iterator/go" "github.com/netobserv/flowlogs-pipeline/pkg/config" "github.com/netobserv/flowlogs-pipeline/pkg/pipeline" + "github.com/netobserv/flowlogs-pipeline/pkg/test" kafkago "github.com/segmentio/kafka-go" log "github.com/sirupsen/logrus" "github.com/spf13/viper" @@ -309,60 +308,28 @@ parameters: }() } -func runCommand(t *testing.T, command string) { - var cmd *exec.Cmd - cmdStrings := strings.Split(command, " ") - cmdBase := cmdStrings[0] - cmdStrings = cmdStrings[1:] - cmd = exec.Command(cmdBase, cmdStrings...) - cmd.Stdout = os.Stdout - cmd.Stderr = os.Stderr - err := cmd.Run() - assert.NoError(t, err) - if err != nil { - msg := fmt.Sprintf("error in running command: %v \n", err) - assert.Fail(t, msg) - } -} +/* +var TestEnv env.Environment -func runCommandGetOutput(t *testing.T, command string) string { - var cmd *exec.Cmd - var outBuf bytes.Buffer - var err error - cmdStrings := strings.Split(command, " ") - cmdBase := cmdStrings[0] - cmdStrings = cmdStrings[1:] - cmd = exec.Command(cmdBase, cmdStrings...) - cmd.Stdout = &outBuf - cmd.Stderr = os.Stderr - err = cmd.Run() - assert.NoError(t, err) - if err != nil { - msg := fmt.Sprintf("error in running command: %v \n", err) - assert.Fail(t, msg) - } - output := outBuf.Bytes() - // strip the line feed from the end of the output string - output = output[0 : len(output)-1] - fmt.Printf("output = %s\n", string(output)) - return string(output) +func TestMain(m *testing.M) { + yamlFiles := []string{"strimzi.yaml", "kafka.strimzi.yaml"} + e2e.Main(m, yamlFiles, &TestEnv) } - +*/ func TestEnd2EndKafka(t *testing.T) { var command string - pwd := runCommandGetOutput(t, "pwd") - + pwd := test.RunCommand("pwd") fmt.Printf("\nset up kind and kafka \n\n") command = pwd + "/kafka_kind_start.sh" - runCommand(t, command) + test.RunCommand(command) fmt.Printf("\nwait for kafka to be active \n\n") command = "kubectl wait kafka/my-cluster --for=condition=Ready --timeout=1200s -n default" - runCommand(t, command) + test.RunCommand(command) - command = "kubectl get kafka my-cluster -o=jsonpath='{.status.listeners[?(@.type==\"external\")].bootstrapServers}{\"\\n\"}'" - kafkaAddr := runCommandGetOutput(t, command) + command = "kubectl get kafka my-cluster -o=jsonpath='{.status.listeners[?(@.type==\"external\")].bootstrapServers}'" + kafkaAddr := test.RunCommand(command) // strip the quotation marks kafkaAddr = kafkaAddr[1 : len(kafkaAddr)-1] fmt.Printf("kafkaAddr = %s \n", kafkaAddr) @@ -391,5 +358,5 @@ func TestEnd2EndKafka(t *testing.T) { fmt.Printf("delete kind and kafka \n") command = pwd + "/kafka_kind_stop.sh" - runCommand(t, command) + test.RunCommand(command) } diff --git a/pkg/test/utils.go b/pkg/test/utils.go index 8b8dc5878..ef4d9218f 100644 --- a/pkg/test/utils.go +++ b/pkg/test/utils.go @@ -20,7 +20,10 @@ package test import ( "bytes" "fmt" + "os" + "os/exec" "reflect" + "strings" "testing" jsoniter "github.com/json-iterator/go" @@ -131,3 +134,26 @@ func CreateMockAgg(name, recordKey, by, agg, op string, value float64, count int "recent_count": recentCount, } } + +func RunCommand(command string) string { + var cmd *exec.Cmd + var outBuf bytes.Buffer + var err error + cmdStrings := strings.Split(command, " ") + cmdBase := cmdStrings[0] + cmdStrings = cmdStrings[1:] + cmd = exec.Command(cmdBase, cmdStrings...) + cmd.Stdout = &outBuf + cmd.Stderr = os.Stderr + err = cmd.Run() + if err != nil { + fmt.Printf("error in running command: %v \n", err) + } + output := outBuf.Bytes() + //strip newline from end of output + if len(output) > 0 && output[len(output)-1] == '\n' { + output = output[0 : len(output)-1] + } + fmt.Printf("output = %s\n", string(output)) + return string(output) +}