Skip to content

Commit

Permalink
Prefer close channels than sending nil.
Browse files Browse the repository at this point in the history
Signed-off-by: Harkishen-Singh <harkishensingh@hotmail.com>
  • Loading branch information
Harkishen-Singh committed Nov 5, 2020
1 parent 91cef84 commit 21e6211
Showing 1 changed file with 51 additions and 44 deletions.
95 changes: 51 additions & 44 deletions storage/remote/queue_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -366,8 +366,8 @@ outer:
return true
}

// RedirectPrevSamples redirects the old samples in the queue to the new queues.
func (t *QueueManager) RedirectPrevSamples(samples []sample) {
// redirectPrevSamples redirects samples in the old queues to new queues.
func (t *QueueManager) redirectPrevSamples(samples []sample) {
outer:
for _, s := range samples {
backoff := t.cfg.MinBackoff
Expand Down Expand Up @@ -700,9 +700,9 @@ type shards struct {

qm *QueueManager
queues []chan sample
queueBuf, newQueueBuf *[]chan []sample
queueBuf, newQueueBuf []chan []sample

newQueuesAvailable atomic.Bool
blockIncomingSamples atomic.Bool // Used to block incoming samples in Append since the older samples need to be redirected to new queues in order to maintain sortedness.

// Emulate a wait group with a channel and an atomic int, as you
// cannot select on a wait group.
Expand Down Expand Up @@ -733,14 +733,11 @@ func (s *shards) start(n int) {
buffer[i] = make(chan []sample, s.qm.cfg.Capacity)
}

if s.queues != nil {
// newQueues consume samples in the existing queues followed by the. Once
// the samples from existing queues are consumed, they will be replaced by
// these queues which will then ingest the incoming samples.
s.newQueuesAvailable.Store(true)
s.newQueueBuf = &buffer
if len(s.queues) != 0 {
s.blockIncomingSamples.Store(true)
s.newQueueBuf = buffer
} else {
s.queueBuf = &buffer
s.queueBuf = buffer
}
s.queues = newQueues

Expand All @@ -761,7 +758,7 @@ func (s *shards) stop() {
for _, queue := range s.queues {
close(queue)
}
s.flushRemaining(*s.queueBuf)
s.flushRemaining(s.queueBuf)
}

// flushRemaining flushes the remaining samples from the buffer that were in pending state
Expand Down Expand Up @@ -820,55 +817,55 @@ func (s *shards) bufferPendingSamples() {
wg sync.WaitGroup
bufLock sync.Mutex
buffer []sample
oldShardsCount = len(*s.queueBuf)
oldShardsCount = len(s.queueBuf)
)
if oldShardsCount == 0 {
return
}
wg.Add(oldShardsCount)
for i := 0; i < oldShardsCount; i++ {
go func(i int) {
buf := *s.queueBuf
temp := <-buf[i]
if temp == nil {
wg.Done()
defer wg.Done()
temp, ok := <-s.queueBuf[i]
if !ok {
return
}
bufLock.Lock()
buffer = append(buffer, temp...)
bufLock.Unlock()
wg.Done()
}(i)
}
wg.Wait()
sort.SliceStable(buffer, func(i, j int) bool {
return buffer[i].t < buffer[j].t
})
s.qm.RedirectPrevSamples(buffer)
s.qm.redirectPrevSamples(buffer)
}

// updateQueues replaces the old queues with the new ones. Post replacement,
// updateBuffers replaces the old queue buffers with the new ones. Post replacement,
// the blocking of incoming samples is released by storing false in newQueuesAvailable.
func (s *shards) updateBuffers() {
s.mtx.Lock()
defer s.mtx.Unlock()
s.queueBuf = s.newQueueBuf
s.newQueueBuf = nil
s.newQueuesAvailable.Store(false)
s.blockIncomingSamples.Store(false)
}

// enqueue a sample. If we are currently in the process of shutting down or resharding,
// will return false; in this case, you should back off and retry.
func (s *shards) enqueue(ref uint64, sample sample, isPending bool) bool {
// will return false; in this case, you should back off and retry. The force signifies
// that the incoming samples should be accepted irrespective of blocking is enabled.
// This is used in case of redirecting samples as we just want to block the new samples,
// not the ones that were already in the queues.
func (s *shards) enqueue(ref uint64, sample sample, force bool) bool {
s.mtx.RLock()
defer s.mtx.RUnlock()

if isPending {
if force {
shard := uint64(ref) % uint64(len(s.queues))
s.queues[shard] <- sample
return true
}
if s.newQueuesAvailable.Load() {
if s.blockIncomingSamples.Load() {
return false
}

Expand Down Expand Up @@ -920,26 +917,39 @@ func (s *shards) runShard(ctx context.Context, shardID int, queue chan sample) {
return

case <-s.reshard:
select {
case sample := <-queue:
// Capture the remaining (in case) sample from the queue.
pendingSamples[nPending].Labels = labelsToLabelsProto(sample.labels, pendingSamples[nPending].Labels)
pendingSamples[nPending].Samples[0].Timestamp = sample.t
pendingSamples[nPending].Samples[0].Value = sample.v
default:
for {
if s.blockIncomingSamples.Load() {
// The incoming samples will now be blocked for enty into the new shards,
// so its sample to stop listening samples.
break
}
select {
case sample, ok := <-queue:
// Accept those samples that might have entered the queue between sending the resharding
// signal and blocking of incoming samples at the time of creation of newer queues.
if !ok {
break
}
pendingSamples[nPending].Labels = labelsToLabelsProto(sample.labels, pendingSamples[nPending].Labels)
pendingSamples[nPending].Samples[0].Timestamp = sample.t
pendingSamples[nPending].Samples[0].Value = sample.v
nPending++
default:
}
}
if nPending > 0 {
level.Info(s.qm.logger).Log("msg", fmt.Sprintf("Sending %d samples to newer queues.", nPending))
s.qm.metrics.pendingSamples.Add(float64(nPending))
s.queueBuf[shardID] <- getSamples(nPending, pendingSamples)
}
close(queue)
level.Info(s.qm.logger).Log("msg", fmt.Sprintf("Sending %d samples to newer queues.", nPending))
s.qm.metrics.pendingSamples.Add(float64(nPending))
buffers := *s.queueBuf
buffers[shardID] <- getSamples(nPending, pendingSamples)
return

case sample, ok := <-queue:
if !ok {
level.Info(s.qm.logger).Log("msg", fmt.Sprintf("Sending %d samples to newer queues.", nPending))
buffers := *s.queueBuf
buffers[shardID] <- getSamples(nPending, pendingSamples)
if nPending > 0 {
level.Info(s.qm.logger).Log("msg", fmt.Sprintf("Sending %d samples to newer queues.", nPending))
s.queueBuf[shardID] <- getSamples(nPending, pendingSamples)
}
return
}

Expand Down Expand Up @@ -1101,9 +1111,6 @@ func allocateTimeSeries(capacity int) []prompb.TimeSeries {
}

func getSamples(nPending int, ts []prompb.TimeSeries) []sample {
if nPending == 0 {
return nil
}
samples := make([]sample, len(ts))
for i := range ts {
samples[i].labels = labelProtosToLabels(ts[i].Labels)
Expand Down

0 comments on commit 21e6211

Please sign in to comment.