Skip to content

Commit

Permalink
Reduce reshard impact by redirecting data to newer shards.
Browse files Browse the repository at this point in the history
Signed-off-by: Harkishen-Singh <harkishensingh@hotmail.com>
  • Loading branch information
Harkishen-Singh committed Jul 30, 2021
1 parent 79d354a commit b650da3
Show file tree
Hide file tree
Showing 3 changed files with 167 additions and 34 deletions.
194 changes: 164 additions & 30 deletions storage/remote/queue_manager.go
Expand Up @@ -15,7 +15,9 @@ package remote

import (
"context"
"fmt"
"math"
"sort"
"strconv"
"sync"
"time"
Expand Down Expand Up @@ -350,6 +352,7 @@ type QueueManager struct {
metadataWatcher *MetadataWatcher

clientMtx sync.RWMutex
reshardMtx sync.RWMutex
storeClient WriteClient

seriesMtx sync.Mutex
Expand Down Expand Up @@ -528,9 +531,12 @@ outer:
}
appendSample.Value = s.V
appendSample.Timestamp = s.T
if t.shards.enqueue(s.Ref, writeSample{lbls, appendSample}) {
t.reshardMtx.RLock()
if t.shards.enqueue(writeSample{s.Ref, lbls, appendSample}) {
t.reshardMtx.RUnlock()
continue outer
}
t.reshardMtx.RUnlock()

t.metrics.enqueueRetriesTotal.Inc()
time.Sleep(time.Duration(backoff))
Expand Down Expand Up @@ -575,9 +581,12 @@ outer:
appendExemplar.Labels = labelsToLabelsProto(e.Labels, nil)
appendExemplar.Timestamp = e.T
appendExemplar.Value = e.V
if t.shards.enqueue(e.Ref, writeExemplar{lbls, appendExemplar}) {
t.reshardMtx.RLock()
if t.shards.enqueue(writeExemplar{e.Ref, lbls, appendExemplar}) {
t.reshardMtx.RUnlock()
continue outer
}
t.reshardMtx.RUnlock()

t.metrics.enqueueRetriesTotal.Inc()
time.Sleep(time.Duration(backoff))
Expand Down Expand Up @@ -864,17 +873,40 @@ func (t *QueueManager) reshardLoop() {
for {
select {
case numShards := <-t.reshardChan:
// We start the newShards after we have stopped (the therefore completely
// flushed) the oldShards, to guarantee we only every deliver samples in
// order.
t.shards.stop()
t.shards.start(numShards)
t.reshard(numShards)
case <-t.quit:
return
}
}
}

// reshard takes a write lock on queueMux and signals the queues for resharding. It bufferizes the old
// samples and enqueues them before allowing Append() so that samples in order of time is guaranteed.
func (t *QueueManager) reshard(numNewShards int) {
if numQueues := len(t.shards.queues); numQueues > 0 {
// Signal reshard only if old shards exists.
close(t.shards.reshard)
}
// We start the new shards parallel to the old shards. These new shards
// are filled with existing samples in old shards while blocking the
// incoming samples through Append using the queueMux.Lock(). Filling of existing samples is pretty
// quick, so loss in time is negligible.
//
// Once the existing samples are filled, the old shards are replaced by
// the new shards and then incoming samples are consumed. This is done
// to guarantee that we deliver samples in order only.
// Create new shards.
newShards := t.newShards()
newShards.start(numNewShards)
complete := t.shards.transferToShards(newShards)
t.shards.stop()
<-complete
// Replace old shards with the new ones.
t.reshardMtx.Lock()
t.shards = newShards
t.reshardMtx.Unlock()
}

func (t *QueueManager) newShards() *shards {
s := &shards{
qm: t,
Expand All @@ -884,11 +916,13 @@ func (t *QueueManager) newShards() *shards {
}

type writeSample struct {
ref uint64
seriesLabels labels.Labels
sample prompb.Sample
}

type writeExemplar struct {
ref uint64
seriesLabels labels.Labels
exemplar prompb.Exemplar
}
Expand All @@ -898,13 +932,16 @@ type shards struct {

qm *QueueManager
queues []chan interface{}
buffer chan []interface{}

// So we can accurately track how many of each are lost during shard shutdowns.
enqueuedSamples atomic.Int64
enqueuedExemplars atomic.Int64

// Emulate a wait group with a channel and an atomic int, as you
// cannot select on a wait group.
done chan struct{}
reshard chan struct{}
running atomic.Int32

// Soft shutdown context will prevent new enqueues and deadlocks.
Expand All @@ -917,7 +954,9 @@ type shards struct {
exemplarsDroppedOnHardShutdown atomic.Uint32
}

// start the shards; must be called before any call to enqueue.
// start the shards/new queues. If the queues are already existing, like in case of
// resharding, new queues are formed parallel to the existing ones which are later
// replaced.
func (s *shards) start(n int) {
s.mtx.Lock()
defer s.mtx.Unlock()
Expand All @@ -931,37 +970,36 @@ func (s *shards) start(n int) {
}

s.queues = newQueues
s.buffer = make(chan []interface{})

var hardShutdownCtx context.Context
hardShutdownCtx, s.hardShutdown = context.WithCancel(context.Background())
s.softShutdown = make(chan struct{})
s.running.Store(int32(n))

s.softShutdown = make(chan struct{})
s.reshard = make(chan struct{})
s.done = make(chan struct{})

s.samplesDroppedOnHardShutdown.Store(0)
s.exemplarsDroppedOnHardShutdown.Store(0)

for i := 0; i < n; i++ {
go s.runShard(hardShutdownCtx, i, newQueues[i])
go s.runShard(hardShutdownCtx, i, newQueues[i], s.buffer)
}
}

// stop the shards; subsequent call to enqueue will return false.
// stop existing shards and triggers flushing of samples that are existing in the queue.
func (s *shards) stop() {
// Attempt a clean shutdown, but only wait flushDeadline for all the shards
// to cleanly exit. As we're doing RPCs, enqueue can block indefinitely.
// We must be able so call stop concurrently, hence we can only take the
// RLock here.
s.mtx.RLock()
close(s.softShutdown)
s.mtx.RUnlock()

// Enqueue should now be unblocked, so we can take the write lock. This
// also ensures we don't race with writes to the queues, and get a panic:
// send on closed channel.
s.mtx.Lock()
defer s.mtx.Unlock()
for _, queue := range s.queues {
close(queue)

for i := range s.queues {
close(s.queues[i])
}

select {
case <-s.done:
return
Expand All @@ -981,39 +1019,86 @@ func (s *shards) stop() {

// enqueue data (sample or exemplar). If we are currently in the process of shutting down or resharding,
// will return false; in this case, you should back off and retry.
func (s *shards) enqueue(ref uint64, data interface{}) bool {
func (s *shards) enqueue(data interface{}) bool {
s.mtx.RLock()
defer s.mtx.RUnlock()

select {
case <-s.reshard:
return false
case <-s.softShutdown:
return false
default:
}

var ref uint64
switch n := data.(type) {
case writeSample:
ref = n.ref
s.qm.metrics.pendingSamples.Inc()
s.enqueuedSamples.Inc()
case writeExemplar:
ref = n.ref
s.qm.metrics.pendingExemplars.Inc()
s.enqueuedExemplars.Inc()
default:
level.Warn(s.qm.logger).Log("msg", "Invalid object type in shards enqueue")
}

shard := uint64(ref) % uint64(len(s.queues))
select {
case <-s.reshard:
return false
case <-s.softShutdown:
return false
case s.queues[shard] <- data:
switch data.(type) {
return true
}
}

// transferToShards transfers the data in old shard's buffer to the new shards
// that are passed as argument.
func (s *shards) transferToShards(newShards *shards) <-chan struct{} {
done := make(chan struct{})
ts := func(data interface{}) int64 {
switch n := data.(type) {
case writeSample:
s.qm.metrics.pendingSamples.Inc()
s.enqueuedSamples.Inc()
return n.sample.Timestamp
case writeExemplar:
s.qm.metrics.pendingExemplars.Inc()
s.enqueuedExemplars.Inc()
return n.exemplar.Timestamp
default:
level.Warn(s.qm.logger).Log("msg", "Invalid object type in shards enqueue")
level.Warn(s.qm.logger).Log("msg", "Invalid object type while fetching timestamp when bufferizing data")
return 0
}
return true
}
go func() {
var buffer []interface{}
for i := 0; i < len(s.queues); i++ {
buf := <-s.buffer
buffer = append(buffer, buf...)
}
sort.SliceStable(buffer, func(i, j int) bool {
current := ts(buffer[i])
next := ts(buffer[j])
return current < next
})
level.Debug(s.qm.logger).Log("msg", fmt.Sprintf("Sending %d samples to newer queues...", len(buffer)))
attempt:
for _, sample := range buffer {
if !newShards.enqueue(sample) {
continue attempt
}
}
close(done)
}()
return done
}

func (s *shards) runShard(ctx context.Context, shardID int, queue chan interface{}) {
func (s *shards) runShard(ctx context.Context, shardID int, queue chan interface{}, bufferCh chan []interface{}) {
defer func() {
if s.running.Dec() == 0 {
close(s.done)
close(s.buffer)
}
}()

Expand All @@ -1031,6 +1116,7 @@ func (s *shards) runShard(ctx context.Context, shardID int, queue chan interface

buf []byte
pendingData []prompb.TimeSeries
refIndex []uint64 // Stores the ref of pendingData (writeSample or writeExemplar) on the same index.
exemplarBuffer [][]prompb.Exemplar
)
totalPending := max
Expand All @@ -1040,6 +1126,7 @@ func (s *shards) runShard(ctx context.Context, shardID int, queue chan interface
}

pendingData = make([]prompb.TimeSeries, totalPending)
refIndex = make([]uint64, totalPending)

timer := time.NewTimer(time.Duration(s.qm.cfg.BatchSendDeadline))
stop := func() {
Expand Down Expand Up @@ -1067,6 +1154,21 @@ func (s *shards) runShard(ctx context.Context, shardID int, queue chan interface
s.exemplarsDroppedOnHardShutdown.Add(uint32(droppedExemplars))
return

case <-s.reshard:
var data []interface{}
// Accept those samples that might have entered the queue between sending the resharding
// signal and blocking of incoming samples at the time of creation of newer queues.
for point := range queue {
data = append(data, point)
}
if num := len(data); num > 0 {
s.qm.metrics.pendingSamples.Add(float64(num))
}
pending := timeseriesToData(nPending, pendingData, refIndex)
data = append(data, pending...)
bufferCh <- data
return

case sample, ok := <-queue:
if !ok {
if nPendingSamples > 0 || nPendingExemplars > 0 {
Expand All @@ -1088,6 +1190,7 @@ func (s *shards) runShard(ctx context.Context, shardID int, queue chan interface
pendingData[nPending].Labels = labelsToLabelsProto(d.seriesLabels, pendingData[nPending].Labels)
pendingData[nPending].Samples = sampleBuffer[nPendingSamples]
pendingData[nPending].Exemplars = nil
refIndex[nPending] = d.ref
nPendingSamples++
nPending++

Expand All @@ -1096,6 +1199,7 @@ func (s *shards) runShard(ctx context.Context, shardID int, queue chan interface
pendingData[nPending].Labels = labelsToLabelsProto(d.seriesLabels, pendingData[nPending].Labels)
pendingData[nPending].Samples = nil
pendingData[nPending].Exemplars = exemplarBuffer[nPendingExemplars]
refIndex[nPending] = d.ref
nPendingExemplars++
nPending++
}
Expand Down Expand Up @@ -1127,6 +1231,36 @@ func (s *shards) runShard(ctx context.Context, shardID int, queue chan interface
}
}

func timeseriesToData(max int, ts []prompb.TimeSeries, refIndex []uint64) []interface{} {
data := make([]interface{}, 0, len(ts))
for i, s := range ts {
if i >= max {
break
}
ref := refIndex[i]
labels := labelProtosToLabels(s.Labels)
if s.Samples != nil {
for j := range s.Samples {
data = append(data, writeSample{
ref: ref,
seriesLabels: labels,
sample: s.Samples[j],
})
}
}
if s.Exemplars != nil {
for j := range s.Samples {
data = append(data, writeExemplar{
ref: ref,
seriesLabels: labels,
exemplar: s.Exemplars[j],
})
}
}
}
return data
}

func (s *shards) sendSamples(ctx context.Context, samples []prompb.TimeSeries, sampleCount int, exemplarCount int, buf *[]byte) {
begin := time.Now()
err := s.sendSamplesWithBackoff(ctx, samples, sampleCount, exemplarCount, buf)
Expand Down
3 changes: 1 addition & 2 deletions storage/remote/queue_manager_test.go
Expand Up @@ -366,8 +366,7 @@ func TestReshard(t *testing.T) {
}()

for i := 1; i < len(samples)/config.DefaultQueueConfig.Capacity; i++ {
m.shards.stop()
m.shards.start(i)
m.reshard(i)
time.Sleep(100 * time.Millisecond)
}

Expand Down
4 changes: 2 additions & 2 deletions tsdb/wal/watcher_test.go
Expand Up @@ -55,8 +55,8 @@ type writeToMock struct {
seriesSegmentIndexes map[uint64]int
}

func (wtm *writeToMock) Append(s []record.RefSample) bool {
wtm.samplesAppended += len(s)
func (wtm *writeToMock) Append(samples []record.RefSample) bool {
wtm.samplesAppended += len(samples)
return true
}

Expand Down

0 comments on commit b650da3

Please sign in to comment.