Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add Sync Pool For Snappy #6085

Merged
merged 4 commits into from
Jun 2, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions beacon-chain/p2p/encoder/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -25,12 +25,14 @@ go_library(
go_test(
name = "go_default_test",
srcs = [
"snappy_test.go",
"ssz_test.go",
"varint_test.go",
],
embed = [":go_default_library"],
deps = [
"//proto/testing:go_default_library",
"@com_github_gogo_protobuf//proto:go_default_library",
"@com_github_golang_snappy//:go_default_library",
],
)
41 changes: 41 additions & 0 deletions beacon-chain/p2p/encoder/snappy_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
package encoder

import (
"bytes"
"reflect"
"testing"

"github.com/golang/snappy"
)

func TestSszNetworkEncoder_BufferedReader(t *testing.T) {
r := make([]byte, 10)
bufR := snappy.NewReader(bytes.NewBuffer(r))
ptr := reflect.ValueOf(bufR).Pointer()
bufReaderPool.Put(bufR)

r2 := make([]byte, 10)
rdr := newBufferedReader(bytes.NewBuffer(r2))

nPtr := reflect.ValueOf(rdr).Pointer()

if nPtr != ptr {
t.Errorf("wanted pointer value of %d but got %d", ptr, nPtr)
}
}

func TestSszNetworkEncoder_BufferedWriter(t *testing.T) {
r := make([]byte, 10)
bufR := snappy.NewBufferedWriter(bytes.NewBuffer(r))
ptr := reflect.ValueOf(bufR).Pointer()
bufWriterPool.Put(bufR)

r2 := make([]byte, 10)
rdr := newBufferedWriter(bytes.NewBuffer(r2))

nPtr := reflect.ValueOf(rdr).Pointer()

if nPtr != ptr {
t.Errorf("wanted pointer value of %d but got %d", ptr, nPtr)
}
}
49 changes: 46 additions & 3 deletions beacon-chain/p2p/encoder/ssz.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"bytes"
"fmt"
"io"
"sync"

"github.com/gogo/protobuf/proto"
"github.com/golang/snappy"
Expand All @@ -20,6 +21,14 @@ var MaxChunkSize = params.BeaconNetworkConfig().MaxChunkSize // 1Mib
// MaxGossipSize allowed for gossip messages.
var MaxGossipSize = params.BeaconNetworkConfig().GossipMaxSize // 1 Mib

// This pool defines the sync pool for our buffered snappy writers, so that they
// can be constantly reused.
var bufWriterPool = new(sync.Pool)

terencechain marked this conversation as resolved.
Show resolved Hide resolved
// This pool defines the sync pool for our buffered snappy readers, so that they
// can be constantly reused.
var bufReaderPool = new(sync.Pool)

// SszNetworkEncoder supports p2p networking encoding using SimpleSerialize
// with snappy compression (if enabled).
type SszNetworkEncoder struct {
Expand Down Expand Up @@ -116,7 +125,9 @@ func (e SszNetworkEncoder) doDecode(b []byte, to interface{}) error {
func (e SszNetworkEncoder) Decode(b []byte, to interface{}) error {
if e.UseSnappyCompression {
newBuffer := bytes.NewBuffer(b)
r := snappy.NewReader(newBuffer)
r := newBufferedReader(newBuffer)
defer bufReaderPool.Put(r)

newObj := make([]byte, len(b))
numOfBytes, err := r.Read(newObj)
if err != nil {
Expand Down Expand Up @@ -158,7 +169,8 @@ func (e SszNetworkEncoder) DecodeWithMaxLength(r io.Reader, to interface{}, maxS
return err
}
if e.UseSnappyCompression {
r = snappy.NewReader(r)
r = newBufferedReader(r)
defer bufReaderPool.Put(r)
}
if msgLen > maxSize {
return fmt.Errorf("size of decoded message is %d which is larger than the provided max limit of %d", msgLen, maxSize)
Expand Down Expand Up @@ -190,10 +202,41 @@ func (e SszNetworkEncoder) MaxLength(length int) int {

// Writes a bytes value through a snappy buffered writer.
func writeSnappyBuffer(w io.Writer, b []byte) (int, error) {
bufWriter := snappy.NewBufferedWriter(w)
bufWriter := newBufferedWriter(w)
defer bufWriterPool.Put(bufWriter)
num, err := bufWriter.Write(b)
if err != nil {
return 0, err
}
return num, bufWriter.Close()
}

// Instantiates a new instance of the snappy buffered reader
// using our sync pool.
func newBufferedReader(r io.Reader) *snappy.Reader {
rawReader := bufReaderPool.Get()
if rawReader == nil {
return snappy.NewReader(r)
}
bufR, ok := rawReader.(*snappy.Reader)
if !ok {
return snappy.NewReader(r)
}
bufR.Reset(r)
return bufR
}

// Instantiates a new instance of the snappy buffered writer
// using our sync pool.
func newBufferedWriter(w io.Writer) *snappy.Writer {
rawBufWriter := bufWriterPool.Get()
if rawBufWriter == nil {
return snappy.NewBufferedWriter(w)
}
bufW, ok := rawBufWriter.(*snappy.Writer)
if !ok {
return snappy.NewBufferedWriter(w)
}
bufW.Reset(w)
return bufW
}