Skip to content

Commit

Permalink
Fix broken router tests
Browse files Browse the repository at this point in the history
  • Loading branch information
aaronbuchwald committed Sep 5, 2020
1 parent 216194c commit c963af9
Show file tree
Hide file tree
Showing 6 changed files with 566 additions and 325 deletions.
10 changes: 6 additions & 4 deletions snow/networking/router/resource_manager.go
Expand Up @@ -22,7 +22,7 @@ const (
// ResourceManager defines the interface for the allocation
// of resources from different pools
type ResourceManager interface {
TakeMessage(message) bool
TakeMessage(*message) bool
Utilization(ids.ShortID) float64
}

Expand Down Expand Up @@ -74,7 +74,7 @@ func NewResourceManager(
// It tags the message with the ID of the resource pool it was taken
// from and registers it with the message tracker if successful
// Returns true if it finds a resource for the message.
func (et *throttler) TakeMessage(msg message) bool {
func (et *throttler) TakeMessage(msg *message) bool {
// Attempt to take the message from the pool
messageID := msg.validatorID
outstandingPoolMessages := et.msgTracker.OutstandingCount(ids.ShortEmpty)
Expand All @@ -90,12 +90,14 @@ func (et *throttler) TakeMessage(msg message) bool {
}

// Attempt to take the message from the individual allotment
weight, _ := et.vdrs.GetWeight(messageID)
weight, isStaker := et.vdrs.GetWeight(messageID)
totalWeight := et.vdrs.Weight()
stakerPortion := float64(weight) / float64(totalWeight)
messageAllotment := uint32(stakerPortion * float64(et.reservedMessages))
messageCount := et.msgTracker.OutstandingCount(messageID)
if messageCount < messageAllotment {
// Allow at least one message per staker, even when staking
// portion rounds message allotment down to 0
if messageCount <= messageAllotment && isStaker {
et.msgTracker.Add(messageID)
msg.SetDone(func() {
et.msgTracker.Remove(messageID)
Expand Down
103 changes: 103 additions & 0 deletions snow/networking/router/resource_manager_test.go
@@ -0,0 +1,103 @@
// // (c) 2019-2020, Ava Labs, Inc. All rights reserved.
// // See the file LICENSE for licensing terms.

package router

import (
"testing"
"time"

"github.com/ava-labs/gecko/ids"
"github.com/ava-labs/gecko/snow/networking/tracker"

"github.com/ava-labs/gecko/snow/validators"
"github.com/ava-labs/gecko/utils/logging"
)

func TestTakeMessage(t *testing.T) {
bufferSize := 8
vdrList := make([]validators.Validator, 0, bufferSize)
messages := make([]*message, 0, bufferSize)
for i := 0; i < bufferSize; i++ {
vdr := validators.GenerateRandomValidator(2)
messages = append(messages, &message{
validatorID: vdr.ID(),
})
vdrList = append(vdrList, vdr)
}
nonStakerID := ids.NewShortID([20]byte{16})

cpuTracker := tracker.NewCPUTracker(time.Second)
msgTracker := tracker.NewMessageTracker()
vdrs := validators.NewSet()
vdrs.Set(vdrList)
resourceManager := NewResourceManager(
vdrs,
logging.NoLog{},
msgTracker,
cpuTracker,
uint32(bufferSize),
1, // Allow each peer to take at most one message from pool
0.5, // Allot half of message queue to stakers
0.5, // Allot half of CPU time to stakers
)

for i, msg := range messages {
if success := resourceManager.TakeMessage(msg); !success {
t.Fatalf("Failed to take message %d.", i)
}
}

nonStakerMsg1 := &message{validatorID: nonStakerID}
if success := resourceManager.TakeMessage(nonStakerMsg1); success {
t.Fatal("Should have throttled message from non-staker when the message pool was empty")
}
nonStakerMsg1.Done()

for _, msg := range messages {
msg.Done()
}

nonStakerMsg2 := &message{validatorID: nonStakerID}
if success := resourceManager.TakeMessage(nonStakerMsg2); !success {
t.Fatal("Failed to take additional message after all previous messages were marked as done.")
}
nonStakerMsg2.Done()
}

func TestStakerGetsThrottled(t *testing.T) {
bufferSize := 8
vdrList := make([]validators.Validator, 0, bufferSize)
for i := 0; i < bufferSize; i++ {
vdr := validators.GenerateRandomValidator(2)
vdrList = append(vdrList, vdr)
}

cpuTracker := tracker.NewCPUTracker(time.Second)
msgTracker := tracker.NewMessageTracker()
vdrs := validators.NewSet()
vdrs.Set(vdrList)
resourceManager := NewResourceManager(
vdrs,
logging.NoLog{},
msgTracker,
cpuTracker,
uint32(bufferSize),
1, // Allow each peer to take at most one message from pool
0.5, // Allot half of message queue to stakers
0.5, // Allot half of CPU time to stakers
)

// Ensure that a staker with only part of the stake
// cannot take up the entire message queue
vdrID := vdrList[0].ID()
for i := 0; i < bufferSize; i++ {
if success := resourceManager.TakeMessage(&message{
validatorID: vdrID,
}); !success {
// The staker was throttled before taking up the whole message queue
return
}
}
t.Fatal("Staker should have been throttled before taking up the entire message queue.")
}
3 changes: 2 additions & 1 deletion snow/networking/router/service_queue.go
Expand Up @@ -136,6 +136,7 @@ func (ml *multiLevelQueue) PopMessage() (message, error) {
if err == nil {
ml.pendingMessages--
ml.msgTracker.Remove(msg.validatorID)
msg.Done()
ml.metrics.pending.Dec()
}
return msg, err
Expand Down Expand Up @@ -238,7 +239,7 @@ func (ml *multiLevelQueue) pushMessage(msg message) bool {
return false
}

success := ml.resourceManager.TakeMessage(msg)
success := ml.resourceManager.TakeMessage(&msg)
if !success {
ml.metrics.dropped.Inc()
ml.metrics.throttled.Inc()
Expand Down

0 comments on commit c963af9

Please sign in to comment.