Skip to content

Commit

Permalink
Merge pull request #10212 from influxdata/er-retention-index
Browse files Browse the repository at this point in the history
Ensure orhpaned series cleaned up with shard drop
  • Loading branch information
e-dard authored Aug 21, 2018
2 parents ca1b780 + f52de2d commit 61a11c1
Show file tree
Hide file tree
Showing 12 changed files with 62 additions and 22 deletions.
2 changes: 1 addition & 1 deletion tsdb/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ const (
DefaultEngine = "tsm1"

// DefaultIndex is the default index for new shards
DefaultIndex = "inmem"
DefaultIndex = InmemIndexName

// tsdb/engine/wal configuration options

Expand Down
4 changes: 2 additions & 2 deletions tsdb/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,12 +61,12 @@ func TestConfig_Validate_Error(t *testing.T) {
t.Errorf("unexpected error: %s", err)
}

c.Index = "inmem"
c.Index = tsdb.InmemIndexName
if err := c.Validate(); err != nil {
t.Error(err)
}

c.Index = "tsi1"
c.Index = tsdb.TSI1IndexName
if err := c.Validate(); err != nil {
t.Error(err)
}
Expand Down
7 changes: 3 additions & 4 deletions tsdb/engine/tsm1/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -1480,7 +1480,6 @@ func (e *Engine) DeleteSeriesRangeWithPredicate(itr tsdb.SeriesIterator, predica
// does not update the index or disable compactions. This should mainly be called by DeleteSeriesRange
// and not directly.
func (e *Engine) deleteSeriesRange(seriesKeys [][]byte, min, max int64) error {
ts := time.Now().UTC().UnixNano()
if len(seriesKeys) == 0 {
return nil
}
Expand Down Expand Up @@ -1692,7 +1691,7 @@ func (e *Engine) deleteSeriesRange(seriesKeys [][]byte, min, max int64) error {
// the global index (all shards).
if index, ok := e.index.(*inmem.ShardIndex); ok {
key := models.MakeKey(name, tags)
if e := index.Index.DropSeriesGlobal(key, ts); e != nil {
if e := index.Index.DropSeriesGlobal(key); e != nil {
err = e
}
}
Expand Down Expand Up @@ -2354,7 +2353,7 @@ func (e *Engine) createCallIterator(ctx context.Context, measurement string, cal
tagSets []*query.TagSet
err error
)
if e.index.Type() == "inmem" {
if e.index.Type() == tsdb.InmemIndexName {
ts := e.index.(indexTagSets)
tagSets, err = ts.TagSets([]byte(measurement), opt)
} else {
Expand Down Expand Up @@ -2434,7 +2433,7 @@ func (e *Engine) createVarRefIterator(ctx context.Context, measurement string, o
tagSets []*query.TagSet
err error
)
if e.index.Type() == "inmem" {
if e.index.Type() == tsdb.InmemIndexName {
ts := e.index.(indexTagSets)
tagSets, err = ts.TagSets([]byte(measurement), opt)
} else {
Expand Down
2 changes: 1 addition & 1 deletion tsdb/engine/tsm1/engine_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2352,7 +2352,7 @@ func NewEngine(index string) (*Engine, error) {

opt := tsdb.NewEngineOptions()
opt.IndexVersion = index
if index == "inmem" {
if index == tsdb.InmemIndexName {
opt.InmemIndex = inmem.NewIndex(db, sfile)
}
// Initialise series id sets. Need to do this as it's normally done at the
Expand Down
13 changes: 11 additions & 2 deletions tsdb/index.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,12 @@ import (
"go.uber.org/zap"
)

// Available index types.
const (
InmemIndexName = "inmem"
TSI1IndexName = "tsi1"
)

type Index interface {
Open() error
Close() error
Expand All @@ -35,6 +41,9 @@ type Index interface {
DropSeries(seriesID uint64, key []byte, cascade bool) error
DropMeasurementIfSeriesNotExist(name []byte) error

// Used to clean up series in inmem index that were dropped with a shard.
DropSeriesGlobal(key []byte) error

MeasurementsSketches() (estimator.Sketch, estimator.Sketch, error)
SeriesN() int64
SeriesSketches() (estimator.Sketch, estimator.Sketch, error)
Expand Down Expand Up @@ -1208,7 +1217,7 @@ type IndexSet struct {
// HasInmemIndex returns true if any in-memory index is in use.
func (is IndexSet) HasInmemIndex() bool {
for _, idx := range is.Indexes {
if idx.Type() == "inmem" {
if idx.Type() == InmemIndexName {
return true
}
}
Expand Down Expand Up @@ -2673,7 +2682,7 @@ func NewIndex(id uint64, database, path string, seriesIDSet *SeriesIDSet, sfile
} else if err != nil {
return nil, err
} else if err == nil {
format = "tsi1"
format = TSI1IndexName
}

// Lookup index by format.
Expand Down
4 changes: 2 additions & 2 deletions tsdb/index/inmem/inmem.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ import (
)

// IndexName is the name of this index.
const IndexName = "inmem"
const IndexName = tsdb.InmemIndexName

func init() {
tsdb.NewInmemIndex = func(name string, sfile *tsdb.SeriesFile) (interface{}, error) { return NewIndex(name, sfile), nil }
Expand Down Expand Up @@ -773,7 +773,7 @@ func (i *Index) DropMeasurementIfSeriesNotExist(name []byte) error {
}

// DropSeriesGlobal removes the series key and its tags from the index.
func (i *Index) DropSeriesGlobal(key []byte, ts int64) error {
func (i *Index) DropSeriesGlobal(key []byte) error {
if key == nil {
return nil
}
Expand Down
5 changes: 4 additions & 1 deletion tsdb/index/tsi1/index.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ import (
)

// IndexName is the name of the index.
const IndexName = "tsi1"
const IndexName = tsdb.TSI1IndexName

// ErrCompactionInterrupted is returned if compactions are disabled or
// an index is closed while a compaction is occurring.
Expand Down Expand Up @@ -643,6 +643,9 @@ func (i *Index) DropSeries(seriesID uint64, key []byte, cascade bool) error {
return nil
}

// DropSeriesGlobal is a no-op on the tsi1 index.
func (i *Index) DropSeriesGlobal(key []byte) error { return nil }

// DropMeasurementIfSeriesNotExist drops a measurement only if there are no more
// series for the measurment.
func (i *Index) DropMeasurementIfSeriesNotExist(name []byte) error {
Expand Down
2 changes: 1 addition & 1 deletion tsdb/index_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -507,7 +507,7 @@ func BenchmarkIndexSet_TagSets(b *testing.B) {
// TODO(edd): this is somewhat awkward. We should unify this difference somewhere higher
// up than the engine. I don't want to open an engine do a benchmark on
// different index implementations.
if indexType == "inmem" {
if indexType == tsdb.InmemIndexName {
ts = func() ([]*query.TagSet, error) {
return idx.Index.(indexTagSets).TagSets(name, opt)
}
Expand Down
2 changes: 1 addition & 1 deletion tsdb/shard_internal_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -231,7 +231,7 @@ func NewTempShard(index string) *TempShard {
opt := NewEngineOptions()
opt.IndexVersion = index
opt.Config.WALDir = filepath.Join(dir, "wal")
if index == "inmem" {
if index == InmemIndexName {
opt.InmemIndex, _ = NewInmemIndex(path.Base(dir), sfile)
}

Expand Down
2 changes: 1 addition & 1 deletion tsdb/shard_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2060,7 +2060,7 @@ func NewShards(index string, n int) Shards {
opt := tsdb.NewEngineOptions()
opt.IndexVersion = index
opt.Config.WALDir = filepath.Join(dir, "wal")
if index == "inmem" {
if index == tsdb.InmemIndexName {
opt.InmemIndex = inmem.NewIndex(filepath.Base(dir), sfile.SeriesFile)
}

Expand Down
39 changes: 34 additions & 5 deletions tsdb/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -352,7 +352,7 @@ func (s *Store) loadShards() error {

// Existing shards should continue to use inmem index.
if _, err := os.Stat(filepath.Join(path, "index")); os.IsNotExist(err) {
opt.IndexVersion = "inmem"
opt.IndexVersion = InmemIndexName
}

// Open engine.
Expand Down Expand Up @@ -678,9 +678,6 @@ func (s *Store) DeleteShard(shardID uint64) error {
ss := index.SeriesIDSet()

db := sh.Database()
if err := sh.Close(); err != nil {
return err
}

// Determine if the shard contained any series that are not present in any
// other shards in the database.
Expand All @@ -701,10 +698,42 @@ func (s *Store) DeleteShard(shardID uint64) error {
if ss.Cardinality() > 0 {
sfile := s.seriesFile(db)
if sfile != nil {
// If the inmem index is in use, then the series being removed from the
// series file will also need to be removed from the index.
if index.Type() == InmemIndexName {
var keyBuf []byte // Series key buffer.
var name []byte
var tagsBuf models.Tags // Buffer for tags container.
var err error

ss.ForEach(func(id uint64) {
skey := sfile.SeriesKey(id) // Series File series key
if skey == nil {
return
}

name, tagsBuf = ParseSeriesKeyInto(skey, tagsBuf)
keyBuf = models.AppendMakeKey(keyBuf, name, tagsBuf)
if err = index.DropSeriesGlobal(keyBuf); err != nil {
return
}
})

if err != nil {
return err
}
}

ss.ForEach(func(id uint64) {
sfile.DeleteSeriesID(id)
})
}

}

// Close the shard.
if err := sh.Close(); err != nil {
return err
}

// Remove the on-disk shard data.
Expand Down Expand Up @@ -1784,7 +1813,7 @@ func (s *Store) monitorShards() {

s.mu.RLock()
shards := s.filterShards(func(sh *Shard) bool {
return sh.IndexType() == "inmem"
return sh.IndexType() == InmemIndexName
})
s.mu.RUnlock()

Expand Down
2 changes: 1 addition & 1 deletion tsdb/store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -561,7 +561,7 @@ func TestStore_BackupRestoreShard(t *testing.T) {
}

for _, index := range tsdb.RegisteredIndexes() {
if index == "tsi1" {
if index == tsdb.TSI1IndexName {
t.Skip("Skipping failing test for tsi1")
}

Expand Down

0 comments on commit 61a11c1

Please sign in to comment.