From 469499873b90e70ef3d506d19efa704f62aff786 Mon Sep 17 00:00:00 2001 From: Nishant Das Date: Wed, 3 Jun 2020 03:02:21 +0800 Subject: [PATCH] Add Sync Pool For Snappy (#6085) * add pool for snappy * Update beacon-chain/p2p/encoder/ssz.go * comment added Co-authored-by: terence tsao --- beacon-chain/p2p/encoder/BUILD.bazel | 2 + beacon-chain/p2p/encoder/snappy_test.go | 41 +++++++++++++++++++++ beacon-chain/p2p/encoder/ssz.go | 49 +++++++++++++++++++++++-- 3 files changed, 89 insertions(+), 3 deletions(-) create mode 100644 beacon-chain/p2p/encoder/snappy_test.go diff --git a/beacon-chain/p2p/encoder/BUILD.bazel b/beacon-chain/p2p/encoder/BUILD.bazel index 18aecdf19e0..a353af76a21 100644 --- a/beacon-chain/p2p/encoder/BUILD.bazel +++ b/beacon-chain/p2p/encoder/BUILD.bazel @@ -25,6 +25,7 @@ go_library( go_test( name = "go_default_test", srcs = [ + "snappy_test.go", "ssz_test.go", "varint_test.go", ], @@ -32,5 +33,6 @@ go_test( deps = [ "//proto/testing:go_default_library", "@com_github_gogo_protobuf//proto:go_default_library", + "@com_github_golang_snappy//:go_default_library", ], ) diff --git a/beacon-chain/p2p/encoder/snappy_test.go b/beacon-chain/p2p/encoder/snappy_test.go new file mode 100644 index 00000000000..2bace68ba4b --- /dev/null +++ b/beacon-chain/p2p/encoder/snappy_test.go @@ -0,0 +1,41 @@ +package encoder + +import ( + "bytes" + "reflect" + "testing" + + "github.com/golang/snappy" +) + +func TestSszNetworkEncoder_BufferedReader(t *testing.T) { + r := make([]byte, 10) + bufR := snappy.NewReader(bytes.NewBuffer(r)) + ptr := reflect.ValueOf(bufR).Pointer() + bufReaderPool.Put(bufR) + + r2 := make([]byte, 10) + rdr := newBufferedReader(bytes.NewBuffer(r2)) + + nPtr := reflect.ValueOf(rdr).Pointer() + + if nPtr != ptr { + t.Errorf("wanted pointer value of %d but got %d", ptr, nPtr) + } +} + +func TestSszNetworkEncoder_BufferedWriter(t *testing.T) { + r := make([]byte, 10) + bufR := snappy.NewBufferedWriter(bytes.NewBuffer(r)) + ptr := reflect.ValueOf(bufR).Pointer() + bufWriterPool.Put(bufR) + + r2 := make([]byte, 10) + rdr := newBufferedWriter(bytes.NewBuffer(r2)) + + nPtr := reflect.ValueOf(rdr).Pointer() + + if nPtr != ptr { + t.Errorf("wanted pointer value of %d but got %d", ptr, nPtr) + } +} diff --git a/beacon-chain/p2p/encoder/ssz.go b/beacon-chain/p2p/encoder/ssz.go index 81ef0eb0744..0ef9175f3f1 100644 --- a/beacon-chain/p2p/encoder/ssz.go +++ b/beacon-chain/p2p/encoder/ssz.go @@ -4,6 +4,7 @@ import ( "bytes" "fmt" "io" + "sync" "github.com/gogo/protobuf/proto" "github.com/golang/snappy" @@ -20,6 +21,14 @@ var MaxChunkSize = params.BeaconNetworkConfig().MaxChunkSize // 1Mib // MaxGossipSize allowed for gossip messages. var MaxGossipSize = params.BeaconNetworkConfig().GossipMaxSize // 1 Mib +// This pool defines the sync pool for our buffered snappy writers, so that they +// can be constantly reused. +var bufWriterPool = new(sync.Pool) + +// This pool defines the sync pool for our buffered snappy readers, so that they +// can be constantly reused. +var bufReaderPool = new(sync.Pool) + // SszNetworkEncoder supports p2p networking encoding using SimpleSerialize // with snappy compression (if enabled). type SszNetworkEncoder struct { @@ -116,7 +125,9 @@ func (e SszNetworkEncoder) doDecode(b []byte, to interface{}) error { func (e SszNetworkEncoder) Decode(b []byte, to interface{}) error { if e.UseSnappyCompression { newBuffer := bytes.NewBuffer(b) - r := snappy.NewReader(newBuffer) + r := newBufferedReader(newBuffer) + defer bufReaderPool.Put(r) + newObj := make([]byte, len(b)) numOfBytes, err := r.Read(newObj) if err != nil { @@ -158,7 +169,8 @@ func (e SszNetworkEncoder) DecodeWithMaxLength(r io.Reader, to interface{}, maxS return err } if e.UseSnappyCompression { - r = snappy.NewReader(r) + r = newBufferedReader(r) + defer bufReaderPool.Put(r) } if msgLen > maxSize { return fmt.Errorf("size of decoded message is %d which is larger than the provided max limit of %d", msgLen, maxSize) @@ -190,10 +202,41 @@ func (e SszNetworkEncoder) MaxLength(length int) int { // Writes a bytes value through a snappy buffered writer. func writeSnappyBuffer(w io.Writer, b []byte) (int, error) { - bufWriter := snappy.NewBufferedWriter(w) + bufWriter := newBufferedWriter(w) + defer bufWriterPool.Put(bufWriter) num, err := bufWriter.Write(b) if err != nil { return 0, err } return num, bufWriter.Close() } + +// Instantiates a new instance of the snappy buffered reader +// using our sync pool. +func newBufferedReader(r io.Reader) *snappy.Reader { + rawReader := bufReaderPool.Get() + if rawReader == nil { + return snappy.NewReader(r) + } + bufR, ok := rawReader.(*snappy.Reader) + if !ok { + return snappy.NewReader(r) + } + bufR.Reset(r) + return bufR +} + +// Instantiates a new instance of the snappy buffered writer +// using our sync pool. +func newBufferedWriter(w io.Writer) *snappy.Writer { + rawBufWriter := bufWriterPool.Get() + if rawBufWriter == nil { + return snappy.NewBufferedWriter(w) + } + bufW, ok := rawBufWriter.(*snappy.Writer) + if !ok { + return snappy.NewBufferedWriter(w) + } + bufW.Reset(w) + return bufW +}