Skip to content

Commit

Permalink
Upload blocks with the lowest minTime first
Browse files Browse the repository at this point in the history
Fixes thanos-io#1670

Signed-off-by: Olivier Biesmans <olivier.biesmans@blablacar.com>
  • Loading branch information
Olivier Biesmans committed Oct 23, 2019
1 parent e1a99d8 commit 53e49b5
Show file tree
Hide file tree
Showing 3 changed files with 157 additions and 1 deletion.
3 changes: 3 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ require (
github.com/armon/go-metrics v0.0.0-20190430140413-ec5e00d3c878
github.com/cespare/xxhash v1.1.0
github.com/codahale/hdrhistogram v0.0.0-20161010025455-3a0bb77429bd // indirect
github.com/davecgh/go-spew v1.1.1
github.com/fatih/structtag v1.0.0
github.com/fortytw2/leaktest v1.3.0
github.com/fsnotify/fsnotify v1.4.7
Expand Down Expand Up @@ -65,3 +66,5 @@ replace (
k8s.io/klog => k8s.io/klog v0.3.1
k8s.io/kube-openapi => k8s.io/kube-openapi v0.0.0-20190228160746-b3a7cee44a30
)

go 1.13
24 changes: 23 additions & 1 deletion pkg/shipper/shipper.go
Original file line number Diff line number Diff line change
Expand Up @@ -276,7 +276,7 @@ func (s *Shipper) Sync(ctx context.Context) (uploaded int, err error) {
uploadErrs int
)
// Sync non compacted blocks first.
if err := s.iterBlockMetas(func(m *metadata.Meta) error {
if err := s.iterSortedBlockMetas(func(m *metadata.Meta) error {
// Do not sync a block if we already uploaded or ignored it. If it's no longer found in the bucket,
// it was generally removed by the compaction process.
if _, uploaded := hasUploaded[m.ULID]; uploaded {
Expand Down Expand Up @@ -414,6 +414,28 @@ func (s *Shipper) iterBlockMetas(f func(m *metadata.Meta) error) error {
return nil
}

// iterSortedBlockMetas calls f with the block meta for each block found in dir
// sorted by minTime asc.
func (s *Shipper) iterSortedBlockMetas(f func(m *metadata.Meta) error) error {
var metas []*metadata.Meta

if err := s.iterBlockMetas(func(m *metadata.Meta) error {
metas = append(metas, m)
return nil
}); err != nil {
return err
}
sort.Slice(metas, func(i, j int) bool {
return metas[i].BlockMeta.MinTime < metas[j].BlockMeta.MinTime
})
for _, m := range metas {
if err := f(m); err != nil {
return err
}
}
return nil
}

func hardlinkBlock(src, dst string) error {
chunkDir := filepath.Join(dst, block.ChunksDirname)

Expand Down
131 changes: 131 additions & 0 deletions pkg/shipper/shipper_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,10 @@ package shipper
import (
"io/ioutil"
"math"
"math/rand"
"os"
"path"
"sort"
"testing"

"github.com/go-kit/kit/log"
Expand Down Expand Up @@ -77,3 +79,132 @@ func TestShipperTimestamps(t *testing.T) {
testutil.Equals(t, int64(1000), mint)
testutil.Equals(t, int64(2000), maxt)
}

func TestIterBlockMetas(t *testing.T) {
var metas []*metadata.Meta
dir, err := ioutil.TempDir("", "shipper-test")
testutil.Ok(t, err)
defer func() {
testutil.Ok(t, os.RemoveAll(dir))
}()

id1 := ulid.MustNew(1, nil)
testutil.Ok(t, os.Mkdir(path.Join(dir, id1.String()), os.ModePerm))
testutil.Ok(t, metadata.Write(log.NewNopLogger(), path.Join(dir, id1.String()), &metadata.Meta{
BlockMeta: tsdb.BlockMeta{
ULID: id1,
MaxTime: 2000,
MinTime: 1000,
Version: 1,
},
}))
id2 := ulid.MustNew(2, nil)
testutil.Ok(t, os.Mkdir(path.Join(dir, id2.String()), os.ModePerm))
testutil.Ok(t, metadata.Write(log.NewNopLogger(), path.Join(dir, id2.String()), &metadata.Meta{
BlockMeta: tsdb.BlockMeta{
ULID: id2,
MaxTime: 4000,
MinTime: 3000,
Version: 1,
},
}))
id3 := ulid.MustNew(3, nil)
testutil.Ok(t, os.Mkdir(path.Join(dir, id3.String()), os.ModePerm))
testutil.Ok(t, metadata.Write(log.NewNopLogger(), path.Join(dir, id3.String()), &metadata.Meta{
BlockMeta: tsdb.BlockMeta{
ULID: id3,
MaxTime: 3000,
MinTime: 2000,
Version: 1,
},
}))
shipper := New(nil, nil, dir, nil, nil, metadata.TestSource)
if err := shipper.iterSortedBlockMetas(func(m *metadata.Meta) error {
metas = append(metas, m)
return nil
}); err != nil {
testutil.Ok(t, err)
}
testutil.Equals(t, sort.SliceIsSorted(metas, func(i, j int) bool {
return metas[i].BlockMeta.MinTime < metas[j].BlockMeta.MinTime
}), true)
}

func BenchmarkIterBlockMetas(b *testing.B) {
var metas []*metadata.Meta
dir, err := ioutil.TempDir("", "shipper-test")
testutil.Ok(b, err)
defer func() {
testutil.Ok(b, os.RemoveAll(dir))
}()

for i := 0; i < 50; i++ {
id := ulid.MustNew(uint64(i), nil)
testutil.Ok(b, os.Mkdir(path.Join(dir, id.String()), os.ModePerm))
testutil.Ok(b,
metadata.Write(
log.NewNopLogger(),
path.Join(dir, id.String()),
&metadata.Meta{
BlockMeta: tsdb.BlockMeta{
ULID: id,
MaxTime: int64((i + 1) * 1000),
MinTime: int64(i * 1000),
Version: 1,
},
},
),
)
}
rand.Shuffle(len(metas), func(i, j int) {
metas[i], metas[j] = metas[j], metas[i]
})
b.ResetTimer()

shipper := New(nil, nil, dir, nil, nil, metadata.TestSource)
if err := shipper.iterBlockMetas(func(m *metadata.Meta) error {
metas = append(metas, m)
return nil
}); err != nil {
testutil.Ok(b, err)
}
}

func BenchmarkIterSortedBlockMetas(b *testing.B) {
var metas []*metadata.Meta
dir, err := ioutil.TempDir("", "shipper-test")
testutil.Ok(b, err)
defer func() {
testutil.Ok(b, os.RemoveAll(dir))
}()

for i := 0; i < 50; i++ {
id := ulid.MustNew(uint64(i), nil)
testutil.Ok(b, os.Mkdir(path.Join(dir, id.String()), os.ModePerm))
testutil.Ok(b,
metadata.Write(
log.NewNopLogger(),
path.Join(dir, id.String()),
&metadata.Meta{
BlockMeta: tsdb.BlockMeta{
ULID: id,
MaxTime: int64((i + 1) * 1000),
MinTime: int64(i * 1000),
Version: 1,
},
},
),
)
}
rand.Shuffle(len(metas), func(i, j int) {
metas[i], metas[j] = metas[j], metas[i]
})

shipper := New(nil, nil, dir, nil, nil, metadata.TestSource)
if err := shipper.iterSortedBlockMetas(func(m *metadata.Meta) error {
metas = append(metas, m)
return nil
}); err != nil {
testutil.Ok(b, err)
}
}

0 comments on commit 53e49b5

Please sign in to comment.