From b2cc40269a01ea4bf8f8913b81b4aa6671ecc561 Mon Sep 17 00:00:00 2001 From: Ahrav Dutta Date: Tue, 19 Dec 2023 21:11:27 -0800 Subject: [PATCH 1/4] update handlers --- pkg/handlers/handlers.go | 16 ++-------------- 1 file changed, 2 insertions(+), 14 deletions(-) diff --git a/pkg/handlers/handlers.go b/pkg/handlers/handlers.go index 6008f1aa1926..d52ba745d9e4 100644 --- a/pkg/handlers/handlers.go +++ b/pkg/handlers/handlers.go @@ -48,20 +48,11 @@ type Handler interface { // packages them in the provided chunk skeleton, and reports them to the chunk reporter. // The function returns true if processing was successful and false otherwise. // Context is used for cancellation, and the caller is responsible for canceling it if needed. -func HandleFile(ctx logContext.Context, file io.Reader, chunkSkel *sources.Chunk, reporter sources.ChunkReporter, opts ...Option) bool { +func HandleFile(ctx logContext.Context, reReader *diskbufferreader.DiskBufferReader, chunkSkel *sources.Chunk, reporter sources.ChunkReporter, opts ...Option) bool { for _, h := range DefaultHandlers() { h.New(opts...) - // The re-reader is used to reset the file reader after checking if the handler implements SpecializedHandler. - // This is necessary because the archive pkg doesn't correctly determine the file type when using - // an io.MultiReader, which is used by the SpecializedHandler. - reReader, err := diskbufferreader.New(file) - if err != nil { - ctx.Logger().Error(err, "error creating reusable reader") - return false - } - - if success := processHandler(ctx, h, reReader, chunkSkel, reporter); success { + if handled := processHandler(ctx, h, reReader, chunkSkel, reporter); handled { return true } } @@ -70,9 +61,6 @@ func HandleFile(ctx logContext.Context, file io.Reader, chunkSkel *sources.Chunk } func processHandler(ctx logContext.Context, h Handler, reReader *diskbufferreader.DiskBufferReader, chunkSkel *sources.Chunk, reporter sources.ChunkReporter) bool { - defer reReader.Close() - defer reReader.Stop() - if specialHandler, ok := h.(SpecializedHandler); ok { file, isSpecial, err := specialHandler.HandleSpecialized(ctx, reReader) if isSpecial { From 051b8d0ba4a0e168e40824264ce7017980212e6b Mon Sep 17 00:00:00 2001 From: Ahrav Dutta Date: Tue, 19 Dec 2023 21:54:38 -0800 Subject: [PATCH 2/4] add benchmark --- pkg/handlers/archive_test.go | 45 ++++++++++++++++++++++++++++++++---- 1 file changed, 40 insertions(+), 5 deletions(-) diff --git a/pkg/handlers/archive_test.go b/pkg/handlers/archive_test.go index 6fe77f340ceb..8c864e93b165 100644 --- a/pkg/handlers/archive_test.go +++ b/pkg/handlers/archive_test.go @@ -115,7 +115,9 @@ func TestHandleFile(t *testing.T) { // Context cancels the operation. canceledCtx, cancel := logContext.WithCancel(logContext.Background()) cancel() - assert.False(t, HandleFile(canceledCtx, strings.NewReader("file"), &sources.Chunk{}, reporter)) + reader, err := diskbufferreader.New(strings.NewReader("file")) + assert.NoError(t, err) + assert.False(t, HandleFile(canceledCtx, reader, &sources.Chunk{}, reporter)) // Only one chunk is sent on the channel. // TODO: Embed a zip without making an HTTP request. @@ -124,7 +126,7 @@ func TestHandleFile(t *testing.T) { defer resp.Body.Close() archive := Archive{} archive.New() - reader, err := diskbufferreader.New(resp.Body) + reader, err = diskbufferreader.New(resp.Body) assert.NoError(t, err) assert.Equal(t, 0, len(reporter.Ch)) @@ -132,6 +134,30 @@ func TestHandleFile(t *testing.T) { assert.Equal(t, 1, len(reporter.Ch)) } +func BenchmarkHandleFile(b *testing.B) { + file, err := os.Open("testdata/dir-archive.zip") + assert.Nil(b, err) + defer file.Close() + + archive := Archive{} + archive.New() + + b.ResetTimer() + for i := 0; i < b.N; i++ { + sourceChan := make(chan *sources.Chunk, 1) + reader, err := diskbufferreader.New(file) + assert.NoError(b, err) + + go func() { + defer close(sourceChan) + HandleFile(logContext.Background(), reader, &sources.Chunk{}, sources.ChanReporter{Ch: sourceChan}) + }() + + for range sourceChan { + } + } +} + func TestHandleFileSkipBinaries(t *testing.T) { filename := createBinaryArchive(t) defer os.Remove(filename) @@ -139,13 +165,16 @@ func TestHandleFileSkipBinaries(t *testing.T) { file, err := os.Open(filename) assert.NoError(t, err) + reader, err := diskbufferreader.New(file) + assert.NoError(t, err) + ctx, cancel := logContext.WithTimeout(logContext.Background(), 5*time.Second) defer cancel() sourceChan := make(chan *sources.Chunk, 1) go func() { defer close(sourceChan) - HandleFile(ctx, file, &sources.Chunk{}, sources.ChanReporter{Ch: sourceChan}, WithSkipBinaries(true)) + HandleFile(ctx, reader, &sources.Chunk{}, sources.ChanReporter{Ch: sourceChan}, WithSkipBinaries(true)) }() count := 0 @@ -283,12 +312,15 @@ func TestExtractTarContent(t *testing.T) { assert.Nil(t, err) defer file.Close() + reader, err := diskbufferreader.New(file) + assert.NoError(t, err) + ctx := logContext.Background() chunkCh := make(chan *sources.Chunk) go func() { defer close(chunkCh) - ok := HandleFile(ctx, file, &sources.Chunk{}, sources.ChanReporter{Ch: chunkCh}) + ok := HandleFile(ctx, reader, &sources.Chunk{}, sources.ChanReporter{Ch: chunkCh}) assert.True(t, ok) }() @@ -335,13 +367,16 @@ func TestNestedDirArchive(t *testing.T) { assert.Nil(t, err) defer file.Close() + reader, err := diskbufferreader.New(file) + assert.NoError(t, err) + ctx, cancel := logContext.WithTimeout(logContext.Background(), 5*time.Second) defer cancel() sourceChan := make(chan *sources.Chunk, 1) go func() { defer close(sourceChan) - HandleFile(ctx, file, &sources.Chunk{}, sources.ChanReporter{Ch: sourceChan}) + HandleFile(ctx, reader, &sources.Chunk{}, sources.ChanReporter{Ch: sourceChan}) }() count := 0 From f8e310a46bc0e30c732630c73f975eef98b00326 Mon Sep 17 00:00:00 2001 From: Ahrav Dutta Date: Tue, 19 Dec 2023 21:58:56 -0800 Subject: [PATCH 3/4] update benchmark. --- pkg/handlers/archive_test.go | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/pkg/handlers/archive_test.go b/pkg/handlers/archive_test.go index 8c864e93b165..3953f3cb9443 100644 --- a/pkg/handlers/archive_test.go +++ b/pkg/handlers/archive_test.go @@ -148,6 +148,8 @@ func BenchmarkHandleFile(b *testing.B) { reader, err := diskbufferreader.New(file) assert.NoError(b, err) + b.StartTimer() + go func() { defer close(sourceChan) HandleFile(logContext.Background(), reader, &sources.Chunk{}, sources.ChanReporter{Ch: sourceChan}) @@ -155,6 +157,8 @@ func BenchmarkHandleFile(b *testing.B) { for range sourceChan { } + + b.StopTimer() } } From 985d3d4f926295ff01d2b232abfe1a19da2fd800 Mon Sep 17 00:00:00 2001 From: Ahrav Dutta Date: Tue, 19 Dec 2023 22:27:54 -0800 Subject: [PATCH 4/4] update tests --- pkg/handlers/archive.go | 3 +++ pkg/handlers/archive_test.go | 2 +- 2 files changed, 4 insertions(+), 1 deletion(-) diff --git a/pkg/handlers/archive.go b/pkg/handlers/archive.go index 0b4a300fc01f..e8f571698039 100644 --- a/pkg/handlers/archive.go +++ b/pkg/handlers/archive.go @@ -111,6 +111,9 @@ func (a *Archive) openArchive(ctx logContext.Context, depth int, reader io.Reade if err != nil { return err } + + defer compReader.Close() + return a.openArchive(ctx, depth+1, compReader, archiveChan) case archiver.Extractor: return archive.Extract(logContext.WithValue(ctx, depthKey, depth+1), arReader, nil, a.extractorHandler(archiveChan)) diff --git a/pkg/handlers/archive_test.go b/pkg/handlers/archive_test.go index 3953f3cb9443..dfff4fb1afc2 100644 --- a/pkg/handlers/archive_test.go +++ b/pkg/handlers/archive_test.go @@ -135,7 +135,7 @@ func TestHandleFile(t *testing.T) { } func BenchmarkHandleFile(b *testing.B) { - file, err := os.Open("testdata/dir-archive.zip") + file, err := os.Open("testdata/test.tgz") assert.Nil(b, err) defer file.Close()