Skip to content

Commit

Permalink
StoreGateway: Partition index-header download (#6747)
Browse files Browse the repository at this point in the history
* Partition index-header download

Signed-off-by: 馃尣 Harry 馃寠 John 馃彅 <johrry@amazon.com>

* Use int division instead of float

Signed-off-by: 馃尣 Harry 馃寠 John 馃彅 <johrry@amazon.com>

* Ignore errors in close()

Signed-off-by: 馃尣 Harry 馃寠 John 馃彅 <johrry@amazon.com>

* Fix e2e

Signed-off-by: 馃尣 Harry 馃寠 John 馃彅 <johrry@amazon.com>

* Use disk to buffer parts of index-header

Signed-off-by: 馃尣 Harry 馃寠 John 馃彅 <johrry@amazon.com>

* Fix lint

Signed-off-by: 馃尣 Harry 馃寠 John 馃彅 <johrry@amazon.com>

* Renaming variables

Signed-off-by: 馃尣 Harry 馃寠 John 馃彅 <johrry@amazon.com>

* Increase partition size

Signed-off-by: 馃尣 Harry 馃寠 John 馃彅 <johrry@amazon.com>

* Fix e2e failures

Signed-off-by: 馃尣 Harry 馃寠 John 馃彅 <johrry@amazon.com>

* Refactoring

Signed-off-by: 馃尣 Harry 馃寠 John 馃彅 <johrry@amazon.com>

* Fix e2e

Signed-off-by: 馃尣 Harry 馃寠 John 馃彅 <johrry@amazon.com>

* Fix lint

Signed-off-by: 馃尣 Harry 馃寠 John 馃彅 <johrry@amazon.com>

* Fix e2e

Signed-off-by: 馃尣 Harry 馃寠 John 馃彅 <johrry@amazon.com>

* Cosmetic changes

Signed-off-by: 馃尣 Harry 馃寠 John 馃彅 <johrry@amazon.com>

* Address review comments

Signed-off-by: 馃尣 Harry 馃寠 John 馃彅 <johrry@amazon.com>

---------

Signed-off-by: 馃尣 Harry 馃寠 John 馃彅 <johrry@amazon.com>
  • Loading branch information
harry671003 committed Oct 5, 2023
1 parent 63ea2e2 commit 62d2753
Show file tree
Hide file tree
Showing 4 changed files with 325 additions and 2 deletions.
2 changes: 1 addition & 1 deletion go.mod
Expand Up @@ -191,7 +191,7 @@ require (
github.com/google/go-cmp v0.5.9
github.com/google/go-querystring v1.1.0 // indirect
github.com/google/pprof v0.0.0-20230705174524-200ffdc848b8 // indirect
github.com/google/uuid v1.3.0 // indirect
github.com/google/uuid v1.3.0
github.com/googleapis/enterprise-certificate-proxy v0.2.5 // indirect
github.com/googleapis/gax-go/v2 v2.12.0 // indirect
github.com/grpc-ecosystem/grpc-gateway/v2 v2.16.0 // indirect
Expand Down
7 changes: 6 additions & 1 deletion pkg/block/indexheader/binary_reader.go
Expand Up @@ -74,7 +74,12 @@ type BinaryTOC struct {

// WriteBinary build index header from the pieces of index in object storage, and cached in file if necessary.
func WriteBinary(ctx context.Context, bkt objstore.BucketReader, id ulid.ULID, filename string) ([]byte, error) {
ir, indexVersion, err := newChunkedIndexReader(ctx, bkt, id)
var tmpDir = ""
if filename != "" {
tmpDir = filepath.Dir(filename)
}
parallelBucket := WrapWithParallel(bkt, tmpDir)
ir, indexVersion, err := newChunkedIndexReader(ctx, parallelBucket, id)
if err != nil {
return nil, errors.Wrap(err, "new index reader")
}
Expand Down
231 changes: 231 additions & 0 deletions pkg/block/indexheader/parallel_bucket.go
@@ -0,0 +1,231 @@
// Copyright (c) The Thanos Authors.
// Licensed under the Apache License 2.0.

package indexheader

import (
"bufio"
"bytes"
"context"
"fmt"
"io"
"os"
"path/filepath"

"github.com/google/uuid"
"github.com/pkg/errors"
"github.com/prometheus/prometheus/tsdb/fileutil"
"github.com/thanos-io/objstore"
"github.com/thanos-io/thanos/pkg/runutil"
"golang.org/x/sync/errgroup"
)

// partitionSize is used for splitting range reads.
const partitionSize = 16 * 1024 * 1024 // 16 MiB

type parallelBucketReader struct {
objstore.BucketReader
tmpDir string
partitionSize int64
}

func WrapWithParallel(b objstore.BucketReader, tmpDir string) objstore.BucketReader {
return &parallelBucketReader{
BucketReader: b,
tmpDir: tmpDir,
partitionSize: partitionSize,
}
}

// GetRange reads the range in parallel.
func (b *parallelBucketReader) GetRange(ctx context.Context, name string, off int64, length int64) (io.ReadCloser, error) {
partFilePrefix := uuid.New().String()
g, gctx := errgroup.WithContext(ctx)

numParts := length / b.partitionSize
if length%b.partitionSize > 0 {
// A partial partition is remaining
numParts += 1
}

parts := make([]Part, 0, numParts)

partId := 0
for o := off; o < off+length; o += b.partitionSize {
l := b.partitionSize
if o+l > off+length {
// Partial partition
l = length - (int64(partId) * b.partitionSize)
}

partOff := o
partLength := l
part, err := b.createPart(partFilePrefix, partId, int(partLength))
if err != nil {
return nil, err
}
parts = append(parts, part)

g.Go(func() error {
rc, err := b.BucketReader.GetRange(gctx, name, partOff, partLength)
defer runutil.CloseWithErrCapture(&err, rc, "close object")
if err != nil {
return errors.Wrap(err, fmt.Sprintf("get range part %v", partId))
}
if _, err := io.Copy(part, rc); err != nil {
return errors.Wrap(err, fmt.Sprintf("get range part %v", partId))
}
return part.Flush()
})
partId += 1
}

if err := g.Wait(); err != nil {
return nil, err
}
return newPartMerger(parts), nil
}

func (b *parallelBucketReader) createPart(partFilePrefix string, partId int, size int) (Part, error) {
if b.tmpDir == "" {
// Parts stored in memory
return newPartBuffer(size), nil
}

partName := fmt.Sprintf("%s.part-%d", partFilePrefix, partId)
filename := filepath.Join(b.tmpDir, partName)
return newPartFile(filename)
}

type partMerger struct {
closers []io.Closer
multiReader io.Reader
}

func newPartMerger(parts []Part) *partMerger {
readers := make([]io.Reader, 0, len(parts))
closers := make([]io.Closer, 0, len(parts))
for _, p := range parts {
readers = append(readers, p.(io.Reader))
closers = append(closers, p.(io.Closer))
}
return &partMerger{
closers: closers,
multiReader: io.MultiReader(readers...),
}
}

func (m *partMerger) Read(b []byte) (n int, err error) {
n, err = m.multiReader.Read(b)
return
}

func (m *partMerger) Close() (err error) {
var firstErr error = nil
for _, c := range m.closers {
if err := c.Close(); err != nil {
if firstErr == nil {
firstErr = err
}
}
}
return firstErr
}

type Part interface {
Read(buf []byte) (int, error)
Write(buf []byte) (int, error)
Flush() error
}

// partFile stores parts in temporary files.
type partFile struct {
file *os.File
fileWriter *bufio.Writer
fileReader *bufio.Reader
}

func newPartFile(filename string) (*partFile, error) {
dir := filepath.Dir(filename)
df, err := fileutil.OpenDir(dir)
if os.IsNotExist(err) {
if err := os.MkdirAll(dir, os.ModePerm); err != nil {
return nil, errors.Wrap(err, "create temp dir")
}
df, err = fileutil.OpenDir(dir)
}
if err != nil {
return nil, errors.Wrap(err, "open temp dir")
}

if err := df.Sync(); err != nil {
return nil, errors.Wrap(err, "sync dir")
}

if err := os.RemoveAll(filename); err != nil {
return nil, errors.Wrap(err, "remove existing file")
}
f, err := os.OpenFile(filepath.Clean(filename), os.O_CREATE|os.O_RDWR, 0600)
if err != nil {
return nil, errors.Wrap(err, "open temp file")
}
return &partFile{
file: f,
fileWriter: bufio.NewWriterSize(f, 32*1024),
fileReader: bufio.NewReaderSize(f, 32*1024),
}, nil
}

func (p *partFile) Close() error {
if err := p.file.Close(); err != nil {
return err
}
return os.Remove(p.file.Name())
}

func (p *partFile) Flush() error {
if err := p.fileWriter.Flush(); err != nil {
return err
}
if err := p.file.Sync(); err != nil {
return err
}
// Seek is necessary because the part was just written to.
_, err := p.file.Seek(0, io.SeekStart)
return err
}

func (p *partFile) Read(buf []byte) (int, error) {
return p.fileReader.Read(buf)
}

func (p *partFile) Write(buf []byte) (int, error) {
return p.fileWriter.Write(buf)
}

// partBuffer stores parts in memory.
type partBuffer struct {
buf *bytes.Buffer
}

func newPartBuffer(size int) *partBuffer {
return &partBuffer{
buf: bytes.NewBuffer(make([]byte, 0, size)),
}
}

func (p *partBuffer) Close() error {
return nil
}

func (p *partBuffer) Read(b []byte) (int, error) {
return p.buf.Read(b)
}

func (p *partBuffer) Write(b []byte) (int, error) {
return p.buf.Write(b)
}

func (p *partBuffer) Flush() error {
return nil
}
87 changes: 87 additions & 0 deletions pkg/block/indexheader/parallel_bucket_test.go
@@ -0,0 +1,87 @@
// Copyright (c) The Thanos Authors.
// Licensed under the Apache License 2.0.

package indexheader

import (
"bytes"
"context"
"crypto/rand"
"io"
"math/big"
"testing"

"github.com/efficientgo/core/testutil"
"github.com/thanos-io/objstore"
)

func TestParallelBucket_InMemoryBuffering(t *testing.T) {
bkt := objstore.NewInMemBucket()
parallelBucket := &parallelBucketReader{
BucketReader: bkt,
tmpDir: "",
partitionSize: 100,
}
testParallelBucket(t, bkt, parallelBucket)
}

func TestParallelBucket_TmpFileBuffering(t *testing.T) {
bkt := objstore.NewInMemBucket()
parallelBucket := &parallelBucketReader{
BucketReader: bkt,
tmpDir: t.TempDir(),
partitionSize: 100,
}
testParallelBucket(t, bkt, parallelBucket)
}

func testParallelBucket(t *testing.T, bkt objstore.Bucket, parallelBucket *parallelBucketReader) {
name := "test/data"
ctx := context.Background()

var size int64 = 10 * 1024
o, err := rand.Int(rand.Reader, big.NewInt(size/2))
testutil.Ok(t, err)
offset := o.Int64()

l, err := rand.Int(rand.Reader, big.NewInt(size/2))
testutil.Ok(t, err)
length := l.Int64()

randBytes := uploadRandom(t, ctx, bkt, name, size)

r1, err := parallelBucket.GetRange(ctx, name, offset, length)
testutil.Ok(t, err)

parallelBytes, err := io.ReadAll(r1)
testutil.Ok(t, err)
testutil.Assert(t, length == int64(len(parallelBytes)))

expectedBytes := randBytes[offset : offset+length]
testutil.Assert(t, length == int64(len(expectedBytes)))
testutil.Equals(t, expectedBytes, parallelBytes)

r2, err := bkt.GetRange(ctx, name, offset, length)
testutil.Ok(t, err)
memoryBytes, err := io.ReadAll(r2)
testutil.Ok(t, err)
testutil.Assert(t, length == int64(len(memoryBytes)))
testutil.Equals(t, memoryBytes, parallelBytes)

err = r1.Close()
testutil.Ok(t, err)

err = r2.Close()
testutil.Ok(t, err)
}

func uploadRandom(t *testing.T, ctx context.Context, bkt objstore.Bucket, name string, size int64) []byte {
b := make([]byte, size)
_, err := rand.Read(b)
testutil.Ok(t, err)
r := bytes.NewReader(b)
err = bkt.Upload(ctx, name, r)
testutil.Ok(t, err)

return b
}

0 comments on commit 62d2753

Please sign in to comment.