Skip to content

Commit

Permalink
fix(tsm1): fix wal's totalOldDiskSize statistics (#20811)
Browse files Browse the repository at this point in the history
  • Loading branch information
StoneYunZhao committed Mar 3, 2021
1 parent 5fc9240 commit 265c1f3
Show file tree
Hide file tree
Showing 3 changed files with 102 additions and 1 deletion.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
1. [20754](https://github.com/influxdata/influxdb/pull/20754): Update references to docs site to use current URLs.
1. [20773](https://github.com/influxdata/influxdb/pull/20773): Fix data race in TSM engine when inspecting tombstone stats.
1. [20797](https://github.com/influxdata/influxdb/pull/20797): Fix data race in TSM cache. Thanks @StoneYunZhao!
1. [20811](https://github.com/influxdata/influxdb/pull/20811): Fix TSM WAL segment size computing. Thanks @StoneYunZhao!
1. [20798](https://github.com/influxdata/influxdb/pull/20798): Deprecate misleading `retentionPeriodHrs` key in onboarding API.
1. [20819](https://github.com/influxdata/influxdb/pull/20819): Fix Single Stat graphs with thresholds crashing on negative values.
1. [20809](https://github.com/influxdata/influxdb/pull/20809): Fix InfluxDB port in Flux function UI examples. Thanks @sunjincheng121!
Expand Down
13 changes: 12 additions & 1 deletion tsdb/engine/tsm1/wal.go
Original file line number Diff line number Diff line change
Expand Up @@ -242,6 +242,11 @@ func (l *WAL) Open() error {
}
}
}

if l.currentSegmentWriter != nil {
totalOldDiskSize -= int64(l.currentSegmentWriter.size)
}

atomic.StoreInt64(&l.stats.OldBytes, totalOldDiskSize)

l.closing = make(chan struct{})
Expand Down Expand Up @@ -379,6 +384,11 @@ func (l *WAL) Remove(files []string) error {

totalOldDiskSize += stat.Size()
}

if l.currentSegmentWriter != nil {
totalOldDiskSize -= int64(l.currentSegmentWriter.size)
}

atomic.StoreInt64(&l.stats.OldBytes, totalOldDiskSize)

return nil
Expand Down Expand Up @@ -565,7 +575,8 @@ func (l *WAL) newSegmentFile() error {
if err := l.currentSegmentWriter.close(); err != nil {
return err
}
atomic.StoreInt64(&l.stats.OldBytes, int64(l.currentSegmentWriter.size))

atomic.AddInt64(&l.stats.OldBytes, int64(l.currentSegmentWriter.size))
}

fileName := filepath.Join(l.path, fmt.Sprintf("%s%05d.%s", WALFilePrefix, l.currentSegmentID, WALFileExtension))
Expand Down
89 changes: 89 additions & 0 deletions tsdb/engine/tsm1/wal_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,13 @@ import (
"io/ioutil"
"os"
"reflect"
"sort"
"testing"

"github.com/golang/snappy"
"github.com/influxdata/influxdb/v2/pkg/slices"
"github.com/influxdata/influxdb/v2/tsdb/engine/tsm1"
"github.com/stretchr/testify/require"
)

func TestWALWriter_WriteMulti_Single(t *testing.T) {
Expand Down Expand Up @@ -701,6 +703,93 @@ func TestWALRollSegment(t *testing.T) {
}
}

func TestWAL_DiskSize(t *testing.T) {
test := func(w *tsm1.WAL, oldZero, curZero bool) {
// get disk size by reading file
files, err := ioutil.ReadDir(w.Path())
require.NoError(t, err)

sort.Slice(files, func(i, j int) bool {
return files[i].Name() < files[j].Name()
})

var old, cur int64
if len(files) > 0 {
cur = files[len(files)-1].Size()
for i := 0; i < len(files)-1; i++ {
old += files[i].Size()
}
}

// test zero size condition
require.False(t, oldZero && old > 0)
require.False(t, !oldZero && old == 0)
require.False(t, curZero && cur > 0)
require.False(t, !curZero && cur == 0)

// test method DiskSizeBytes
require.Equal(t, old+cur, w.DiskSizeBytes(), "total disk size")

// test Statistics
ss := w.Statistics(nil)
require.Equal(t, 1, len(ss))

m := ss[0].Values
require.NotNil(t, m)

require.Equal(t, m["oldSegmentsDiskBytes"].(int64), old, "old disk size")
require.Equal(t, m["currentSegmentDiskBytes"].(int64), cur, "current dist size")
}

dir := MustTempDir()
defer os.RemoveAll(dir)

w := tsm1.NewWAL(dir)

const segSize = 1024
w.SegmentSize = segSize

// open
require.NoError(t, w.Open())

test(w, true, true)

// write some values, the total size of these values does not exceed segSize(1024),
// so rollSegment will not be triggered
values := map[string][]tsm1.Value{
"cpu,host=A#!~#value": {tsm1.NewValue(1, 1.0)},
"cpu,host=B#!~#value": {tsm1.NewValue(1, 1.0)},
"cpu,host=C#!~#value": {tsm1.NewValue(1, 1.0)},
}

_, err := w.WriteMulti(values)
require.NoError(t, err)

test(w, true, false)

// write some values, the total size of these values exceeds segSize(1024),
// so rollSegment will be triggered
for i := 0; i < 100; i++ {
_, err := w.WriteMulti(values)
require.NoError(t, err)
}

test(w, false, false)

// reopen
require.NoError(t, w.Close())
require.NoError(t, w.Open())

test(w, false, false)

// remove
closedSegments, err := w.ClosedSegments()
require.NoError(t, err)
require.NoError(t, w.Remove(closedSegments))

test(w, true, false)
}

func TestWriteWALSegment_UnmarshalBinary_WriteWALCorrupt(t *testing.T) {
p1 := tsm1.NewValue(1, 1.1)
p2 := tsm1.NewValue(1, int64(1))
Expand Down

0 comments on commit 265c1f3

Please sign in to comment.