From df3a5f8087268d069a13a062a77d7f650eb6760a Mon Sep 17 00:00:00 2001 From: Ben Ye Date: Tue, 11 Jul 2023 09:01:12 -0700 Subject: [PATCH] index header: Remove memWriter from fileWriter (#6509) * index header: remove memWriter from fileWriter Signed-off-by: Ben Ye * update changelog Signed-off-by: Ben Ye * refactor Signed-off-by: Ben Ye * fix test Signed-off-by: Ben Ye * update comment Signed-off-by: Ben Ye --------- Signed-off-by: Ben Ye --- CHANGELOG.md | 1 + pkg/block/indexheader/binary_reader.go | 72 ++++++++++++-------------- 2 files changed, 33 insertions(+), 40 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index ba7292b23a..045f045f0b 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -46,6 +46,7 @@ We use *breaking :warning:* to mark changes that are not backward compatible (re - [#6325](https://github.com/thanos-io/thanos/pull/6325) Store: return gRPC resource exhausted error for byte limiter. - [#6399](https://github.com/thanos-io/thanos/pull/6399) *: Fix double-counting bug in http_request_duration metric - [#6428](https://github.com/thanos-io/thanos/pull/6428) Report gRPC connnection errors in the logs. +- [#6509](https://github.com/thanos-io/thanos/pull/6509) Store Gateway: Remove `memWriter` from `fileWriter` to reduce memory usage when sync index headers. ### Changed - [#6049](https://github.com/thanos-io/thanos/pull/6049) Compact: *breaking :warning:* Replace group with resolution in compact metrics to avoid cardinality explosion on compact metrics for large numbers of groups. diff --git a/pkg/block/indexheader/binary_reader.go b/pkg/block/indexheader/binary_reader.go index 0c7d062c9c..1befe63a7f 100644 --- a/pkg/block/indexheader/binary_reader.go +++ b/pkg/block/indexheader/binary_reader.go @@ -250,13 +250,7 @@ type binaryWriter struct { } func newBinaryWriter(id ulid.ULID, cacheFilename string, buf []byte) (w *binaryWriter, err error) { - var memoryWriter *MemoryWriter - memoryWriter, err = NewMemoryWriter(id, len(buf)) - if err != nil { - return nil, err - } - var binWriter PosWriter = memoryWriter - + var binWriter PosWriter if cacheFilename != "" { dir := filepath.Dir(cacheFilename) @@ -277,9 +271,8 @@ func newBinaryWriter(id ulid.ULID, cacheFilename string, buf []byte) (w *binaryW return nil, errors.Wrap(err, "remove any existing index at path") } - // We use file writer for buffers not larger than reused one. var fileWriter *FileWriter - fileWriter, err = NewFileWriter(cacheFilename, memoryWriter) + fileWriter, err = NewFileWriter(cacheFilename, len(buf)) if err != nil { return nil, err } @@ -287,6 +280,8 @@ func newBinaryWriter(id ulid.ULID, cacheFilename string, buf []byte) (w *binaryW return nil, errors.Wrap(err, "sync dir") } binWriter = fileWriter + } else { + binWriter = NewMemoryWriter(id, len(buf)) } w = &binaryWriter{ @@ -304,10 +299,14 @@ func newBinaryWriter(id ulid.ULID, cacheFilename string, buf []byte) (w *binaryW return w, w.writer.Write(w.buf.Get()) } +type PosWriterWithBuffer interface { + PosWriter + Buffer() []byte +} + type PosWriter interface { Pos() uint64 Write(bufs ...[]byte) error - Buffer() []byte Flush() error Sync() error Close() error @@ -315,18 +314,16 @@ type PosWriter interface { type MemoryWriter struct { id ulid.ULID - buf bytes.Buffer + buf *bytes.Buffer pos uint64 } -// TODO(bwplotka): Added size to method, upstream this. -func NewMemoryWriter(id ulid.ULID, size int) (*MemoryWriter, error) { - var buf bytes.Buffer +func NewMemoryWriter(id ulid.ULID, size int) *MemoryWriter { return &MemoryWriter{ id: id, - buf: buf, + buf: bytes.NewBuffer(make([]byte, 0, size)), pos: 0, - }, nil + } } func (mw *MemoryWriter) Pos() uint64 { @@ -369,58 +366,52 @@ func (mw *MemoryWriter) Close() error { type FileWriter struct { f *os.File - memWriter *MemoryWriter fileWriter *bufio.Writer name string + pos uint64 } // TODO(bwplotka): Added size to method, upstream this. -func NewFileWriter(name string, memWriter *MemoryWriter) (*FileWriter, error) { +func NewFileWriter(name string, size int) (*FileWriter, error) { f, err := os.OpenFile(filepath.Clean(name), os.O_CREATE|os.O_RDWR, 0600) if err != nil { return nil, err } return &FileWriter{ f: f, - memWriter: memWriter, - fileWriter: bufio.NewWriterSize(f, memWriter.buf.Len()), + fileWriter: bufio.NewWriterSize(f, size), name: name, + pos: 0, }, nil } func (fw *FileWriter) Pos() uint64 { - return fw.memWriter.Pos() + return fw.pos } func (fw *FileWriter) Write(bufs ...[]byte) error { - if err := fw.memWriter.Write(bufs...); err != nil { - return err - } for _, b := range bufs { - _, err := fw.fileWriter.Write(b) + n, err := fw.fileWriter.Write(b) + fw.pos += uint64(n) if err != nil { return err } + // For now the index file must not grow beyond 64GiB. Some of the fixed-sized + // offset references in v1 are only 4 bytes large. + // Once we move to compressed/varint representations in those areas, this limitation + // can be lifted. + if fw.pos > 16*math.MaxUint32 { + return errors.Errorf("%q exceeding max size of 64GiB", fw.name) + } } return nil } -func (fw *FileWriter) Buffer() []byte { - return fw.memWriter.Buffer() -} - func (fw *FileWriter) Flush() error { - if err := fw.memWriter.Flush(); err != nil { - return err - } - return fw.fileWriter.Flush() } func (fw *FileWriter) Close() error { - if err := fw.memWriter.Close(); err != nil { - return err - } if err := fw.Flush(); err != nil { return err } @@ -431,9 +422,6 @@ func (fw *FileWriter) Close() error { } func (fw *FileWriter) Sync() error { - if err := fw.memWriter.Sync(); err != nil { - return err - } return fw.f.Sync() } @@ -476,7 +464,11 @@ func (w *binaryWriter) Write(p []byte) (int, error) { } func (w *binaryWriter) Buffer() []byte { - return w.writer.Buffer() + pwb, ok := w.writer.(PosWriterWithBuffer) + if ok { + return pwb.Buffer() + } + return nil } func (w *binaryWriter) Close() error {