Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

index header: Remove memWriter from fileWriter #6509

Merged
merged 5 commits into from Jul 11, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Expand Up @@ -45,6 +45,7 @@ We use *breaking :warning:* to mark changes that are not backward compatible (re
- [#6325](https://github.com/thanos-io/thanos/pull/6325) Store: return gRPC resource exhausted error for byte limiter.
- [#6399](https://github.com/thanos-io/thanos/pull/6399) *: Fix double-counting bug in http_request_duration metric
- [#6428](https://github.com/thanos-io/thanos/pull/6428) Report gRPC connnection errors in the logs.
- [#6509](https://github.com/thanos-io/thanos/pull/6509) Store Gateway: Remove `memWriter` from `fileWriter` to reduce memory usage when sync index headers.

### Changed
- [#6049](https://github.com/thanos-io/thanos/pull/6049) Compact: *breaking :warning:* Replace group with resolution in compact metrics to avoid cardinality explosion on compact metrics for large numbers of groups.
Expand Down
72 changes: 32 additions & 40 deletions pkg/block/indexheader/binary_reader.go
Expand Up @@ -250,13 +250,7 @@ type binaryWriter struct {
}

func newBinaryWriter(id ulid.ULID, cacheFilename string, buf []byte) (w *binaryWriter, err error) {
var memoryWriter *MemoryWriter
memoryWriter, err = NewMemoryWriter(id, len(buf))
if err != nil {
return nil, err
}
var binWriter PosWriter = memoryWriter

var binWriter PosWriter
if cacheFilename != "" {
dir := filepath.Dir(cacheFilename)

Expand All @@ -277,16 +271,17 @@ func newBinaryWriter(id ulid.ULID, cacheFilename string, buf []byte) (w *binaryW
return nil, errors.Wrap(err, "remove any existing index at path")
}

// We use file writer for buffers not larger than reused one.
var fileWriter *FileWriter
fileWriter, err = NewFileWriter(cacheFilename, memoryWriter)
fileWriter, err = NewFileWriter(cacheFilename, len(buf))
if err != nil {
return nil, err
}
if err := df.Sync(); err != nil {
return nil, errors.Wrap(err, "sync dir")
}
binWriter = fileWriter
} else {
binWriter = NewMemoryWriter(id, len(buf))
}

w = &binaryWriter{
Expand All @@ -304,29 +299,31 @@ func newBinaryWriter(id ulid.ULID, cacheFilename string, buf []byte) (w *binaryW
return w, w.writer.Write(w.buf.Get())
}

type PosWriterWithBuffer interface {
PosWriter
Buffer() []byte
}

type PosWriter interface {
Pos() uint64
Write(bufs ...[]byte) error
Buffer() []byte
Flush() error
Sync() error
Close() error
}

type MemoryWriter struct {
id ulid.ULID
buf bytes.Buffer
buf *bytes.Buffer
pos uint64
}

// TODO(bwplotka): Added size to method, upstream this.
func NewMemoryWriter(id ulid.ULID, size int) (*MemoryWriter, error) {
var buf bytes.Buffer
func NewMemoryWriter(id ulid.ULID, size int) *MemoryWriter {
return &MemoryWriter{
id: id,
buf: buf,
buf: bytes.NewBuffer(make([]byte, 0, size)),
pos: 0,
}, nil
}
}

func (mw *MemoryWriter) Pos() uint64 {
Expand Down Expand Up @@ -369,58 +366,52 @@ func (mw *MemoryWriter) Close() error {

type FileWriter struct {
f *os.File
memWriter *MemoryWriter
fileWriter *bufio.Writer
name string
pos uint64
}

// TODO(bwplotka): Added size to method, upstream this.
func NewFileWriter(name string, memWriter *MemoryWriter) (*FileWriter, error) {
func NewFileWriter(name string, size int) (*FileWriter, error) {
f, err := os.OpenFile(filepath.Clean(name), os.O_CREATE|os.O_RDWR, 0600)
if err != nil {
return nil, err
}
return &FileWriter{
f: f,
memWriter: memWriter,
fileWriter: bufio.NewWriterSize(f, memWriter.buf.Len()),
fileWriter: bufio.NewWriterSize(f, size),
name: name,
pos: 0,
}, nil
}

func (fw *FileWriter) Pos() uint64 {
return fw.memWriter.Pos()
return fw.pos
}

func (fw *FileWriter) Write(bufs ...[]byte) error {
if err := fw.memWriter.Write(bufs...); err != nil {
return err
}
for _, b := range bufs {
_, err := fw.fileWriter.Write(b)
n, err := fw.fileWriter.Write(b)
yeya24 marked this conversation as resolved.
Show resolved Hide resolved
fw.pos += uint64(n)
if err != nil {
return err
}
// For now the index file must not grow beyond 64GiB. Some of the fixed-sized
// offset references in v1 are only 4 bytes large.
// Once we move to compressed/varint representations in those areas, this limitation
// can be lifted.
if fw.pos > 16*math.MaxUint32 {
return errors.Errorf("%q exceeding max size of 64GiB", fw.name)
}
}
return nil
}

func (fw *FileWriter) Buffer() []byte {
return fw.memWriter.Buffer()
}

func (fw *FileWriter) Flush() error {
if err := fw.memWriter.Flush(); err != nil {
return err
}

return fw.fileWriter.Flush()
}

func (fw *FileWriter) Close() error {
if err := fw.memWriter.Close(); err != nil {
return err
}
if err := fw.Flush(); err != nil {
return err
}
Expand All @@ -431,9 +422,6 @@ func (fw *FileWriter) Close() error {
}

func (fw *FileWriter) Sync() error {
if err := fw.memWriter.Sync(); err != nil {
return err
}
return fw.f.Sync()
}

Expand Down Expand Up @@ -476,7 +464,11 @@ func (w *binaryWriter) Write(p []byte) (int, error) {
}

func (w *binaryWriter) Buffer() []byte {
return w.writer.Buffer()
pwb, ok := w.writer.(PosWriterWithBuffer)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This looks a bit like a hack. Where is the Buffer method called? Maybe we can just use a PosWriterWithBuffer there instead.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is used here https://github.com/thanos-io/thanos/blob/main/pkg/block/indexheader/binary_reader.go#L130.

It will only be called if it is a MemWriter. If it is file writer mode it returns previously. I think the design is not the best though and needs more refactoring.

If we think this kind of hack is not ok, I can add the Buffer method back to FileWriter and just return nil

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it possible to write the bytes to both the file writer and the memory writer? So we would need some sort of a composite Writer that writes to multiple Writers.

This way the Buffer would also go away.

Copy link
Contributor Author

@yeya24 yeya24 Jul 10, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it possible to write the bytes to both the file writer and the memory writer?

It is the current implementation and I don't think it is correct. We don't have to write memory writer if we have the file writer IIUC. It is just holding the extra memory for nothing. (We are reading from the file anyway)

In the current implementation, file writer has a memory writer but the memory writer does nothing, only providing the Pos() method because memory writer keeps track the pos, but we can do the same in the file writer itself.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hm then why do we need the buffer? Does it hold the meta file temporarily?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The buffer is used by memory writer only because it holds the index header all in memory.
It is not used in file writer at all. But the current interface PosWriter has buffer method so both file writer and memory writer implements it.

This pr basically removes that buffer from file writer. But I don't have a good way to refactor the PosWriter interface to remove buffer method so I added PosWriterWithBuffer.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

https://github.com/thanos-io/thanos/blob/main/pkg/block/indexheader/binary_reader.go#L547 As you can see, the first returned value is the buffer but it is ignored if it is file mode. Otherwise, at line 554 the returned buffer will be used.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@fpetkovski Does it make sense overall? TLDR is that buffer is not used at all for file writer.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this is fine for now, it's more important to remove the memory footprint than to address the design.

if ok {
return pwb.Buffer()
}
return nil
}

func (w *binaryWriter) Close() error {
Expand Down