diff --git a/pkg/handlers/archive.go b/pkg/handlers/archive.go index 0b4a300fc01f..e8f571698039 100644 --- a/pkg/handlers/archive.go +++ b/pkg/handlers/archive.go @@ -111,6 +111,9 @@ func (a *Archive) openArchive(ctx logContext.Context, depth int, reader io.Reade if err != nil { return err } + + defer compReader.Close() + return a.openArchive(ctx, depth+1, compReader, archiveChan) case archiver.Extractor: return archive.Extract(logContext.WithValue(ctx, depthKey, depth+1), arReader, nil, a.extractorHandler(archiveChan)) diff --git a/pkg/handlers/archive_test.go b/pkg/handlers/archive_test.go index 6fe77f340ceb..dfff4fb1afc2 100644 --- a/pkg/handlers/archive_test.go +++ b/pkg/handlers/archive_test.go @@ -115,7 +115,9 @@ func TestHandleFile(t *testing.T) { // Context cancels the operation. canceledCtx, cancel := logContext.WithCancel(logContext.Background()) cancel() - assert.False(t, HandleFile(canceledCtx, strings.NewReader("file"), &sources.Chunk{}, reporter)) + reader, err := diskbufferreader.New(strings.NewReader("file")) + assert.NoError(t, err) + assert.False(t, HandleFile(canceledCtx, reader, &sources.Chunk{}, reporter)) // Only one chunk is sent on the channel. // TODO: Embed a zip without making an HTTP request. @@ -124,7 +126,7 @@ func TestHandleFile(t *testing.T) { defer resp.Body.Close() archive := Archive{} archive.New() - reader, err := diskbufferreader.New(resp.Body) + reader, err = diskbufferreader.New(resp.Body) assert.NoError(t, err) assert.Equal(t, 0, len(reporter.Ch)) @@ -132,6 +134,34 @@ func TestHandleFile(t *testing.T) { assert.Equal(t, 1, len(reporter.Ch)) } +func BenchmarkHandleFile(b *testing.B) { + file, err := os.Open("testdata/test.tgz") + assert.Nil(b, err) + defer file.Close() + + archive := Archive{} + archive.New() + + b.ResetTimer() + for i := 0; i < b.N; i++ { + sourceChan := make(chan *sources.Chunk, 1) + reader, err := diskbufferreader.New(file) + assert.NoError(b, err) + + b.StartTimer() + + go func() { + defer close(sourceChan) + HandleFile(logContext.Background(), reader, &sources.Chunk{}, sources.ChanReporter{Ch: sourceChan}) + }() + + for range sourceChan { + } + + b.StopTimer() + } +} + func TestHandleFileSkipBinaries(t *testing.T) { filename := createBinaryArchive(t) defer os.Remove(filename) @@ -139,13 +169,16 @@ func TestHandleFileSkipBinaries(t *testing.T) { file, err := os.Open(filename) assert.NoError(t, err) + reader, err := diskbufferreader.New(file) + assert.NoError(t, err) + ctx, cancel := logContext.WithTimeout(logContext.Background(), 5*time.Second) defer cancel() sourceChan := make(chan *sources.Chunk, 1) go func() { defer close(sourceChan) - HandleFile(ctx, file, &sources.Chunk{}, sources.ChanReporter{Ch: sourceChan}, WithSkipBinaries(true)) + HandleFile(ctx, reader, &sources.Chunk{}, sources.ChanReporter{Ch: sourceChan}, WithSkipBinaries(true)) }() count := 0 @@ -283,12 +316,15 @@ func TestExtractTarContent(t *testing.T) { assert.Nil(t, err) defer file.Close() + reader, err := diskbufferreader.New(file) + assert.NoError(t, err) + ctx := logContext.Background() chunkCh := make(chan *sources.Chunk) go func() { defer close(chunkCh) - ok := HandleFile(ctx, file, &sources.Chunk{}, sources.ChanReporter{Ch: chunkCh}) + ok := HandleFile(ctx, reader, &sources.Chunk{}, sources.ChanReporter{Ch: chunkCh}) assert.True(t, ok) }() @@ -335,13 +371,16 @@ func TestNestedDirArchive(t *testing.T) { assert.Nil(t, err) defer file.Close() + reader, err := diskbufferreader.New(file) + assert.NoError(t, err) + ctx, cancel := logContext.WithTimeout(logContext.Background(), 5*time.Second) defer cancel() sourceChan := make(chan *sources.Chunk, 1) go func() { defer close(sourceChan) - HandleFile(ctx, file, &sources.Chunk{}, sources.ChanReporter{Ch: sourceChan}) + HandleFile(ctx, reader, &sources.Chunk{}, sources.ChanReporter{Ch: sourceChan}) }() count := 0 diff --git a/pkg/handlers/handlers.go b/pkg/handlers/handlers.go index 6008f1aa1926..d52ba745d9e4 100644 --- a/pkg/handlers/handlers.go +++ b/pkg/handlers/handlers.go @@ -48,20 +48,11 @@ type Handler interface { // packages them in the provided chunk skeleton, and reports them to the chunk reporter. // The function returns true if processing was successful and false otherwise. // Context is used for cancellation, and the caller is responsible for canceling it if needed. -func HandleFile(ctx logContext.Context, file io.Reader, chunkSkel *sources.Chunk, reporter sources.ChunkReporter, opts ...Option) bool { +func HandleFile(ctx logContext.Context, reReader *diskbufferreader.DiskBufferReader, chunkSkel *sources.Chunk, reporter sources.ChunkReporter, opts ...Option) bool { for _, h := range DefaultHandlers() { h.New(opts...) - // The re-reader is used to reset the file reader after checking if the handler implements SpecializedHandler. - // This is necessary because the archive pkg doesn't correctly determine the file type when using - // an io.MultiReader, which is used by the SpecializedHandler. - reReader, err := diskbufferreader.New(file) - if err != nil { - ctx.Logger().Error(err, "error creating reusable reader") - return false - } - - if success := processHandler(ctx, h, reReader, chunkSkel, reporter); success { + if handled := processHandler(ctx, h, reReader, chunkSkel, reporter); handled { return true } } @@ -70,9 +61,6 @@ func HandleFile(ctx logContext.Context, file io.Reader, chunkSkel *sources.Chunk } func processHandler(ctx logContext.Context, h Handler, reReader *diskbufferreader.DiskBufferReader, chunkSkel *sources.Chunk, reporter sources.ChunkReporter) bool { - defer reReader.Close() - defer reReader.Stop() - if specialHandler, ok := h.(SpecializedHandler); ok { file, isSpecial, err := specialHandler.HandleSpecialized(ctx, reReader) if isSpecial {