Skip to content

Commit

Permalink
Merge branch 'main' into hotfix/qfe-query-split-handle-parse-errors
Browse files Browse the repository at this point in the history
  • Loading branch information
matej-g committed Mar 2, 2023
2 parents fa3546b + 90a967d commit 1dd9597
Show file tree
Hide file tree
Showing 2 changed files with 40 additions and 10 deletions.
44 changes: 37 additions & 7 deletions pkg/receive/multitsdb.go
Original file line number Diff line number Diff line change
Expand Up @@ -273,9 +273,6 @@ func (t *MultiTSDB) Prune(ctx context.Context) error {
return nil
}

t.mtx.Lock()
defer t.mtx.Unlock()

var (
wg sync.WaitGroup
merr errutil.SyncMultiError
Expand All @@ -284,6 +281,7 @@ func (t *MultiTSDB) Prune(ctx context.Context) error {
pmtx sync.Mutex
)

t.mtx.RLock()
for tenantID, tenantInstance := range t.tenants {
wg.Add(1)
go func(tenantID string, tenantInstance *tenant) {
Expand All @@ -303,8 +301,16 @@ func (t *MultiTSDB) Prune(ctx context.Context) error {
}(tenantID, tenantInstance)
}
wg.Wait()
t.mtx.RUnlock()

t.mtx.Lock()
defer t.mtx.Unlock()
for _, tenantID := range prunedTenants {
// Check that the tenant hasn't been reinitialized in-between locks.
if t.tenants[tenantID].readyStorage().get() != nil {
continue
}

level.Info(t.logger).Log("msg", "Pruned tenant", "tenant", tenantID)
delete(t.tenants, tenantID)
}
Expand All @@ -315,18 +321,36 @@ func (t *MultiTSDB) Prune(ctx context.Context) error {
// pruneTSDB removes a TSDB if its past the retention period.
// It compacts the TSDB head, sends all remaining blocks to S3 and removes the TSDB from disk.
func (t *MultiTSDB) pruneTSDB(ctx context.Context, logger log.Logger, tenantInstance *tenant) (bool, error) {
tenantTSDB := tenantInstance.readyStorage().get()
tenantTSDB := tenantInstance.readyStorage()
if tenantTSDB == nil {
return false, nil
}
tdb := tenantTSDB.db
tenantTSDB.mtx.RLock()
if tenantTSDB.a == nil || tenantTSDB.a.db == nil {
tenantTSDB.mtx.RUnlock()
return false, nil
}

tdb := tenantTSDB.a.db
head := tdb.Head()
if head.MaxTime() < 0 {
tenantTSDB.mtx.RUnlock()
return false, nil
}

sinceLastAppendMillis := time.Since(time.UnixMilli(head.MaxTime())).Milliseconds()
compactThreshold := int64(1.5 * float64(t.tsdbOpts.MaxBlockDuration))
if sinceLastAppendMillis <= compactThreshold {
tenantTSDB.mtx.RUnlock()
return false, nil
}
tenantTSDB.mtx.RUnlock()

// Acquire a write lock and check that no writes have occurred in-between locks.
tenantTSDB.mtx.Lock()
defer tenantTSDB.mtx.Unlock()

sinceLastAppendMillis = time.Since(time.UnixMilli(head.MaxTime())).Milliseconds()
if sinceLastAppendMillis <= compactThreshold {
return false, nil
}
Expand Down Expand Up @@ -356,10 +380,12 @@ func (t *MultiTSDB) pruneTSDB(ctx context.Context, logger log.Logger, tenantInst
return false, err
}

if err := os.RemoveAll(tenantTSDB.db.Dir()); err != nil {
if err := os.RemoveAll(tdb.Dir()); err != nil {
return false, err
}

tenantInstance.readyS.set(nil)

return true, nil
}

Expand Down Expand Up @@ -604,7 +630,11 @@ func (s *ReadyStorage) Set(db *tsdb.DB) {
s.mtx.Lock()
defer s.mtx.Unlock()

s.a = &adapter{db: db}
s.set(&adapter{db: db})
}

func (s *ReadyStorage) set(a *adapter) {
s.a = a
}

// Get the storage.
Expand Down
6 changes: 3 additions & 3 deletions pkg/receive/multitsdb_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -419,9 +419,9 @@ func TestMultiTSDBPrune(t *testing.T) {
defer func() { testutil.Ok(t, m.Close()) }()

for i := 0; i < 100; i++ {
testutil.Ok(t, appendSample(m, "foo", time.UnixMilli(int64(10+i))))
testutil.Ok(t, appendSample(m, "bar", time.Now().Add(-4*time.Hour)))
testutil.Ok(t, appendSample(m, "baz", time.Now().Add(time.Duration(i)*time.Second)))
testutil.Ok(t, appendSample(m, "deleted-tenant", time.UnixMilli(int64(10+i))))
testutil.Ok(t, appendSample(m, "compacted-tenant", time.Now().Add(-4*time.Hour)))
testutil.Ok(t, appendSample(m, "active-tenant", time.Now().Add(time.Duration(i)*time.Second)))
}
testutil.Equals(t, 3, len(m.TSDBLocalClients()))

Expand Down

0 comments on commit 1dd9597

Please sign in to comment.