Skip to content

Commit

Permalink
[opt]: compress write data by re-use snappy streaming (#1027)
Browse files Browse the repository at this point in the history
  • Loading branch information
stone1100 committed May 8, 2024
1 parent bad9228 commit 870f210
Show file tree
Hide file tree
Showing 15 changed files with 286 additions and 223 deletions.
7 changes: 7 additions & 0 deletions .codecov.yml
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,13 @@ coverage:
# (default is 70..100)
round: down # up, down, or nearest
precision: 2 # Number of decimal places, between 0 and 5
status:
project:
default:
informational: true
patch:
default:
informational: true

# Ignoring Paths
# --------------
Expand Down
6 changes: 4 additions & 2 deletions .github/workflows/lind.yml
Original file line number Diff line number Diff line change
Expand Up @@ -49,8 +49,10 @@ jobs:
run: make gomock
- name: Test
run: make test-without-lint
- name: Upload
run: bash <(curl -s https://codecov.io/bash) -t ${{ secrets.CODECOV_TOKEN }}
- name: Upload coverage reports to Codecov
uses: codecov/codecov-action@v4.2.0
env:
CODECOV_TOKEN: ${{ secrets.CODECOV_TOKEN }}

darwin-test:
name: Unit Test(MacOS)
Expand Down
104 changes: 104 additions & 0 deletions pkg/compress/snappy.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,104 @@
// Licensed to LinDB under one or more contributor
// license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright
// ownership. LinDB licenses this file to you under
// the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package compress

import (
"bytes"
"io"

"github.com/klauspost/compress/snappy"
)

// Writer represents compress writer.
type Writer interface {
io.Writer
io.Closer
// Bytes returns compressed binary data.
Bytes() []byte
}

// Reader represents uncompress reader.
type Reader interface {
// Uncompress de-compresses compressed binary data.
Uncompress(compressData []byte) ([]byte, error)
}

type snappyWriter struct {
writer *snappy.Writer
buffer bytes.Buffer
}

// NewBufferedWriter creates a snappy compress writer.
func NewSnappyWriter() Writer {
w := &snappyWriter{}
w.writer = snappy.NewBufferedWriter(&w.buffer)
return w
}

func (w *snappyWriter) Write(row []byte) (n int, err error) {
n, err = w.writer.Write(row)
return n, err
}

func (w *snappyWriter) Bytes() (r []byte) {
// TODO: opts bytes copy?
data := w.buffer.Bytes()
r = make([]byte, len(data))
copy(r, data)

// reset compress context
w.buffer.Reset()
w.writer.Reset(&w.buffer)
return
}

func (w *snappyWriter) Close() error {
return w.writer.Close()
}

type snappyReader struct {
reader *snappy.Reader
compressed bytes.Buffer
decompressed bytes.Buffer
}

// NewSnappyReader creates a snappy uncompress reader.
func NewSnappyReader() Reader {
r := &snappyReader{}
r.reader = snappy.NewReader(&r.compressed)
return r
}

func (r *snappyReader) Uncompress(compressData []byte) ([]byte, error) {
defer func() {
// reset uncompress context
r.compressed.Reset()
r.decompressed.Reset()
r.reader.Reset(&r.compressed)
}()
_, err := r.compressed.Write(compressData)

if err == nil {
_, err = io.Copy(&r.decompressed, r.reader)
}

if err != nil {
return nil, err
}
return r.decompressed.Bytes(), nil
}
39 changes: 39 additions & 0 deletions pkg/compress/snappy_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
// Licensed to LinDB under one or more contributor
// license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright
// ownership. LinDB licenses this file to you under
// the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package compress

import (
"testing"

"github.com/stretchr/testify/assert"
)

func Test_Snappy(t *testing.T) {
data := []byte("hello snappy")
w := NewSnappyWriter()
_, err := w.Write(data)
assert.NoError(t, err)
err = w.Close()
assert.NoError(t, err)
compress := w.Bytes()

r := NewSnappyReader()
dst, err := r.Uncompress(compress)
assert.NoError(t, err)
assert.Equal(t, data, dst)
}
30 changes: 12 additions & 18 deletions replica/channel_family.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ type familyChannel struct {
cancel context.CancelFunc
liveNodes map[models.NodeID]models.StatefulNode
statistics *metrics.BrokerFamilyWriteStatistics
ch chan *compressedChunk
ch chan []byte
leaderChangedSignal chan struct{}
lastFlushTime *atomic.Int64
stoppingSignal chan struct{}
Expand Down Expand Up @@ -104,7 +104,7 @@ func newFamilyChannel(
shardState: shardState,
liveNodes: liveNodes,
newWriteStreamFn: rpc.NewWriteStream,
ch: make(chan *compressedChunk, 2),
ch: make(chan []byte, 2),
notifyLeaderChange: atomic.NewBool(false),
leaderChangedSignal: make(chan struct{}, 1),
stoppedSignal: make(chan struct{}, 1),
Expand Down Expand Up @@ -203,8 +203,8 @@ func (fc *familyChannel) writeTask(_ context.Context) {
ticker := time.NewTicker(fc.checkFlushInterval)
defer ticker.Stop()

retryBuffers := make([]*compressedChunk, 0)
retry := func(compressed *compressedChunk) {
retryBuffers := make([][]byte, 0)
retry := func(compressed []byte) {
if len(retryBuffers) > fc.maxRetryBuf {
fc.logger.Error("too many retry messages, drop current message")
fc.statistics.RetryDrop.Incr()
Expand All @@ -214,12 +214,8 @@ func (fc *familyChannel) writeTask(_ context.Context) {
}
}
var stream rpc.WriteStream
send := func(compressed *compressedChunk) bool {
if compressed == nil {
return true
}
if len(*compressed) == 0 {
compressed.Release()
send := func(compressed []byte) bool {
if len(compressed) == 0 {
return true
}
if stream == nil {
Expand All @@ -237,7 +233,7 @@ func (fc *familyChannel) writeTask(_ context.Context) {
fc.statistics.CreateStream.Incr()
stream = s
}
if err := stream.Send(*compressed); err != nil {
if err := stream.Send(compressed); err != nil {
fc.statistics.SendFailure.Incr()
fc.logger.Error(
"failed writing compressed chunk to storage",
Expand All @@ -260,9 +256,8 @@ func (fc *familyChannel) writeTask(_ context.Context) {
return false
}
fc.statistics.SendSuccess.Incr()
fc.statistics.SendSize.Add(float64(len(*compressed)))
fc.statistics.SendSize.Add(float64(len(compressed)))
fc.statistics.PendingSend.Decr()
compressed.Release()
return true
}

Expand All @@ -282,7 +277,7 @@ func (fc *familyChannel) writeTask(_ context.Context) {
defer func() {
fc.stoppedSignal <- struct{}{}
}()
sendLastMsg := func(compressed *compressedChunk) {
sendLastMsg := func(compressed []byte) {
if !send(compressed) {
fc.logger.Error("send message failure before close channel, message lost")
}
Expand Down Expand Up @@ -324,7 +319,7 @@ func (fc *familyChannel) writeTask(_ context.Context) {
// if send ok, retry pending message
if len(retryBuffers) > 0 {
messages := retryBuffers
retryBuffers = make([]*compressedChunk, 0)
retryBuffers = make([][]byte, 0)
for _, msg := range messages {
if !send(msg) {
retry(msg)
Expand All @@ -335,7 +330,6 @@ func (fc *familyChannel) writeTask(_ context.Context) {
stream = nil
}
case <-ticker.C:
// check
fc.checkFlush()
case <-fc.ctx.Done():
sendBeforeStop()
Expand All @@ -345,7 +339,7 @@ func (fc *familyChannel) writeTask(_ context.Context) {
}

// sendPendingMessage sends pending message before close this channel.
func (fc *familyChannel) sendPendingMessage(sendLastMsg func(compressed *compressedChunk)) {
func (fc *familyChannel) sendPendingMessage(sendLastMsg func(compressed []byte)) {
// try to write pending data
for compressed := range fc.ch {
sendLastMsg(compressed)
Expand Down Expand Up @@ -388,7 +382,7 @@ func (fc *familyChannel) flushChunk() {
fc.logger.Error("compress chunk err", logger.Error(err))
return
}
if compressed == nil || len(*compressed) == 0 {
if len(compressed) == 0 {
return
}
select {
Expand Down

0 comments on commit 870f210

Please sign in to comment.