Skip to content

Commit

Permalink
FIX: panic on dup ack #187
Browse files Browse the repository at this point in the history
  • Loading branch information
myzhan committed Mar 21, 2023
1 parent 7f2007b commit 41a131e
Show file tree
Hide file tree
Showing 2 changed files with 23 additions and 0 deletions.
8 changes: 8 additions & 0 deletions runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -297,6 +297,7 @@ type slaveRunner struct {
masterHost string
masterPort int
waitForAck sync.WaitGroup
ackReceived int32
lastReceivedSpawnTimestamp int64
client client
}
Expand Down Expand Up @@ -396,13 +397,20 @@ func (r *slaveRunner) onCustomMessage(msg *CustomMessage) {
}

func (r *slaveRunner) onAckMessage(msg *genericMessage) {
// Maybe we should add a state for waiting?
if !atomic.CompareAndSwapInt32(&r.ackReceived, 0, 1) {
log.Println("Receive duplicate ack message, ignored")
return
}
r.waitForAck.Done()
Events.Publish(EVENT_CONNECTED)
}

func (r *slaveRunner) sendClientReadyAndWaitForAck() {
r.waitForAck = sync.WaitGroup{}
r.waitForAck.Add(1)

atomic.StoreInt32(&r.ackReceived, 0)
// locust allows workers to bypass version check by sending -1 as version
r.client.sendChannel() <- newClientReadyMessage("client_ready", -1, r.nodeID)

Expand Down
15 changes: 15 additions & 0 deletions runner_test.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package boomer

import (
"sync"
"sync/atomic"
"testing"
"time"
Expand Down Expand Up @@ -341,6 +342,20 @@ func TestOnQuitMessage(t *testing.T) {
assert.Equal(t, stateInit, runner.state)
}

func TestOnAckMessage(t *testing.T) {
eventCount := 0
Events.Subscribe(EVENT_CONNECTED, func() {
eventCount++
})
runner := newSlaveRunner("localhost", 5557, []*Task{}, nil)
runner.waitForAck = sync.WaitGroup{}
runner.waitForAck.Add(1)

runner.onAckMessage(nil)
runner.onAckMessage(nil)
assert.Equal(t, 1, eventCount)
}

func TestOnMessage(t *testing.T) {
taskA := &Task{
Fn: func() {
Expand Down

0 comments on commit 41a131e

Please sign in to comment.