Skip to content

Commit

Permalink
Merge add4546 into de2b514
Browse files Browse the repository at this point in the history
  • Loading branch information
willhug committed Mar 3, 2017
2 parents de2b514 + add4546 commit 53b4c34
Show file tree
Hide file tree
Showing 5 changed files with 65 additions and 6 deletions.
26 changes: 26 additions & 0 deletions internal/iopool/copy.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
package iopool

import (
"io"
"sync"
)

type buffer struct {
b []byte
}

var _copyBufSize = 1024 * 32
var _pool = sync.Pool{
New: func() interface{} {
return &buffer{make([]byte, _copyBufSize)}
},
}

// Copy wraps the io library's CopyBuffer func with a preallocated buffer from a
// sync.Pool we maintain.
func Copy(dst io.Writer, src io.Reader) (int64, error) {
buf := _pool.Get().(*buffer)
written, err := io.CopyBuffer(dst, src, buf.b)
_pool.Put(buf)
return written, err
}
32 changes: 32 additions & 0 deletions internal/iopool/copy_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
package iopool

import (
"bytes"
"math/rand"
"sync"
"testing"

"github.com/stretchr/testify/assert"
)

func TestBuffers(t *testing.T) {
var wg sync.WaitGroup
for g := 0; g < 10; g++ {
wg.Add(1)
go func() {
for i := 0; i < 100; i++ {
inputBytes := make([]byte, rand.Intn(5000)+20)
_, err := rand.Read(inputBytes)
assert.NoError(t, err, "Unexpected error from rand.Read")
reader := bytes.NewReader(inputBytes)

outputBytes := make([]byte, 0, len(inputBytes))
writer := bytes.NewBuffer(outputBytes)

Copy(writer, reader)
}
wg.Done()
}()
}
wg.Wait()
}
4 changes: 2 additions & 2 deletions transport/http/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,12 +23,12 @@ package http
import (
"bytes"
"context"
"io"
"net/http"
"time"

"go.uber.org/yarpc/api/transport"
"go.uber.org/yarpc/internal/errors"
"go.uber.org/yarpc/internal/iopool"
"go.uber.org/yarpc/internal/request"

"github.com/opentracing/opentracing-go"
Expand Down Expand Up @@ -130,7 +130,7 @@ func handleOnewayRequest(
// we will lose access to the body unless we read all the bytes before
// returning from the request
var buff bytes.Buffer
if _, err := io.Copy(&buff, treq.Body); err != nil {
if _, err := iopool.Copy(&buff, treq.Body); err != nil {
return err
}
treq.Body = &buff
Expand Down
4 changes: 2 additions & 2 deletions transport/http/inbound_example_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,12 @@ package http_test

import (
"fmt"
"io"
"log"
nethttp "net/http"
"os"

"go.uber.org/yarpc"
"go.uber.org/yarpc/internal/iopool"
"go.uber.org/yarpc/transport/http"
)

Expand Down Expand Up @@ -58,7 +58,7 @@ func ExampleMux() {
}
defer res.Body.Close()

if _, err := io.Copy(os.Stdout, res.Body); err != nil {
if _, err := iopool.Copy(os.Stdout, res.Body); err != nil {
log.Fatal(err)
}
// Output: hello from /health
Expand Down
5 changes: 3 additions & 2 deletions transport/tchannel/channel_outbound.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,10 @@ import (
"go.uber.org/yarpc/internal/encoding"
"go.uber.org/yarpc/internal/errors"
"go.uber.org/yarpc/internal/introspection"
"go.uber.org/yarpc/internal/iopool"
"go.uber.org/yarpc/internal/sync"

"github.com/uber/tchannel-go"
"go.uber.org/yarpc/internal/sync"
)

var (
Expand Down Expand Up @@ -207,7 +208,7 @@ func writeBody(body io.Reader, call *tchannel.OutboundCall) error {
return err
}

if _, err := io.Copy(w, body); err != nil {
if _, err := iopool.Copy(w, body); err != nil {
return err
}

Expand Down

0 comments on commit 53b4c34

Please sign in to comment.