From 9bc4e7156a688ba989b22f59b0f405a2b7a47251 Mon Sep 17 00:00:00 2001 From: Ben Ye Date: Thu, 6 Jul 2023 12:26:09 -0700 Subject: [PATCH 1/5] index header: remove memWriter from fileWriter Signed-off-by: Ben Ye --- pkg/block/indexheader/binary_reader.go | 58 +++++++++++--------------- 1 file changed, 24 insertions(+), 34 deletions(-) diff --git a/pkg/block/indexheader/binary_reader.go b/pkg/block/indexheader/binary_reader.go index 0c7d062c9c..1f73bfba84 100644 --- a/pkg/block/indexheader/binary_reader.go +++ b/pkg/block/indexheader/binary_reader.go @@ -250,13 +250,7 @@ type binaryWriter struct { } func newBinaryWriter(id ulid.ULID, cacheFilename string, buf []byte) (w *binaryWriter, err error) { - var memoryWriter *MemoryWriter - memoryWriter, err = NewMemoryWriter(id, len(buf)) - if err != nil { - return nil, err - } - var binWriter PosWriter = memoryWriter - + var binWriter PosWriter if cacheFilename != "" { dir := filepath.Dir(cacheFilename) @@ -279,7 +273,7 @@ func newBinaryWriter(id ulid.ULID, cacheFilename string, buf []byte) (w *binaryW // We use file writer for buffers not larger than reused one. var fileWriter *FileWriter - fileWriter, err = NewFileWriter(cacheFilename, memoryWriter) + fileWriter, err = NewFileWriter(cacheFilename, len(buf)) if err != nil { return nil, err } @@ -287,6 +281,8 @@ func newBinaryWriter(id ulid.ULID, cacheFilename string, buf []byte) (w *binaryW return nil, errors.Wrap(err, "sync dir") } binWriter = fileWriter + } else { + binWriter = NewMemoryWriter(id, len(buf)) } w = &binaryWriter{ @@ -315,18 +311,16 @@ type PosWriter interface { type MemoryWriter struct { id ulid.ULID - buf bytes.Buffer + buf *bytes.Buffer pos uint64 } -// TODO(bwplotka): Added size to method, upstream this. -func NewMemoryWriter(id ulid.ULID, size int) (*MemoryWriter, error) { - var buf bytes.Buffer +func NewMemoryWriter(id ulid.ULID, size int) *MemoryWriter { return &MemoryWriter{ id: id, - buf: buf, + buf: bytes.NewBuffer(make([]byte, size)), pos: 0, - }, nil + } } func (mw *MemoryWriter) Pos() uint64 { @@ -369,58 +363,57 @@ func (mw *MemoryWriter) Close() error { type FileWriter struct { f *os.File - memWriter *MemoryWriter fileWriter *bufio.Writer name string + pos uint64 } // TODO(bwplotka): Added size to method, upstream this. -func NewFileWriter(name string, memWriter *MemoryWriter) (*FileWriter, error) { +func NewFileWriter(name string, size int) (*FileWriter, error) { f, err := os.OpenFile(filepath.Clean(name), os.O_CREATE|os.O_RDWR, 0600) if err != nil { return nil, err } return &FileWriter{ f: f, - memWriter: memWriter, - fileWriter: bufio.NewWriterSize(f, memWriter.buf.Len()), + fileWriter: bufio.NewWriterSize(f, size), name: name, + pos: 0, }, nil } func (fw *FileWriter) Pos() uint64 { - return fw.memWriter.Pos() + return fw.pos } func (fw *FileWriter) Write(bufs ...[]byte) error { - if err := fw.memWriter.Write(bufs...); err != nil { - return err - } for _, b := range bufs { - _, err := fw.fileWriter.Write(b) + n, err := fw.fileWriter.Write(b) + fw.pos += uint64(n) if err != nil { return err } + // For now the index file must not grow beyond 64GiB. Some of the fixed-sized + // offset references in v1 are only 4 bytes large. + // Once we move to compressed/varint representations in those areas, this limitation + // can be lifted. + if fw.pos > 16*math.MaxUint32 { + return errors.Errorf("%q exceeding max size of 64GiB", fw.name) + } } return nil } +// Buffer is not used at all for FileWriter. func (fw *FileWriter) Buffer() []byte { - return fw.memWriter.Buffer() + return nil } func (fw *FileWriter) Flush() error { - if err := fw.memWriter.Flush(); err != nil { - return err - } - return fw.fileWriter.Flush() } func (fw *FileWriter) Close() error { - if err := fw.memWriter.Close(); err != nil { - return err - } if err := fw.Flush(); err != nil { return err } @@ -431,9 +424,6 @@ func (fw *FileWriter) Close() error { } func (fw *FileWriter) Sync() error { - if err := fw.memWriter.Sync(); err != nil { - return err - } return fw.f.Sync() } From 6486b6ce3896e13af2525cb7a7e6d239142bb810 Mon Sep 17 00:00:00 2001 From: Ben Ye Date: Thu, 6 Jul 2023 12:36:45 -0700 Subject: [PATCH 2/5] update changelog Signed-off-by: Ben Ye --- CHANGELOG.md | 1 + 1 file changed, 1 insertion(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index e3a45cee6a..0da5c25daf 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -45,6 +45,7 @@ We use *breaking :warning:* to mark changes that are not backward compatible (re - [#6325](https://github.com/thanos-io/thanos/pull/6325) Store: return gRPC resource exhausted error for byte limiter. - [#6399](https://github.com/thanos-io/thanos/pull/6399) *: Fix double-counting bug in http_request_duration metric - [#6428](https://github.com/thanos-io/thanos/pull/6428) Report gRPC connnection errors in the logs. +- [#6509](https://github.com/thanos-io/thanos/pull/6509) Store Gateway: Remove `memWriter` from `fileWriter` to reduce memory usage when sync index headers. ### Changed - [#6049](https://github.com/thanos-io/thanos/pull/6049) Compact: *breaking :warning:* Replace group with resolution in compact metrics to avoid cardinality explosion on compact metrics for large numbers of groups. From a10991b0cdb935d7f5958103032e73fd80296d87 Mon Sep 17 00:00:00 2001 From: Ben Ye Date: Thu, 6 Jul 2023 14:53:24 -0700 Subject: [PATCH 3/5] refactor Signed-off-by: Ben Ye --- pkg/block/indexheader/binary_reader.go | 17 ++++++++++------- 1 file changed, 10 insertions(+), 7 deletions(-) diff --git a/pkg/block/indexheader/binary_reader.go b/pkg/block/indexheader/binary_reader.go index 1f73bfba84..737d50aec8 100644 --- a/pkg/block/indexheader/binary_reader.go +++ b/pkg/block/indexheader/binary_reader.go @@ -300,10 +300,14 @@ func newBinaryWriter(id ulid.ULID, cacheFilename string, buf []byte) (w *binaryW return w, w.writer.Write(w.buf.Get()) } +type PosWriterWithBuffer interface { + PosWriter + Buffer() []byte +} + type PosWriter interface { Pos() uint64 Write(bufs ...[]byte) error - Buffer() []byte Flush() error Sync() error Close() error @@ -404,11 +408,6 @@ func (fw *FileWriter) Write(bufs ...[]byte) error { return nil } -// Buffer is not used at all for FileWriter. -func (fw *FileWriter) Buffer() []byte { - return nil -} - func (fw *FileWriter) Flush() error { return fw.fileWriter.Flush() } @@ -466,7 +465,11 @@ func (w *binaryWriter) Write(p []byte) (int, error) { } func (w *binaryWriter) Buffer() []byte { - return w.writer.Buffer() + pwb, ok := w.writer.(PosWriterWithBuffer) + if ok { + return pwb.Buffer() + } + return nil } func (w *binaryWriter) Close() error { From 4c3266b646ce7f9cd4e8bf81d4cc8f15d0e05b23 Mon Sep 17 00:00:00 2001 From: Ben Ye Date: Thu, 6 Jul 2023 18:21:47 -0700 Subject: [PATCH 4/5] fix test Signed-off-by: Ben Ye --- pkg/block/indexheader/binary_reader.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/block/indexheader/binary_reader.go b/pkg/block/indexheader/binary_reader.go index 737d50aec8..02e46005bc 100644 --- a/pkg/block/indexheader/binary_reader.go +++ b/pkg/block/indexheader/binary_reader.go @@ -322,7 +322,7 @@ type MemoryWriter struct { func NewMemoryWriter(id ulid.ULID, size int) *MemoryWriter { return &MemoryWriter{ id: id, - buf: bytes.NewBuffer(make([]byte, size)), + buf: bytes.NewBuffer(make([]byte, 0, size)), pos: 0, } } From 15f898cd0801f90f8532a7d78a09418cbef01735 Mon Sep 17 00:00:00 2001 From: Ben Ye Date: Mon, 10 Jul 2023 09:34:21 -0700 Subject: [PATCH 5/5] update comment Signed-off-by: Ben Ye --- pkg/block/indexheader/binary_reader.go | 1 - 1 file changed, 1 deletion(-) diff --git a/pkg/block/indexheader/binary_reader.go b/pkg/block/indexheader/binary_reader.go index 02e46005bc..1befe63a7f 100644 --- a/pkg/block/indexheader/binary_reader.go +++ b/pkg/block/indexheader/binary_reader.go @@ -271,7 +271,6 @@ func newBinaryWriter(id ulid.ULID, cacheFilename string, buf []byte) (w *binaryW return nil, errors.Wrap(err, "remove any existing index at path") } - // We use file writer for buffers not larger than reused one. var fileWriter *FileWriter fileWriter, err = NewFileWriter(cacheFilename, len(buf)) if err != nil {