diff --git a/CHANGELOG.md b/CHANGELOG.md index 410cb364b20..0a38750d329 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -16,6 +16,7 @@ 1. [20754](https://github.com/influxdata/influxdb/pull/20754): Update references to docs site to use current URLs. 1. [20773](https://github.com/influxdata/influxdb/pull/20773): Fix data race in TSM engine when inspecting tombstone stats. 1. [20797](https://github.com/influxdata/influxdb/pull/20797): Fix data race in TSM cache. Thanks @StoneYunZhao! +1. [20811](https://github.com/influxdata/influxdb/pull/20811): Fix TSM WAL segment size computing. Thanks @StoneYunZhao! 1. [20798](https://github.com/influxdata/influxdb/pull/20798): Deprecate misleading `retentionPeriodHrs` key in onboarding API. 1. [20819](https://github.com/influxdata/influxdb/pull/20819): Fix Single Stat graphs with thresholds crashing on negative values. 1. [20809](https://github.com/influxdata/influxdb/pull/20809): Fix InfluxDB port in Flux function UI examples. Thanks @sunjincheng121! diff --git a/tsdb/engine/tsm1/wal.go b/tsdb/engine/tsm1/wal.go index 77403819625..dfd6cf4abb9 100644 --- a/tsdb/engine/tsm1/wal.go +++ b/tsdb/engine/tsm1/wal.go @@ -242,6 +242,11 @@ func (l *WAL) Open() error { } } } + + if l.currentSegmentWriter != nil { + totalOldDiskSize -= int64(l.currentSegmentWriter.size) + } + atomic.StoreInt64(&l.stats.OldBytes, totalOldDiskSize) l.closing = make(chan struct{}) @@ -379,6 +384,11 @@ func (l *WAL) Remove(files []string) error { totalOldDiskSize += stat.Size() } + + if l.currentSegmentWriter != nil { + totalOldDiskSize -= int64(l.currentSegmentWriter.size) + } + atomic.StoreInt64(&l.stats.OldBytes, totalOldDiskSize) return nil @@ -565,7 +575,8 @@ func (l *WAL) newSegmentFile() error { if err := l.currentSegmentWriter.close(); err != nil { return err } - atomic.StoreInt64(&l.stats.OldBytes, int64(l.currentSegmentWriter.size)) + + atomic.AddInt64(&l.stats.OldBytes, int64(l.currentSegmentWriter.size)) } fileName := filepath.Join(l.path, fmt.Sprintf("%s%05d.%s", WALFilePrefix, l.currentSegmentID, WALFileExtension)) diff --git a/tsdb/engine/tsm1/wal_test.go b/tsdb/engine/tsm1/wal_test.go index d846dc8346b..2a9bacc56b8 100644 --- a/tsdb/engine/tsm1/wal_test.go +++ b/tsdb/engine/tsm1/wal_test.go @@ -6,11 +6,13 @@ import ( "io/ioutil" "os" "reflect" + "sort" "testing" "github.com/golang/snappy" "github.com/influxdata/influxdb/v2/pkg/slices" "github.com/influxdata/influxdb/v2/tsdb/engine/tsm1" + "github.com/stretchr/testify/require" ) func TestWALWriter_WriteMulti_Single(t *testing.T) { @@ -701,6 +703,93 @@ func TestWALRollSegment(t *testing.T) { } } +func TestWAL_DiskSize(t *testing.T) { + test := func(w *tsm1.WAL, oldZero, curZero bool) { + // get disk size by reading file + files, err := ioutil.ReadDir(w.Path()) + require.NoError(t, err) + + sort.Slice(files, func(i, j int) bool { + return files[i].Name() < files[j].Name() + }) + + var old, cur int64 + if len(files) > 0 { + cur = files[len(files)-1].Size() + for i := 0; i < len(files)-1; i++ { + old += files[i].Size() + } + } + + // test zero size condition + require.False(t, oldZero && old > 0) + require.False(t, !oldZero && old == 0) + require.False(t, curZero && cur > 0) + require.False(t, !curZero && cur == 0) + + // test method DiskSizeBytes + require.Equal(t, old+cur, w.DiskSizeBytes(), "total disk size") + + // test Statistics + ss := w.Statistics(nil) + require.Equal(t, 1, len(ss)) + + m := ss[0].Values + require.NotNil(t, m) + + require.Equal(t, m["oldSegmentsDiskBytes"].(int64), old, "old disk size") + require.Equal(t, m["currentSegmentDiskBytes"].(int64), cur, "current dist size") + } + + dir := MustTempDir() + defer os.RemoveAll(dir) + + w := tsm1.NewWAL(dir) + + const segSize = 1024 + w.SegmentSize = segSize + + // open + require.NoError(t, w.Open()) + + test(w, true, true) + + // write some values, the total size of these values does not exceed segSize(1024), + // so rollSegment will not be triggered + values := map[string][]tsm1.Value{ + "cpu,host=A#!~#value": {tsm1.NewValue(1, 1.0)}, + "cpu,host=B#!~#value": {tsm1.NewValue(1, 1.0)}, + "cpu,host=C#!~#value": {tsm1.NewValue(1, 1.0)}, + } + + _, err := w.WriteMulti(values) + require.NoError(t, err) + + test(w, true, false) + + // write some values, the total size of these values exceeds segSize(1024), + // so rollSegment will be triggered + for i := 0; i < 100; i++ { + _, err := w.WriteMulti(values) + require.NoError(t, err) + } + + test(w, false, false) + + // reopen + require.NoError(t, w.Close()) + require.NoError(t, w.Open()) + + test(w, false, false) + + // remove + closedSegments, err := w.ClosedSegments() + require.NoError(t, err) + require.NoError(t, w.Remove(closedSegments)) + + test(w, true, false) +} + func TestWriteWALSegment_UnmarshalBinary_WriteWALCorrupt(t *testing.T) { p1 := tsm1.NewValue(1, 1.1) p2 := tsm1.NewValue(1, int64(1))