Skip to content

Commit

Permalink
Merge pull request #620 from mhmtszr/compression-memory-leak
Browse files Browse the repository at this point in the history
Fix memory leak while compression
  • Loading branch information
iamemilio committed Jan 19, 2023
2 parents 9fa5deb + cb791a3 commit 02c2525
Show file tree
Hide file tree
Showing 6 changed files with 101 additions and 13 deletions.
12 changes: 7 additions & 5 deletions internal/collector.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
"os"
"regexp"
"strconv"
"sync"
"time"

"github.com/newrelic/go-agent/internal/logger"
Expand Down Expand Up @@ -50,10 +51,11 @@ type RpmCmd struct {
// RpmControls contains fields which will be the same for all calls made
// by the same application.
type RpmControls struct {
License string
Client *http.Client
Logger logger.Logger
AgentVersion string
License string
Client *http.Client
Logger logger.Logger
AgentVersion string
GzipWriterPool *sync.Pool
}

// RPMResponse contains a NR endpoint response.
Expand Down Expand Up @@ -131,7 +133,7 @@ func rpmURL(cmd RpmCmd, cs RpmControls) string {
}

func collectorRequestInternal(url string, cmd RpmCmd, cs RpmControls) RPMResponse {
compressed, err := compress(cmd.Data)
compressed, err := compress(cmd.Data, cs.GzipWriterPool)
if nil != err {
return RPMResponse{Err: err}
}
Expand Down
33 changes: 33 additions & 0 deletions internal/collector_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,16 @@
package internal

import (
"compress/gzip"
"encoding/json"
"errors"
"fmt"
"io"
"io/ioutil"
"net/http"
"net/url"
"strings"
"sync"
"testing"

"github.com/newrelic/go-agent/internal/crossagent"
Expand Down Expand Up @@ -111,6 +114,11 @@ func TestCollectorRequest(t *testing.T) {
},
Logger: logger.ShimLogger{IsDebugEnabled: true},
AgentVersion: "agent_version",
GzipWriterPool: &sync.Pool{
New: func() interface{} {
return gzip.NewWriter(io.Discard)
},
},
}
resp := CollectorRequest(cmd, cs)
if nil != resp.Err {
Expand Down Expand Up @@ -138,6 +146,11 @@ func TestCollectorBadRequest(t *testing.T) {
},
Logger: logger.ShimLogger{IsDebugEnabled: true},
AgentVersion: "agent_version",
GzipWriterPool: &sync.Pool{
New: func() interface{} {
return gzip.NewWriter(io.Discard)
},
},
}
u := ":" // bad url
resp := collectorRequestInternal(u, cmd, cs)
Expand All @@ -157,6 +170,11 @@ func TestUrl(t *testing.T) {
Client: nil,
Logger: nil,
AgentVersion: "1",
GzipWriterPool: &sync.Pool{
New: func() interface{} {
return gzip.NewWriter(io.Discard)
},
},
}

out := rpmURL(cmd, cs)
Expand Down Expand Up @@ -236,6 +254,11 @@ func testConnectHelper(cm connectMock) (*ConnectReply, RPMResponse) {
Client: &http.Client{Transport: cm},
Logger: logger.ShimLogger{IsDebugEnabled: true},
AgentVersion: "1",
GzipWriterPool: &sync.Pool{
New: func() interface{} {
return gzip.NewWriter(io.Discard)
},
},
}

return ConnectAttempt(config, "", false, cs)
Expand Down Expand Up @@ -479,6 +502,11 @@ func TestCollectorRequestRespectsMaxPayloadSize(t *testing.T) {
}),
},
Logger: logger.ShimLogger{IsDebugEnabled: true},
GzipWriterPool: &sync.Pool{
New: func() interface{} {
return gzip.NewWriter(io.Discard)
},
},
}
resp := CollectorRequest(cmd, cs)
if nil == resp.Err {
Expand Down Expand Up @@ -515,6 +543,11 @@ func TestConnectReplyMaxPayloadSize(t *testing.T) {
}),
},
Logger: logger.ShimLogger{IsDebugEnabled: true},
GzipWriterPool: &sync.Pool{
New: func() interface{} {
return gzip.NewWriter(io.Discard)
},
},
}
}

Expand Down
8 changes: 6 additions & 2 deletions internal/compress.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,15 @@ package internal
import (
"bytes"
"compress/gzip"
"sync"
)

func compress(b []byte) (*bytes.Buffer, error) {
func compress(b []byte, gzipWriterPool *sync.Pool) (*bytes.Buffer, error) {
w := gzipWriterPool.Get().(*gzip.Writer)
defer gzipWriterPool.Put(w)

var buf bytes.Buffer
w := gzip.NewWriter(&buf)
w.Reset(&buf)
_, err := w.Write(b)
w.Close()

Expand Down
17 changes: 11 additions & 6 deletions v3/newrelic/collector.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
"net/http"
"net/url"
"strconv"
"sync"

"github.com/newrelic/go-agent/v3/internal"
"github.com/newrelic/go-agent/v3/internal/logger"
Expand Down Expand Up @@ -52,9 +53,10 @@ type rpmCmd struct {
// rpmControls contains fields which will be the same for all calls made
// by the same application.
type rpmControls struct {
License string
Client *http.Client
Logger logger.Logger
License string
Client *http.Client
Logger logger.Logger
GzipWriterPool *sync.Pool
}

// rpmResponse contains a NR endpoint response.
Expand Down Expand Up @@ -136,9 +138,12 @@ func rpmURL(cmd rpmCmd, cs rpmControls) string {
return u.String()
}

func compress(b []byte) (*bytes.Buffer, error) {
func compress(b []byte, gzipWriterPool *sync.Pool) (*bytes.Buffer, error) {
w := gzipWriterPool.Get().(*gzip.Writer)
defer gzipWriterPool.Put(w)

var buf bytes.Buffer
w := gzip.NewWriter(&buf)
w.Reset(&buf)
_, err := w.Write(b)
w.Close()

Expand All @@ -150,7 +155,7 @@ func compress(b []byte) (*bytes.Buffer, error) {
}

func collectorRequestInternal(url string, cmd rpmCmd, cs rpmControls) rpmResponse {
compressed, err := compress(cmd.Data)
compressed, err := compress(cmd.Data, cs.GzipWriterPool)
if nil != err {
return rpmResponse{Err: err}
}
Expand Down
38 changes: 38 additions & 0 deletions v3/newrelic/collector_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,16 @@
package newrelic

import (
"compress/gzip"
"encoding/json"
"errors"
"fmt"
"io"
"io/ioutil"
"net/http"
"net/url"
"strings"
"sync"
"testing"
"time"

Expand Down Expand Up @@ -105,6 +108,11 @@ func TestCollectorRequest(t *testing.T) {
}),
},
Logger: logger.ShimLogger{IsDebugEnabled: true},
GzipWriterPool: &sync.Pool{
New: func() interface{} {
return gzip.NewWriter(io.Discard)
},
},
}
resp := collectorRequest(cmd, cs)
if nil != resp.Err {
Expand All @@ -131,6 +139,11 @@ func TestCollectorBadRequest(t *testing.T) {
}),
},
Logger: logger.ShimLogger{IsDebugEnabled: true},
GzipWriterPool: &sync.Pool{
New: func() interface{} {
return gzip.NewWriter(io.Discard)
},
},
}
u := ":" // bad url
resp := collectorRequestInternal(u, cmd, cs)
Expand All @@ -154,6 +167,11 @@ func TestCollectorTimeout(t *testing.T) {
Timeout: time.Nanosecond, // force a timeout
},
Logger: logger.ShimLogger{IsDebugEnabled: true},
GzipWriterPool: &sync.Pool{
New: func() interface{} {
return gzip.NewWriter(io.Discard)
},
},
}
u := "https://example.com"
resp := collectorRequestInternal(u, cmd, cs)
Expand All @@ -174,6 +192,11 @@ func TestUrl(t *testing.T) {
License: "123abc",
Client: nil,
Logger: nil,
GzipWriterPool: &sync.Pool{
New: func() interface{} {
return gzip.NewWriter(io.Discard)
},
},
}

out := rpmURL(cmd, cs)
Expand Down Expand Up @@ -235,6 +258,11 @@ func testConnectHelper(cm connectMock) (*internal.ConnectReply, rpmResponse) {
License: "12345",
Client: &http.Client{Transport: cm},
Logger: logger.ShimLogger{IsDebugEnabled: true},
GzipWriterPool: &sync.Pool{
New: func() interface{} {
return gzip.NewWriter(io.Discard)
},
},
}

return connectAttempt(cm.config, cs)
Expand Down Expand Up @@ -403,6 +431,11 @@ func TestCollectorRequestRespectsMaxPayloadSize(t *testing.T) {
}),
},
Logger: logger.ShimLogger{IsDebugEnabled: true},
GzipWriterPool: &sync.Pool{
New: func() interface{} {
return gzip.NewWriter(io.Discard)
},
},
}
resp := collectorRequest(cmd, cs)
if nil == resp.Err {
Expand Down Expand Up @@ -439,6 +472,11 @@ func TestConnectReplyMaxPayloadSize(t *testing.T) {
}),
},
Logger: logger.ShimLogger{IsDebugEnabled: true},
GzipWriterPool: &sync.Pool{
New: func() interface{} {
return gzip.NewWriter(io.Discard)
},
},
}
}

Expand Down
6 changes: 6 additions & 0 deletions v3/newrelic/internal_app.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
package newrelic

import (
"compress/gzip"
"errors"
"fmt"
"io"
Expand Down Expand Up @@ -433,6 +434,11 @@ func newApp(c config) *app {
Timeout: collectorTimeout,
},
Logger: c.Logger,
GzipWriterPool: &sync.Pool{
New: func() interface{} {
return gzip.NewWriter(io.Discard)
},
},
},
}

Expand Down

0 comments on commit 02c2525

Please sign in to comment.