Skip to content

Commit

Permalink
Reuse buffers and comment out logs to reduce memory allocations
Browse files Browse the repository at this point in the history
  • Loading branch information
yutopp committed Oct 17, 2018
1 parent 0f018b5 commit 1e9de67
Show file tree
Hide file tree
Showing 5 changed files with 28 additions and 38 deletions.
2 changes: 1 addition & 1 deletion Makefile
Expand Up @@ -17,7 +17,7 @@ test:
go test -cover -v -race -timeout 10s ./...

bench:
go test -bench . -benchmem -gcflags="-m -l" ./...
go test -bench . -benchmem -gcflags="-m -m -l" ./...

dep-init:
dep ensure
Expand Down
35 changes: 16 additions & 19 deletions chunk_header.go
Expand Up @@ -18,27 +18,27 @@ type chunkBasicHeader struct {
chunkStreamID int /* [0, 65599] */
}

func decodeChunkBasicHeader(r io.Reader, bh *chunkBasicHeader) error {
buf := make([]byte, 3)
_, err := io.ReadAtLeast(r, buf[:1], 1)
if err != nil {
func decodeChunkBasicHeader(r io.Reader, buf []byte, bh *chunkBasicHeader) error {
if buf == nil || len(buf) < 3 {
buf = make([]byte, 3)
}

if _, err := io.ReadAtLeast(r, buf[:1], 1); err != nil {
return err
}

fmtTy := (buf[0] & 0xC0) >> 6 // 0b11000000 >> 6
fmtTy := (buf[0] & 0xc0) >> 6 // 0b11000000 >> 6
csID := int(buf[0] & 0x3f) // 0b00111111

switch csID {
case 0:
_, err := io.ReadAtLeast(r, buf[1:2], 1)
if err != nil {
if _, err := io.ReadAtLeast(r, buf[1:2], 1); err != nil {
return err
}
csID = int(buf[1]) + 64

case 1:
_, err := io.ReadAtLeast(r, buf[1:], 2)
if err != nil {
if _, err := io.ReadAtLeast(r, buf[1:], 2); err != nil {
return err
}
csID = int(buf[2])*256 + int(buf[1]) + 64
Expand Down Expand Up @@ -86,14 +86,15 @@ type chunkMessageHeader struct {
messageStreamID uint32 // fmt = 0
}

func decodeChunkMessageHeader(r io.Reader, fmt byte, mh *chunkMessageHeader) error {
func decodeChunkMessageHeader(r io.Reader, fmt byte, buf []byte, mh *chunkMessageHeader) error {
if buf == nil || len(buf) < 11 {
buf = make([]byte, 11)
}
cache32bits := make([]byte, 4)

switch fmt {
case 0:
buf := make([]byte, 11)
_, err := io.ReadAtLeast(r, buf, len(buf))
if err != nil {
if _, err := io.ReadAtLeast(r, buf[:11], 11); err != nil {
return err
}

Expand All @@ -113,9 +114,7 @@ func decodeChunkMessageHeader(r io.Reader, fmt byte, mh *chunkMessageHeader) err
}

case 1:
buf := make([]byte, 7)
_, err := io.ReadAtLeast(r, buf, len(buf))
if err != nil {
if _, err := io.ReadAtLeast(r, buf[:7], 7); err != nil {
return err
}

Expand All @@ -134,9 +133,7 @@ func decodeChunkMessageHeader(r io.Reader, fmt byte, mh *chunkMessageHeader) err
}

case 2:
buf := make([]byte, 3)
_, err := io.ReadAtLeast(r, buf, len(buf))
if err != nil {
if _, err := io.ReadAtLeast(r, buf[:3], 3); err != nil {
return err
}

Expand Down
14 changes: 4 additions & 10 deletions chunk_header_test.go
Expand Up @@ -128,12 +128,9 @@ func TestChunkBasicHeader(t *testing.T) {
t.Run(tc.name, func(t *testing.T) {
t.Parallel()

bin := make([]byte, len(tc.binary))
copy(bin, tc.binary) // copy ownership

buf := bytes.NewBuffer(bin)
r := bytes.NewReader(tc.binary)
var mh chunkBasicHeader
err := decodeChunkBasicHeader(buf, &mh)
err := decodeChunkBasicHeader(r, nil, &mh)
assert.Nil(t, err)
assert.Equal(t, tc.value, &mh)
})
Expand Down Expand Up @@ -382,12 +379,9 @@ func TestChunkMessageHeader(t *testing.T) {
t.Run(tc.name, func(t *testing.T) {
t.Parallel()

bin := make([]byte, len(tc.binary))
copy(bin, tc.binary) // copy ownership

buf := bytes.NewBuffer(bin)
r := bytes.NewReader(tc.binary)
var mh chunkMessageHeader
err := decodeChunkMessageHeader(buf, tc.fmt, &mh)
err := decodeChunkMessageHeader(r, tc.fmt, nil, &mh)
assert.Nil(t, err)
assert.Equal(t, tc.value, &mh)
})
Expand Down
1 change: 0 additions & 1 deletion chunk_stream_reader.go
Expand Up @@ -11,7 +11,6 @@ import (
"bytes"
)

// Must call Close after reading.
type ChunkStreamReader struct {
basicHeader chunkBasicHeader
messageHeader chunkMessageHeader
Expand Down
14 changes: 7 additions & 7 deletions chunk_streamer.go
Expand Up @@ -193,16 +193,16 @@ func (cs *ChunkStreamer) Close() error {
// returns nil reader when chunk is fragmented.
func (cs *ChunkStreamer) readChunk() (*ChunkStreamReader, error) {
var bh chunkBasicHeader
if err := decodeChunkBasicHeader(cs.r, &bh); err != nil {
if err := decodeChunkBasicHeader(cs.r, cs.cacheBuffer, &bh); err != nil {
return nil, err
}
cs.logger.Debugf("(READ) BasicHeader = %+v", bh)
//cs.logger.Debugf("(READ) BasicHeader = %+v", bh)

var mh chunkMessageHeader
if err := decodeChunkMessageHeader(cs.r, bh.fmt, &mh); err != nil {
if err := decodeChunkMessageHeader(cs.r, bh.fmt, cs.cacheBuffer, &mh); err != nil {
return nil, err
}
cs.logger.Debugf("(READ) MessageHeader = %+v", mh)
//cs.logger.Debugf("(READ) MessageHeader = %+v", mh)

reader, err := cs.prepareChunkReader(bh.chunkStreamID)
if err != nil {
Expand Down Expand Up @@ -239,7 +239,7 @@ func (cs *ChunkStreamer) readChunk() (*ChunkStreamReader, error) {
panic("unsupported chunk") // TODO: fix
}

cs.logger.Debugf("(READ) MessageLength = %d, Current = %d", reader.messageLength, reader.buf.Len())
//cs.logger.Debugf("(READ) MessageLength = %d, Current = %d", reader.messageLength, reader.buf.Len())

expectLen := int(reader.messageLength) - reader.buf.Len()
if expectLen <= 0 {
Expand All @@ -249,7 +249,7 @@ func (cs *ChunkStreamer) readChunk() (*ChunkStreamReader, error) {
if uint32(expectLen) > cs.peerState.chunkSize {
expectLen = int(cs.peerState.chunkSize)
}
cs.logger.Debugf("(READ) Length = %d", expectLen)
//cs.logger.Debugf("(READ) Length = %d", expectLen)

lr := io.LimitReader(cs.r, int64(expectLen))
if _, err := io.CopyBuffer(&reader.buf, lr, cs.cacheBuffer); err != nil {
Expand All @@ -272,7 +272,7 @@ func (cs *ChunkStreamer) readChunk() (*ChunkStreamReader, error) {
func (cs *ChunkStreamer) writeChunk(writer *ChunkStreamWriter) (bool, error) {
cs.updateWriterHeader(writer)

cs.logger.Debugf("(WRITE) Headers: Basic = %+v / Message = %+v", writer.basicHeader, writer.messageHeader)
//cs.logger.Debugf("(WRITE) Headers: Basic = %+v / Message = %+v", writer.basicHeader, writer.messageHeader)
//cs.logger.Debugf("(WRITE) Buffer: %+v", writer.buf.Bytes())

expectLen := writer.buf.Len()
Expand Down

0 comments on commit 1e9de67

Please sign in to comment.