Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 7 additions & 7 deletions pkg/test/e2e/kafka/kafka.strimzi.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ metadata:
spec:
kafka:
version: 3.1.0
replicas: 3
replicas: 1
listeners:
- name: plain
port: 9092
Expand All @@ -20,16 +20,16 @@ spec:
type: nodeport
tls: false
config:
offsets.topic.replication.factor: 3
transaction.state.log.replication.factor: 3
transaction.state.log.min.isr: 2
default.replication.factor: 3
min.insync.replicas: 2
offsets.topic.replication.factor: 1
transaction.state.log.replication.factor: 1
transaction.state.log.min.isr: 1
default.replication.factor: 1
min.insync.replicas: 1
inter.broker.protocol.version: "3.1"
storage:
type: ephemeral
zookeeper:
replicas: 3
replicas: 1
storage:
type: ephemeral
entityOperator:
Expand Down
61 changes: 14 additions & 47 deletions pkg/test/e2e/kafka/kafka_end_to_end_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,21 +15,20 @@
*
*/

package test
package e2e

import (
"bufio"
"bytes"
"fmt"
"os"
"os/exec"
"strings"
"testing"
"time"

jsoniter "github.com/json-iterator/go"
"github.com/netobserv/flowlogs-pipeline/pkg/config"
"github.com/netobserv/flowlogs-pipeline/pkg/pipeline"
"github.com/netobserv/flowlogs-pipeline/pkg/test"
kafkago "github.com/segmentio/kafka-go"
log "github.com/sirupsen/logrus"
"github.com/spf13/viper"
Expand Down Expand Up @@ -309,60 +308,28 @@ parameters:
}()
}

func runCommand(t *testing.T, command string) {
var cmd *exec.Cmd
cmdStrings := strings.Split(command, " ")
cmdBase := cmdStrings[0]
cmdStrings = cmdStrings[1:]
cmd = exec.Command(cmdBase, cmdStrings...)
cmd.Stdout = os.Stdout
cmd.Stderr = os.Stderr
err := cmd.Run()
assert.NoError(t, err)
if err != nil {
msg := fmt.Sprintf("error in running command: %v \n", err)
assert.Fail(t, msg)
}
}
/*
var TestEnv env.Environment

func runCommandGetOutput(t *testing.T, command string) string {
var cmd *exec.Cmd
var outBuf bytes.Buffer
var err error
cmdStrings := strings.Split(command, " ")
cmdBase := cmdStrings[0]
cmdStrings = cmdStrings[1:]
cmd = exec.Command(cmdBase, cmdStrings...)
cmd.Stdout = &outBuf
cmd.Stderr = os.Stderr
err = cmd.Run()
assert.NoError(t, err)
if err != nil {
msg := fmt.Sprintf("error in running command: %v \n", err)
assert.Fail(t, msg)
}
output := outBuf.Bytes()
// strip the line feed from the end of the output string
output = output[0 : len(output)-1]
fmt.Printf("output = %s\n", string(output))
return string(output)
func TestMain(m *testing.M) {
yamlFiles := []string{"strimzi.yaml", "kafka.strimzi.yaml"}
e2e.Main(m, yamlFiles, &TestEnv)
}

*/
func TestEnd2EndKafka(t *testing.T) {
var command string

pwd := runCommandGetOutput(t, "pwd")

pwd := test.RunCommand("pwd")
fmt.Printf("\nset up kind and kafka \n\n")
command = pwd + "/kafka_kind_start.sh"
runCommand(t, command)
test.RunCommand(command)

fmt.Printf("\nwait for kafka to be active \n\n")
command = "kubectl wait kafka/my-cluster --for=condition=Ready --timeout=1200s -n default"
runCommand(t, command)
test.RunCommand(command)

command = "kubectl get kafka my-cluster -o=jsonpath='{.status.listeners[?(@.type==\"external\")].bootstrapServers}{\"\\n\"}'"
kafkaAddr := runCommandGetOutput(t, command)
command = "kubectl get kafka my-cluster -o=jsonpath='{.status.listeners[?(@.type==\"external\")].bootstrapServers}'"
kafkaAddr := test.RunCommand(command)
// strip the quotation marks
kafkaAddr = kafkaAddr[1 : len(kafkaAddr)-1]
fmt.Printf("kafkaAddr = %s \n", kafkaAddr)
Expand Down Expand Up @@ -391,5 +358,5 @@ func TestEnd2EndKafka(t *testing.T) {

fmt.Printf("delete kind and kafka \n")
command = pwd + "/kafka_kind_stop.sh"
runCommand(t, command)
test.RunCommand(command)
}
26 changes: 26 additions & 0 deletions pkg/test/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,10 @@ package test
import (
"bytes"
"fmt"
"os"
"os/exec"
"reflect"
"strings"
"testing"

jsoniter "github.com/json-iterator/go"
Expand Down Expand Up @@ -131,3 +134,26 @@ func CreateMockAgg(name, recordKey, by, agg, op string, value float64, count int
"recent_count": recentCount,
}
}

func RunCommand(command string) string {
var cmd *exec.Cmd
var outBuf bytes.Buffer
var err error
cmdStrings := strings.Split(command, " ")
cmdBase := cmdStrings[0]
cmdStrings = cmdStrings[1:]
cmd = exec.Command(cmdBase, cmdStrings...)
cmd.Stdout = &outBuf
cmd.Stderr = os.Stderr
err = cmd.Run()
if err != nil {
fmt.Printf("error in running command: %v \n", err)
}
output := outBuf.Bytes()
//strip newline from end of output
if len(output) > 0 && output[len(output)-1] == '\n' {
output = output[0 : len(output)-1]
}
fmt.Printf("output = %s\n", string(output))
return string(output)
}