Skip to content

Commit

Permalink
object: fixed compression bug where we were not clearing the buffer
Browse files Browse the repository at this point in the history
this effectively defeated the purpose of compression, caused high
memory usage and other kinds of bad behavior.

refactored the code to prevent this issue by resetting the buffer
at the caller not callee.

fixed previous e2e test to catch the issue mentioned in #166,
verified it fails against master and passes with this change.
  • Loading branch information
jkowalski committed Jan 10, 2020
1 parent de71f0f commit 0b8c4d0
Show file tree
Hide file tree
Showing 2 changed files with 34 additions and 20 deletions.
32 changes: 26 additions & 6 deletions repo/object/object_manager_test.go
Expand Up @@ -302,7 +302,7 @@ func TestEndToEndReadAndSeek(t *testing.T) {
ctx := context.Background()
_, om := setupTest(t)

for _, size := range []int{1, 199, 200, 201, 9999, 512434} {
for _, size := range []int{1, 199, 200, 201, 9999, 512434, 5012434} {
// Create some random data sample of the specified size.
randomData := make([]byte, size)
cryptorand.Read(randomData) //nolint:errcheck
Expand All @@ -328,20 +328,23 @@ func TestEndToEndReadAndSeek(t *testing.T) {

func TestEndToEndReadAndSeekWithCompression(t *testing.T) {
ctx := context.Background()
_, om := setupTest(t)

for compressorName := range compression.ByName {
for _, size := range []int{1, 199, 200, 201, 9999, 512434} {
// Create some random data sample of the specified size.
randomData := make([]byte, size)
totalBytesWritten := 0
data, om := setupTest(t)

for _, size := range []int{1, 199, 200, 201, 9999, 512434, 5012434} {
// Create some compressible data sample of the specified size.
randomData := makeCompressibleData(size)

writer := om.NewWriter(ctx, WriterOptions{Compressor: compressorName})
if _, err := writer.Write(randomData); err != nil {
t.Errorf("write error: %v", err)
}

totalBytesWritten += size

objectID, err := writer.Result()
t.Logf("oid: %v", objectID)

writer.Close()

Expand All @@ -352,8 +355,25 @@ func TestEndToEndReadAndSeekWithCompression(t *testing.T) {

verify(ctx, t, om, objectID, randomData, fmt.Sprintf("%v %v", objectID, size))
}

compressedBytes := 0
for _, d := range data {
compressedBytes += len(d)
}

// data is highly compressible, should easily compress to 1% of original size or less
ratio := float64(compressedBytes) / float64(totalBytesWritten)
if ratio > 0.01 {
t.Errorf("compression not effective for %v wrote %v, compressed %v, ratio %v", compressorName, totalBytesWritten, compressedBytes, ratio)
}
}
}

func makeCompressibleData(size int) []byte {
phrase := []byte("quick brown fox")
return append(append([]byte(nil), phrase[0:size%len(phrase)]...), bytes.Repeat(phrase, size/len(phrase))...)
}

func verify(ctx context.Context, t *testing.T, om *Manager, objectID ID, expectedData []byte, testCaseID string) {
t.Helper()

Expand Down
22 changes: 8 additions & 14 deletions repo/object/object_writer.go
Expand Up @@ -98,11 +98,13 @@ func (w *objectWriter) flushBuffer() error {
w.indirectIndex[chunkID].Length = int64(length)
w.currentPosition += int64(length)

contentBytes, isCompressed, err := w.maybeCompressedContentBytes()
contentBytes, isCompressed, err := maybeCompressedContentBytes(w.compressor, w.buffer.Bytes())
if err != nil {
return errors.Wrap(err, "unable to prepare content bytes")
}

w.buffer.Reset()

contentID, err := w.repo.contentMgr.WriteContent(w.ctx, contentBytes, w.prefix)
w.repo.trace("OBJECT_WRITER(%q) stored %v (%v bytes)", w.description, contentID, length)

Expand All @@ -121,27 +123,19 @@ func (w *objectWriter) flushBuffer() error {
return nil
}

func (w *objectWriter) maybeCompressedContentBytes() (data []byte, isCompressed bool, err error) {
if w.compressor != nil {
compressedBytes, err := w.compressor.Compress(w.buffer.Bytes())
func maybeCompressedContentBytes(comp compression.Compressor, b []byte) (data []byte, isCompressed bool, err error) {
if comp != nil {
compressedBytes, err := comp.Compress(b)
if err != nil {
return nil, false, errors.Wrap(err, "compression error")
}

if len(compressedBytes) < w.buffer.Len() {
if len(compressedBytes) < len(b) {
return compressedBytes, true, nil
}
}

var b2 bytes.Buffer

if _, err := w.buffer.WriteTo(&b2); err != nil {
return nil, false, err
}

w.buffer.Reset()

return b2.Bytes(), false, nil
return append([]byte{}, b...), false, nil
}

func (w *objectWriter) Result() (ID, error) {
Expand Down

0 comments on commit 0b8c4d0

Please sign in to comment.