Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[not-fixup] - Reduce memory consumption for Buffered File Writer #2377

Merged
merged 30 commits into from
Feb 6, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
30 commits
Select commit Hold shift + click to select a range
5e5a57a
correctly use the buffered file writer
ahrav Feb 2, 2024
3b21ea3
use value from source
ahrav Feb 3, 2024
9aaf0c8
reorder fields
ahrav Feb 3, 2024
f7f890a
Merge branch 'main' into fixup-use-git-parser
ahrav Feb 3, 2024
1edc0a3
use only the DetectorKey as a map field
ahrav Feb 3, 2024
d530798
correctly use the buffered file writer
ahrav Feb 2, 2024
ea8bc7f
use value from source
ahrav Feb 3, 2024
3bc4482
reorder fields
ahrav Feb 3, 2024
510ce03
add tests and update
ahrav Feb 3, 2024
f6f1f44
Fix issue with buffer slices growing
ahrav Feb 4, 2024
7659980
fix test
ahrav Feb 4, 2024
b0dbbbf
Merge branch 'bug-unhashable-key' into fixup-use-git-parser
ahrav Feb 4, 2024
96ef60a
Merge branch 'fixup-use-git-parser' into optimize-buffered-file-writer
ahrav Feb 4, 2024
a07c39b
fix
ahrav Feb 4, 2024
e41cced
add singleton
ahrav Feb 4, 2024
8e3d56c
use shared pool
ahrav Feb 4, 2024
3e67392
optimize
ahrav Feb 5, 2024
d8e5589
rename and cleanup
ahrav Feb 5, 2024
3894900
use correct calculation to grow buffer
ahrav Feb 5, 2024
999d662
only grow if needed
ahrav Feb 5, 2024
54c8a65
merge
ahrav Feb 5, 2024
57a2305
merge main
ahrav Feb 5, 2024
faa423b
address comments
ahrav Feb 5, 2024
c8b4b02
remove unused
ahrav Feb 5, 2024
8190669
remove
ahrav Feb 5, 2024
c1cf67c
rip out Grow
ahrav Feb 5, 2024
2e74a4c
address coment
ahrav Feb 6, 2024
a2d23ef
use 2k default buffer
ahrav Feb 6, 2024
10532d5
Merge branch 'main' into optimize-buffered-file-writer
ahrav Feb 6, 2024
f1bd25d
update comment allow large buffers to be garbage collected
ahrav Feb 6, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion pkg/gitparse/gitparse.go
Original file line number Diff line number Diff line change
Expand Up @@ -449,7 +449,6 @@ func (c *Parser) FromReader(ctx context.Context, stdOut io.Reader, commitChan ch
}
// Create a new currentDiff and currentCommit
currentDiff = diff()
// currentDiff = NewDiff(withCustomContentWriter(c.contentWriter()))
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Small cleanup here.

currentCommit = &Commit{Message: strings.Builder{}}
// Check that the commit line contains a hash and set it.
if len(line) >= 47 {
Expand Down
130 changes: 95 additions & 35 deletions pkg/writers/buffered_file_writer/bufferedfilewriter.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,12 +13,59 @@ import (
"github.com/trufflesecurity/trufflehog/v3/pkg/context"
)

// bufferPool is used to store buffers for reuse.
var bufferPool = sync.Pool{
// TODO: Consider growing the buffer before returning it if we can find an optimal size.
// Ideally the size would cover the majority of cases without being too large.
// This would avoid the need to grow the buffer when writing to it, reducing allocations.
New: func() any { return new(bytes.Buffer) },
type bufPoolOpt func(pool *bufferPool)

type bufferPool struct {
bufferSize uint32
*sync.Pool
}

const defaultBufferSize = 2 << 10 // 2KB
func newBufferPool(opts ...bufPoolOpt) *bufferPool {
pool := &bufferPool{bufferSize: defaultBufferSize}

for _, opt := range opts {
opt(pool)
}
pool.Pool = &sync.Pool{
New: func() any {
buf := new(bytes.Buffer)
buf.Grow(int(pool.bufferSize))
return buf
},
}

return pool
}

// sharedBufferPool is the shared buffer pool used by all BufferedFileWriters.
// This allows for efficient reuse of buffers across multiple writers.
var sharedBufferPool *bufferPool

func init() { sharedBufferPool = newBufferPool() }

func (bp *bufferPool) get(ctx context.Context) *bytes.Buffer {
buf, ok := bp.Pool.Get().(*bytes.Buffer)
if !ok {
ctx.Logger().Error(fmt.Errorf("buffer pool returned unexpected type"), "using new buffer")
buf = bytes.NewBuffer(make([]byte, 0, bp.bufferSize))
}

return buf
}

func (bp *bufferPool) put(buf *bytes.Buffer) {
// If the buffer is more than twice the default size, release it for garbage collection.
// This prevents us from returning very large buffers to the pool.
const maxAllowedCapacity = 2 * defaultBufferSize
if buf.Cap() > maxAllowedCapacity {
buf = nil // Release the large buffer for garbage collection.
} else {
// Reset the buffer to clear any existing data.
buf.Reset()
}

bp.Put(buf)
}

// state represents the current mode of BufferedFileWriter.
Expand All @@ -39,7 +86,8 @@ type BufferedFileWriter struct {

state state // Current state of the writer. (writeOnly or readOnly)

buf bytes.Buffer // Buffer for storing data under the threshold in memory.
bufPool *bufferPool // Pool for storing buffers for reuse.
buf *bytes.Buffer // Buffer for storing data under the threshold in memory.
filename string // Name of the temporary file.
file io.WriteCloser // File for storing data over the threshold.
}
Expand All @@ -55,7 +103,11 @@ func WithThreshold(threshold uint64) Option {
// New creates a new BufferedFileWriter with the given options.
func New(opts ...Option) *BufferedFileWriter {
const defaultThreshold = 10 * 1024 * 1024 // 10MB
w := &BufferedFileWriter{threshold: defaultThreshold, state: writeOnly}
w := &BufferedFileWriter{
threshold: defaultThreshold,
state: writeOnly,
bufPool: sharedBufferPool,
}
for _, opt := range opts {
opt(w)
}
Expand All @@ -78,17 +130,16 @@ func (w *BufferedFileWriter) String() (string, error) {
}
defer file.Close()

// Create a buffer large enough to hold file data and additional buffer data, if any.
fileSize := w.size
buf := bytes.NewBuffer(make([]byte, 0, fileSize))

var buf bytes.Buffer
// Read the file contents into the buffer.
if _, err := io.Copy(buf, file); err != nil {
if _, err := io.CopyBuffer(&buf, file, nil); err != nil {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What's the effect of this change?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should be identical exepct we use CopyBuffer elsewhere in the codebase so I was trying to be consistent.

// CopyBuffer is identical to Copy except that it stages through the
// provided buffer (if one is required) rather than allocating a
// temporary on

return "", fmt.Errorf("failed to read file contents: %w", err)
}

// Append buffer data, if any, to the end of the file contents.
buf.Write(w.buf.Bytes())
if _, err := w.buf.WriteTo(&buf); err != nil {
return "", err
}

return buf.String(), nil
}
Expand All @@ -100,33 +151,44 @@ func (w *BufferedFileWriter) Write(ctx context.Context, data []byte) (int, error
}

size := uint64(len(data))

if w.buf == nil || w.buf.Len() == 0 {
w.buf = w.bufPool.get(ctx)
}

bufferLength := w.buf.Len()

defer func() {
w.size += size
ctx.Logger().V(4).Info(
"write complete",
"data_size", size,
"content_size", w.buf.Len(),
"content_size", bufferLength,
"total_size", w.size,
)
}()

if w.buf.Len() == 0 {
bufPtr, ok := bufferPool.Get().(*bytes.Buffer)
if !ok {
ctx.Logger().Error(fmt.Errorf("buffer pool returned unexpected type"), "using new buffer")
bufPtr = new(bytes.Buffer)
}
bufPtr.Reset() // Reset the buffer to clear any existing data
w.buf = *bufPtr
}

if uint64(w.buf.Len())+size <= w.threshold {
totalSizeNeeded := uint64(bufferLength) + uint64(len(data))
if totalSizeNeeded <= w.threshold {
// If the total size is within the threshold, write to the buffer.
ctx.Logger().V(4).Info(
"writing to buffer",
"data_size", size,
"content_size", w.buf.Len(),
"content_size", bufferLength,
)

availableSpace := w.buf.Cap() - bufferLength
growSize := int(totalSizeNeeded) - bufferLength
Comment on lines +180 to +181
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

totalSizeNeeded is bufferLength + len(data), so is growSize equivalent to len(data)?

That seems wrong, or I'm misunderstanding what growSize means.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, the growSize is the length of the current buffer + the amount of data needed to be written to the buffer. It's the size the buffer needs to be to handle all the existing data and any new data. If you call Grow with a size that is less than less than the remainder of the buffer nothing happens. This is why you can't do something linke totalSizeNeeded - w.buf.Cap() you need to use the length for it to grow correctly.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I had to read this part like four times as well. I was going to complain until I noticed that that the old code was equally confusing so I decided to just blame the buffer interface.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yea I am ashamed to say I spent WAY too long trying to fully grok the buffer interface... there are definitely some gotchas that took me way way too long to comprehend. 😞

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Don't be ashamed! Intuitive API design is difficult and sometimes impossible. Some stuff is just hard to understand.

if growSize > availableSpace {
ctx.Logger().V(4).Info(
"buffer size exceeded, growing buffer",
"current_size", bufferLength,
"new_size", totalSizeNeeded,
"available_space", availableSpace,
"grow_size", growSize,
)
}

return w.buf.Write(data)
}

Expand All @@ -143,14 +205,12 @@ func (w *BufferedFileWriter) Write(ctx context.Context, data []byte) (int, error

// Transfer existing data in buffer to the file, then clear the buffer.
ahrav marked this conversation as resolved.
Show resolved Hide resolved
// This ensures all the data is in one place - either entirely in the buffer or the file.
if w.buf.Len() > 0 {
ctx.Logger().V(4).Info("writing buffer to file", "content_size", w.buf.Len())
if _, err := w.file.Write(w.buf.Bytes()); err != nil {
if bufferLength > 0 {
ctx.Logger().V(4).Info("writing buffer to file", "content_size", bufferLength)
if _, err := w.buf.WriteTo(w.file); err != nil {
return 0, err
}
// Reset the buffer to clear any existing data and return it to the pool.
w.buf.Reset()
bufferPool.Put(&w.buf)
w.bufPool.put(w.buf)
}
}
ctx.Logger().V(4).Info("writing to file", "data_size", size)
Expand All @@ -167,7 +227,7 @@ func (w *BufferedFileWriter) CloseForWriting() error {
}

if w.buf.Len() > 0 {
_, err := w.file.Write(w.buf.Bytes())
_, err := w.buf.WriteTo(w.file)
rosecodym marked this conversation as resolved.
Show resolved Hide resolved
if err != nil {
return err
}
Expand Down Expand Up @@ -199,7 +259,7 @@ func (w *BufferedFileWriter) ReadCloser() (io.ReadCloser, error) {
// Data is in memory.
return &bufferReadCloser{
Reader: bytes.NewReader(w.buf.Bytes()),
onClose: func() { bufferPool.Put(&w.buf) },
onClose: func() { w.bufPool.put(w.buf) },
}, nil
}

Expand Down
Loading
Loading