Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

WIP: Issue 7124 #2

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 17 additions & 10 deletions storage/remote/queue_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -544,7 +544,7 @@ func (t *QueueManager) updateShardsLoop() {
select {
case <-ticker.C:
desiredShards := t.calculateDesiredShards()
if desiredShards == t.numShards {
if !t.shouldReshard(desiredShards) {
continue
}
// Resharding can take some time, and we want this loop
Expand All @@ -562,6 +562,22 @@ func (t *QueueManager) updateShardsLoop() {
}
}

// shouldReshard returns if resharding should occur
func (t *QueueManager) shouldReshard(desiredShards int) bool {
if desiredShards == t.numShards {
return false
}
// We shouldn't reshard if Prometheus hasn't been able to send to the
// remote endpoint successfully within some period of time.
minSendTimestamp := time.Now().Add(-2 * time.Duration(t.cfg.BatchSendDeadline)).Unix()
lsts := atomic.LoadInt64(&t.lastSendTimestamp)
if lsts < minSendTimestamp {
level.Warn(t.logger).Log("msg", "Skipping resharding, last successful send was beyond threshold", "lastSendTimestamp", lsts, "minSendTimestamp", minSendTimestamp)
return false
}
return true
}

// calculateDesiredShards returns the number of desired shards, which will be
// the current QueueManager.numShards if resharding should not occur for reasons
// outlined in this functions implementation. It is up to the caller to reshard, or not,
Expand Down Expand Up @@ -591,15 +607,6 @@ func (t *QueueManager) calculateDesiredShards() int {
return t.numShards
}

// We shouldn't reshard if Prometheus hasn't been able to send to the
// remote endpoint successfully within some period of time.
minSendTimestamp := time.Now().Add(-2 * time.Duration(t.cfg.BatchSendDeadline)).Unix()
lsts := atomic.LoadInt64(&t.lastSendTimestamp)
if lsts < minSendTimestamp {
level.Warn(t.logger).Log("msg", "Skipping resharding, last successful send was beyond threshold", "lastSendTimestamp", lsts, "minSendTimestamp", minSendTimestamp)
return t.numShards
}

// When behind we will try to catch up on a proporation of samples per tick.
// This works similarly to an integral accumulator in that pending samples
// is the result of the error integral.
Expand Down
9 changes: 4 additions & 5 deletions storage/remote/queue_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -292,16 +292,14 @@ func TestReleaseNoninternedString(t *testing.T) {
testutil.Assert(t, metric == 0, "expected there to be no calls to release for strings that were not already interned: %d", int(metric))
}

func TestCalculateDesiredsShards(t *testing.T) {
func TestShouldReshard(t *testing.T) {
type testcase struct {
startingShards int
samplesIn, samplesOut int64
reshard bool
samplesIn, samplesOut int64
}
cases := []testcase{
{
// Test that ensures that if we haven't successfully sent a
// sample recently the queue will not reshard.
startingShards: 10,
reshard: false,
samplesIn: 1000,
Expand Down Expand Up @@ -329,8 +327,9 @@ func TestCalculateDesiredsShards(t *testing.T) {
}
m.Start()
desiredShards := m.calculateDesiredShards()
shouldReshard := m.shouldReshard(desiredShards)
m.Stop()
if !c.reshard {
if !(c.reshard == shouldReshard) {
testutil.Assert(t, desiredShards == m.numShards, "expected calculateDesiredShards to not want to reshard, wants to change from %d to %d shards", m.numShards, desiredShards)
} else {
testutil.Assert(t, desiredShards != m.numShards, "expected calculateDesiredShards to want to reshard, wants to change from %d to %d shards", m.numShards, desiredShards)
Expand Down