Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Reduce resharding impact by redirecting data to new shards #8075

Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
194 changes: 164 additions & 30 deletions storage/remote/queue_manager.go
Expand Up @@ -15,7 +15,9 @@ package remote

import (
"context"
"fmt"
"math"
"sort"
"strconv"
"sync"
"time"
Expand Down Expand Up @@ -350,6 +352,7 @@ type QueueManager struct {
metadataWatcher *MetadataWatcher

clientMtx sync.RWMutex
reshardMtx sync.RWMutex
storeClient WriteClient

seriesMtx sync.Mutex
Expand Down Expand Up @@ -528,9 +531,12 @@ outer:
}
appendSample.Value = s.V
appendSample.Timestamp = s.T
if t.shards.enqueue(s.Ref, writeSample{lbls, appendSample}) {
t.reshardMtx.RLock()
if t.shards.enqueue(writeSample{s.Ref, lbls, appendSample}) {
t.reshardMtx.RUnlock()
continue outer
}
t.reshardMtx.RUnlock()

t.metrics.enqueueRetriesTotal.Inc()
time.Sleep(time.Duration(backoff))
Expand Down Expand Up @@ -575,9 +581,12 @@ outer:
appendExemplar.Labels = labelsToLabelsProto(e.Labels, nil)
appendExemplar.Timestamp = e.T
appendExemplar.Value = e.V
if t.shards.enqueue(e.Ref, writeExemplar{lbls, appendExemplar}) {
t.reshardMtx.RLock()
if t.shards.enqueue(writeExemplar{e.Ref, lbls, appendExemplar}) {
t.reshardMtx.RUnlock()
continue outer
}
t.reshardMtx.RUnlock()

t.metrics.enqueueRetriesTotal.Inc()
time.Sleep(time.Duration(backoff))
Expand Down Expand Up @@ -864,17 +873,40 @@ func (t *QueueManager) reshardLoop() {
for {
select {
case numShards := <-t.reshardChan:
// We start the newShards after we have stopped (the therefore completely
// flushed) the oldShards, to guarantee we only every deliver samples in
// order.
t.shards.stop()
t.shards.start(numShards)
t.reshard(numShards)
case <-t.quit:
return
}
}
}

// reshard takes a write lock on queueMux and signals the queues for resharding. It bufferizes the old
// samples and enqueues them before allowing Append() so that samples in order of time is guaranteed.
func (t *QueueManager) reshard(numNewShards int) {
if numQueues := len(t.shards.queues); numQueues > 0 {
// Signal reshard only if old shards exists.
close(t.shards.reshard)
}
// We start the new shards parallel to the old shards. These new shards
// are filled with existing samples in old shards while blocking the
// incoming samples through Append using the queueMux.Lock(). Filling of existing samples is pretty
// quick, so loss in time is negligible.
//
// Once the existing samples are filled, the old shards are replaced by
// the new shards and then incoming samples are consumed. This is done
// to guarantee that we deliver samples in order only.
// Create new shards.
newShards := t.newShards()
newShards.start(numNewShards)
complete := t.shards.transferToShards(newShards)
t.shards.stop()
<-complete
// Replace old shards with the new ones.
t.reshardMtx.Lock()
t.shards = newShards
t.reshardMtx.Unlock()
}

func (t *QueueManager) newShards() *shards {
s := &shards{
qm: t,
Expand All @@ -884,11 +916,13 @@ func (t *QueueManager) newShards() *shards {
}

type writeSample struct {
ref uint64
seriesLabels labels.Labels
sample prompb.Sample
}

type writeExemplar struct {
ref uint64
seriesLabels labels.Labels
exemplar prompb.Exemplar
}
Expand All @@ -898,13 +932,16 @@ type shards struct {

qm *QueueManager
queues []chan interface{}
buffer chan []interface{}

// So we can accurately track how many of each are lost during shard shutdowns.
enqueuedSamples atomic.Int64
enqueuedExemplars atomic.Int64

// Emulate a wait group with a channel and an atomic int, as you
// cannot select on a wait group.
done chan struct{}
reshard chan struct{}
running atomic.Int32

// Soft shutdown context will prevent new enqueues and deadlocks.
Expand All @@ -917,7 +954,9 @@ type shards struct {
exemplarsDroppedOnHardShutdown atomic.Uint32
}

// start the shards; must be called before any call to enqueue.
// start the shards/new queues. If the queues are already existing, like in case of
// resharding, new queues are formed parallel to the existing ones which are later
// replaced.
func (s *shards) start(n int) {
s.mtx.Lock()
defer s.mtx.Unlock()
Expand All @@ -931,37 +970,36 @@ func (s *shards) start(n int) {
}

s.queues = newQueues
s.buffer = make(chan []interface{})

var hardShutdownCtx context.Context
hardShutdownCtx, s.hardShutdown = context.WithCancel(context.Background())
s.softShutdown = make(chan struct{})
s.running.Store(int32(n))

s.softShutdown = make(chan struct{})
s.reshard = make(chan struct{})
s.done = make(chan struct{})

s.samplesDroppedOnHardShutdown.Store(0)
s.exemplarsDroppedOnHardShutdown.Store(0)

for i := 0; i < n; i++ {
go s.runShard(hardShutdownCtx, i, newQueues[i])
go s.runShard(hardShutdownCtx, i, newQueues[i], s.buffer)
}
}

// stop the shards; subsequent call to enqueue will return false.
// stop existing shards and triggers flushing of samples that are existing in the queue.
func (s *shards) stop() {
// Attempt a clean shutdown, but only wait flushDeadline for all the shards
// to cleanly exit. As we're doing RPCs, enqueue can block indefinitely.
// We must be able so call stop concurrently, hence we can only take the
// RLock here.
s.mtx.RLock()
close(s.softShutdown)
s.mtx.RUnlock()

// Enqueue should now be unblocked, so we can take the write lock. This
// also ensures we don't race with writes to the queues, and get a panic:
// send on closed channel.
s.mtx.Lock()
defer s.mtx.Unlock()
for _, queue := range s.queues {
close(queue)

for i := range s.queues {
close(s.queues[i])
}

select {
case <-s.done:
return
Expand All @@ -981,39 +1019,86 @@ func (s *shards) stop() {

// enqueue data (sample or exemplar). If we are currently in the process of shutting down or resharding,
// will return false; in this case, you should back off and retry.
func (s *shards) enqueue(ref uint64, data interface{}) bool {
func (s *shards) enqueue(data interface{}) bool {
s.mtx.RLock()
defer s.mtx.RUnlock()

select {
case <-s.reshard:
return false
case <-s.softShutdown:
return false
default:
}

var ref uint64
switch n := data.(type) {
case writeSample:
ref = n.ref
s.qm.metrics.pendingSamples.Inc()
s.enqueuedSamples.Inc()
case writeExemplar:
ref = n.ref
s.qm.metrics.pendingExemplars.Inc()
s.enqueuedExemplars.Inc()
default:
level.Warn(s.qm.logger).Log("msg", "Invalid object type in shards enqueue")
}

shard := uint64(ref) % uint64(len(s.queues))
select {
case <-s.reshard:
return false
case <-s.softShutdown:
return false
case s.queues[shard] <- data:
switch data.(type) {
return true
}
}

// transferToShards transfers the data in old shard's buffer to the new shards
// that are passed as argument.
func (s *shards) transferToShards(newShards *shards) <-chan struct{} {
done := make(chan struct{})
ts := func(data interface{}) int64 {
switch n := data.(type) {
case writeSample:
s.qm.metrics.pendingSamples.Inc()
s.enqueuedSamples.Inc()
return n.sample.Timestamp
case writeExemplar:
s.qm.metrics.pendingExemplars.Inc()
s.enqueuedExemplars.Inc()
return n.exemplar.Timestamp
default:
level.Warn(s.qm.logger).Log("msg", "Invalid object type in shards enqueue")
level.Warn(s.qm.logger).Log("msg", "Invalid object type while fetching timestamp when bufferizing data")
return 0
}
return true
}
go func() {
var buffer []interface{}
for i := 0; i < len(s.queues); i++ {
buf := <-s.buffer
buffer = append(buffer, buf...)
}
sort.SliceStable(buffer, func(i, j int) bool {
current := ts(buffer[i])
next := ts(buffer[j])
return current < next
})
level.Debug(s.qm.logger).Log("msg", fmt.Sprintf("Sending %d samples to newer queues...", len(buffer)))
attempt:
for _, sample := range buffer {
if !newShards.enqueue(sample) {
continue attempt
}
}
close(done)
}()
return done
}

func (s *shards) runShard(ctx context.Context, shardID int, queue chan interface{}) {
func (s *shards) runShard(ctx context.Context, shardID int, queue chan interface{}, bufferCh chan []interface{}) {
defer func() {
if s.running.Dec() == 0 {
close(s.done)
close(s.buffer)
}
}()

Expand All @@ -1031,6 +1116,7 @@ func (s *shards) runShard(ctx context.Context, shardID int, queue chan interface

buf []byte
pendingData []prompb.TimeSeries
refIndex []uint64 // Stores the ref of pendingData (writeSample or writeExemplar) on the same index.
exemplarBuffer [][]prompb.Exemplar
)
totalPending := max
Expand All @@ -1040,6 +1126,7 @@ func (s *shards) runShard(ctx context.Context, shardID int, queue chan interface
}

pendingData = make([]prompb.TimeSeries, totalPending)
refIndex = make([]uint64, totalPending)

timer := time.NewTimer(time.Duration(s.qm.cfg.BatchSendDeadline))
stop := func() {
Expand Down Expand Up @@ -1067,6 +1154,21 @@ func (s *shards) runShard(ctx context.Context, shardID int, queue chan interface
s.exemplarsDroppedOnHardShutdown.Add(uint32(droppedExemplars))
return

case <-s.reshard:
var data []interface{}
// Accept those samples that might have entered the queue between sending the resharding
// signal and blocking of incoming samples at the time of creation of newer queues.
for point := range queue {
data = append(data, point)
}
pending := timeseriesToData(nPending, pendingData, refIndex)
data = append(data, pending...)
if num := len(data); num > 0 {
s.qm.metrics.pendingSamples.Add(float64(num))
}
bufferCh <- data
return

case sample, ok := <-queue:
if !ok {
if nPendingSamples > 0 || nPendingExemplars > 0 {
Expand All @@ -1088,6 +1190,7 @@ func (s *shards) runShard(ctx context.Context, shardID int, queue chan interface
pendingData[nPending].Labels = labelsToLabelsProto(d.seriesLabels, pendingData[nPending].Labels)
pendingData[nPending].Samples = sampleBuffer[nPendingSamples]
pendingData[nPending].Exemplars = nil
refIndex[nPending] = d.ref
nPendingSamples++
nPending++

Expand All @@ -1096,6 +1199,7 @@ func (s *shards) runShard(ctx context.Context, shardID int, queue chan interface
pendingData[nPending].Labels = labelsToLabelsProto(d.seriesLabels, pendingData[nPending].Labels)
pendingData[nPending].Samples = nil
pendingData[nPending].Exemplars = exemplarBuffer[nPendingExemplars]
refIndex[nPending] = d.ref
nPendingExemplars++
nPending++
}
Expand Down Expand Up @@ -1127,6 +1231,36 @@ func (s *shards) runShard(ctx context.Context, shardID int, queue chan interface
}
}

func timeseriesToData(max int, ts []prompb.TimeSeries, refIndex []uint64) []interface{} {
data := make([]interface{}, 0, len(ts))
for i, s := range ts {
if i >= max {
break
}
ref := refIndex[i]
labels := labelProtosToLabels(s.Labels)
if s.Samples != nil {
for j := range s.Samples {
data = append(data, writeSample{
ref: ref,
seriesLabels: labels,
sample: s.Samples[j],
})
}
}
if s.Exemplars != nil {
for j := range s.Samples {
data = append(data, writeExemplar{
ref: ref,
seriesLabels: labels,
exemplar: s.Exemplars[j],
})
}
}
}
return data
}

func (s *shards) sendSamples(ctx context.Context, samples []prompb.TimeSeries, sampleCount int, exemplarCount int, buf *[]byte) {
begin := time.Now()
err := s.sendSamplesWithBackoff(ctx, samples, sampleCount, exemplarCount, buf)
Expand Down
3 changes: 1 addition & 2 deletions storage/remote/queue_manager_test.go
Expand Up @@ -366,8 +366,7 @@ func TestReshard(t *testing.T) {
}()

for i := 1; i < len(samples)/config.DefaultQueueConfig.Capacity; i++ {
m.shards.stop()
m.shards.start(i)
m.reshard(i)
time.Sleep(100 * time.Millisecond)
}

Expand Down
4 changes: 2 additions & 2 deletions tsdb/wal/watcher_test.go
Expand Up @@ -55,8 +55,8 @@ type writeToMock struct {
seriesSegmentIndexes map[uint64]int
}

func (wtm *writeToMock) Append(s []record.RefSample) bool {
wtm.samplesAppended += len(s)
func (wtm *writeToMock) Append(samples []record.RefSample) bool {
wtm.samplesAppended += len(samples)
return true
}

Expand Down