Skip to content

Commit

Permalink
Merge 26eb79a into 4d64383
Browse files Browse the repository at this point in the history
  • Loading branch information
jcorbin authored Jun 18, 2018
2 parents 4d64383 + 26eb79a commit f79e950
Showing 1 changed file with 77 additions and 8 deletions.
85 changes: 77 additions & 8 deletions forward/request_sender.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,12 @@
package forward

import (
"bytes"
"encoding/json"
"errors"
"io"
"io/ioutil"
"sync"
"time"

"golang.org/x/net/context"
Expand All @@ -32,7 +36,6 @@ import (
"github.com/uber/ringpop-go/logging"
"github.com/uber/ringpop-go/shared"
"github.com/uber/tchannel-go"
"github.com/uber/tchannel-go/raw"
"github.com/uber/tchannel-go/thrift"
)

Expand Down Expand Up @@ -156,6 +159,26 @@ func (s *requestSender) Send() (res []byte, err error) {
}
}

type _bufPool struct {
sync.Pool
}

func (bp *_bufPool) Get() *bytes.Buffer {
const allocSize = 4 * 1024 * 1024 // 4 MiB
val := bp.Pool.Get()
if buf, ok := val.(*bytes.Buffer); ok {
return buf
}
return bytes.NewBuffer(make([]byte, 0, allocSize))
}

func (bp *_bufPool) Put(buf *bytes.Buffer) {
buf.Reset()
bp.Pool.Put(buf)
}

var bufPool = _bufPool{}

// calls remote service and writes response to s.response
func (s *requestSender) MakeCall(ctx context.Context, res *[]byte, fwdError *error, appError *error) <-chan bool {
done := make(chan bool, 1)
Expand All @@ -173,26 +196,24 @@ func (s *requestSender) MakeCall(ctx context.Context, res *[]byte, fwdError *err
return
}

var arg3 []byte
headers := s.headers
if s.format == tchannel.Thrift {
if headers == nil {
headers = []byte{0, 0}
}
_, arg3, _, err = raw.WriteArgs(call, headers, s.request)
err = forwardArgs(call, headers, s.request, res)
} else {
var resp *tchannel.OutboundCallResponse
_, arg3, resp, err = raw.WriteArgs(call, headers, s.request)
err = forwardArgs(call, headers, s.request, res)

// check if the response is an application level error
if err == nil && resp.ApplicationError() {
if err == nil && call.Response().ApplicationError() {
// parse the json from the application level error
errResp := struct {
Type string `json:"type"`
Message string `json:"message"`
}{}

err = json.Unmarshal(arg3, &errResp)
err = json.Unmarshal(*res, &errResp)

// if parsing succeeded return the error as an application error
if err == nil {
Expand All @@ -208,13 +229,61 @@ func (s *requestSender) MakeCall(ctx context.Context, res *[]byte, fwdError *err
return
}

*res = arg3
done <- true
}()

return done
}

var discarderFrom = ioutil.Discard.(io.ReaderFrom)

func forwardArgs(call *tchannel.OutboundCall, arg2, arg3 []byte, res *[]byte) error {
if err := writeArg(call.Arg2Writer, arg2); err != nil {
return err
}
if err := writeArg(call.Arg3Writer, arg3); err != nil {
return err
}

resp := call.Response()
if err := readArgInto(discarderFrom, resp.Arg2Reader); err != nil {
return err
}

buf := bufPool.Get()
defer bufPool.Put(buf)
if err := readArgInto(buf, resp.Arg3Reader); err != nil {
return err
}

*res = append(*res, buf.Bytes()...)
return nil
}

func writeArg(writable func() (tchannel.ArgWriter, error), bs []byte) error {
aw, err := writable()
if err != nil {
return err
}
_, err = aw.Write(bs)
if err == nil {
err = aw.Close()
}
return err
}

func readArgInto(rf io.ReaderFrom, readable func() (tchannel.ArgReader, error)) error {
ar, err := readable()
if err != nil {
return err
}
_, err = rf.ReadFrom(ar)
if err == nil {
err = ar.Close()
}
return err
}

func (s *requestSender) ScheduleRetry() ([]byte, error) {
if s.retries == 0 {
s.retryStartTime = time.Now()
Expand Down

0 comments on commit f79e950

Please sign in to comment.