-
Notifications
You must be signed in to change notification settings - Fork 2
/
suite.go
171 lines (153 loc) · 4.9 KB
/
suite.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
package test
import (
_ "embed"
"errors"
"fmt"
"net"
"os/exec"
"net/http"
"time"
"encoding/json"
"github.com/kubescape/messaging/pulsar/config"
"github.com/kubescape/messaging/pulsar/connector"
"github.com/stretchr/testify/suite"
)
const (
pulsarKAURL = "%s/admin/v2/brokers/ready"
)
//go:embed scripts/pulsar.sh
var startPulsarScript string
//go:embed scripts/pulsar_stop.sh
var pulsarStopCommand string
type PulsarTestSuite struct {
suite.Suite
DefaultTestConfig config.PulsarConfig
Client connector.Client
AppPortStart int
AdminPortStart int
shutdownFunc func()
}
func (suite *PulsarTestSuite) SetupSuite() {
suite.T().Log("setup suite")
suite.DefaultTestConfig = config.PulsarConfig{
Tenant: "ca-messaging",
Namespace: "test-namespace",
Clusters: []string{"standalone"},
MaxDeliveryAttempts: 2,
RedeliveryDelaySeconds: 0,
}
randomContainerName := fmt.Sprintf("pulsar-test-%d-%d", suite.AdminPortStart, time.Now().UnixNano())
if suite.AppPortStart == 0 {
suite.AppPortStart = 6650
}
if suite.AdminPortStart == 0 {
suite.AdminPortStart = 8080
}
//start pulsar
suite.startPulsar(randomContainerName)
x, _ := json.Marshal(suite.DefaultTestConfig)
fmt.Println(string(x))
var err error
//ensure pulsar connection
suite.Client, err = connector.NewClient(connector.WithConfig(&suite.DefaultTestConfig))
if err != nil {
suite.FailNow("failed to create pulsar client", err.Error())
}
suite.shutdownFunc = func() {
defer func() {
suite.Client.Close()
formmatedScript := fmt.Sprintf(pulsarStopCommand, randomContainerName)
outbytes, err := exec.Command("/bin/sh", "-c", formmatedScript).CombinedOutput()
if err != nil {
suite.FailNow("failed to stop pulsar", err.Error(), string(outbytes))
}
}()
}
}
func (suite *PulsarTestSuite) checkPulsarIsAlive() bool {
kaURL := fmt.Sprintf(pulsarKAURL, suite.DefaultTestConfig.AdminUrl)
fmt.Println("pulsar admin", kaURL)
req, err := http.NewRequest(http.MethodGet, kaURL, nil)
if err != nil {
suite.FailNow("failed to create request", err.Error())
}
client := http.Client{Timeout: 5 * time.Second}
resp, err := client.Do(req)
if err == nil && resp.StatusCode == http.StatusOK {
suite.T().Log("pulsar started")
resp.Body.Close()
return true
}
return false
}
func (suite *PulsarTestSuite) TearDownSuite() {
suite.T().Log("tear down suite")
suite.shutdownFunc()
suite.Assert().NoError(killPortProcess(suite.AppPortStart))
suite.Assert().NoError(killPortProcess(suite.AdminPortStart))
}
func (suite *PulsarTestSuite) SetupTest() {
suite.T().Log("setup test")
}
func (suite *PulsarTestSuite) TearDownTest() {
suite.T().Log("tear down test")
// clear all pulsar topics messages
if err := suite.clearAllMessages(); err != nil {
suite.FailNow("failed to clear all messages", err.Error())
}
}
func findFreePort(rangeStart, rangeEnd int) (int, error) {
for port := rangeStart; port <= rangeEnd; port++ {
address := fmt.Sprintf("localhost:%d", port)
conn, err := net.DialTimeout("tcp", address, 1*time.Second)
if conn != nil {
conn.Close()
}
if err != nil { // port is available since we got no response
return port, nil
}
conn.Close()
}
return 0, errors.New("no free port found")
}
func (suite *PulsarTestSuite) startPulsar(contName string) {
suite.T().Log("stopping existing pulsar container")
exec.Command("/bin/sh", "-c", pulsarStopCommand).Run()
suite.T().Log("starting pulsar")
pulsarAppPort, err := findFreePort(suite.AppPortStart, suite.AppPortStart+100)
if err != nil {
suite.FailNow("failed to find free port", err.Error())
}
suite.DefaultTestConfig.URL = fmt.Sprintf("pulsar://localhost:%d", pulsarAppPort)
suite.AppPortStart = pulsarAppPort
pulsarAdminPort, err := findFreePort(suite.AdminPortStart, suite.AdminPortStart+100)
if err != nil {
suite.FailNow("failed to find free port for pulsar admin", err.Error())
}
suite.DefaultTestConfig.AdminUrl = fmt.Sprintf("http://localhost:%d", pulsarAdminPort)
suite.AdminPortStart = pulsarAdminPort
formattedScript := fmt.Sprintf(startPulsarScript, pulsarAppPort, pulsarAdminPort, contName)
out, err := exec.Command("/bin/sh", "-c", formattedScript).CombinedOutput()
if err != nil {
if exitErr, ok := err.(*exec.ExitError); ok {
suite.FailNow("failed to start pulsar", err.Error(), string(exitErr.Stderr), string(out))
}
suite.FailNow("failed to start pulsar", err.Error(), string(out))
}
suite.T().Log("waiting for pulsar to start")
for i := 0; i < 30; i++ {
isAlive := suite.checkPulsarIsAlive()
if isAlive {
return
}
time.Sleep(2 * time.Second)
}
formmatedScript := fmt.Sprintf(pulsarStopCommand, contName)
outbytes, err := exec.Command("/bin/sh", "-c", formmatedScript).CombinedOutput()
if err != nil {
fmt.Println(string(outbytes), err.Error())
}
killPortProcess(suite.AppPortStart)
killPortProcess(suite.AdminPortStart)
suite.FailNow("failed to start pulsar")
}