Skip to content

Commit

Permalink
index header: Remove memWriter from fileWriter (#6509)
Browse files Browse the repository at this point in the history
* index header: remove memWriter from fileWriter

Signed-off-by: Ben Ye <benye@amazon.com>

* update changelog

Signed-off-by: Ben Ye <benye@amazon.com>

* refactor

Signed-off-by: Ben Ye <benye@amazon.com>

* fix test

Signed-off-by: Ben Ye <benye@amazon.com>

* update comment

Signed-off-by: Ben Ye <benye@amazon.com>

---------

Signed-off-by: Ben Ye <benye@amazon.com>
  • Loading branch information
yeya24 committed Jul 11, 2023
1 parent 4dd2667 commit df3a5f8
Show file tree
Hide file tree
Showing 2 changed files with 33 additions and 40 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Expand Up @@ -46,6 +46,7 @@ We use *breaking :warning:* to mark changes that are not backward compatible (re
- [#6325](https://github.com/thanos-io/thanos/pull/6325) Store: return gRPC resource exhausted error for byte limiter.
- [#6399](https://github.com/thanos-io/thanos/pull/6399) *: Fix double-counting bug in http_request_duration metric
- [#6428](https://github.com/thanos-io/thanos/pull/6428) Report gRPC connnection errors in the logs.
- [#6509](https://github.com/thanos-io/thanos/pull/6509) Store Gateway: Remove `memWriter` from `fileWriter` to reduce memory usage when sync index headers.

### Changed
- [#6049](https://github.com/thanos-io/thanos/pull/6049) Compact: *breaking :warning:* Replace group with resolution in compact metrics to avoid cardinality explosion on compact metrics for large numbers of groups.
Expand Down
72 changes: 32 additions & 40 deletions pkg/block/indexheader/binary_reader.go
Expand Up @@ -250,13 +250,7 @@ type binaryWriter struct {
}

func newBinaryWriter(id ulid.ULID, cacheFilename string, buf []byte) (w *binaryWriter, err error) {
var memoryWriter *MemoryWriter
memoryWriter, err = NewMemoryWriter(id, len(buf))
if err != nil {
return nil, err
}
var binWriter PosWriter = memoryWriter

var binWriter PosWriter
if cacheFilename != "" {
dir := filepath.Dir(cacheFilename)

Expand All @@ -277,16 +271,17 @@ func newBinaryWriter(id ulid.ULID, cacheFilename string, buf []byte) (w *binaryW
return nil, errors.Wrap(err, "remove any existing index at path")
}

// We use file writer for buffers not larger than reused one.
var fileWriter *FileWriter
fileWriter, err = NewFileWriter(cacheFilename, memoryWriter)
fileWriter, err = NewFileWriter(cacheFilename, len(buf))
if err != nil {
return nil, err
}
if err := df.Sync(); err != nil {
return nil, errors.Wrap(err, "sync dir")
}
binWriter = fileWriter
} else {
binWriter = NewMemoryWriter(id, len(buf))
}

w = &binaryWriter{
Expand All @@ -304,29 +299,31 @@ func newBinaryWriter(id ulid.ULID, cacheFilename string, buf []byte) (w *binaryW
return w, w.writer.Write(w.buf.Get())
}

type PosWriterWithBuffer interface {
PosWriter
Buffer() []byte
}

type PosWriter interface {
Pos() uint64
Write(bufs ...[]byte) error
Buffer() []byte
Flush() error
Sync() error
Close() error
}

type MemoryWriter struct {
id ulid.ULID
buf bytes.Buffer
buf *bytes.Buffer
pos uint64
}

// TODO(bwplotka): Added size to method, upstream this.
func NewMemoryWriter(id ulid.ULID, size int) (*MemoryWriter, error) {
var buf bytes.Buffer
func NewMemoryWriter(id ulid.ULID, size int) *MemoryWriter {
return &MemoryWriter{
id: id,
buf: buf,
buf: bytes.NewBuffer(make([]byte, 0, size)),
pos: 0,
}, nil
}
}

func (mw *MemoryWriter) Pos() uint64 {
Expand Down Expand Up @@ -369,58 +366,52 @@ func (mw *MemoryWriter) Close() error {

type FileWriter struct {
f *os.File
memWriter *MemoryWriter
fileWriter *bufio.Writer
name string
pos uint64
}

// TODO(bwplotka): Added size to method, upstream this.
func NewFileWriter(name string, memWriter *MemoryWriter) (*FileWriter, error) {
func NewFileWriter(name string, size int) (*FileWriter, error) {
f, err := os.OpenFile(filepath.Clean(name), os.O_CREATE|os.O_RDWR, 0600)
if err != nil {
return nil, err
}
return &FileWriter{
f: f,
memWriter: memWriter,
fileWriter: bufio.NewWriterSize(f, memWriter.buf.Len()),
fileWriter: bufio.NewWriterSize(f, size),
name: name,
pos: 0,
}, nil
}

func (fw *FileWriter) Pos() uint64 {
return fw.memWriter.Pos()
return fw.pos
}

func (fw *FileWriter) Write(bufs ...[]byte) error {
if err := fw.memWriter.Write(bufs...); err != nil {
return err
}
for _, b := range bufs {
_, err := fw.fileWriter.Write(b)
n, err := fw.fileWriter.Write(b)
fw.pos += uint64(n)
if err != nil {
return err
}
// For now the index file must not grow beyond 64GiB. Some of the fixed-sized
// offset references in v1 are only 4 bytes large.
// Once we move to compressed/varint representations in those areas, this limitation
// can be lifted.
if fw.pos > 16*math.MaxUint32 {
return errors.Errorf("%q exceeding max size of 64GiB", fw.name)
}
}
return nil
}

func (fw *FileWriter) Buffer() []byte {
return fw.memWriter.Buffer()
}

func (fw *FileWriter) Flush() error {
if err := fw.memWriter.Flush(); err != nil {
return err
}

return fw.fileWriter.Flush()
}

func (fw *FileWriter) Close() error {
if err := fw.memWriter.Close(); err != nil {
return err
}
if err := fw.Flush(); err != nil {
return err
}
Expand All @@ -431,9 +422,6 @@ func (fw *FileWriter) Close() error {
}

func (fw *FileWriter) Sync() error {
if err := fw.memWriter.Sync(); err != nil {
return err
}
return fw.f.Sync()
}

Expand Down Expand Up @@ -476,7 +464,11 @@ func (w *binaryWriter) Write(p []byte) (int, error) {
}

func (w *binaryWriter) Buffer() []byte {
return w.writer.Buffer()
pwb, ok := w.writer.(PosWriterWithBuffer)
if ok {
return pwb.Buffer()
}
return nil
}

func (w *binaryWriter) Close() error {
Expand Down

0 comments on commit df3a5f8

Please sign in to comment.