From 5e5a57a2f766308891e94effbe04d7ac8b87e2bd Mon Sep 17 00:00:00 2001 From: Ahrav Dutta Date: Fri, 2 Feb 2024 15:15:28 -0800 Subject: [PATCH 01/24] correctly use the buffered file writer --- pkg/gitparse/gitparse.go | 48 +++++++++++++++++++++++++--------------- pkg/sources/git/git.go | 7 +++--- 2 files changed, 33 insertions(+), 22 deletions(-) diff --git a/pkg/gitparse/gitparse.go b/pkg/gitparse/gitparse.go index 14792efb08c1..c90ae8804018 100644 --- a/pkg/gitparse/gitparse.go +++ b/pkg/gitparse/gitparse.go @@ -16,6 +16,7 @@ import ( "github.com/trufflesecurity/trufflehog/v3/pkg/common" "github.com/trufflesecurity/trufflehog/v3/pkg/context" + bufferedfilewriter "github.com/trufflesecurity/trufflehog/v3/pkg/writers/buffered_file_writer" ) const ( @@ -100,10 +101,11 @@ func (b *buffer) String() (string, error) { return b.Buffer.String(), nil } // The use of contentWriter enables the management of diff data either in memory or on disk, // based on its size, optimizing resource usage and performance. type Diff struct { - PathB string - LineStart int + PathB string + LineStart int + IsBinary bool + contentWriter contentWriter - IsBinary bool } type diffOption func(*Diff) @@ -111,10 +113,14 @@ type diffOption func(*Diff) // withPathB sets the PathB option. func withPathB(pathB string) diffOption { return func(d *Diff) { d.PathB = pathB } } +// withCustomContentWriter sets the useCustomContentWriter option. +func withCustomContentWriter(cr contentWriter) diffOption { + return func(d *Diff) { d.contentWriter = cr } +} + // NewDiff creates a new Diff with a threshold. func NewDiff(opts ...diffOption) *Diff { diff := new(Diff) - diff.contentWriter = newBuffer() for _, opt := range opts { opt(diff) } @@ -200,10 +206,10 @@ func (c1 *Commit) Equal(ctx context.Context, c2 *Commit) bool { // Parser sets values used in GitParse. type Parser struct { - maxDiffSize int - maxCommitSize int - dateFormat string - contentWriter contentWriter + maxDiffSize int + maxCommitSize int + dateFormat string + useCustomContentWriter bool } type ParseState int @@ -250,11 +256,9 @@ func (state ParseState) String() string { }[state] } -// WithContentWriter sets the ContentWriter for the Parser. -func WithContentWriter(writer contentWriter) Option { - return func(parser *Parser) { - parser.contentWriter = writer - } +// UseCustomContentWriter sets useCustomContentWriter option. +func UseCustomContentWriter() Option { + return func(parser *Parser) { parser.useCustomContentWriter = true } } // WithMaxDiffSize sets maxDiffSize option. Diffs larger than maxDiffSize will @@ -283,7 +287,6 @@ func NewParser(options ...Option) *Parser { dateFormat: defaultDateFormat, maxDiffSize: defaultMaxDiffSize, maxCommitSize: defaultMaxCommitSize, - contentWriter: newBuffer(), } for _, option := range options { option(parser) @@ -387,7 +390,9 @@ func (c *Parser) FromReader(ctx context.Context, stdOut io.Reader, commitChan ch totalLogSize int ) var latestState = Initial - currentDiff := NewDiff() + + writer := c.contentWriter() + currentDiff := NewDiff(withCustomContentWriter(writer)) defer common.RecoverWithExit(ctx) defer close(commitChan) @@ -430,7 +435,7 @@ func (c *Parser) FromReader(ctx context.Context, stdOut io.Reader, commitChan ch totalLogSize += currentCommit.Size } // Create a new currentDiff and currentCommit - currentDiff = NewDiff() + currentDiff = NewDiff(withCustomContentWriter(c.contentWriter())) currentCommit = &Commit{Message: strings.Builder{}} // Check that the commit line contains a hash and set it. if len(line) >= 47 { @@ -498,7 +503,7 @@ func (c *Parser) FromReader(ctx context.Context, stdOut io.Reader, commitChan ch currentCommit.Message.WriteString(oldCommit.Message.String()) } } - currentDiff = NewDiff() + currentDiff = NewDiff(withCustomContentWriter(c.contentWriter())) case isModeLine(isStaged, latestState, line): latestState = ModeLine // NoOp @@ -538,7 +543,7 @@ func (c *Parser) FromReader(ctx context.Context, stdOut io.Reader, commitChan ch } currentCommit.Diffs = append(currentCommit.Diffs, *currentDiff) } - currentDiff = NewDiff(withPathB(currentDiff.PathB)) + currentDiff = NewDiff(withCustomContentWriter(c.contentWriter()), withPathB(currentDiff.PathB)) words := bytes.Split(line, []byte(" ")) if len(words) >= 3 { @@ -606,6 +611,13 @@ func (c *Parser) FromReader(ctx context.Context, stdOut io.Reader, commitChan ch ctx.Logger().V(2).Info("finished parsing git log.", "total_log_size", totalLogSize) } +func (c *Parser) contentWriter() contentWriter { + if c.useCustomContentWriter { + return bufferedfilewriter.New() + } + return newBuffer() +} + func isMergeLine(isStaged bool, latestState ParseState, line []byte) bool { if isStaged || latestState != CommitLine { return false diff --git a/pkg/sources/git/git.go b/pkg/sources/git/git.go index 9843b860cf6c..75a4849160c3 100644 --- a/pkg/sources/git/git.go +++ b/pkg/sources/git/git.go @@ -34,7 +34,6 @@ import ( "github.com/trufflesecurity/trufflehog/v3/pkg/pb/sourcespb" "github.com/trufflesecurity/trufflehog/v3/pkg/sanitizer" "github.com/trufflesecurity/trufflehog/v3/pkg/sources" - bufferedfilewriter "github.com/trufflesecurity/trufflehog/v3/pkg/writers/buffered_file_writer" ) const SourceType = sourcespb.SourceType_SOURCE_TYPE_GIT @@ -99,7 +98,7 @@ type Config struct { func NewGit(config *Config) *Git { var parser *gitparse.Parser if config.UseCustomContentWriter { - parser = gitparse.NewParser(gitparse.WithContentWriter(bufferedfilewriter.New())) + parser = gitparse.NewParser(gitparse.UseCustomContentWriter()) } else { parser = gitparse.NewParser() } @@ -224,7 +223,7 @@ func (s *Source) Init(aCtx context.Context, name string, jobId sources.JobID, so }, } }, - UseCustomContentWriter: s.useCustomContentWriter, + UseCustomContentWriter: true, } s.git = NewGit(cfg) return nil @@ -522,7 +521,7 @@ func (s *Git) ScanCommits(ctx context.Context, repo *git.Repository, path string repoCtx = context.WithValue(ctx, "repo", path) } - commitChan, err := gitparse.NewParser().RepoPath(repoCtx, path, scanOptions.HeadHash, scanOptions.BaseHash == "", scanOptions.ExcludeGlobs, scanOptions.Bare) + commitChan, err := s.parser.RepoPath(repoCtx, path, scanOptions.HeadHash, scanOptions.BaseHash == "", scanOptions.ExcludeGlobs, scanOptions.Bare) if err != nil { return err } From 3b21ea304b1d8f427875b0472466b2ea12192a96 Mon Sep 17 00:00:00 2001 From: Ahrav Dutta Date: Sat, 3 Feb 2024 11:04:10 -0800 Subject: [PATCH 02/24] use value from source --- pkg/sources/git/git.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/sources/git/git.go b/pkg/sources/git/git.go index 75a4849160c3..7a7de1ddb168 100644 --- a/pkg/sources/git/git.go +++ b/pkg/sources/git/git.go @@ -223,7 +223,7 @@ func (s *Source) Init(aCtx context.Context, name string, jobId sources.JobID, so }, } }, - UseCustomContentWriter: true, + UseCustomContentWriter: s.useCustomContentWriter, } s.git = NewGit(cfg) return nil From 9aaf0c86b71b1fd03af39925ccf13df3d25ab863 Mon Sep 17 00:00:00 2001 From: Ahrav Dutta Date: Sat, 3 Feb 2024 11:40:50 -0800 Subject: [PATCH 03/24] reorder fields --- pkg/gitparse/gitparse.go | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/pkg/gitparse/gitparse.go b/pkg/gitparse/gitparse.go index c90ae8804018..36b925a77ac5 100644 --- a/pkg/gitparse/gitparse.go +++ b/pkg/gitparse/gitparse.go @@ -206,9 +206,10 @@ func (c1 *Commit) Equal(ctx context.Context, c2 *Commit) bool { // Parser sets values used in GitParse. type Parser struct { - maxDiffSize int - maxCommitSize int - dateFormat string + maxDiffSize int + maxCommitSize int + dateFormat string + useCustomContentWriter bool } From 1edc0a3b2e623a8fe646ec4fbb6147308b2d84bc Mon Sep 17 00:00:00 2001 From: Ahrav Dutta Date: Sat, 3 Feb 2024 14:25:51 -0800 Subject: [PATCH 04/24] use only the DetectorKey as a map field --- pkg/engine/ahocorasick/ahocorasickcore.go | 3 +++ pkg/engine/engine.go | 8 ++++---- pkg/engine/engine_test.go | 24 +++++++++++------------ 3 files changed, 19 insertions(+), 16 deletions(-) diff --git a/pkg/engine/ahocorasick/ahocorasickcore.go b/pkg/engine/ahocorasick/ahocorasickcore.go index b083715507db..e5eb3f60df59 100644 --- a/pkg/engine/ahocorasick/ahocorasickcore.go +++ b/pkg/engine/ahocorasick/ahocorasickcore.go @@ -23,6 +23,9 @@ type DetectorKey struct { customDetectorName string } +// Type returns the detector type of the key. +func (k DetectorKey) Type() detectorspb.DetectorType { return k.detectorType } + // AhoCorasickCore encapsulates the operations and data structures used for keyword matching via the // Aho-Corasick algorithm. It is responsible for constructing and managing the trie for efficient // substring searches, as well as mapping keywords to their associated detectors for rapid lookups. diff --git a/pkg/engine/engine.go b/pkg/engine/engine.go index eda8f3e4c0a4..3d822e4bb2e2 100644 --- a/pkg/engine/engine.go +++ b/pkg/engine/engine.go @@ -598,8 +598,8 @@ func (e *Engine) detectorWorker(ctx context.Context) { // by the same detector in the chunk. Exact matches on lookup indicate a duplicate secret for a detector // in that chunk - which is expected and not malicious. Those intra-detector dupes are still verified. type chunkSecretKey struct { - secret string - detectorInfo ahocorasick.DetectorInfo + secret string + detectorKey ahocorasick.DetectorKey } func likelyDuplicate(ctx context.Context, val chunkSecretKey, dupes map[chunkSecretKey]struct{}) bool { @@ -615,7 +615,7 @@ func likelyDuplicate(ctx context.Context, val chunkSecretKey, dupes map[chunkSec // If the detector type is the same, we don't need to compare the strings. // These are not duplicates, and should be verified. - if val.detectorInfo.Type() == dupeKey.detectorInfo.Type() { + if val.detectorKey.Type() == dupeKey.detectorKey.Type() { continue } @@ -674,7 +674,7 @@ func (e *Engine) verificationOverlapWorker(ctx context.Context) { // Ex: // - postman api key: PMAK-qnwfsLyRSyfCwfpHaQP1UzDhrgpWvHjbYzjpRCMshjt417zWcrzyHUArs7r // - malicious detector "api key": qnwfsLyRSyfCwfpHaQP1UzDhrgpWvHjbYzjpRCMshjt417zWcrzyHUArs7r - key := chunkSecretKey{secret: string(val), detectorInfo: detector} + key := chunkSecretKey{secret: string(val), detectorKey: detector.Key} if _, ok := chunkSecrets[key]; ok { continue } diff --git a/pkg/engine/engine_test.go b/pkg/engine/engine_test.go index ea4b5807b4f5..39120d18c1cb 100644 --- a/pkg/engine/engine_test.go +++ b/pkg/engine/engine_test.go @@ -562,47 +562,47 @@ func TestLikelyDuplicate(t *testing.T) { }{ { name: "exact duplicate different detector", - val: chunkSecretKey{"PMAK-qnwfsLyRSyfCwfpHaQP1UzDhrgpWvHjbYzjpRCMshjt417zWcrzyHUArs7r", detectorA}, + val: chunkSecretKey{"PMAK-qnwfsLyRSyfCwfpHaQP1UzDhrgpWvHjbYzjpRCMshjt417zWcrzyHUArs7r", detectorA.Key}, dupes: map[chunkSecretKey]struct{}{ - {"PMAK-qnwfsLyRSyfCwfpHaQP1UzDhrgpWvHjbYzjpRCMshjt417zWcrzyHUArs7r", detectorB}: {}, + {"PMAK-qnwfsLyRSyfCwfpHaQP1UzDhrgpWvHjbYzjpRCMshjt417zWcrzyHUArs7r", detectorB.Key}: {}, }, expected: true, }, { name: "non-duplicate length outside range", - val: chunkSecretKey{"short", detectorA}, + val: chunkSecretKey{"short", detectorA.Key}, dupes: map[chunkSecretKey]struct{}{ - {"muchlongerthanthevalstring", detectorB}: {}, + {"muchlongerthanthevalstring", detectorB.Key}: {}, }, expected: false, }, { name: "similar within threshold", - val: chunkSecretKey{"PMAK-qnwfsLyRSyfCwfpHaQP1UzDhrgpWvHjbYzjpRCMshjt417zWcrzyHUArs7r", detectorA}, + val: chunkSecretKey{"PMAK-qnwfsLyRSyfCwfpHaQP1UzDhrgpWvHjbYzjpRCMshjt417zWcrzyHUArs7r", detectorA.Key}, dupes: map[chunkSecretKey]struct{}{ - {"qnwfsLyRSyfCwfpHaQP1UzDhrgpWvHjbYzjpRCMshjt417zWcrzyHUArs7r", detectorB}: {}, + {"qnwfsLyRSyfCwfpHaQP1UzDhrgpWvHjbYzjpRCMshjt417zWcrzyHUArs7r", detectorB.Key}: {}, }, expected: true, }, { name: "similar outside threshold", - val: chunkSecretKey{"anotherkey", detectorA}, + val: chunkSecretKey{"anotherkey", detectorA.Key}, dupes: map[chunkSecretKey]struct{}{ - {"completelydifferent", detectorB}: {}, + {"completelydifferent", detectorB.Key}: {}, }, expected: false, }, { name: "empty strings", - val: chunkSecretKey{"", detectorA}, - dupes: map[chunkSecretKey]struct{}{{"", detectorB}: {}}, + val: chunkSecretKey{"", detectorA.Key}, + dupes: map[chunkSecretKey]struct{}{{"", detectorB.Key}: {}}, expected: true, }, { name: "similar within threshold same detector", - val: chunkSecretKey{"PMAK-qnwfsLyRSyfCwfpHaQP1UzDhrgpWvHjbYzjpRCMshjt417zWcrzyHUArs7r", detectorA}, + val: chunkSecretKey{"PMAK-qnwfsLyRSyfCwfpHaQP1UzDhrgpWvHjbYzjpRCMshjt417zWcrzyHUArs7r", detectorA.Key}, dupes: map[chunkSecretKey]struct{}{ - {"qnwfsLyRSyfCwfpHaQP1UzDhrgpWvHjbYzjpRCMshjt417zWcrzyHUArs7r", detectorA}: {}, + {"qnwfsLyRSyfCwfpHaQP1UzDhrgpWvHjbYzjpRCMshjt417zWcrzyHUArs7r", detectorA.Key}: {}, }, expected: false, }, From d530798f28113066714bfd2744473f70e838fd2f Mon Sep 17 00:00:00 2001 From: Ahrav Dutta Date: Fri, 2 Feb 2024 15:15:28 -0800 Subject: [PATCH 05/24] correctly use the buffered file writer --- pkg/gitparse/gitparse.go | 48 +++++++++++++++++++++++++--------------- pkg/sources/git/git.go | 7 +++--- 2 files changed, 33 insertions(+), 22 deletions(-) diff --git a/pkg/gitparse/gitparse.go b/pkg/gitparse/gitparse.go index 14792efb08c1..c90ae8804018 100644 --- a/pkg/gitparse/gitparse.go +++ b/pkg/gitparse/gitparse.go @@ -16,6 +16,7 @@ import ( "github.com/trufflesecurity/trufflehog/v3/pkg/common" "github.com/trufflesecurity/trufflehog/v3/pkg/context" + bufferedfilewriter "github.com/trufflesecurity/trufflehog/v3/pkg/writers/buffered_file_writer" ) const ( @@ -100,10 +101,11 @@ func (b *buffer) String() (string, error) { return b.Buffer.String(), nil } // The use of contentWriter enables the management of diff data either in memory or on disk, // based on its size, optimizing resource usage and performance. type Diff struct { - PathB string - LineStart int + PathB string + LineStart int + IsBinary bool + contentWriter contentWriter - IsBinary bool } type diffOption func(*Diff) @@ -111,10 +113,14 @@ type diffOption func(*Diff) // withPathB sets the PathB option. func withPathB(pathB string) diffOption { return func(d *Diff) { d.PathB = pathB } } +// withCustomContentWriter sets the useCustomContentWriter option. +func withCustomContentWriter(cr contentWriter) diffOption { + return func(d *Diff) { d.contentWriter = cr } +} + // NewDiff creates a new Diff with a threshold. func NewDiff(opts ...diffOption) *Diff { diff := new(Diff) - diff.contentWriter = newBuffer() for _, opt := range opts { opt(diff) } @@ -200,10 +206,10 @@ func (c1 *Commit) Equal(ctx context.Context, c2 *Commit) bool { // Parser sets values used in GitParse. type Parser struct { - maxDiffSize int - maxCommitSize int - dateFormat string - contentWriter contentWriter + maxDiffSize int + maxCommitSize int + dateFormat string + useCustomContentWriter bool } type ParseState int @@ -250,11 +256,9 @@ func (state ParseState) String() string { }[state] } -// WithContentWriter sets the ContentWriter for the Parser. -func WithContentWriter(writer contentWriter) Option { - return func(parser *Parser) { - parser.contentWriter = writer - } +// UseCustomContentWriter sets useCustomContentWriter option. +func UseCustomContentWriter() Option { + return func(parser *Parser) { parser.useCustomContentWriter = true } } // WithMaxDiffSize sets maxDiffSize option. Diffs larger than maxDiffSize will @@ -283,7 +287,6 @@ func NewParser(options ...Option) *Parser { dateFormat: defaultDateFormat, maxDiffSize: defaultMaxDiffSize, maxCommitSize: defaultMaxCommitSize, - contentWriter: newBuffer(), } for _, option := range options { option(parser) @@ -387,7 +390,9 @@ func (c *Parser) FromReader(ctx context.Context, stdOut io.Reader, commitChan ch totalLogSize int ) var latestState = Initial - currentDiff := NewDiff() + + writer := c.contentWriter() + currentDiff := NewDiff(withCustomContentWriter(writer)) defer common.RecoverWithExit(ctx) defer close(commitChan) @@ -430,7 +435,7 @@ func (c *Parser) FromReader(ctx context.Context, stdOut io.Reader, commitChan ch totalLogSize += currentCommit.Size } // Create a new currentDiff and currentCommit - currentDiff = NewDiff() + currentDiff = NewDiff(withCustomContentWriter(c.contentWriter())) currentCommit = &Commit{Message: strings.Builder{}} // Check that the commit line contains a hash and set it. if len(line) >= 47 { @@ -498,7 +503,7 @@ func (c *Parser) FromReader(ctx context.Context, stdOut io.Reader, commitChan ch currentCommit.Message.WriteString(oldCommit.Message.String()) } } - currentDiff = NewDiff() + currentDiff = NewDiff(withCustomContentWriter(c.contentWriter())) case isModeLine(isStaged, latestState, line): latestState = ModeLine // NoOp @@ -538,7 +543,7 @@ func (c *Parser) FromReader(ctx context.Context, stdOut io.Reader, commitChan ch } currentCommit.Diffs = append(currentCommit.Diffs, *currentDiff) } - currentDiff = NewDiff(withPathB(currentDiff.PathB)) + currentDiff = NewDiff(withCustomContentWriter(c.contentWriter()), withPathB(currentDiff.PathB)) words := bytes.Split(line, []byte(" ")) if len(words) >= 3 { @@ -606,6 +611,13 @@ func (c *Parser) FromReader(ctx context.Context, stdOut io.Reader, commitChan ch ctx.Logger().V(2).Info("finished parsing git log.", "total_log_size", totalLogSize) } +func (c *Parser) contentWriter() contentWriter { + if c.useCustomContentWriter { + return bufferedfilewriter.New() + } + return newBuffer() +} + func isMergeLine(isStaged bool, latestState ParseState, line []byte) bool { if isStaged || latestState != CommitLine { return false diff --git a/pkg/sources/git/git.go b/pkg/sources/git/git.go index 9843b860cf6c..75a4849160c3 100644 --- a/pkg/sources/git/git.go +++ b/pkg/sources/git/git.go @@ -34,7 +34,6 @@ import ( "github.com/trufflesecurity/trufflehog/v3/pkg/pb/sourcespb" "github.com/trufflesecurity/trufflehog/v3/pkg/sanitizer" "github.com/trufflesecurity/trufflehog/v3/pkg/sources" - bufferedfilewriter "github.com/trufflesecurity/trufflehog/v3/pkg/writers/buffered_file_writer" ) const SourceType = sourcespb.SourceType_SOURCE_TYPE_GIT @@ -99,7 +98,7 @@ type Config struct { func NewGit(config *Config) *Git { var parser *gitparse.Parser if config.UseCustomContentWriter { - parser = gitparse.NewParser(gitparse.WithContentWriter(bufferedfilewriter.New())) + parser = gitparse.NewParser(gitparse.UseCustomContentWriter()) } else { parser = gitparse.NewParser() } @@ -224,7 +223,7 @@ func (s *Source) Init(aCtx context.Context, name string, jobId sources.JobID, so }, } }, - UseCustomContentWriter: s.useCustomContentWriter, + UseCustomContentWriter: true, } s.git = NewGit(cfg) return nil @@ -522,7 +521,7 @@ func (s *Git) ScanCommits(ctx context.Context, repo *git.Repository, path string repoCtx = context.WithValue(ctx, "repo", path) } - commitChan, err := gitparse.NewParser().RepoPath(repoCtx, path, scanOptions.HeadHash, scanOptions.BaseHash == "", scanOptions.ExcludeGlobs, scanOptions.Bare) + commitChan, err := s.parser.RepoPath(repoCtx, path, scanOptions.HeadHash, scanOptions.BaseHash == "", scanOptions.ExcludeGlobs, scanOptions.Bare) if err != nil { return err } From ea8bc7f31bf4fe83314ef46da4441ee8b07d22e9 Mon Sep 17 00:00:00 2001 From: Ahrav Dutta Date: Sat, 3 Feb 2024 11:04:10 -0800 Subject: [PATCH 06/24] use value from source --- pkg/sources/git/git.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/sources/git/git.go b/pkg/sources/git/git.go index 75a4849160c3..7a7de1ddb168 100644 --- a/pkg/sources/git/git.go +++ b/pkg/sources/git/git.go @@ -223,7 +223,7 @@ func (s *Source) Init(aCtx context.Context, name string, jobId sources.JobID, so }, } }, - UseCustomContentWriter: true, + UseCustomContentWriter: s.useCustomContentWriter, } s.git = NewGit(cfg) return nil From 3bc448262406ac8e026b409f56ba66bf1d626b4b Mon Sep 17 00:00:00 2001 From: Ahrav Dutta Date: Sat, 3 Feb 2024 11:40:50 -0800 Subject: [PATCH 07/24] reorder fields --- pkg/gitparse/gitparse.go | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/pkg/gitparse/gitparse.go b/pkg/gitparse/gitparse.go index c90ae8804018..36b925a77ac5 100644 --- a/pkg/gitparse/gitparse.go +++ b/pkg/gitparse/gitparse.go @@ -206,9 +206,10 @@ func (c1 *Commit) Equal(ctx context.Context, c2 *Commit) bool { // Parser sets values used in GitParse. type Parser struct { - maxDiffSize int - maxCommitSize int - dateFormat string + maxDiffSize int + maxCommitSize int + dateFormat string + useCustomContentWriter bool } From 510ce0357fe9df74089d52ca588cc419767f9b57 Mon Sep 17 00:00:00 2001 From: Ahrav Dutta Date: Sat, 3 Feb 2024 14:13:01 -0800 Subject: [PATCH 08/24] add tests and update --- .../bufferedfilewriter.go | 101 +++++++++++++++--- .../bufferedfilewriter_test.go | 62 +++++++++++ 2 files changed, 149 insertions(+), 14 deletions(-) diff --git a/pkg/writers/buffered_file_writer/bufferedfilewriter.go b/pkg/writers/buffered_file_writer/bufferedfilewriter.go index e35c1ac3edb3..9d3839c59d4c 100644 --- a/pkg/writers/buffered_file_writer/bufferedfilewriter.go +++ b/pkg/writers/buffered_file_writer/bufferedfilewriter.go @@ -8,17 +8,38 @@ import ( "io" "os" "sync" + "sync/atomic" "github.com/trufflesecurity/trufflehog/v3/pkg/cleantemp" "github.com/trufflesecurity/trufflehog/v3/pkg/context" ) -// bufferPool is used to store buffers for reuse. -var bufferPool = sync.Pool{ - // TODO: Consider growing the buffer before returning it if we can find an optimal size. - // Ideally the size would cover the majority of cases without being too large. - // This would avoid the need to grow the buffer when writing to it, reducing allocations. - New: func() any { return new(bytes.Buffer) }, +type bufPoolOpt func(*sync.Pool) + +func withBufPoolSize(size int) bufPoolOpt { + return func(pool *sync.Pool) { + pool.New = func() any { + buf := new(bytes.Buffer) + buf.Grow(size) + return buf + } + } +} + +func newBufferPool(opts ...bufPoolOpt) sync.Pool { + const defaultBufferSize = 10 * 1024 // 10KB + + pool := sync.Pool{ + New: func() any { + buf := new(bytes.Buffer) + buf.Grow(defaultBufferSize) + return buf + }, + } + for _, opt := range opts { + opt(&pool) + } + return pool } // state represents the current mode of BufferedFileWriter. @@ -39,6 +60,7 @@ type BufferedFileWriter struct { state state // Current state of the writer. (writeOnly or readOnly) + bufPool sync.Pool // Pool for storing buffers for reuse. buf bytes.Buffer // Buffer for storing data under the threshold in memory. filename string // Name of the temporary file. file io.WriteCloser // File for storing data over the threshold. @@ -47,6 +69,11 @@ type BufferedFileWriter struct { // Option is a function that modifies a BufferedFileWriter. type Option func(*BufferedFileWriter) +// WithBufPoolSize sets the size of the buffer pool. +func WithBufPoolSize(size int) Option { + return func(w *BufferedFileWriter) { w.bufPool = newBufferPool(withBufPoolSize(size)) } +} + // WithThreshold sets the threshold for switching to file writing. func WithThreshold(threshold uint64) Option { return func(w *BufferedFileWriter) { w.threshold = threshold } @@ -55,7 +82,11 @@ func WithThreshold(threshold uint64) Option { // New creates a new BufferedFileWriter with the given options. func New(opts ...Option) *BufferedFileWriter { const defaultThreshold = 10 * 1024 * 1024 // 10MB - w := &BufferedFileWriter{threshold: defaultThreshold, state: writeOnly} + w := &BufferedFileWriter{ + threshold: defaultThreshold, + state: writeOnly, + bufPool: newBufferPool(), + } for _, opt := range opts { opt(w) } @@ -88,7 +119,9 @@ func (w *BufferedFileWriter) String() (string, error) { } // Append buffer data, if any, to the end of the file contents. - buf.Write(w.buf.Bytes()) + if _, err := buf.WriteTo(&w.buf); err != nil { + return "", err + } return buf.String(), nil } @@ -111,11 +144,12 @@ func (w *BufferedFileWriter) Write(ctx context.Context, data []byte) (int, error }() if w.buf.Len() == 0 { - bufPtr, ok := bufferPool.Get().(*bytes.Buffer) + bufPtr, ok := w.bufPool.Get().(*bytes.Buffer) if !ok { ctx.Logger().Error(fmt.Errorf("buffer pool returned unexpected type"), "using new buffer") bufPtr = new(bytes.Buffer) } + atomic.AddInt64(&activeBufferCount, 1) bufPtr.Reset() // Reset the buffer to clear any existing data w.buf = *bufPtr } @@ -145,12 +179,13 @@ func (w *BufferedFileWriter) Write(ctx context.Context, data []byte) (int, error // This ensures all the data is in one place - either entirely in the buffer or the file. if w.buf.Len() > 0 { ctx.Logger().V(4).Info("writing buffer to file", "content_size", w.buf.Len()) - if _, err := w.file.Write(w.buf.Bytes()); err != nil { + if _, err := w.buf.WriteTo(w.file); err != nil { return 0, err } // Reset the buffer to clear any existing data and return it to the pool. - w.buf.Reset() - bufferPool.Put(&w.buf) + w.returnBufferToPool(&w.buf) + // w.buf.Reset() + // bufferPool.Put(&w.buf) } } ctx.Logger().V(4).Info("writing to file", "data_size", size) @@ -167,7 +202,7 @@ func (w *BufferedFileWriter) CloseForWriting() error { } if w.buf.Len() > 0 { - _, err := w.file.Write(w.buf.Bytes()) + _, err := w.buf.WriteTo(w.file) if err != nil { return err } @@ -199,10 +234,48 @@ func (w *BufferedFileWriter) ReadCloser() (io.ReadCloser, error) { // Data is in memory. return &bufferReadCloser{ Reader: bytes.NewReader(w.buf.Bytes()), - onClose: func() { bufferPool.Put(&w.buf) }, + onClose: func() { w.returnBufferToPool(&w.buf) }, }, nil } +var ( + // Track the number of active buffers not yet returned to the pool. + activeBufferCount int64 + totalBufferLength uint64 // Tracks the total length of all buffers + totalBufferSize uint64 + bufferCount uint64 + logFrequency uint64 = 25000 // Log after every 25000 calls +) + +func (w *BufferedFileWriter) returnBufferToPool(buf *bytes.Buffer) { + atomic.AddInt64(&activeBufferCount, -1) + currentCapacity := buf.Cap() + currentLength := buf.Len() + + // Add to totals + atomic.AddUint64(&totalBufferSize, uint64(currentCapacity)) + atomic.AddUint64(&totalBufferLength, uint64(currentLength)) + count := atomic.AddUint64(&bufferCount, 1) + + if count%logFrequency == 0 { + avgBufferSize := atomic.LoadUint64(&totalBufferSize) / count + avgBufferLength := atomic.LoadUint64(&totalBufferLength) / count + fmt.Printf("Buffer pool update: count = %d, average size = %d bytes, average length = %d bytes\n", count, avgBufferSize, avgBufferLength) + + count := atomic.LoadInt64(&activeBufferCount) + fmt.Printf("Current active buffer count: %d\n", count) + + // Here you might decide to reset the counters to zero to get more recent trends + // atomic.StoreUint64(&totalBufferSize, 0) + // atomic.StoreUint64(&totalBufferLength, 0) + // atomic.StoreUint64(&bufferCount, 0) + } + + // Reset the buffer to clear any existing data and return it to the pool. + buf.Reset() + w.bufPool.Put(buf) +} + // autoDeletingFileReader wraps an *os.File and deletes the file on Close. type autoDeletingFileReader struct{ *os.File } diff --git a/pkg/writers/buffered_file_writer/bufferedfilewriter_test.go b/pkg/writers/buffered_file_writer/bufferedfilewriter_test.go index 471b9389dd16..d45068983b6d 100644 --- a/pkg/writers/buffered_file_writer/bufferedfilewriter_test.go +++ b/pkg/writers/buffered_file_writer/bufferedfilewriter_test.go @@ -89,6 +89,8 @@ func TestBufferedFileWriterString(t *testing.T) { got, err := writer.String() assert.NoError(t, err) + err = writer.CloseForWriting() + assert.NoError(t, err) assert.Equal(t, tc.expectedStr, got, "String content mismatch") }) @@ -306,3 +308,63 @@ func TestBufferedFileWriterWriteInReadOnlyState(t *testing.T) { _, err := writer.Write(context.Background(), []byte("should fail")) assert.Error(t, err) } + +func BenchmarkBufferedFileWriterWriteLarge(b *testing.B) { + ctx := context.Background() + data := make([]byte, 1024*1024*10) // 10MB + for i := range data { + data[i] = byte(i % 256) // Simple pattern to avoid uniform zero data + } + + b.ResetTimer() + + for i := 0; i < b.N; i++ { + // Threshold is smaller than the data size, data should get flushed to the file. + writer := New(WithThreshold(1024)) + + b.StartTimer() + { + _, err := writer.Write(ctx, data) + assert.NoError(b, err) + } + b.StopTimer() + + // Ensure proper cleanup after each write operation, including closing the file + err := writer.CloseForWriting() + assert.NoError(b, err) + + rc, err := writer.ReadCloser() + assert.NoError(b, err) + rc.Close() + } +} + +func BenchmarkBufferedFileWriterWriteSmall(b *testing.B) { + ctx := context.Background() + data := make([]byte, 1024*1024) // 1MB + for i := range data { + data[i] = byte(i % 256) // Simple pattern to avoid uniform zero data + } + + b.ResetTimer() + + for i := 0; i < b.N; i++ { + // Threshold is the same as the buffer size, data should always be written to the buffer. + writer := New(WithThreshold(1024 * 1024)) + + b.StartTimer() + { + _, err := writer.Write(ctx, data) + assert.NoError(b, err) + } + b.StopTimer() + + // Ensure proper cleanup after each write operation, including closing the file. + err := writer.CloseForWriting() + assert.NoError(b, err) + + rc, err := writer.ReadCloser() + assert.NoError(b, err) + rc.Close() + } +} From f6f1f4464efccb817f9a4c38afa78270020fad23 Mon Sep 17 00:00:00 2001 From: Ahrav Dutta Date: Sat, 3 Feb 2024 19:04:29 -0800 Subject: [PATCH 09/24] Fix issue with buffer slices growing --- .../bufferedfilewriter.go | 171 ++++++++++-------- .../bufferedfilewriter_test.go | 138 ++++++++++++++ 2 files changed, 231 insertions(+), 78 deletions(-) diff --git a/pkg/writers/buffered_file_writer/bufferedfilewriter.go b/pkg/writers/buffered_file_writer/bufferedfilewriter.go index 9d3839c59d4c..7921a5c4a9b7 100644 --- a/pkg/writers/buffered_file_writer/bufferedfilewriter.go +++ b/pkg/writers/buffered_file_writer/bufferedfilewriter.go @@ -8,38 +8,84 @@ import ( "io" "os" "sync" - "sync/atomic" "github.com/trufflesecurity/trufflehog/v3/pkg/cleantemp" "github.com/trufflesecurity/trufflehog/v3/pkg/context" ) -type bufPoolOpt func(*sync.Pool) +type bufPoolOpt func(pool *bufferPool) -func withBufPoolSize(size int) bufPoolOpt { - return func(pool *sync.Pool) { - pool.New = func() any { - buf := new(bytes.Buffer) - buf.Grow(size) - return buf - } - } +func withBufPoolSize(size uint32) bufPoolOpt { + return func(pool *bufferPool) { pool.bufferSize = size } +} + +type bufferPool struct { + bufferSize uint32 + *sync.Pool } -func newBufferPool(opts ...bufPoolOpt) sync.Pool { - const defaultBufferSize = 10 * 1024 // 10KB +const defaultBufferSize = 2 << 12 // 8KB +func newBufferPool(opts ...bufPoolOpt) *bufferPool { + bp := &bufferPool{bufferSize: defaultBufferSize} - pool := sync.Pool{ + for _, opt := range opts { + opt(bp) + } + bp.Pool = &sync.Pool{ New: func() any { buf := new(bytes.Buffer) - buf.Grow(defaultBufferSize) + buf.Grow(int(bp.bufferSize)) return buf }, } - for _, opt := range opts { - opt(&pool) + + return bp +} + +func (bp *bufferPool) get(ctx context.Context) *bytes.Buffer { + buf, ok := bp.Pool.Get().(*bytes.Buffer) + if !ok { + ctx.Logger().Error(fmt.Errorf("buffer pool returned unexpected type"), "using new buffer") + buf = bytes.NewBuffer(make([]byte, 0, bp.bufferSize)) + } + + return buf +} + +func (bp *bufferPool) getWithSize(ctx context.Context, dataSize uint64) *bytes.Buffer { + var buf *bytes.Buffer + + // Attempt to fetch a buffer from the pool. + if fetchedBuf := bp.get(ctx); fetchedBuf != nil { + // Check if the fetched buffer can accommodate the data size. + // If not, create a new buffer with enough capacity. + diff := int(dataSize) - fetchedBuf.Cap() + if diff > 0 { + // Since the fetched buffer is not suitable, put it back into the pool + // and allocate a new buffer of the required size. + bp.put(buf) + buf = bytes.NewBuffer(make([]byte, 0, dataSize)) + } } - return pool + + // Ensure the buffer is reset before use to avoid any old data remaining. + buf.Reset() + return buf +} + +func (bp *bufferPool) put(buf *bytes.Buffer) { + // If the buffer is more than twice the default size, replace it with a new, smaller one. + // This prevents us from returning very large buffers to the pool. + const maxAllowedCapacity = 2 * defaultBufferSize + if buf.Cap() > maxAllowedCapacity { + // Replace the buffer with a new, smaller one. No need to copy data since we're resetting it. + buf = bytes.NewBuffer(make([]byte, 0, defaultBufferSize)) + } else { + // Reset the buffer to clear any existing data. + buf.Reset() + } + + bp.Put(buf) } // state represents the current mode of BufferedFileWriter. @@ -60,8 +106,8 @@ type BufferedFileWriter struct { state state // Current state of the writer. (writeOnly or readOnly) - bufPool sync.Pool // Pool for storing buffers for reuse. - buf bytes.Buffer // Buffer for storing data under the threshold in memory. + bufPool *bufferPool // Pool for storing buffers for reuse. + buf *bytes.Buffer // Buffer for storing data under the threshold in memory. filename string // Name of the temporary file. file io.WriteCloser // File for storing data over the threshold. } @@ -70,7 +116,7 @@ type BufferedFileWriter struct { type Option func(*BufferedFileWriter) // WithBufPoolSize sets the size of the buffer pool. -func WithBufPoolSize(size int) Option { +func WithBufPoolSize(size uint32) Option { return func(w *BufferedFileWriter) { w.bufPool = newBufferPool(withBufPoolSize(size)) } } @@ -109,17 +155,14 @@ func (w *BufferedFileWriter) String() (string, error) { } defer file.Close() - // Create a buffer large enough to hold file data and additional buffer data, if any. - fileSize := w.size - buf := bytes.NewBuffer(make([]byte, 0, fileSize)) - + var buf bytes.Buffer // Read the file contents into the buffer. - if _, err := io.Copy(buf, file); err != nil { + if _, err := io.CopyBuffer(&buf, file, nil); err != nil { return "", fmt.Errorf("failed to read file contents: %w", err) } // Append buffer data, if any, to the end of the file contents. - if _, err := buf.WriteTo(&w.buf); err != nil { + if _, err := buf.WriteTo(w.buf); err != nil { return "", err } @@ -143,24 +186,36 @@ func (w *BufferedFileWriter) Write(ctx context.Context, data []byte) (int, error ) }() - if w.buf.Len() == 0 { - bufPtr, ok := w.bufPool.Get().(*bytes.Buffer) - if !ok { - ctx.Logger().Error(fmt.Errorf("buffer pool returned unexpected type"), "using new buffer") - bufPtr = new(bytes.Buffer) - } - atomic.AddInt64(&activeBufferCount, 1) - bufPtr.Reset() // Reset the buffer to clear any existing data - w.buf = *bufPtr + if w.buf == nil || w.buf.Len() == 0 { + w.buf = w.bufPool.get(ctx) } - if uint64(w.buf.Len())+size <= w.threshold { + totalSizeNeeded := uint64(w.buf.Len()) + uint64(len(data)) + if totalSizeNeeded <= w.threshold { // If the total size is within the threshold, write to the buffer. ctx.Logger().V(4).Info( "writing to buffer", "data_size", size, "content_size", w.buf.Len(), ) + + if totalSizeNeeded > uint64(w.buf.Cap()) { + ctx.Logger().V(4).Info( + "buffer size exceeded, getting new buffer", + "current_size", w.buf.Len(), + "new_size", totalSizeNeeded, + ) + // The current buffer cannot accommodate the new data; fetch a new, larger buffer. + newBuf := w.bufPool.getWithSize(ctx, totalSizeNeeded) + + // Copy the existing data to the new buffer and return the old buffer to the pool. + if _, err := w.buf.WriteTo(newBuf); err != nil { + return 0, err + } + w.bufPool.put(w.buf) + w.buf = newBuf + } + return w.buf.Write(data) } @@ -183,9 +238,7 @@ func (w *BufferedFileWriter) Write(ctx context.Context, data []byte) (int, error return 0, err } // Reset the buffer to clear any existing data and return it to the pool. - w.returnBufferToPool(&w.buf) - // w.buf.Reset() - // bufferPool.Put(&w.buf) + w.bufPool.put(w.buf) } } ctx.Logger().V(4).Info("writing to file", "data_size", size) @@ -234,48 +287,10 @@ func (w *BufferedFileWriter) ReadCloser() (io.ReadCloser, error) { // Data is in memory. return &bufferReadCloser{ Reader: bytes.NewReader(w.buf.Bytes()), - onClose: func() { w.returnBufferToPool(&w.buf) }, + onClose: func() { w.bufPool.put(w.buf) }, }, nil } -var ( - // Track the number of active buffers not yet returned to the pool. - activeBufferCount int64 - totalBufferLength uint64 // Tracks the total length of all buffers - totalBufferSize uint64 - bufferCount uint64 - logFrequency uint64 = 25000 // Log after every 25000 calls -) - -func (w *BufferedFileWriter) returnBufferToPool(buf *bytes.Buffer) { - atomic.AddInt64(&activeBufferCount, -1) - currentCapacity := buf.Cap() - currentLength := buf.Len() - - // Add to totals - atomic.AddUint64(&totalBufferSize, uint64(currentCapacity)) - atomic.AddUint64(&totalBufferLength, uint64(currentLength)) - count := atomic.AddUint64(&bufferCount, 1) - - if count%logFrequency == 0 { - avgBufferSize := atomic.LoadUint64(&totalBufferSize) / count - avgBufferLength := atomic.LoadUint64(&totalBufferLength) / count - fmt.Printf("Buffer pool update: count = %d, average size = %d bytes, average length = %d bytes\n", count, avgBufferSize, avgBufferLength) - - count := atomic.LoadInt64(&activeBufferCount) - fmt.Printf("Current active buffer count: %d\n", count) - - // Here you might decide to reset the counters to zero to get more recent trends - // atomic.StoreUint64(&totalBufferSize, 0) - // atomic.StoreUint64(&totalBufferLength, 0) - // atomic.StoreUint64(&bufferCount, 0) - } - - // Reset the buffer to clear any existing data and return it to the pool. - buf.Reset() - w.bufPool.Put(buf) -} - // autoDeletingFileReader wraps an *os.File and deletes the file on Close. type autoDeletingFileReader struct{ *os.File } diff --git a/pkg/writers/buffered_file_writer/bufferedfilewriter_test.go b/pkg/writers/buffered_file_writer/bufferedfilewriter_test.go index d45068983b6d..1dcc07e69bf8 100644 --- a/pkg/writers/buffered_file_writer/bufferedfilewriter_test.go +++ b/pkg/writers/buffered_file_writer/bufferedfilewriter_test.go @@ -1,6 +1,7 @@ package bufferedfilewriter import ( + "bytes" "os" "testing" "time" @@ -97,6 +98,143 @@ func TestBufferedFileWriterString(t *testing.T) { } } +const ( + smallBuffer = 2 << 5 // 64B + mediumBuffer = 2 << 10 // 2KB + smallFile = 2 << 25 // 32MB + mediumFile = 2 << 28 // 256MB +) + +func BenchmarkBufferedFileWriterString_BufferOnly_Small(b *testing.B) { + data := bytes.Repeat([]byte("a"), smallBuffer) + + ctx := context.Background() + writer := New() + + _, err := writer.Write(ctx, data) + assert.NoError(b, err) + + benchmarkBufferedFileWriterString(b, writer) + + err = writer.CloseForWriting() + assert.NoError(b, err) + + rc, err := writer.ReadCloser() + assert.NoError(b, err) + rc.Close() +} + +func BenchmarkBufferedFileWriterString_BufferOnly_Medium(b *testing.B) { + data := bytes.Repeat([]byte("a"), mediumBuffer) + ctx := context.Background() + writer := New() + + _, err := writer.Write(ctx, data) + assert.NoError(b, err) + + benchmarkBufferedFileWriterString(b, writer) + + err = writer.CloseForWriting() + assert.NoError(b, err) + + rc, err := writer.ReadCloser() + assert.NoError(b, err) + rc.Close() +} + +func BenchmarkBufferedFileWriterString_OnlyFile_Small(b *testing.B) { + data := bytes.Repeat([]byte("a"), smallFile) + + ctx := context.Background() + writer := New() + + _, err := writer.Write(ctx, data) + assert.NoError(b, err) + + benchmarkBufferedFileWriterString(b, writer) + + err = writer.CloseForWriting() + assert.NoError(b, err) + + rc, err := writer.ReadCloser() + assert.NoError(b, err) + rc.Close() +} + +func BenchmarkBufferedFileWriterString_OnlyFile_Medium(b *testing.B) { + data := bytes.Repeat([]byte("a"), mediumFile) + + ctx := context.Background() + writer := New() + + _, err := writer.Write(ctx, data) + assert.NoError(b, err) + + benchmarkBufferedFileWriterString(b, writer) + + err = writer.CloseForWriting() + assert.NoError(b, err) + + rc, err := writer.ReadCloser() + assert.NoError(b, err) + rc.Close() +} + +func BenchmarkBufferedFileWriterString_BufferWithFile_Small(b *testing.B) { + data := bytes.Repeat([]byte("a"), smallFile) + + ctx := context.Background() + writer := New() + + _, err := writer.Write(ctx, data) + assert.NoError(b, err) + + // Write again so we also fill up the buffer. + _, err = writer.Write(ctx, data) + assert.NoError(b, err) + + benchmarkBufferedFileWriterString(b, writer) + + err = writer.CloseForWriting() + assert.NoError(b, err) + + rc, err := writer.ReadCloser() + assert.NoError(b, err) + rc.Close() +} + +func BenchmarkBufferedFileWriterString_BufferWithFile_Medium(b *testing.B) { + data := bytes.Repeat([]byte("a"), mediumFile) + + ctx := context.Background() + writer := New() + + _, err := writer.Write(ctx, data) + assert.NoError(b, err) + + // Write again so we also fill up the buffer. + _, err = writer.Write(ctx, data) + assert.NoError(b, err) + + benchmarkBufferedFileWriterString(b, writer) + + err = writer.CloseForWriting() + assert.NoError(b, err) + + rc, err := writer.ReadCloser() + assert.NoError(b, err) + rc.Close() +} + +func benchmarkBufferedFileWriterString(b *testing.B, w *BufferedFileWriter) { + b.ResetTimer() + for i := 0; i < b.N; i++ { + _, err := w.String() + assert.NoError(b, err) + } + b.StopTimer() +} + func TestBufferedFileWriterLen(t *testing.T) { t.Parallel() tests := []struct { From 765998010375df835ca084f8bcc82e53b95e38b0 Mon Sep 17 00:00:00 2001 From: Ahrav Dutta Date: Sun, 4 Feb 2024 14:01:00 -0800 Subject: [PATCH 10/24] fix test --- pkg/writers/buffered_file_writer/bufferedfilewriter.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/writers/buffered_file_writer/bufferedfilewriter.go b/pkg/writers/buffered_file_writer/bufferedfilewriter.go index 7921a5c4a9b7..86260bb4aab6 100644 --- a/pkg/writers/buffered_file_writer/bufferedfilewriter.go +++ b/pkg/writers/buffered_file_writer/bufferedfilewriter.go @@ -162,7 +162,7 @@ func (w *BufferedFileWriter) String() (string, error) { } // Append buffer data, if any, to the end of the file contents. - if _, err := buf.WriteTo(w.buf); err != nil { + if _, err := w.buf.WriteTo(&buf); err != nil { return "", err } From a07c39babe2a33f79f5b1076b9e38ddcd804d80e Mon Sep 17 00:00:00 2001 From: Ahrav Dutta Date: Sun, 4 Feb 2024 14:42:44 -0800 Subject: [PATCH 11/24] fix --- pkg/writers/buffered_file_writer/bufferedfilewriter.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/writers/buffered_file_writer/bufferedfilewriter.go b/pkg/writers/buffered_file_writer/bufferedfilewriter.go index 86260bb4aab6..d32601836470 100644 --- a/pkg/writers/buffered_file_writer/bufferedfilewriter.go +++ b/pkg/writers/buffered_file_writer/bufferedfilewriter.go @@ -63,7 +63,7 @@ func (bp *bufferPool) getWithSize(ctx context.Context, dataSize uint64) *bytes.B if diff > 0 { // Since the fetched buffer is not suitable, put it back into the pool // and allocate a new buffer of the required size. - bp.put(buf) + bp.put(fetchedBuf) buf = bytes.NewBuffer(make([]byte, 0, dataSize)) } } From e41cced7c1d4cfa1f6eea901101957f0cfb6074f Mon Sep 17 00:00:00 2001 From: Ahrav Dutta Date: Sun, 4 Feb 2024 15:02:34 -0800 Subject: [PATCH 12/24] add singleton --- .../bufferedfilewriter.go | 26 ++++++++----------- 1 file changed, 11 insertions(+), 15 deletions(-) diff --git a/pkg/writers/buffered_file_writer/bufferedfilewriter.go b/pkg/writers/buffered_file_writer/bufferedfilewriter.go index d32601836470..2a5dd1c0bbda 100644 --- a/pkg/writers/buffered_file_writer/bufferedfilewriter.go +++ b/pkg/writers/buffered_file_writer/bufferedfilewriter.go @@ -15,10 +15,6 @@ import ( type bufPoolOpt func(pool *bufferPool) -func withBufPoolSize(size uint32) bufPoolOpt { - return func(pool *bufferPool) { pool.bufferSize = size } -} - type bufferPool struct { bufferSize uint32 *sync.Pool @@ -26,22 +22,27 @@ type bufferPool struct { const defaultBufferSize = 2 << 12 // 8KB func newBufferPool(opts ...bufPoolOpt) *bufferPool { - bp := &bufferPool{bufferSize: defaultBufferSize} + pool := &bufferPool{bufferSize: defaultBufferSize} for _, opt := range opts { - opt(bp) + opt(pool) } - bp.Pool = &sync.Pool{ + pool.Pool = &sync.Pool{ New: func() any { buf := new(bytes.Buffer) - buf.Grow(int(bp.bufferSize)) + buf.Grow(int(pool.bufferSize)) return buf }, } - return bp + return pool } +var sharedBufferPool *bufferPool + +// init function to initialize the shared buffer pool +func init() { sharedBufferPool = newBufferPool() } + func (bp *bufferPool) get(ctx context.Context) *bytes.Buffer { buf, ok := bp.Pool.Get().(*bytes.Buffer) if !ok { @@ -115,11 +116,6 @@ type BufferedFileWriter struct { // Option is a function that modifies a BufferedFileWriter. type Option func(*BufferedFileWriter) -// WithBufPoolSize sets the size of the buffer pool. -func WithBufPoolSize(size uint32) Option { - return func(w *BufferedFileWriter) { w.bufPool = newBufferPool(withBufPoolSize(size)) } -} - // WithThreshold sets the threshold for switching to file writing. func WithThreshold(threshold uint64) Option { return func(w *BufferedFileWriter) { w.threshold = threshold } @@ -131,7 +127,7 @@ func New(opts ...Option) *BufferedFileWriter { w := &BufferedFileWriter{ threshold: defaultThreshold, state: writeOnly, - bufPool: newBufferPool(), + bufPool: sharedBufferPool, } for _, opt := range opts { opt(w) From 8e3d56c4193f646874d2ee9196ac7c1ea82f6488 Mon Sep 17 00:00:00 2001 From: Ahrav Dutta Date: Sun, 4 Feb 2024 15:07:34 -0800 Subject: [PATCH 13/24] use shared pool --- .../bufferedfilewriter.go | 30 ++++++++----------- 1 file changed, 13 insertions(+), 17 deletions(-) diff --git a/pkg/writers/buffered_file_writer/bufferedfilewriter.go b/pkg/writers/buffered_file_writer/bufferedfilewriter.go index 2a5dd1c0bbda..9c0942d72a67 100644 --- a/pkg/writers/buffered_file_writer/bufferedfilewriter.go +++ b/pkg/writers/buffered_file_writer/bufferedfilewriter.go @@ -54,24 +54,20 @@ func (bp *bufferPool) get(ctx context.Context) *bytes.Buffer { } func (bp *bufferPool) getWithSize(ctx context.Context, dataSize uint64) *bytes.Buffer { - var buf *bytes.Buffer - - // Attempt to fetch a buffer from the pool. - if fetchedBuf := bp.get(ctx); fetchedBuf != nil { - // Check if the fetched buffer can accommodate the data size. - // If not, create a new buffer with enough capacity. - diff := int(dataSize) - fetchedBuf.Cap() - if diff > 0 { - // Since the fetched buffer is not suitable, put it back into the pool - // and allocate a new buffer of the required size. - bp.put(fetchedBuf) - buf = bytes.NewBuffer(make([]byte, 0, dataSize)) - } + fetchedBuf := bp.get(ctx) + + // Calculate if the fetched buffer's capacity is less than the required data size. + requiredCapacity := int(dataSize) + if fetchedBuf.Cap() >= requiredCapacity { + fetchedBuf.Reset() + return fetchedBuf } - // Ensure the buffer is reset before use to avoid any old data remaining. - buf.Reset() - return buf + // If the fetched buffer's capacity is insufficient, return the inadequate buffer to the pool, + // and create a new buffer of the required size. + bp.put(fetchedBuf) // Return the initially fetched buffer as it's not used. + + return bytes.NewBuffer(make([]byte, 0, requiredCapacity)) } func (bp *bufferPool) put(buf *bytes.Buffer) { @@ -183,7 +179,7 @@ func (w *BufferedFileWriter) Write(ctx context.Context, data []byte) (int, error }() if w.buf == nil || w.buf.Len() == 0 { - w.buf = w.bufPool.get(ctx) + w.buf = w.bufPool.getWithSize(ctx, defaultBufferSize) } totalSizeNeeded := uint64(w.buf.Len()) + uint64(len(data)) From 3e6739267cf30f2cb7579d1efa1e28b5b67c3d95 Mon Sep 17 00:00:00 2001 From: Ahrav Dutta Date: Sun, 4 Feb 2024 16:01:24 -0800 Subject: [PATCH 14/24] optimize --- .../bufferedfilewriter.go | 56 +++++++------------ 1 file changed, 21 insertions(+), 35 deletions(-) diff --git a/pkg/writers/buffered_file_writer/bufferedfilewriter.go b/pkg/writers/buffered_file_writer/bufferedfilewriter.go index 9c0942d72a67..1ba65d3f424e 100644 --- a/pkg/writers/buffered_file_writer/bufferedfilewriter.go +++ b/pkg/writers/buffered_file_writer/bufferedfilewriter.go @@ -38,9 +38,10 @@ func newBufferPool(opts ...bufPoolOpt) *bufferPool { return pool } +// sharedBufferPool is the shared buffer pool used by all BufferedFileWriters. +// This allows for efficient reuse of buffers across multiple writers. var sharedBufferPool *bufferPool -// init function to initialize the shared buffer pool func init() { sharedBufferPool = newBufferPool() } func (bp *bufferPool) get(ctx context.Context) *bytes.Buffer { @@ -53,21 +54,10 @@ func (bp *bufferPool) get(ctx context.Context) *bytes.Buffer { return buf } -func (bp *bufferPool) getWithSize(ctx context.Context, dataSize uint64) *bytes.Buffer { - fetchedBuf := bp.get(ctx) - - // Calculate if the fetched buffer's capacity is less than the required data size. - requiredCapacity := int(dataSize) - if fetchedBuf.Cap() >= requiredCapacity { - fetchedBuf.Reset() - return fetchedBuf - } - - // If the fetched buffer's capacity is insufficient, return the inadequate buffer to the pool, - // and create a new buffer of the required size. - bp.put(fetchedBuf) // Return the initially fetched buffer as it's not used. - - return bytes.NewBuffer(make([]byte, 0, requiredCapacity)) +func (bp *bufferPool) growBufferWithSize(buf *bytes.Buffer, dataSize uint64) { + // Grow the buffer to accommodate the new data. + requiredGrowth := int(dataSize - uint64(buf.Cap())) + buf.Grow(requiredGrowth) } func (bp *bufferPool) put(buf *bytes.Buffer) { @@ -168,44 +158,40 @@ func (w *BufferedFileWriter) Write(ctx context.Context, data []byte) (int, error } size := uint64(len(data)) + + if w.buf == nil || w.buf.Len() == 0 { + w.buf = w.bufPool.get(ctx) + } + + bufferLength := w.buf.Len() + defer func() { w.size += size ctx.Logger().V(4).Info( "write complete", "data_size", size, - "content_size", w.buf.Len(), + "content_size", bufferLength, "total_size", w.size, ) }() - if w.buf == nil || w.buf.Len() == 0 { - w.buf = w.bufPool.getWithSize(ctx, defaultBufferSize) - } - - totalSizeNeeded := uint64(w.buf.Len()) + uint64(len(data)) + totalSizeNeeded := uint64(bufferLength) + uint64(len(data)) if totalSizeNeeded <= w.threshold { // If the total size is within the threshold, write to the buffer. ctx.Logger().V(4).Info( "writing to buffer", "data_size", size, - "content_size", w.buf.Len(), + "content_size", bufferLength, ) if totalSizeNeeded > uint64(w.buf.Cap()) { ctx.Logger().V(4).Info( "buffer size exceeded, getting new buffer", - "current_size", w.buf.Len(), + "current_size", bufferLength, "new_size", totalSizeNeeded, ) - // The current buffer cannot accommodate the new data; fetch a new, larger buffer. - newBuf := w.bufPool.getWithSize(ctx, totalSizeNeeded) - - // Copy the existing data to the new buffer and return the old buffer to the pool. - if _, err := w.buf.WriteTo(newBuf); err != nil { - return 0, err - } - w.bufPool.put(w.buf) - w.buf = newBuf + // The current buffer cannot accommodate the new data; grow it. + w.bufPool.growBufferWithSize(w.buf, totalSizeNeeded) } return w.buf.Write(data) @@ -224,8 +210,8 @@ func (w *BufferedFileWriter) Write(ctx context.Context, data []byte) (int, error // Transfer existing data in buffer to the file, then clear the buffer. // This ensures all the data is in one place - either entirely in the buffer or the file. - if w.buf.Len() > 0 { - ctx.Logger().V(4).Info("writing buffer to file", "content_size", w.buf.Len()) + if bufferLength > 0 { + ctx.Logger().V(4).Info("writing buffer to file", "content_size", bufferLength) if _, err := w.buf.WriteTo(w.file); err != nil { return 0, err } From d8e5589b75f63db07dfca6960365ad72522e8d35 Mon Sep 17 00:00:00 2001 From: Ahrav Dutta Date: Sun, 4 Feb 2024 16:04:43 -0800 Subject: [PATCH 15/24] rename and cleanup --- .../buffered_file_writer/bufferedfilewriter.go | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/pkg/writers/buffered_file_writer/bufferedfilewriter.go b/pkg/writers/buffered_file_writer/bufferedfilewriter.go index 1ba65d3f424e..9c7b20f68125 100644 --- a/pkg/writers/buffered_file_writer/bufferedfilewriter.go +++ b/pkg/writers/buffered_file_writer/bufferedfilewriter.go @@ -54,10 +54,9 @@ func (bp *bufferPool) get(ctx context.Context) *bytes.Buffer { return buf } -func (bp *bufferPool) growBufferWithSize(buf *bytes.Buffer, dataSize uint64) { +func (bp *bufferPool) growBufferWithSize(buf *bytes.Buffer, size int) { // Grow the buffer to accommodate the new data. - requiredGrowth := int(dataSize - uint64(buf.Cap())) - buf.Grow(requiredGrowth) + buf.Grow(size) } func (bp *bufferPool) put(buf *bytes.Buffer) { @@ -184,14 +183,15 @@ func (w *BufferedFileWriter) Write(ctx context.Context, data []byte) (int, error "content_size", bufferLength, ) - if totalSizeNeeded > uint64(w.buf.Cap()) { + growSize := int(totalSizeNeeded - uint64(w.buf.Cap())) + if growSize > 0 { ctx.Logger().V(4).Info( - "buffer size exceeded, getting new buffer", + "buffer size exceeded, growing buffer", "current_size", bufferLength, "new_size", totalSizeNeeded, + "grow_size", growSize, ) - // The current buffer cannot accommodate the new data; grow it. - w.bufPool.growBufferWithSize(w.buf, totalSizeNeeded) + w.bufPool.growBufferWithSize(w.buf, growSize) } return w.buf.Write(data) From 389490025ba499328a204d1427a20a435d1a29c9 Mon Sep 17 00:00:00 2001 From: Ahrav Dutta Date: Sun, 4 Feb 2024 18:43:32 -0800 Subject: [PATCH 16/24] use correct calculation to grow buffer --- pkg/writers/buffered_file_writer/bufferedfilewriter.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/pkg/writers/buffered_file_writer/bufferedfilewriter.go b/pkg/writers/buffered_file_writer/bufferedfilewriter.go index 9c7b20f68125..6ca8d654dc0b 100644 --- a/pkg/writers/buffered_file_writer/bufferedfilewriter.go +++ b/pkg/writers/buffered_file_writer/bufferedfilewriter.go @@ -183,8 +183,8 @@ func (w *BufferedFileWriter) Write(ctx context.Context, data []byte) (int, error "content_size", bufferLength, ) - growSize := int(totalSizeNeeded - uint64(w.buf.Cap())) - if growSize > 0 { + if totalSizeNeeded > uint64(w.buf.Cap()) { + growSize := int(totalSizeNeeded - uint64(w.buf.Cap())) ctx.Logger().V(4).Info( "buffer size exceeded, growing buffer", "current_size", bufferLength, From 999d6622a3e6cabd0a3da11b735618accc36d509 Mon Sep 17 00:00:00 2001 From: Ahrav Dutta Date: Sun, 4 Feb 2024 19:21:56 -0800 Subject: [PATCH 17/24] only grow if needed --- .../bufferedfilewriter.go | 90 ++++++++----------- 1 file changed, 35 insertions(+), 55 deletions(-) diff --git a/pkg/writers/buffered_file_writer/bufferedfilewriter.go b/pkg/writers/buffered_file_writer/bufferedfilewriter.go index 86260bb4aab6..847799624fbd 100644 --- a/pkg/writers/buffered_file_writer/bufferedfilewriter.go +++ b/pkg/writers/buffered_file_writer/bufferedfilewriter.go @@ -15,10 +15,6 @@ import ( type bufPoolOpt func(pool *bufferPool) -func withBufPoolSize(size uint32) bufPoolOpt { - return func(pool *bufferPool) { pool.bufferSize = size } -} - type bufferPool struct { bufferSize uint32 *sync.Pool @@ -26,22 +22,28 @@ type bufferPool struct { const defaultBufferSize = 2 << 12 // 8KB func newBufferPool(opts ...bufPoolOpt) *bufferPool { - bp := &bufferPool{bufferSize: defaultBufferSize} + pool := &bufferPool{bufferSize: defaultBufferSize} for _, opt := range opts { - opt(bp) + opt(pool) } - bp.Pool = &sync.Pool{ + pool.Pool = &sync.Pool{ New: func() any { buf := new(bytes.Buffer) - buf.Grow(int(bp.bufferSize)) + buf.Grow(int(pool.bufferSize)) return buf }, } - return bp + return pool } +// sharedBufferPool is the shared buffer pool used by all BufferedFileWriters. +// This allows for efficient reuse of buffers across multiple writers. +var sharedBufferPool *bufferPool + +func init() { sharedBufferPool = newBufferPool() } + func (bp *bufferPool) get(ctx context.Context) *bytes.Buffer { buf, ok := bp.Pool.Get().(*bytes.Buffer) if !ok { @@ -52,25 +54,9 @@ func (bp *bufferPool) get(ctx context.Context) *bytes.Buffer { return buf } -func (bp *bufferPool) getWithSize(ctx context.Context, dataSize uint64) *bytes.Buffer { - var buf *bytes.Buffer - - // Attempt to fetch a buffer from the pool. - if fetchedBuf := bp.get(ctx); fetchedBuf != nil { - // Check if the fetched buffer can accommodate the data size. - // If not, create a new buffer with enough capacity. - diff := int(dataSize) - fetchedBuf.Cap() - if diff > 0 { - // Since the fetched buffer is not suitable, put it back into the pool - // and allocate a new buffer of the required size. - bp.put(buf) - buf = bytes.NewBuffer(make([]byte, 0, dataSize)) - } - } - - // Ensure the buffer is reset before use to avoid any old data remaining. - buf.Reset() - return buf +func (bp *bufferPool) growBufferWithSize(buf *bytes.Buffer, size int) { + // Grow the buffer to accommodate the new data. + buf.Grow(size) } func (bp *bufferPool) put(buf *bytes.Buffer) { @@ -115,11 +101,6 @@ type BufferedFileWriter struct { // Option is a function that modifies a BufferedFileWriter. type Option func(*BufferedFileWriter) -// WithBufPoolSize sets the size of the buffer pool. -func WithBufPoolSize(size uint32) Option { - return func(w *BufferedFileWriter) { w.bufPool = newBufferPool(withBufPoolSize(size)) } -} - // WithThreshold sets the threshold for switching to file writing. func WithThreshold(threshold uint64) Option { return func(w *BufferedFileWriter) { w.threshold = threshold } @@ -131,7 +112,7 @@ func New(opts ...Option) *BufferedFileWriter { w := &BufferedFileWriter{ threshold: defaultThreshold, state: writeOnly, - bufPool: newBufferPool(), + bufPool: sharedBufferPool, } for _, opt := range opts { opt(w) @@ -176,44 +157,43 @@ func (w *BufferedFileWriter) Write(ctx context.Context, data []byte) (int, error } size := uint64(len(data)) + + if w.buf == nil || w.buf.Len() == 0 { + w.buf = w.bufPool.get(ctx) + } + + bufferLength := w.buf.Len() + defer func() { w.size += size ctx.Logger().V(4).Info( "write complete", "data_size", size, - "content_size", w.buf.Len(), + "content_size", bufferLength, "total_size", w.size, ) }() - if w.buf == nil || w.buf.Len() == 0 { - w.buf = w.bufPool.get(ctx) - } - - totalSizeNeeded := uint64(w.buf.Len()) + uint64(len(data)) + totalSizeNeeded := uint64(bufferLength) + uint64(len(data)) if totalSizeNeeded <= w.threshold { // If the total size is within the threshold, write to the buffer. ctx.Logger().V(4).Info( "writing to buffer", "data_size", size, - "content_size", w.buf.Len(), + "content_size", bufferLength, ) - if totalSizeNeeded > uint64(w.buf.Cap()) { + availableSpace := w.buf.Cap() - bufferLength + growSize := int(totalSizeNeeded) - bufferLength + if growSize > availableSpace { ctx.Logger().V(4).Info( - "buffer size exceeded, getting new buffer", - "current_size", w.buf.Len(), + "buffer size exceeded, growing buffer", + "current_size", bufferLength, "new_size", totalSizeNeeded, + "available_space", availableSpace, + "grow_size", growSize, ) - // The current buffer cannot accommodate the new data; fetch a new, larger buffer. - newBuf := w.bufPool.getWithSize(ctx, totalSizeNeeded) - - // Copy the existing data to the new buffer and return the old buffer to the pool. - if _, err := w.buf.WriteTo(newBuf); err != nil { - return 0, err - } - w.bufPool.put(w.buf) - w.buf = newBuf + w.bufPool.growBufferWithSize(w.buf, growSize) } return w.buf.Write(data) @@ -232,8 +212,8 @@ func (w *BufferedFileWriter) Write(ctx context.Context, data []byte) (int, error // Transfer existing data in buffer to the file, then clear the buffer. // This ensures all the data is in one place - either entirely in the buffer or the file. - if w.buf.Len() > 0 { - ctx.Logger().V(4).Info("writing buffer to file", "content_size", w.buf.Len()) + if bufferLength > 0 { + ctx.Logger().V(4).Info("writing buffer to file", "content_size", bufferLength) if _, err := w.buf.WriteTo(w.file); err != nil { return 0, err } From faa423b146d2375954d01a6f6c836b897cfbe97f Mon Sep 17 00:00:00 2001 From: Ahrav Dutta Date: Mon, 5 Feb 2024 12:51:28 -0800 Subject: [PATCH 18/24] address comments --- pkg/writers/buffered_file_writer/bufferedfilewriter.go | 7 +------ 1 file changed, 1 insertion(+), 6 deletions(-) diff --git a/pkg/writers/buffered_file_writer/bufferedfilewriter.go b/pkg/writers/buffered_file_writer/bufferedfilewriter.go index 847799624fbd..616a909499ad 100644 --- a/pkg/writers/buffered_file_writer/bufferedfilewriter.go +++ b/pkg/writers/buffered_file_writer/bufferedfilewriter.go @@ -54,11 +54,6 @@ func (bp *bufferPool) get(ctx context.Context) *bytes.Buffer { return buf } -func (bp *bufferPool) growBufferWithSize(buf *bytes.Buffer, size int) { - // Grow the buffer to accommodate the new data. - buf.Grow(size) -} - func (bp *bufferPool) put(buf *bytes.Buffer) { // If the buffer is more than twice the default size, replace it with a new, smaller one. // This prevents us from returning very large buffers to the pool. @@ -193,7 +188,7 @@ func (w *BufferedFileWriter) Write(ctx context.Context, data []byte) (int, error "available_space", availableSpace, "grow_size", growSize, ) - w.bufPool.growBufferWithSize(w.buf, growSize) + w.buf.Grow(growSize) } return w.buf.Write(data) From c8b4b02464a69e40bdf951f7992d3b59ec0e7ee7 Mon Sep 17 00:00:00 2001 From: Ahrav Dutta Date: Mon, 5 Feb 2024 12:55:22 -0800 Subject: [PATCH 19/24] remove unused --- pkg/gitparse/gitparse.go | 7 ------- 1 file changed, 7 deletions(-) diff --git a/pkg/gitparse/gitparse.go b/pkg/gitparse/gitparse.go index 04561a84805d..d9a10a873f0d 100644 --- a/pkg/gitparse/gitparse.go +++ b/pkg/gitparse/gitparse.go @@ -623,13 +623,6 @@ func (c *Parser) FromReader(ctx context.Context, stdOut io.Reader, commitChan ch ctx.Logger().V(2).Info("finished parsing git log.", "total_log_size", totalLogSize) } -func (c *Parser) contentWriter() contentWriter { - if c.useCustomContentWriter { - return bufferedfilewriter.New() - } - return newBuffer() -} - func isMergeLine(isStaged bool, latestState ParseState, line []byte) bool { if isStaged || latestState != CommitLine { return false From 8190669bc2d7c2f38fdd1f07414c0a62586cd44e Mon Sep 17 00:00:00 2001 From: Ahrav Dutta Date: Mon, 5 Feb 2024 12:57:33 -0800 Subject: [PATCH 20/24] remove --- pkg/gitparse/gitparse.go | 1 - 1 file changed, 1 deletion(-) diff --git a/pkg/gitparse/gitparse.go b/pkg/gitparse/gitparse.go index d9a10a873f0d..525ff38e3dfd 100644 --- a/pkg/gitparse/gitparse.go +++ b/pkg/gitparse/gitparse.go @@ -445,7 +445,6 @@ func (c *Parser) FromReader(ctx context.Context, stdOut io.Reader, commitChan ch totalLogSize += currentCommit.Size } // Create a new currentDiff and currentCommit - currentCommit = &Commit{} currentDiff = diff() currentCommit = &Commit{Message: strings.Builder{}} // Check that the commit line contains a hash and set it. From c1cf67cf6c06f8f116c7efeb33ce0df5ce4ab25a Mon Sep 17 00:00:00 2001 From: Ahrav Dutta Date: Mon, 5 Feb 2024 14:51:09 -0800 Subject: [PATCH 21/24] rip out Grow --- pkg/writers/buffered_file_writer/bufferedfilewriter.go | 1 - 1 file changed, 1 deletion(-) diff --git a/pkg/writers/buffered_file_writer/bufferedfilewriter.go b/pkg/writers/buffered_file_writer/bufferedfilewriter.go index 616a909499ad..ee7a0a0dd298 100644 --- a/pkg/writers/buffered_file_writer/bufferedfilewriter.go +++ b/pkg/writers/buffered_file_writer/bufferedfilewriter.go @@ -188,7 +188,6 @@ func (w *BufferedFileWriter) Write(ctx context.Context, data []byte) (int, error "available_space", availableSpace, "grow_size", growSize, ) - w.buf.Grow(growSize) } return w.buf.Write(data) From 2e74a4cdbf9b44349c205316acb2024f22bbef3d Mon Sep 17 00:00:00 2001 From: Ahrav Dutta Date: Tue, 6 Feb 2024 07:18:23 -0800 Subject: [PATCH 22/24] address coment --- pkg/writers/buffered_file_writer/bufferedfilewriter.go | 1 - 1 file changed, 1 deletion(-) diff --git a/pkg/writers/buffered_file_writer/bufferedfilewriter.go b/pkg/writers/buffered_file_writer/bufferedfilewriter.go index ee7a0a0dd298..962d3cd4210d 100644 --- a/pkg/writers/buffered_file_writer/bufferedfilewriter.go +++ b/pkg/writers/buffered_file_writer/bufferedfilewriter.go @@ -211,7 +211,6 @@ func (w *BufferedFileWriter) Write(ctx context.Context, data []byte) (int, error if _, err := w.buf.WriteTo(w.file); err != nil { return 0, err } - // Reset the buffer to clear any existing data and return it to the pool. w.bufPool.put(w.buf) } } From a2d23ef3436cd8f00773ddcf58d7e298c8462f46 Mon Sep 17 00:00:00 2001 From: Ahrav Dutta Date: Tue, 6 Feb 2024 07:19:11 -0800 Subject: [PATCH 23/24] use 2k default buffer --- pkg/writers/buffered_file_writer/bufferedfilewriter.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/writers/buffered_file_writer/bufferedfilewriter.go b/pkg/writers/buffered_file_writer/bufferedfilewriter.go index 962d3cd4210d..38b70bdee665 100644 --- a/pkg/writers/buffered_file_writer/bufferedfilewriter.go +++ b/pkg/writers/buffered_file_writer/bufferedfilewriter.go @@ -20,7 +20,7 @@ type bufferPool struct { *sync.Pool } -const defaultBufferSize = 2 << 12 // 8KB +const defaultBufferSize = 2 << 10 // 2KB func newBufferPool(opts ...bufPoolOpt) *bufferPool { pool := &bufferPool{bufferSize: defaultBufferSize} From f1bd25df6c9b29ca4c690d96d3f854e8a62e135d Mon Sep 17 00:00:00 2001 From: Ahrav Dutta Date: Tue, 6 Feb 2024 07:22:15 -0800 Subject: [PATCH 24/24] update comment allow large buffers to be garbage collected --- pkg/writers/buffered_file_writer/bufferedfilewriter.go | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/pkg/writers/buffered_file_writer/bufferedfilewriter.go b/pkg/writers/buffered_file_writer/bufferedfilewriter.go index 38b70bdee665..09a3c0396c54 100644 --- a/pkg/writers/buffered_file_writer/bufferedfilewriter.go +++ b/pkg/writers/buffered_file_writer/bufferedfilewriter.go @@ -55,12 +55,11 @@ func (bp *bufferPool) get(ctx context.Context) *bytes.Buffer { } func (bp *bufferPool) put(buf *bytes.Buffer) { - // If the buffer is more than twice the default size, replace it with a new, smaller one. + // If the buffer is more than twice the default size, release it for garbage collection. // This prevents us from returning very large buffers to the pool. const maxAllowedCapacity = 2 * defaultBufferSize if buf.Cap() > maxAllowedCapacity { - // Replace the buffer with a new, smaller one. No need to copy data since we're resetting it. - buf = bytes.NewBuffer(make([]byte, 0, defaultBufferSize)) + buf = nil // Release the large buffer for garbage collection. } else { // Reset the buffer to clear any existing data. buf.Reset()