Skip to content

Commit

Permalink
fix: remove flaky tests that need to listen on addr
Browse files Browse the repository at this point in the history
  • Loading branch information
myzhan committed Aug 29, 2023
1 parent b0d0c4f commit 363d524
Show file tree
Hide file tree
Showing 8 changed files with 182 additions and 281 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/unittest.yml
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ jobs:
strategy:
fail-fast: false
matrix:
os: [macos-latest, windows-latest]
os: [macos-latest, windows-latest, ubuntu-latest]
runs-on: ${{ matrix.os }}
steps:
- name: Install Go
Expand Down
2 changes: 2 additions & 0 deletions boomer.go
Original file line number Diff line number Diff line change
Expand Up @@ -115,12 +115,14 @@ func (b *Boomer) Run(tasks ...*Task) {
switch b.mode {
case DistributedMode:
b.slaveRunner = newSlaveRunner(b.masterHost, b.masterPort, tasks, b.rateLimiter)
println("new slave runner")
for _, o := range b.outputs {
b.slaveRunner.addOutput(o)
}
b.slaveRunner.run()
case StandaloneMode:
b.localRunner = newLocalRunner(tasks, b.rateLimiter, b.spawnCount, b.spawnRate)
println("new local runner")
for _, o := range b.outputs {
b.localRunner.addOutput(o)
}
Expand Down
49 changes: 22 additions & 27 deletions boomer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,13 @@ package boomer

import (
"flag"
"log"
"math"
"math/rand"
"os"
"runtime"
"sync/atomic"
"time"

"github.com/myzhan/gomq/zmtp"
. "github.com/onsi/ginkgo/v2"
. "github.com/onsi/gomega"
)
Expand Down Expand Up @@ -95,13 +94,8 @@ var _ = Describe("Test Boomer", func() {
})

It("test distributed run", func() {
masterHost := "0.0.0.0"
rand.Seed(Now())
masterPort := rand.Intn(1000) + 10240

server := newTestServer(masterHost, masterPort)
server.start()
defer server.close()
masterHost := "mock:0.0.0.0"
masterPort := 10240

b := NewBoomer(masterHost, masterPort)

Expand All @@ -116,15 +110,21 @@ var _ = Describe("Test Boomer", func() {
b.Run(taskA)
defer b.Quit()

server.toClient <- newGenericMessage("spawn", map[string]interface{}{
serverMessage := newGenericMessage("spawn", map[string]interface{}{
"user_classes_count": map[interface{}]interface{}{
"Dummy": int64(5),
"Dummy2": int64(5),
},
}, b.slaveRunner.nodeID)
serverMessageInBytes, _ := serverMessage.serialize()
serverZmtpMessage := &zmtp.Message{
MessageType: zmtp.UserMessage,
Body: [][]byte{serverMessageInBytes},
}
MockGomqDealerInstance.RecvChannel() <- serverZmtpMessage

time.Sleep(4 * time.Second)
Expect(count).To(BeEquivalentTo(10))
Expect(count).Should(BeEquivalentTo(10))
})

It("test run tasks for test", func() {
Expand Down Expand Up @@ -181,17 +181,8 @@ var _ = Describe("Test Boomer", func() {

It("test run", func() {
flag.Parse()

masterHost = "0.0.0.0"
rand.Seed(Now())
masterPort = rand.Intn(1000) + 10240

server := newTestServer(masterHost, masterPort)

log.Printf("Starting to serve on %s:%d\n", masterHost, masterPort)
server.start()
defer server.close()

masterHost = "mock:0.0.0.0"
masterPort = 1234
count := int64(0)
taskA := &Task{
Name: "increaseCount",
Expand All @@ -202,19 +193,23 @@ var _ = Describe("Test Boomer", func() {
}

go Run(taskA)
time.Sleep(20 * time.Millisecond)
time.Sleep(50 * time.Millisecond)
defer defaultBoomer.Quit()

server.toClient <- newGenericMessage("spawn", map[string]interface{}{
serverMessage := newGenericMessage("spawn", map[string]interface{}{
"user_classes_count": map[interface{}]interface{}{
"Dummy": int64(5),
"Dummy2": int64(5),
},
}, defaultBoomer.slaveRunner.nodeID)
serverMessageInBytes, _ := serverMessage.serialize()
serverZmtpMessage := &zmtp.Message{
MessageType: zmtp.UserMessage,
Body: [][]byte{serverMessageInBytes},
}
MockGomqDealerInstance.RecvChannel() <- serverZmtpMessage

time.Sleep(4 * time.Second)

defaultBoomer.Quit()

Expect(count).To(BeEquivalentTo(10))
})

Expand Down
85 changes: 84 additions & 1 deletion client_gomq.go
Original file line number Diff line number Diff line change
@@ -1,15 +1,92 @@
//go:build !goczmq
// +build !goczmq

package boomer

import (
"fmt"
"log"
"strings"
"time"

"github.com/myzhan/gomq"
"github.com/myzhan/gomq/zmtp"
)

type MockGomqDealer struct {
connectErr error
sendChannel chan []byte
receiveChannel chan *zmtp.Message
}

var MockGomqDealerInstance *MockGomqDealer = &MockGomqDealer{
sendChannel: make(chan []byte, 10),
receiveChannel: make(chan *zmtp.Message, 10),
}

func (m *MockGomqDealer) SetConnectError(err error) {
m.connectErr = err
}

func (m *MockGomqDealer) Connect(add string) (err error) {
if m.connectErr != nil {
return m.connectErr
}
return nil
}

func (m *MockGomqDealer) AddConnection(*gomq.Connection) {

}

func (m *MockGomqDealer) RemoveConnection(string) {
}

func (m *MockGomqDealer) SendChannel() chan []byte {
return m.sendChannel
}

func (m *MockGomqDealer) Send(payload []byte) (err error) {
m.sendChannel <- payload
return nil
}

func (m *MockGomqDealer) SendMultipart(payload [][]byte) (err error) {
return nil
}

func (m *MockGomqDealer) RecvChannel() chan *zmtp.Message {
return m.receiveChannel
}

func (m *MockGomqDealer) Recv() ([]byte, error) {
return nil, nil
}

func (m *MockGomqDealer) RecvMultipart() ([][]byte, error) {
return nil, nil
}

func (m *MockGomqDealer) Close() {

}

func (m *MockGomqDealer) RetryInterval() time.Duration {
return time.Second
}

func (m *MockGomqDealer) SocketType() zmtp.SocketType {
return zmtp.DealerSocketType
}

func (m *MockGomqDealer) SocketIdentity() zmtp.SocketIdentity {
return nil
}

func (m *MockGomqDealer) SecurityMechanism() zmtp.SecurityMechanism {
return nil
}

type gomqSocketClient struct {
masterHost string
masterPort int
Expand Down Expand Up @@ -39,7 +116,13 @@ func newClient(masterHost string, masterPort int, identity string) (client *gomq

func (c *gomqSocketClient) connect() (err error) {
addr := fmt.Sprintf("tcp://%s:%d", c.masterHost, c.masterPort)
c.dealerSocket = gomq.NewDealer(zmtp.NewSecurityNull(), c.identity)

if strings.HasPrefix(c.masterHost, "mock:") {
// for unittest
c.dealerSocket = MockGomqDealerInstance
} else {
c.dealerSocket = gomq.NewDealer(zmtp.NewSecurityNull(), c.identity)
}

if err = c.dealerSocket.Connect(addr); err != nil {
return err
Expand Down

0 comments on commit 363d524

Please sign in to comment.