Skip to content

Commit

Permalink
Reduce resharding impact by redirecting data to new shards with repla…
Browse files Browse the repository at this point in the history
…cement

Signed-off-by: Harkishen-Singh <harkishensingh@hotmail.com>
  • Loading branch information
Harkishen-Singh committed Apr 4, 2021
1 parent ae73a62 commit bb4b821
Show file tree
Hide file tree
Showing 3 changed files with 106 additions and 24 deletions.
123 changes: 103 additions & 20 deletions storage/remote/queue_manager.go
Expand Up @@ -15,7 +15,9 @@ package remote

import (
"context"
"fmt"
"math"
"sort"
"strconv"
"sync"
"time"
Expand Down Expand Up @@ -312,6 +314,8 @@ type QueueManager struct {
quit chan struct{}
wg sync.WaitGroup

queueMux sync.RWMutex

samplesIn, samplesDropped, samplesOut, samplesOutDuration *ewmaRate

metrics *queueManagerMetrics
Expand Down Expand Up @@ -444,6 +448,10 @@ func (t *QueueManager) sendMetadataWithBackoff(ctx context.Context, metadata []p
// Append queues a sample to be sent to the remote storage. Blocks until all samples are
// enqueued on their shards or a shutdown signal is received.
func (t *QueueManager) Append(samples []record.RefSample) bool {
// Even though we are writing to the queue, we take the read lock. This is to ensure concurrent Appends,
// as the write lock in case of queue is for transfering samples to new queues.
t.queueMux.RLock()
defer t.queueMux.RUnlock()
outer:
for _, s := range samples {
t.seriesMtx.Lock()
Expand All @@ -469,6 +477,7 @@ outer:

if t.shards.enqueue(s.Ref, sample{
labels: lbls,
ref: s.Ref,
t: s.T,
v: s.V,
}) {
Expand Down Expand Up @@ -766,17 +775,42 @@ func (t *QueueManager) reshardLoop() {
for {
select {
case numShards := <-t.reshardChan:
// We start the newShards after we have stopped (the therefore completely
// flushed) the oldShards, to guarantee we only every deliver samples in
// order.
t.shards.stop()
t.shards.start(numShards)
t.reshard(numShards)
case <-t.quit:
return
}
}
}

// reshard takes a write lock on queueMux and signals the queues for resharding. It bufferizes the old
// samples and enqueues them before allowing Append() so that samples in order of time is guaranteed.
func (t *QueueManager) reshard(numNewShards int) {
// Block incoming samples and wait until the exxisting samples are transfered
// to the newly created shards.
t.queueMux.Lock()
defer t.queueMux.Unlock() // Resume accepting samples after the old samples have been transfered to ensure orderedness.
if numQueues := len(t.shards.queues); numQueues > 0 {
// Send reshard signal only if old shards are exising.
t.shards.reshard <- struct{}{}
}
// We start the new shards parallel to the old shards. These new shards
// are filled with existing samples in old shards while blocking the
// incoming samples through Append using the queueMux.Lock(). Filling of existing samples is pretty
// quick, so loss in time is negligible.
//
// Once the existing samples are filled, the old shards are replaced by
// the new shards and then incoming samples are consumed. This is done
// to guarantee that we deliver samples in order only.
// Create new shards.
newShards := t.newShards()
newShards.start(numNewShards)
complete := t.shards.bufferizeOldSamples(newShards)
t.shards.stop()
<-complete
// Replace old shards with the new ones.
t.shards = newShards
}

func (t *QueueManager) newShards() *shards {
s := &shards{
qm: t,
Expand All @@ -787,19 +821,23 @@ func (t *QueueManager) newShards() *shards {

type sample struct {
labels labels.Labels
ref uint64
t int64
v float64
}

type shards struct {
mtx sync.RWMutex // With the WAL, this is never actually contended.
num int // Number of shards running.

qm *QueueManager
queues []chan sample
buffer chan []sample

// Emulate a wait group with a channel and an atomic int, as you
// cannot select on a wait group.
done chan struct{}
reshard chan struct{}
running atomic.Int32

// Soft shutdown context will prevent new enqueues and deadlocks.
Expand All @@ -811,7 +849,9 @@ type shards struct {
droppedOnHardShutdown atomic.Uint32
}

// start the shards; must be called before any call to enqueue.
// start the shards/new queues. If the queues are already existing, like in case of
// resharding, new queues are formed parallel to the existing ones which are later
// replaced.
func (s *shards) start(n int) {
s.mtx.Lock()
defer s.mtx.Unlock()
Expand All @@ -823,38 +863,34 @@ func (s *shards) start(n int) {
for i := 0; i < n; i++ {
newQueues[i] = make(chan sample, s.qm.cfg.Capacity)
}

s.buffer = make(chan []sample)
s.queues = newQueues

var hardShutdownCtx context.Context
hardShutdownCtx, s.hardShutdown = context.WithCancel(context.Background())
s.softShutdown = make(chan struct{})
s.running.Store(int32(n))
s.reshard = make(chan struct{})
s.done = make(chan struct{})
s.droppedOnHardShutdown.Store(0)
s.num = n
for i := 0; i < n; i++ {
go s.runShard(hardShutdownCtx, i, newQueues[i])
}
}

// stop the shards; subsequent call to enqueue will return false.
// stop existing shards and triggers flushing of samples that are existing in the queue.
func (s *shards) stop() {
// Attempt a clean shutdown, but only wait flushDeadline for all the shards
// to cleanly exit. As we're doing RPCs, enqueue can block indefinitely.
// We must be able so call stop concurrently, hence we can only take the
// RLock here.
s.mtx.RLock()
close(s.softShutdown)
s.mtx.RUnlock()

// Enqueue should now be unblocked, so we can take the write lock. This
// also ensures we don't race with writes to the queues, and get a panic:
// send on closed channel.
s.mtx.Lock()
defer s.mtx.Unlock()
for _, queue := range s.queues {
close(queue)

for i := range s.queues {
close(s.queues[i])
}

select {
case <-s.done:
return
Expand All @@ -869,8 +905,7 @@ func (s *shards) stop() {
}
}

// enqueue a sample. If we are currently in the process of shutting down or resharding,
// will return false; in this case, you should back off and retry.
// enqueue a sample.
func (s *shards) enqueue(ref uint64, sample sample) bool {
s.mtx.RLock()
defer s.mtx.RUnlock()
Expand All @@ -891,10 +926,40 @@ func (s *shards) enqueue(ref uint64, sample sample) bool {
}
}

// transferToShards transfers the samples in buffer of the old shards to the new shards
// passed as an argument.
func (s *shards) bufferizeOldSamples(newShards *shards) chan struct{} {
var (
buffer []sample
done = make(chan struct{})
)
go func() {
for i := 0; i < s.num; i++ {
buf := <-s.buffer
if len(buf) == 0 {
continue
}
buffer = append(buffer, buf...)
}
sort.SliceStable(buffer, func(i, j int) bool {
return buffer[i].t < buffer[j].t
})
attempt:
for _, sample := range buffer {
if !newShards.enqueue(sample.ref, sample) {
continue attempt
}
}
close(done)
}()
return done
}

func (s *shards) runShard(ctx context.Context, shardID int, queue chan sample) {
defer func() {
if s.running.Dec() == 0 {
close(s.done)
close(s.buffer)
}
}()

Expand Down Expand Up @@ -932,6 +997,24 @@ func (s *shards) runShard(ctx context.Context, shardID int, queue chan sample) {
s.droppedOnHardShutdown.Add(uint32(droppedSamples))
return

case <-s.reshard:
var samples []sample
for {
sample, ok := <-queue
// Accept those samples that might have entered the queue between sending the resharding
// signal and blocking of incoming samples at the time of creation of newer queues.
if !ok {
break
}
samples = append(samples, sample)
}
if samplesCount := len(samples); samplesCount > 0 {
level.Info(s.qm.logger).Log("msg", fmt.Sprintf("Sending %d samples to newer queues.", samplesCount))
s.qm.metrics.pendingSamples.Add(float64(samplesCount))
}
s.buffer <- samples
return

case sample, ok := <-queue:
if !ok {
if nPending > 0 {
Expand Down
3 changes: 1 addition & 2 deletions storage/remote/queue_manager_test.go
Expand Up @@ -317,8 +317,7 @@ func TestReshard(t *testing.T) {
}()

for i := 1; i < len(samples)/config.DefaultQueueConfig.Capacity; i++ {
m.shards.stop()
m.shards.start(i)
m.reshard(i)
time.Sleep(100 * time.Millisecond)
}

Expand Down
4 changes: 2 additions & 2 deletions tsdb/wal/watcher_test.go
Expand Up @@ -54,8 +54,8 @@ type writeToMock struct {
seriesSegmentIndexes map[uint64]int
}

func (wtm *writeToMock) Append(s []record.RefSample) bool {
wtm.samplesAppended += len(s)
func (wtm *writeToMock) Append(samples []record.RefSample) bool {
wtm.samplesAppended += len(samples)
return true
}

Expand Down

0 comments on commit bb4b821

Please sign in to comment.