Skip to content

Commit

Permalink
Optimize deletes in tsi
Browse files Browse the repository at this point in the history
The DropSeries code path ended up creating a MeasurementSeriesIterator
for each dropped series, this was too expensive just to see if a
series exists.

This adds a HasSeries func and fixes and issue where TSI files were
compacted while an iterator was still in use causing a panic.
  • Loading branch information
jwilder committed Nov 9, 2017
1 parent ab8284e commit a27f6d0
Show file tree
Hide file tree
Showing 12 changed files with 103 additions and 18 deletions.
2 changes: 2 additions & 0 deletions tsdb/engine/tsm1/engine_test.go
Expand Up @@ -1109,3 +1109,5 @@ func (itr *seriesIterator) Next() tsdb.SeriesElem {
itr.keys = itr.keys[1:]
return s
}

func (itr *seriesIterator) Close() {}
1 change: 1 addition & 0 deletions tsdb/index.go
Expand Up @@ -76,6 +76,7 @@ type SeriesElem interface {
// SeriesIterator represents a iterator over a list of series.
type SeriesIterator interface {
Next() SeriesElem
Close()
}

// IndexFormat represents the format for an index.
Expand Down
2 changes: 2 additions & 0 deletions tsdb/index/inmem/inmem.go
Expand Up @@ -1060,3 +1060,5 @@ func (itr *seriesIterator) Next() tsdb.SeriesElem {
itr.keys = itr.keys[1:]
return &itr.elem
}

func (itr *seriesIterator) Close() {}
4 changes: 4 additions & 0 deletions tsdb/index/tsi1/file_set_test.go
Expand Up @@ -30,6 +30,7 @@ func TestFileSet_SeriesIterator(t *testing.T) {
if itr == nil {
t.Fatal("expected iterator")
}
defer itr.Close()

if e := itr.Next(); string(e.Name()) != `cpu` || e.Tags().String() != `[{region east}]` {
t.Fatalf("unexpected series: %s/%s", e.Name(), e.Tags().String())
Expand Down Expand Up @@ -60,6 +61,7 @@ func TestFileSet_SeriesIterator(t *testing.T) {
if itr == nil {
t.Fatal("expected iterator")
}
defer itr.Close()

if e := itr.Next(); string(e.Name()) != `cpu` || e.Tags().String() != `[{region east}]` {
t.Fatalf("unexpected series: %s/%s", e.Name(), e.Tags().String())
Expand Down Expand Up @@ -100,6 +102,7 @@ func TestFileSet_MeasurementSeriesIterator(t *testing.T) {
if itr == nil {
t.Fatal("expected iterator")
}
defer itr.Close()

if e := itr.Next(); string(e.Name()) != `cpu` || e.Tags().String() != `[{region east}]` {
t.Fatalf("unexpected series: %s/%s", e.Name(), e.Tags().String())
Expand Down Expand Up @@ -127,6 +130,7 @@ func TestFileSet_MeasurementSeriesIterator(t *testing.T) {
if itr == nil {
t.Fatalf("expected iterator")
}
defer itr.Close()

if e := itr.Next(); string(e.Name()) != `cpu` || e.Tags().String() != `[{region east}]` {
t.Fatalf("unexpected series: %s/%s", e.Name(), e.Tags().String())
Expand Down
9 changes: 4 additions & 5 deletions tsdb/index/tsi1/index.go
Expand Up @@ -467,6 +467,8 @@ func (i *Index) DropMeasurement(name []byte) error {

// Delete all series in measurement.
if sitr := fs.MeasurementSeriesIterator(name); sitr != nil {
defer sitr.Close()

for s := sitr.Next(); s != nil; s = sitr.Next() {
if !s.Deleted() {
if err := func() error {
Expand Down Expand Up @@ -575,11 +577,8 @@ func (i *Index) DropSeries(key []byte, ts int64) error {
fs := i.retainFileSet()
defer fs.Release()

// Check if that was the last series for the measurement in the entire index.
itr := fs.MeasurementSeriesIterator(mname)
if itr == nil {
return nil
} else if e := itr.Next(); e != nil {
mm := fs.Measurement(mname)
if mm == nil || mm.HasSeries() {
return nil
}

Expand Down
3 changes: 2 additions & 1 deletion tsdb/index/tsi1/index_file.go
Expand Up @@ -251,7 +251,7 @@ func (f *IndexFile) TagKeySeriesIterator(name, key []byte) tsdb.SeriesIterator {
var itrs []tsdb.SeriesIterator
for ve := vitr.Next(); ve != nil; ve = vitr.Next() {
sitr := &rawSeriesIDIterator{data: ve.(*TagBlockValueElem).series.data}
itrs = append(itrs, newSeriesDecodeIterator(&f.sblk, sitr))
itrs = append(itrs, newSeriesDecodeIterator(f, &f.sblk, sitr))
}

return MergeSeriesIterators(itrs...)
Expand All @@ -273,6 +273,7 @@ func (f *IndexFile) TagValueSeriesIterator(name, key, value []byte) tsdb.SeriesI

// Create an iterator over value's series.
return newSeriesDecodeIterator(
f,
&f.sblk,
&rawSeriesIDIterator{
n: ve.(*TagBlockValueElem).series.n,
Expand Down
30 changes: 24 additions & 6 deletions tsdb/index/tsi1/log_file.go
Expand Up @@ -267,7 +267,7 @@ func (f *LogFile) TagKeySeriesIterator(name, key []byte) tsdb.SeriesIterator {
if len(tv.series) == 0 {
continue
}
itrs = append(itrs, newLogSeriesIterator(tv.series))
itrs = append(itrs, newLogSeriesIterator(f, tv.series))
}

return MergeSeriesIterators(itrs...)
Expand Down Expand Up @@ -383,7 +383,7 @@ func (f *LogFile) TagValueSeriesIterator(name, key, value []byte) tsdb.SeriesIte
return nil
}

return newLogSeriesIterator(tv.series)
return newLogSeriesIterator(f, tv.series)
}

// MeasurementN returns the total number of measurements.
Expand Down Expand Up @@ -753,7 +753,8 @@ func (f *LogFile) SeriesIterator() tsdb.SeriesIterator {
if len(series) == 0 {
return nil
}
return &logSeriesIterator{series: series}
f.Retain()
return &logSeriesIterator{f: f, series: series}
}

// createMeasurementIfNotExists returns a measurement by name.
Expand Down Expand Up @@ -792,7 +793,7 @@ func (f *LogFile) MeasurementSeriesIterator(name []byte) tsdb.SeriesIterator {
if mm == nil || len(mm.series) == 0 {
return nil
}
return newLogSeriesIterator(mm.series)
return newLogSeriesIterator(f, mm.series)
}

// CompactTo compacts the log file and writes it to w.
Expand Down Expand Up @@ -1290,6 +1291,17 @@ type logMeasurement struct {

func (m *logMeasurement) Name() []byte { return m.name }
func (m *logMeasurement) Deleted() bool { return m.deleted }
func (m *logMeasurement) HasSeries() bool {
if m.deleted {
return false
}
for _, v := range m.series {
if !v.deleted {
return true
}
}
return false
}

func (m *logMeasurement) createTagSetIfNotExists(key []byte) logTagKey {
ts, ok := m.tagSet[string(key)]
Expand Down Expand Up @@ -1432,17 +1444,19 @@ func (itr *logTagValueIterator) Next() (e TagValueElem) {

// logSeriesIterator represents an iterator over a slice of series.
type logSeriesIterator struct {
f *LogFile
series logSeries
}

// newLogSeriesIterator returns a new instance of logSeriesIterator.
// All series are copied to the iterator.
func newLogSeriesIterator(m map[string]*logSerie) *logSeriesIterator {
func newLogSeriesIterator(f *LogFile, m map[string]*logSerie) *logSeriesIterator {
if len(m) == 0 {
return nil
}

itr := logSeriesIterator{series: make(logSeries, 0, len(m))}
f.Retain()
itr := logSeriesIterator{f: f, series: make(logSeries, 0, len(m))}
for _, s := range m {
itr.series = append(itr.series, *s)
}
Expand All @@ -1460,6 +1474,10 @@ func (itr *logSeriesIterator) Next() (e tsdb.SeriesElem) {
return e
}

func (itr *logSeriesIterator) Close() {
itr.f.Release()
}

// FormatLogFileName generates a log filename for the given index.
func FormatLogFileName(id int) string {
return fmt.Sprintf("L0-%08d%s", id, LogFileExt)
Expand Down
1 change: 1 addition & 0 deletions tsdb/index/tsi1/log_file_test.go
Expand Up @@ -93,6 +93,7 @@ func TestLogFile_SeriesStoredInOrder(t *testing.T) {
if itr == nil {
t.Fatal("nil iterator")
}
defer itr.Close()

mname := []string{"cpu", "mem"}
var j int
Expand Down
2 changes: 2 additions & 0 deletions tsdb/index/tsi1/measurement_block.go
Expand Up @@ -337,6 +337,8 @@ func (e *MeasurementBlockElem) SeriesID(i int) uint32 {
return binary.BigEndian.Uint32(e.series.data[i*SeriesIDSize:])
}

func (e *MeasurementBlockElem) HasSeries() bool { return e.series.n > 0 }

// SeriesIDs returns a list of decoded series ids.
//
// NOTE: This should be used for testing and diagnostics purposes only.
Expand Down
12 changes: 10 additions & 2 deletions tsdb/index/tsi1/series_block.go
Expand Up @@ -283,16 +283,20 @@ func (itr *seriesBlockIterator) Next() tsdb.SeriesElem {
}
}

func (itr *seriesBlockIterator) Close() {}

// seriesDecodeIterator decodes a series id iterator into unmarshaled elements.
type seriesDecodeIterator struct {
f *IndexFile
itr seriesIDIterator
sblk *SeriesBlock
e SeriesBlockElem // buffer
}

// newSeriesDecodeIterator returns a new instance of seriesDecodeIterator.
func newSeriesDecodeIterator(sblk *SeriesBlock, itr seriesIDIterator) *seriesDecodeIterator {
return &seriesDecodeIterator{sblk: sblk, itr: itr}
func newSeriesDecodeIterator(f *IndexFile, sblk *SeriesBlock, itr seriesIDIterator) *seriesDecodeIterator {
f.Retain()
return &seriesDecodeIterator{f: f, sblk: sblk, itr: itr}
}

// Next returns the next series element.
Expand All @@ -308,6 +312,10 @@ func (itr *seriesDecodeIterator) Next() tsdb.SeriesElem {
return &itr.e
}

func (itr *seriesDecodeIterator) Close() {
itr.f.Release()
}

// SeriesBlockElem represents a series element in the series list.
type SeriesBlockElem struct {
flag byte
Expand Down
42 changes: 42 additions & 0 deletions tsdb/index/tsi1/tsi1.go
Expand Up @@ -21,6 +21,7 @@ const LoadFactor = 80
type MeasurementElem interface {
Name() []byte
Deleted() bool
HasSeries() bool
}

// MeasurementElems represents a list of MeasurementElem.
Expand Down Expand Up @@ -115,6 +116,15 @@ func (p measurementMergeElem) Deleted() bool {
return p[0].Deleted()
}

func (p measurementMergeElem) HasSeries() bool {
for _, v := range p {
if v.HasSeries() {
return true
}
}
return false
}

// filterUndeletedMeasurementIterator returns all measurements which are not deleted.
type filterUndeletedMeasurementIterator struct {
itr MeasurementIterator
Expand Down Expand Up @@ -455,6 +465,12 @@ func (itr *seriesMergeIterator) Next() tsdb.SeriesElem {
return e
}

func (itr *seriesMergeIterator) Close() {
for _, v := range itr.itrs {
v.Close()
}
}

// IntersectSeriesIterators returns an iterator that only returns series which
// occur in both iterators. If both series have associated expressions then
// they are combined together.
Expand Down Expand Up @@ -521,6 +537,12 @@ func (itr *seriesIntersectIterator) Next() (e tsdb.SeriesElem) {
}
}

func (itr *seriesIntersectIterator) Close() {
for _, v := range itr.itrs {
v.Close()
}
}

// UnionSeriesIterators returns an iterator that returns series from both
// both iterators. If both series have associated expressions then they are
// combined together.
Expand Down Expand Up @@ -590,6 +612,12 @@ func (itr *seriesUnionIterator) Next() (e tsdb.SeriesElem) {
return &itr.e
}

func (itr *seriesUnionIterator) Close() {
for _, v := range itr.itrs {
v.Close()
}
}

// DifferenceSeriesIterators returns an iterator that only returns series which
// occur the first iterator but not the second iterator.
func DifferenceSeriesIterators(itr0, itr1 tsdb.SeriesIterator) tsdb.SeriesIterator {
Expand Down Expand Up @@ -642,6 +670,12 @@ func (itr *seriesDifferenceIterator) Next() (e tsdb.SeriesElem) {
}
}

func (itr *seriesDifferenceIterator) Close() {
for _, v := range itr.itrs {
v.Close()
}
}

// filterUndeletedSeriesIterator returns all series which are not deleted.
type filterUndeletedSeriesIterator struct {
itr tsdb.SeriesIterator
Expand All @@ -667,6 +701,10 @@ func (itr *filterUndeletedSeriesIterator) Next() tsdb.SeriesElem {
}
}

func (itr *filterUndeletedSeriesIterator) Close() {
itr.itr.Close()
}

// seriesExprElem holds a series and its associated filter expression.
type seriesExprElem struct {
tsdb.SeriesElem
Expand Down Expand Up @@ -705,6 +743,10 @@ func (itr *seriesExprIterator) Next() tsdb.SeriesElem {
return &itr.e
}

func (itr *seriesExprIterator) Close() {
itr.itr.Close()
}

// seriesIDIterator represents a iterator over a list of series ids.
type seriesIDIterator interface {
next() uint32
Expand Down
13 changes: 9 additions & 4 deletions tsdb/index/tsi1/tsi1_test.go
Expand Up @@ -202,12 +202,15 @@ func TestMergeSeriesIterators(t *testing.T) {

// MeasurementElem represents a test implementation of tsi1.MeasurementElem.
type MeasurementElem struct {
name []byte
deleted bool
name []byte
deleted bool
hasSeries bool
}

func (e *MeasurementElem) Name() []byte { return e.name }
func (e *MeasurementElem) Deleted() bool { return e.deleted }
func (e *MeasurementElem) Name() []byte { return e.name }
func (e *MeasurementElem) Deleted() bool { return e.deleted }
func (e *MeasurementElem) HasSeries() bool { return e.hasSeries }

func (e *MeasurementElem) TagKeyIterator() tsi1.TagKeyIterator { return nil }

// MeasurementIterator represents an iterator over a slice of measurements.
Expand Down Expand Up @@ -299,6 +302,8 @@ func (itr *SeriesIterator) Next() (e tsdb.SeriesElem) {
return e
}

func (itr *SeriesIterator) Close() {}

// MustTempDir returns a temporary directory. Panic on error.
func MustTempDir() string {
path, err := ioutil.TempDir("", "tsi-")
Expand Down

0 comments on commit a27f6d0

Please sign in to comment.