Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add Sync Pool For Snappy #6085

Merged
merged 4 commits into from
Jun 2, 2020
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
2 changes: 2 additions & 0 deletions beacon-chain/p2p/encoder/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -25,12 +25,14 @@ go_library(
go_test(
name = "go_default_test",
srcs = [
"snappy_test.go",
"ssz_test.go",
"varint_test.go",
],
embed = [":go_default_library"],
deps = [
"//proto/testing:go_default_library",
"@com_github_gogo_protobuf//proto:go_default_library",
"@com_github_golang_snappy//:go_default_library",
],
)
41 changes: 41 additions & 0 deletions beacon-chain/p2p/encoder/snappy_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
package encoder

import (
"bytes"
"reflect"
"testing"

"github.com/golang/snappy"
)

func TestSszNetworkEncoder_BufferedReader(t *testing.T) {
r := make([]byte, 10)
bufR := snappy.NewReader(bytes.NewBuffer(r))
ptr := reflect.ValueOf(bufR).Pointer()
bufReaderPool.Put(bufR)

r2 := make([]byte, 10)
rdr := newBufferedReader(bytes.NewBuffer(r2))

nPtr := reflect.ValueOf(rdr).Pointer()

if nPtr != ptr {
t.Errorf("wanted pointer value of %d but got %d", ptr, nPtr)
}
}

func TestSszNetworkEncoder_BufferedWriter(t *testing.T) {
r := make([]byte, 10)
bufR := snappy.NewBufferedWriter(bytes.NewBuffer(r))
ptr := reflect.ValueOf(bufR).Pointer()
bufWriterPool.Put(bufR)

r2 := make([]byte, 10)
rdr := newBufferedWriter(bytes.NewBuffer(r2))

nPtr := reflect.ValueOf(rdr).Pointer()

if nPtr != ptr {
t.Errorf("wanted pointer value of %d but got %d", ptr, nPtr)
}
}
41 changes: 38 additions & 3 deletions beacon-chain/p2p/encoder/ssz.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"bytes"
"fmt"
"io"
"sync"

"github.com/gogo/protobuf/proto"
"github.com/golang/snappy"
Expand All @@ -20,6 +21,10 @@ var MaxChunkSize = params.BeaconNetworkConfig().MaxChunkSize // 1Mib
// MaxGossipSize allowed for gossip messages.
var MaxGossipSize = params.BeaconNetworkConfig().GossipMaxSize // 1 Mib

var bufWriterPool = new(sync.Pool)

terencechain marked this conversation as resolved.
Show resolved Hide resolved
var bufReaderPool = new(sync.Pool)

// SszNetworkEncoder supports p2p networking encoding using SimpleSerialize
// with snappy compression (if enabled).
type SszNetworkEncoder struct {
Expand Down Expand Up @@ -116,7 +121,9 @@ func (e SszNetworkEncoder) doDecode(b []byte, to interface{}) error {
func (e SszNetworkEncoder) Decode(b []byte, to interface{}) error {
if e.UseSnappyCompression {
newBuffer := bytes.NewBuffer(b)
r := snappy.NewReader(newBuffer)
r := newBufferedReader(newBuffer)
defer bufReaderPool.Put(r)

newObj := make([]byte, len(b))
numOfBytes, err := r.Read(newObj)
if err != nil {
Expand Down Expand Up @@ -158,7 +165,8 @@ func (e SszNetworkEncoder) DecodeWithMaxLength(r io.Reader, to interface{}, maxS
return err
}
if e.UseSnappyCompression {
r = snappy.NewReader(r)
r = newBufferedReader(r)
defer bufReaderPool.Put(r)
}
if msgLen > maxSize {
return fmt.Errorf("size of decoded message is %d which is larger than the provided max limit of %d", msgLen, maxSize)
Expand Down Expand Up @@ -190,10 +198,37 @@ func (e SszNetworkEncoder) MaxLength(length int) int {

// Writes a bytes value through a snappy buffered writer.
func writeSnappyBuffer(w io.Writer, b []byte) (int, error) {
bufWriter := snappy.NewBufferedWriter(w)
bufWriter := newBufferedWriter(w)
defer bufWriterPool.Put(bufWriter)
num, err := bufWriter.Write(b)
if err != nil {
return 0, err
}
return num, bufWriter.Close()
}

func newBufferedReader(r io.Reader) *snappy.Reader {
rawReader := bufReaderPool.Get()
if rawReader == nil {
return snappy.NewReader(r)
}
bufR, ok := rawReader.(*snappy.Reader)
if !ok {
return snappy.NewReader(r)
}
bufR.Reset(r)
return bufR
}

func newBufferedWriter(w io.Writer) *snappy.Writer {
rawBufWriter := bufWriterPool.Get()
if rawBufWriter == nil {
return snappy.NewBufferedWriter(w)
}
bufW, ok := rawBufWriter.(*snappy.Writer)
if !ok {
return snappy.NewBufferedWriter(w)
}
bufW.Reset(w)
return bufW
}