Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[bug] - Bug archive handler memory leak #2247

Merged
merged 4 commits into from
Dec 20, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions pkg/handlers/archive.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,9 @@ func (a *Archive) openArchive(ctx logContext.Context, depth int, reader io.Reade
if err != nil {
return err
}

defer compReader.Close()

return a.openArchive(ctx, depth+1, compReader, archiveChan)
case archiver.Extractor:
return archive.Extract(logContext.WithValue(ctx, depthKey, depth+1), arReader, nil, a.extractorHandler(archiveChan))
Expand Down
49 changes: 44 additions & 5 deletions pkg/handlers/archive_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,9 @@ func TestHandleFile(t *testing.T) {
// Context cancels the operation.
canceledCtx, cancel := logContext.WithCancel(logContext.Background())
cancel()
assert.False(t, HandleFile(canceledCtx, strings.NewReader("file"), &sources.Chunk{}, reporter))
reader, err := diskbufferreader.New(strings.NewReader("file"))
assert.NoError(t, err)
assert.False(t, HandleFile(canceledCtx, reader, &sources.Chunk{}, reporter))

// Only one chunk is sent on the channel.
// TODO: Embed a zip without making an HTTP request.
Expand All @@ -124,28 +126,59 @@ func TestHandleFile(t *testing.T) {
defer resp.Body.Close()
archive := Archive{}
archive.New()
reader, err := diskbufferreader.New(resp.Body)
reader, err = diskbufferreader.New(resp.Body)
assert.NoError(t, err)

assert.Equal(t, 0, len(reporter.Ch))
assert.True(t, HandleFile(logContext.Background(), reader, &sources.Chunk{}, reporter))
assert.Equal(t, 1, len(reporter.Ch))
}

func BenchmarkHandleFile(b *testing.B) {
file, err := os.Open("testdata/test.tgz")
assert.Nil(b, err)
defer file.Close()

archive := Archive{}
archive.New()

b.ResetTimer()
for i := 0; i < b.N; i++ {
sourceChan := make(chan *sources.Chunk, 1)
reader, err := diskbufferreader.New(file)
assert.NoError(b, err)

b.StartTimer()

go func() {
defer close(sourceChan)
HandleFile(logContext.Background(), reader, &sources.Chunk{}, sources.ChanReporter{Ch: sourceChan})
}()

for range sourceChan {
}

b.StopTimer()
}
}

func TestHandleFileSkipBinaries(t *testing.T) {
filename := createBinaryArchive(t)
defer os.Remove(filename)

file, err := os.Open(filename)
assert.NoError(t, err)

reader, err := diskbufferreader.New(file)
assert.NoError(t, err)

ctx, cancel := logContext.WithTimeout(logContext.Background(), 5*time.Second)
defer cancel()
sourceChan := make(chan *sources.Chunk, 1)

go func() {
defer close(sourceChan)
HandleFile(ctx, file, &sources.Chunk{}, sources.ChanReporter{Ch: sourceChan}, WithSkipBinaries(true))
HandleFile(ctx, reader, &sources.Chunk{}, sources.ChanReporter{Ch: sourceChan}, WithSkipBinaries(true))
}()

count := 0
Expand Down Expand Up @@ -283,12 +316,15 @@ func TestExtractTarContent(t *testing.T) {
assert.Nil(t, err)
defer file.Close()

reader, err := diskbufferreader.New(file)
assert.NoError(t, err)

ctx := logContext.Background()

chunkCh := make(chan *sources.Chunk)
go func() {
defer close(chunkCh)
ok := HandleFile(ctx, file, &sources.Chunk{}, sources.ChanReporter{Ch: chunkCh})
ok := HandleFile(ctx, reader, &sources.Chunk{}, sources.ChanReporter{Ch: chunkCh})
assert.True(t, ok)
}()

Expand Down Expand Up @@ -335,13 +371,16 @@ func TestNestedDirArchive(t *testing.T) {
assert.Nil(t, err)
defer file.Close()

reader, err := diskbufferreader.New(file)
assert.NoError(t, err)

ctx, cancel := logContext.WithTimeout(logContext.Background(), 5*time.Second)
defer cancel()
sourceChan := make(chan *sources.Chunk, 1)

go func() {
defer close(sourceChan)
HandleFile(ctx, file, &sources.Chunk{}, sources.ChanReporter{Ch: sourceChan})
HandleFile(ctx, reader, &sources.Chunk{}, sources.ChanReporter{Ch: sourceChan})
}()

count := 0
Expand Down
16 changes: 2 additions & 14 deletions pkg/handlers/handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,20 +48,11 @@ type Handler interface {
// packages them in the provided chunk skeleton, and reports them to the chunk reporter.
// The function returns true if processing was successful and false otherwise.
// Context is used for cancellation, and the caller is responsible for canceling it if needed.
func HandleFile(ctx logContext.Context, file io.Reader, chunkSkel *sources.Chunk, reporter sources.ChunkReporter, opts ...Option) bool {
func HandleFile(ctx logContext.Context, reReader *diskbufferreader.DiskBufferReader, chunkSkel *sources.Chunk, reporter sources.ChunkReporter, opts ...Option) bool {
for _, h := range DefaultHandlers() {
h.New(opts...)

// The re-reader is used to reset the file reader after checking if the handler implements SpecializedHandler.
// This is necessary because the archive pkg doesn't correctly determine the file type when using
// an io.MultiReader, which is used by the SpecializedHandler.
reReader, err := diskbufferreader.New(file)
if err != nil {
ctx.Logger().Error(err, "error creating reusable reader")
return false
}

if success := processHandler(ctx, h, reReader, chunkSkel, reporter); success {
if handled := processHandler(ctx, h, reReader, chunkSkel, reporter); handled {
return true
}
}
Expand All @@ -70,9 +61,6 @@ func HandleFile(ctx logContext.Context, file io.Reader, chunkSkel *sources.Chunk
}

func processHandler(ctx logContext.Context, h Handler, reReader *diskbufferreader.DiskBufferReader, chunkSkel *sources.Chunk, reporter sources.ChunkReporter) bool {
defer reReader.Close()
defer reReader.Stop()

if specialHandler, ok := h.(SpecializedHandler); ok {
file, isSpecial, err := specialHandler.HandleSpecialized(ctx, reReader)
if isSpecial {
Expand Down
Loading