diff --git a/snow/networking/router/resource_manager.go b/snow/networking/router/resource_manager.go index 39368102887..458d84a65b0 100644 --- a/snow/networking/router/resource_manager.go +++ b/snow/networking/router/resource_manager.go @@ -22,7 +22,7 @@ const ( // ResourceManager defines the interface for the allocation // of resources from different pools type ResourceManager interface { - TakeMessage(message) bool + TakeMessage(*message) bool Utilization(ids.ShortID) float64 } @@ -74,7 +74,7 @@ func NewResourceManager( // It tags the message with the ID of the resource pool it was taken // from and registers it with the message tracker if successful // Returns true if it finds a resource for the message. -func (et *throttler) TakeMessage(msg message) bool { +func (et *throttler) TakeMessage(msg *message) bool { // Attempt to take the message from the pool messageID := msg.validatorID outstandingPoolMessages := et.msgTracker.OutstandingCount(ids.ShortEmpty) @@ -90,12 +90,14 @@ func (et *throttler) TakeMessage(msg message) bool { } // Attempt to take the message from the individual allotment - weight, _ := et.vdrs.GetWeight(messageID) + weight, isStaker := et.vdrs.GetWeight(messageID) totalWeight := et.vdrs.Weight() stakerPortion := float64(weight) / float64(totalWeight) messageAllotment := uint32(stakerPortion * float64(et.reservedMessages)) messageCount := et.msgTracker.OutstandingCount(messageID) - if messageCount < messageAllotment { + // Allow at least one message per staker, even when staking + // portion rounds message allotment down to 0 + if messageCount <= messageAllotment && isStaker { et.msgTracker.Add(messageID) msg.SetDone(func() { et.msgTracker.Remove(messageID) diff --git a/snow/networking/router/resource_manager_test.go b/snow/networking/router/resource_manager_test.go new file mode 100644 index 00000000000..d1f1fb438b1 --- /dev/null +++ b/snow/networking/router/resource_manager_test.go @@ -0,0 +1,103 @@ +// // (c) 2019-2020, Ava Labs, Inc. All rights reserved. +// // See the file LICENSE for licensing terms. + +package router + +import ( + "testing" + "time" + + "github.com/ava-labs/gecko/ids" + "github.com/ava-labs/gecko/snow/networking/tracker" + + "github.com/ava-labs/gecko/snow/validators" + "github.com/ava-labs/gecko/utils/logging" +) + +func TestTakeMessage(t *testing.T) { + bufferSize := 8 + vdrList := make([]validators.Validator, 0, bufferSize) + messages := make([]*message, 0, bufferSize) + for i := 0; i < bufferSize; i++ { + vdr := validators.GenerateRandomValidator(2) + messages = append(messages, &message{ + validatorID: vdr.ID(), + }) + vdrList = append(vdrList, vdr) + } + nonStakerID := ids.NewShortID([20]byte{16}) + + cpuTracker := tracker.NewCPUTracker(time.Second) + msgTracker := tracker.NewMessageTracker() + vdrs := validators.NewSet() + vdrs.Set(vdrList) + resourceManager := NewResourceManager( + vdrs, + logging.NoLog{}, + msgTracker, + cpuTracker, + uint32(bufferSize), + 1, // Allow each peer to take at most one message from pool + 0.5, // Allot half of message queue to stakers + 0.5, // Allot half of CPU time to stakers + ) + + for i, msg := range messages { + if success := resourceManager.TakeMessage(msg); !success { + t.Fatalf("Failed to take message %d.", i) + } + } + + nonStakerMsg1 := &message{validatorID: nonStakerID} + if success := resourceManager.TakeMessage(nonStakerMsg1); success { + t.Fatal("Should have throttled message from non-staker when the message pool was empty") + } + nonStakerMsg1.Done() + + for _, msg := range messages { + msg.Done() + } + + nonStakerMsg2 := &message{validatorID: nonStakerID} + if success := resourceManager.TakeMessage(nonStakerMsg2); !success { + t.Fatal("Failed to take additional message after all previous messages were marked as done.") + } + nonStakerMsg2.Done() +} + +func TestStakerGetsThrottled(t *testing.T) { + bufferSize := 8 + vdrList := make([]validators.Validator, 0, bufferSize) + for i := 0; i < bufferSize; i++ { + vdr := validators.GenerateRandomValidator(2) + vdrList = append(vdrList, vdr) + } + + cpuTracker := tracker.NewCPUTracker(time.Second) + msgTracker := tracker.NewMessageTracker() + vdrs := validators.NewSet() + vdrs.Set(vdrList) + resourceManager := NewResourceManager( + vdrs, + logging.NoLog{}, + msgTracker, + cpuTracker, + uint32(bufferSize), + 1, // Allow each peer to take at most one message from pool + 0.5, // Allot half of message queue to stakers + 0.5, // Allot half of CPU time to stakers + ) + + // Ensure that a staker with only part of the stake + // cannot take up the entire message queue + vdrID := vdrList[0].ID() + for i := 0; i < bufferSize; i++ { + if success := resourceManager.TakeMessage(&message{ + validatorID: vdrID, + }); !success { + // The staker was throttled before taking up the whole message queue + return + } + } + t.Fatal("Staker should have been throttled before taking up the entire message queue.") +} diff --git a/snow/networking/router/service_queue.go b/snow/networking/router/service_queue.go index 87295e1a9fb..d57efb80595 100644 --- a/snow/networking/router/service_queue.go +++ b/snow/networking/router/service_queue.go @@ -136,6 +136,7 @@ func (ml *multiLevelQueue) PopMessage() (message, error) { if err == nil { ml.pendingMessages-- ml.msgTracker.Remove(msg.validatorID) + msg.Done() ml.metrics.pending.Dec() } return msg, err @@ -238,7 +239,7 @@ func (ml *multiLevelQueue) pushMessage(msg message) bool { return false } - success := ml.resourceManager.TakeMessage(msg) + success := ml.resourceManager.TakeMessage(&msg) if !success { ml.metrics.dropped.Inc() ml.metrics.throttled.Inc() diff --git a/snow/networking/router/service_queue_test.go b/snow/networking/router/service_queue_test.go index 6fb3da7389d..eceb56ade4d 100644 --- a/snow/networking/router/service_queue_test.go +++ b/snow/networking/router/service_queue_test.go @@ -3,323 +3,333 @@ package router -// import ( -// "math" -// "testing" -// "time" - -// "github.com/prometheus/client_golang/prometheus" - -// "github.com/ava-labs/gecko/snow" -// "github.com/ava-labs/gecko/snow/validators" -// ) - -// func setupMultiLevelQueue(t *testing.T, bufferSize int) (messageQueue, chan struct{}, validators.Set) { -// vdrs := validators.NewSet() -// metrics := &metrics{} -// metrics.Initialize("", prometheus.NewRegistry()) -// consumptionRanges := []float64{ -// 0.5, -// 0.75, -// 1.5, -// math.MaxFloat64, -// } - -// cpuInterval := defaultCPUInterval -// // Defines the percentage of CPU time allotted to processing messages -// // from the bucket at the corresponding index. -// consumptionAllotments := []time.Duration{ -// cpuInterval / 4, -// cpuInterval / 4, -// cpuInterval / 4, -// cpuInterval / 4, -// } - -// ctx := snow.DefaultContextTest() -// ctx.Bootstrapped() -// queue, semaChan := newMultiLevelQueue( -// vdrs, -// ctx, -// metrics, -// consumptionRanges, -// consumptionAllotments, -// bufferSize, -// DefaultMaxNonStakerPendingMsgs, -// time.Second, -// DefaultStakerPortion, -// DefaultStakerPortion, -// ) - -// return queue, semaChan, vdrs -// } - -// func TestMultiLevelQueueSendsMessages(t *testing.T) { -// bufferSize := 8 -// queue, semaChan, vdrs := setupMultiLevelQueue(t, bufferSize) -// vdrList := []validators.Validator{} -// messages := []message{} -// for i := 0; i < bufferSize; i++ { -// vdr := validators.GenerateRandomValidator(2) -// messages = append(messages, message{ -// validatorID: vdr.ID(), -// }) -// vdrList = append(vdrList, vdr) -// } - -// vdrs.Set(vdrList) -// queue.EndInterval() - -// for _, msg := range messages { -// queue.PushMessage(msg) -// } - -// for count := 0; count < bufferSize; count++ { -// select { -// case _, ok := <-semaChan: -// if !ok { -// t.Fatal("Semaphore channel was closed early unexpectedly") -// } -// if _, err := queue.PopMessage(); err != nil { -// t.Fatalf("Pop message failed with error: %s", err) -// } -// default: -// t.Fatalf("Should have read message %d from queue", count) -// } -// } - -// // Ensure that the 6th message was never added to the queue -// select { -// case <-semaChan: -// t.Fatal("Semaphore channel should have been empty after reading all messages from the queue") -// default: -// } -// } - -// func TestExtraMessageDeadlock(t *testing.T) { -// bufferSize := 8 -// oversizedBuffer := bufferSize * 2 -// queue, semaChan, vdrs := setupMultiLevelQueue(t, bufferSize) - -// vdrList := []validators.Validator{} -// messages := []message{} -// for i := 0; i < oversizedBuffer; i++ { -// vdr := validators.GenerateRandomValidator(2) -// messages = append(messages, message{ -// validatorID: vdr.ID(), -// }) -// vdrList = append(vdrList, vdr) -// } - -// vdrs.Set(vdrList) -// queue.EndInterval() - -// // Test messages are dropped when full to avoid blocking when -// // adding a message to a queue or to the counting semaphore channel -// for _, msg := range messages { -// queue.PushMessage(msg) -// } - -// // There should now be [bufferSize] messages on the queue -// // Note: this may not be the case where a message is dropped -// // because there is less than [bufferSize] room on the multi-level -// // queue as a result of rounding when calculating the size of the -// // single-level queues. -// for i := 0; i < bufferSize; i++ { -// <-semaChan -// } -// select { -// case <-semaChan: -// t.Fatal("Semaphore channel should have been empty") -// default: -// } -// } - -// func TestMultiLevelQueuePrioritizes(t *testing.T) { -// bufferSize := 8 -// vdrs := validators.NewSet() -// validator1 := validators.GenerateRandomValidator(2000) -// validator2 := validators.GenerateRandomValidator(2000) -// vdrs.Set([]validators.Validator{ -// validator1, -// validator2, -// }) - -// metrics := &metrics{} -// metrics.Initialize("", prometheus.NewRegistry()) -// // Set tier1 cutoff sufficiently low so that only messages from validators -// // the message queue has not serviced will be placed on it for the test. -// tier1 := 0.001 -// tier2 := 1.0 -// tier3 := math.MaxFloat64 -// consumptionRanges := []float64{ -// tier1, -// tier2, -// tier3, -// } - -// perTier := time.Second -// // Give each tier 1 second of processing time -// consumptionAllotments := []time.Duration{ -// perTier, -// perTier, -// perTier, -// } - -// ctx := snow.DefaultContextTest() -// ctx.Bootstrapped() -// queue, semaChan := newMultiLevelQueue( -// vdrs, -// ctx, -// metrics, -// consumptionRanges, -// consumptionAllotments, -// bufferSize, -// DefaultMaxNonStakerPendingMsgs, -// time.Second, -// DefaultStakerPortion, -// DefaultStakerPortion, -// ) - -// // Utilize CPU such that the next message from validator2 will be placed on a lower -// // level queue (but be sure not to consume the entire CPU allotment for tier1) -// queue.UtilizeCPU(validator2.ID(), perTier/2) - -// // Push two messages from from high priority validator and one from -// // low priority validator -// messages := []message{ -// { -// validatorID: validator1.ID(), -// requestID: 1, -// }, -// { -// validatorID: validator1.ID(), -// requestID: 2, -// }, -// { -// validatorID: validator2.ID(), -// requestID: 3, -// }, -// } - -// for _, msg := range messages { -// queue.PushMessage(msg) -// } - -// <-semaChan -// if msg1, err := queue.PopMessage(); err != nil { -// t.Fatal(err) -// } else if !msg1.validatorID.Equals(validator1.ID()) { -// t.Fatal("Expected first message to come from the high priority validator") -// } - -// // Utilize the remainder of the time that should be alloted to the highest priority -// // queue. -// queue.UtilizeCPU(validator1.ID(), perTier) - -// <-semaChan -// if msg2, err := queue.PopMessage(); err != nil { -// t.Fatal(err) -// } else if !msg2.validatorID.Equals(validator2.ID()) { -// t.Fatal("Expected second message to come from the low priority validator after moving on to the lower level queue") -// } - -// <-semaChan -// if msg3, err := queue.PopMessage(); err != nil { -// t.Fatal(err) -// } else if !msg3.validatorID.Equals(validator1.ID()) { -// t.Fatal("Expected final message to come from validator1") -// } -// } - -// func TestMultiLevelQueuePushesDownOldMessages(t *testing.T) { -// bufferSize := 16 -// vdrs := validators.NewSet() -// vdr0 := validators.GenerateRandomValidator(2000) -// vdr1 := validators.GenerateRandomValidator(2000) -// vdrs.Set([]validators.Validator{ -// vdr0, -// vdr1, -// }) - -// metrics := &metrics{} -// metrics.Initialize("", prometheus.NewRegistry()) -// // Set tier1 cutoff sufficiently low so that only messages from validators -// // the message queue has not serviced will be placed on it for the test. -// tier1 := 0.001 -// tier2 := 1.0 -// tier3 := math.MaxFloat64 -// consumptionRanges := []float64{ -// tier1, -// tier2, -// tier3, -// } - -// perTier := time.Second -// // Give each tier 1 second of processing time -// consumptionAllotments := []time.Duration{ -// perTier, -// perTier, -// perTier, -// } - -// ctx := snow.DefaultContextTest() -// ctx.Bootstrapped() -// cpuTracker := -// queue, semaChan := newMultiLevelQueue( -// vdrs, -// ctx, -// metrics, -// consumptionRanges, -// consumptionAllotments, -// bufferSize, -// DefaultMaxNonStakerPendingMsgs, -// time.Second, -// DefaultStakerPortion, -// DefaultStakerPortion, -// ) - -// queue.PushMessage(message{ -// validatorID: vdr0.ID(), -// requestID: 1, -// }) -// queue.PushMessage(message{ -// validatorID: vdr0.ID(), -// requestID: 2, -// }) -// queue.PushMessage(message{ -// validatorID: vdr1.ID(), -// requestID: 3, -// }) - -// <-semaChan -// msg, err := queue.PopMessage() -// if err != nil { -// t.Fatalf("Popping first message errored: %s", err) -// } -// if !msg.validatorID.Equals(vdr0.ID()) { -// t.Fatal("Expected first message to come from vdr0") -// } - -// // Utilize enough CPU so that messages from vdr0 will be placed in a lower -// // priority queue, but not exhaust the time spent processing messages from -// // the highest priority queue -// queue.UtilizeCPU(vdr0.ID(), time.Second/2) - -// <-semaChan -// msg, err = queue.PopMessage() -// if err != nil { -// t.Fatalf("Popping second message errored: %s", err) -// } -// if !msg.validatorID.Equals(vdr1.ID()) { -// t.Fatal("Expected second message to come from vdr1 after vdr0 dropped in priority") -// } - -// <-semaChan -// msg, err = queue.PopMessage() -// if err != nil { -// t.Fatalf("Popping third message errored: %s", err) -// } -// if !msg.validatorID.Equals(vdr0.ID()) { -// t.Fatal("Expected third message to come from vdr0") -// } -// } +import ( + "math" + "testing" + "time" + + "github.com/prometheus/client_golang/prometheus" + + "github.com/ava-labs/gecko/snow" + "github.com/ava-labs/gecko/snow/networking/tracker" + "github.com/ava-labs/gecko/snow/validators" +) + +func setupMultiLevelQueue(t *testing.T, bufferSize int) (messageQueue, chan struct{}, validators.Set) { + vdrs := validators.NewSet() + metrics := &metrics{} + metrics.Initialize("", prometheus.NewRegistry()) + consumptionRanges := []float64{ + 0.5, + 0.75, + 1.5, + math.MaxFloat64, + } + + cpuInterval := defaultCPUInterval + // Defines the percentage of CPU time allotted to processing messages + // from the bucket at the corresponding index. + consumptionAllotments := []time.Duration{ + cpuInterval / 4, + cpuInterval / 4, + cpuInterval / 4, + cpuInterval / 4, + } + + ctx := snow.DefaultContextTest() + ctx.Bootstrapped() + cpuTracker := tracker.NewCPUTracker(cpuInterval) + queue, semaChan := newMultiLevelQueue( + vdrs, + ctx, + metrics, + cpuTracker, + consumptionRanges, + consumptionAllotments, + bufferSize, + DefaultMaxNonStakerPendingMsgs, + DefaultStakerPortion, + DefaultStakerPortion, + ) + + return queue, semaChan, vdrs +} + +func TestMultiLevelQueueSendsMessages(t *testing.T) { + bufferSize := 8 + queue, semaChan, vdrs := setupMultiLevelQueue(t, bufferSize) + vdrList := []validators.Validator{} + messages := []message{} + for i := 0; i < bufferSize; i++ { + vdr := validators.GenerateRandomValidator(2) + messages = append(messages, message{ + validatorID: vdr.ID(), + }) + vdrList = append(vdrList, vdr) + } + + vdrs.Set(vdrList) + + for _, msg := range messages { + queue.PushMessage(msg) + } + + for count := 0; count < bufferSize; count++ { + select { + case _, ok := <-semaChan: + if !ok { + t.Fatal("Semaphore channel was closed early unexpectedly") + } + if _, err := queue.PopMessage(); err != nil { + t.Fatalf("Pop message failed with error: %s", err) + } + default: + t.Fatalf("Should have read message %d from queue", count) + } + } + + // Ensure that the 6th message was never added to the queue + select { + case <-semaChan: + t.Fatal("Semaphore channel should have been empty after reading all messages from the queue") + default: + } +} + +func TestExtraMessageDeadlock(t *testing.T) { + bufferSize := 8 + oversizedBuffer := bufferSize * 2 + queue, semaChan, vdrs := setupMultiLevelQueue(t, bufferSize) + + vdrList := []validators.Validator{} + messages := []message{} + for i := 0; i < oversizedBuffer; i++ { + vdr := validators.GenerateRandomValidator(2) + messages = append(messages, message{ + validatorID: vdr.ID(), + }) + vdrList = append(vdrList, vdr) + } + + vdrs.Set(vdrList) + + // Test messages are dropped when full to avoid blocking when + // adding a message to a queue or to the counting semaphore channel + for _, msg := range messages { + queue.PushMessage(msg) + } + + // There should now be [bufferSize] messages on the queue + // Note: this may not be the case where a message is dropped + // because there is less than [bufferSize] room on the multi-level + // queue as a result of rounding when calculating the size of the + // single-level queues. + for i := 0; i < bufferSize; i++ { + <-semaChan + } + select { + case <-semaChan: + t.Fatal("Semaphore channel should have been empty") + default: + } +} + +func TestMultiLevelQueuePrioritizes(t *testing.T) { + bufferSize := 8 + vdrs := validators.NewSet() + validator1 := validators.GenerateRandomValidator(2000) + validator2 := validators.GenerateRandomValidator(2000) + vdrs.Set([]validators.Validator{ + validator1, + validator2, + }) + + metrics := &metrics{} + metrics.Initialize("", prometheus.NewRegistry()) + // Set tier1 cutoff sufficiently low so that only messages from validators + // the message queue has not serviced will be placed on it for the test. + tier1 := 0.001 + tier2 := 1.0 + tier3 := math.MaxFloat64 + consumptionRanges := []float64{ + tier1, + tier2, + tier3, + } + + perTier := time.Second + // Give each tier 1 second of processing time + consumptionAllotments := []time.Duration{ + perTier, + perTier, + perTier, + } + + ctx := snow.DefaultContextTest() + ctx.Bootstrapped() + cpuTracker := tracker.NewCPUTracker(time.Second) + queue, semaChan := newMultiLevelQueue( + vdrs, + ctx, + metrics, + cpuTracker, + consumptionRanges, + consumptionAllotments, + bufferSize, + DefaultMaxNonStakerPendingMsgs, + DefaultStakerPortion, + DefaultStakerPortion, + ) + + // Utilize CPU such that the next message from validator2 will be placed on a lower + // level queue (but be sure not to consume the entire CPU allotment for tier1) + startTime := time.Now() + duration := perTier / 2 + endTime := startTime.Add(duration) + queue.UtilizeCPU(validator2.ID(), startTime, endTime, duration) + + // Push two messages from from high priority validator and one from + // low priority validator + messages := []message{ + { + validatorID: validator1.ID(), + requestID: 1, + }, + { + validatorID: validator1.ID(), + requestID: 2, + }, + { + validatorID: validator2.ID(), + requestID: 3, + }, + } + + for _, msg := range messages { + queue.PushMessage(msg) + } + + <-semaChan + if msg1, err := queue.PopMessage(); err != nil { + t.Fatal(err) + } else if !msg1.validatorID.Equals(validator1.ID()) { + t.Fatal("Expected first message to come from the high priority validator") + } + + // Utilize the remainder of the time that should be alloted to the highest priority + // queue. + startTime = endTime + duration = perTier + endTime = startTime.Add(duration) + queue.UtilizeCPU(validator1.ID(), startTime, endTime, duration) + + <-semaChan + if msg2, err := queue.PopMessage(); err != nil { + t.Fatal(err) + } else if !msg2.validatorID.Equals(validator2.ID()) { + t.Fatal("Expected second message to come from the low priority validator after moving on to the lower level queue") + } + + <-semaChan + if msg3, err := queue.PopMessage(); err != nil { + t.Fatal(err) + } else if !msg3.validatorID.Equals(validator1.ID()) { + t.Fatal("Expected final message to come from validator1") + } +} + +func TestMultiLevelQueuePushesDownOldMessages(t *testing.T) { + bufferSize := 16 + vdrs := validators.NewSet() + vdr0 := validators.GenerateRandomValidator(2000) + vdr1 := validators.GenerateRandomValidator(2000) + vdrs.Set([]validators.Validator{ + vdr0, + vdr1, + }) + + metrics := &metrics{} + metrics.Initialize("", prometheus.NewRegistry()) + // Set tier1 cutoff sufficiently low so that only messages from validators + // the message queue has not serviced will be placed on it for the test. + tier1 := 0.001 + tier2 := 1.0 + tier3 := math.MaxFloat64 + consumptionRanges := []float64{ + tier1, + tier2, + tier3, + } + + perTier := time.Second + // Give each tier 1 second of processing time + consumptionAllotments := []time.Duration{ + perTier, + perTier, + perTier, + } + + ctx := snow.DefaultContextTest() + ctx.Bootstrapped() + cpuTracker := tracker.NewCPUTracker(time.Second) + queue, semaChan := newMultiLevelQueue( + vdrs, + ctx, + metrics, + cpuTracker, + consumptionRanges, + consumptionAllotments, + bufferSize, + DefaultMaxNonStakerPendingMsgs, + DefaultStakerPortion, + DefaultStakerPortion, + ) + + queue.PushMessage(message{ + validatorID: vdr0.ID(), + requestID: 1, + }) + queue.PushMessage(message{ + validatorID: vdr0.ID(), + requestID: 2, + }) + queue.PushMessage(message{ + validatorID: vdr1.ID(), + requestID: 3, + }) + + <-semaChan + msg, err := queue.PopMessage() + if err != nil { + t.Fatalf("Popping first message errored: %s", err) + } + if !msg.validatorID.Equals(vdr0.ID()) { + t.Fatal("Expected first message to come from vdr0") + } + + // Utilize enough CPU so that messages from vdr0 will be placed in a lower + // priority queue, but not exhaust the time spent processing messages from + // the highest priority queue + startTime := time.Now() + duration := time.Second / 2 + endTime := startTime.Add(duration) + queue.UtilizeCPU(vdr0.ID(), startTime, endTime, duration) + + <-semaChan + msg, err = queue.PopMessage() + if err != nil { + t.Fatalf("Popping second message errored: %s", err) + } + if !msg.validatorID.Equals(vdr1.ID()) { + t.Fatal("Expected second message to come from vdr1 after vdr0 dropped in priority") + } + + <-semaChan + msg, err = queue.PopMessage() + if err != nil { + t.Fatalf("Popping third message errored: %s", err) + } + if !msg.validatorID.Equals(vdr0.ID()) { + t.Fatal("Expected third message to come from vdr0") + } +} diff --git a/snow/networking/tracker/cpu_tracker_test.go b/snow/networking/tracker/cpu_tracker_test.go new file mode 100644 index 00000000000..a740720c21f --- /dev/null +++ b/snow/networking/tracker/cpu_tracker_test.go @@ -0,0 +1,55 @@ +// (c) 2019-2020, Ava Labs, Inc. All rights reserved. +// See the file LICENSE for licensing terms. + +package tracker + +import ( + "testing" + "time" + + "github.com/ava-labs/gecko/ids" +) + +func TestCPUTracker(t *testing.T) { + halflife := time.Second + cpuTracker := NewCPUTracker(halflife) + vdr1 := ids.NewShortID([20]byte{1}) + vdr2 := ids.NewShortID([20]byte{2}) + + startTime1 := time.Now() + endTime1 := startTime1.Add(halflife) + + cpuTracker.UtilizeTime(vdr1, startTime1, endTime1) + + startTime2 := endTime1 + endTime2 := startTime2.Add(halflife) + cpuTracker.UtilizeTime(vdr2, startTime2, endTime2) + + utilization1 := cpuTracker.Utilization(vdr1, endTime2) + utilization2 := cpuTracker.Utilization(vdr2, endTime2) + + if utilization1 >= utilization2 { + t.Fatalf("Utilization should have been higher for the more recent spender") + } + + cumulative := cpuTracker.CumulativeUtilization(endTime2) + sum := utilization1 + utilization2 + if cumulative != sum { + t.Fatalf("Cumulative utilization: %f should have been equal to the sum of the spenders: %f", cumulative, sum) + } + + expectedLen := 2 + len := cpuTracker.Len() + if len != expectedLen { + t.Fatalf("Expected length to match number of spenders: %d, but found length: %d", expectedLen, len) + } + + // Set pruning time to 64 halflifes in the future, to guarantee that + // any counts should have gone to 0 + pruningTime := endTime2.Add(halflife * 64) + cpuTracker.EndInterval(pruningTime) + len = cpuTracker.Len() + if len != 0 { + t.Fatalf("Expected length to be 0 after pruning, but found length: %d", len) + } +} diff --git a/snow/networking/tracker/msg_tracker_test.go b/snow/networking/tracker/msg_tracker_test.go new file mode 100644 index 00000000000..dd3f4ceadc6 --- /dev/null +++ b/snow/networking/tracker/msg_tracker_test.go @@ -0,0 +1,70 @@ +// (c) 2019-2020, Ava Labs, Inc. All rights reserved. +// See the file LICENSE for licensing terms. + +package tracker + +import ( + "testing" + + "github.com/ava-labs/gecko/ids" +) + +func TestMessageTracker(t *testing.T) { + msgTracker := NewMessageTracker() + + vdr1 := ids.NewShortID([20]byte{1}) + vdr2 := ids.NewShortID([20]byte{2}) + noMessagesVdr := ids.NewShortID([20]byte{3}) + + expectedVdr1Count := uint32(5) + expectedVdr2Count := uint32(2) + + for i := uint32(0); i < expectedVdr1Count; i++ { + msgTracker.Add(vdr1) + } + + for i := uint32(0); i < expectedVdr2Count; i++ { + msgTracker.Add(vdr2) + } + + vdr1Count := msgTracker.OutstandingCount(vdr1) + vdr2Count := msgTracker.OutstandingCount(vdr2) + noMessagesCount := msgTracker.OutstandingCount(noMessagesVdr) + + if vdr1Count != expectedVdr1Count { + t.Fatalf("Found unexpected count for validator1: %d, expected: %d", vdr1Count, expectedVdr1Count) + } + + if vdr2Count != expectedVdr2Count { + t.Fatalf("Found unexpected count for validator2: %d, expected: %d", vdr2Count, expectedVdr2Count) + } + + if noMessagesCount != 0 { + t.Fatalf("Found unexpected count for validator with no messages: %d, expected: %d", noMessagesCount, 0) + } + + for i := uint32(0); i < expectedVdr1Count; i++ { + msgTracker.Remove(vdr1) + } + + for i := uint32(0); i < expectedVdr2Count; i++ { + msgTracker.Remove(vdr2) + } + + vdr1Count = msgTracker.OutstandingCount(vdr1) + vdr2Count = msgTracker.OutstandingCount(vdr2) + noMessagesCount = msgTracker.OutstandingCount(noMessagesVdr) + + if vdr1Count != 0 { + t.Fatalf("Found unexpected count for validator1: %d, expected: %d", vdr1Count, 0) + } + + if vdr2Count != 0 { + t.Fatalf("Found unexpected count for validator2: %d, expected: %d", vdr2Count, 0) + } + + if noMessagesCount != 0 { + t.Fatalf("Found unexpected count for validator with no messages: %d, expected: %d", noMessagesCount, 0) + } + +}