Skip to content

Commit

Permalink
add support for master heartbeat message
Browse files Browse the repository at this point in the history
  • Loading branch information
myzhan committed Nov 28, 2023
1 parent 7b9dbb9 commit 8013d9b
Showing 1 changed file with 24 additions and 9 deletions.
33 changes: 24 additions & 9 deletions runner.go
Expand Up @@ -21,8 +21,9 @@ const (
)

const (
slaveReportInterval = 3 * time.Second
heartbeatInterval = 1 * time.Second
slaveReportInterval = 3 * time.Second
heartbeatInterval = 1 * time.Second
masterHeartbeatTimeout = 60 * time.Second
)

type runner struct {
Expand Down Expand Up @@ -332,13 +333,14 @@ func (r *localRunner) sendCustomMessage(messageType string, data interface{}) {
type slaveRunner struct {
runner

nodeID string
masterHost string
masterPort int
waitForAck sync.WaitGroup
ackReceived int32
lastReceivedSpawnTimestamp int64
client client
nodeID string
masterHost string
masterPort int
waitForAck sync.WaitGroup
ackReceived int32
lastReceivedSpawnTimestamp int64
lastMasterHeartbeatTimestamp time.Time
client client
}

func newSlaveRunner(masterHost string, masterPort int, tasks []*Task, rateLimiter RateLimiter) (r *slaveRunner) {
Expand Down Expand Up @@ -482,6 +484,12 @@ func (r *slaveRunner) onMessage(msgInterface message) {
}
}

switch msgType {
case "heartbeat":
r.lastMasterHeartbeatTimestamp = time.Now()
return
}

switch r.state {
case stateInit:
switch msgType {
Expand Down Expand Up @@ -603,6 +611,13 @@ func (r *slaveRunner) run() {
for {
select {
case <-ticker.C:
// check for master heartbeat timeout
if !r.lastMasterHeartbeatTimestamp.IsZero() && time.Now().Sub(r.lastMasterHeartbeatTimestamp) > masterHeartbeatTimeout {
r.logger.Printf("Didn't get heartbeat from master in over %vs, shutting down.\n", masterHeartbeatTimeout.Seconds())
r.shutdown()
return
}
// send client heartbeat message
CPUUsage := GetCurrentCPUUsage()
data := map[string]interface{}{
"state": r.state,
Expand Down

0 comments on commit 8013d9b

Please sign in to comment.