Skip to content

Commit

Permalink
Add Sync Pool For Snappy (#6085)
Browse files Browse the repository at this point in the history
* add pool for snappy

* Update beacon-chain/p2p/encoder/ssz.go

* comment added

Co-authored-by: terence tsao <terence@prysmaticlabs.com>
  • Loading branch information
nisdas and terencechain committed Jun 2, 2020
1 parent fd19fd1 commit 4694998
Show file tree
Hide file tree
Showing 3 changed files with 89 additions and 3 deletions.
2 changes: 2 additions & 0 deletions beacon-chain/p2p/encoder/BUILD.bazel
Expand Up @@ -25,12 +25,14 @@ go_library(
go_test(
name = "go_default_test",
srcs = [
"snappy_test.go",
"ssz_test.go",
"varint_test.go",
],
embed = [":go_default_library"],
deps = [
"//proto/testing:go_default_library",
"@com_github_gogo_protobuf//proto:go_default_library",
"@com_github_golang_snappy//:go_default_library",
],
)
41 changes: 41 additions & 0 deletions beacon-chain/p2p/encoder/snappy_test.go
@@ -0,0 +1,41 @@
package encoder

import (
"bytes"
"reflect"
"testing"

"github.com/golang/snappy"
)

func TestSszNetworkEncoder_BufferedReader(t *testing.T) {
r := make([]byte, 10)
bufR := snappy.NewReader(bytes.NewBuffer(r))
ptr := reflect.ValueOf(bufR).Pointer()
bufReaderPool.Put(bufR)

r2 := make([]byte, 10)
rdr := newBufferedReader(bytes.NewBuffer(r2))

nPtr := reflect.ValueOf(rdr).Pointer()

if nPtr != ptr {
t.Errorf("wanted pointer value of %d but got %d", ptr, nPtr)
}
}

func TestSszNetworkEncoder_BufferedWriter(t *testing.T) {
r := make([]byte, 10)
bufR := snappy.NewBufferedWriter(bytes.NewBuffer(r))
ptr := reflect.ValueOf(bufR).Pointer()
bufWriterPool.Put(bufR)

r2 := make([]byte, 10)
rdr := newBufferedWriter(bytes.NewBuffer(r2))

nPtr := reflect.ValueOf(rdr).Pointer()

if nPtr != ptr {
t.Errorf("wanted pointer value of %d but got %d", ptr, nPtr)
}
}
49 changes: 46 additions & 3 deletions beacon-chain/p2p/encoder/ssz.go
Expand Up @@ -4,6 +4,7 @@ import (
"bytes"
"fmt"
"io"
"sync"

"github.com/gogo/protobuf/proto"
"github.com/golang/snappy"
Expand All @@ -20,6 +21,14 @@ var MaxChunkSize = params.BeaconNetworkConfig().MaxChunkSize // 1Mib
// MaxGossipSize allowed for gossip messages.
var MaxGossipSize = params.BeaconNetworkConfig().GossipMaxSize // 1 Mib

// This pool defines the sync pool for our buffered snappy writers, so that they
// can be constantly reused.
var bufWriterPool = new(sync.Pool)

// This pool defines the sync pool for our buffered snappy readers, so that they
// can be constantly reused.
var bufReaderPool = new(sync.Pool)

// SszNetworkEncoder supports p2p networking encoding using SimpleSerialize
// with snappy compression (if enabled).
type SszNetworkEncoder struct {
Expand Down Expand Up @@ -116,7 +125,9 @@ func (e SszNetworkEncoder) doDecode(b []byte, to interface{}) error {
func (e SszNetworkEncoder) Decode(b []byte, to interface{}) error {
if e.UseSnappyCompression {
newBuffer := bytes.NewBuffer(b)
r := snappy.NewReader(newBuffer)
r := newBufferedReader(newBuffer)
defer bufReaderPool.Put(r)

newObj := make([]byte, len(b))
numOfBytes, err := r.Read(newObj)
if err != nil {
Expand Down Expand Up @@ -158,7 +169,8 @@ func (e SszNetworkEncoder) DecodeWithMaxLength(r io.Reader, to interface{}, maxS
return err
}
if e.UseSnappyCompression {
r = snappy.NewReader(r)
r = newBufferedReader(r)
defer bufReaderPool.Put(r)
}
if msgLen > maxSize {
return fmt.Errorf("size of decoded message is %d which is larger than the provided max limit of %d", msgLen, maxSize)
Expand Down Expand Up @@ -190,10 +202,41 @@ func (e SszNetworkEncoder) MaxLength(length int) int {

// Writes a bytes value through a snappy buffered writer.
func writeSnappyBuffer(w io.Writer, b []byte) (int, error) {
bufWriter := snappy.NewBufferedWriter(w)
bufWriter := newBufferedWriter(w)
defer bufWriterPool.Put(bufWriter)
num, err := bufWriter.Write(b)
if err != nil {
return 0, err
}
return num, bufWriter.Close()
}

// Instantiates a new instance of the snappy buffered reader
// using our sync pool.
func newBufferedReader(r io.Reader) *snappy.Reader {
rawReader := bufReaderPool.Get()
if rawReader == nil {
return snappy.NewReader(r)
}
bufR, ok := rawReader.(*snappy.Reader)
if !ok {
return snappy.NewReader(r)
}
bufR.Reset(r)
return bufR
}

// Instantiates a new instance of the snappy buffered writer
// using our sync pool.
func newBufferedWriter(w io.Writer) *snappy.Writer {
rawBufWriter := bufWriterPool.Get()
if rawBufWriter == nil {
return snappy.NewBufferedWriter(w)
}
bufW, ok := rawBufWriter.(*snappy.Writer)
if !ok {
return snappy.NewBufferedWriter(w)
}
bufW.Reset(w)
return bufW
}

0 comments on commit 4694998

Please sign in to comment.