Skip to content

Commit

Permalink
[PeerList][Part 4] Refactor Http.outbound
Browse files Browse the repository at this point in the history
Summary: In order to put the Peer Logic into the HTTP.outbound as
seamlessly as possible I ended up refactoring some of the outbound
"call" code to be more modular, there should be no functional change in
this PR.
  • Loading branch information
willhug committed Nov 10, 2016
1 parent e71d298 commit 4b3ef1f
Showing 1 changed file with 73 additions and 55 deletions.
128 changes: 73 additions & 55 deletions transport/http/outbound.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,66 @@ func (o *outbound) Stop() error {
return nil
}

func (o *outbound) createSpan(ctx context.Context, req *http.Request, treq *transport.Request, start time.Time) (context.Context, opentracing.Span) {
func (o *outbound) Call(ctx context.Context, treq *transport.Request) (*transport.Response, error) {
if !o.started.Load() {
// panic because there's no recovery from this
panic(errOutboundNotStarted)
}
start := time.Now()
deadline, _ := ctx.Deadline()
ttl := deadline.Sub(start)

req, err := o.createRequest(treq)
if err != nil {
return nil, err
}

req.Header = applicationHeaders.ToHTTPHeaders(treq.Headers, nil)
ctx, req, span := o.withOpentracingSpan(ctx, req, treq, start)
defer span.Finish()
req = o.withCoreHeaders(req, treq, ttl)

response, err := o.getHTTPClient().Do(req.WithContext(ctx))

if err != nil {
// Workaround borrowed from ctxhttp until
// https://github.com/golang/go/issues/17711 is resolved.
select {
case <-ctx.Done():
err = ctx.Err()
default:
}

span.SetTag("error", true)
span.LogEvent(err.Error())
if err == context.DeadlineExceeded {
end := time.Now()
return nil, errors.ClientTimeoutError(treq.Service, treq.Procedure, end.Sub(start))
}

return nil, err
}

span.SetTag("http.status_code", response.StatusCode)

if response.StatusCode >= 200 && response.StatusCode < 300 {
appHeaders := applicationHeaders.FromHTTPHeaders(
response.Header, transport.NewHeaders())
return &transport.Response{
Headers: appHeaders,
Body: response.Body,
}, nil
}

return nil, getErrFromResponse(response)
}

func (o *outbound) createRequest(treq *transport.Request) (*http.Request, error) {
reqURL := o.URL
return http.NewRequest("POST", reqURL, treq.Body)
}

func (o *outbound) withOpentracingSpan(ctx context.Context, req *http.Request, treq *transport.Request, start time.Time) (context.Context, *http.Request, opentracing.Span) {
// Apply HTTP Context headers for tracing and baggage carried by tracing.
tracer := o.Deps.Tracer()
var parent opentracing.SpanContext // ok to be nil
Expand All @@ -121,35 +180,16 @@ func (o *outbound) createSpan(ctx context.Context, req *http.Request, treq *tran
ext.HTTPUrl.Set(span, req.URL.String())
ctx = opentracing.ContextWithSpan(ctx, span)

req.Header = applicationHeaders.ToHTTPHeaders(treq.Headers, nil)

tracer.Inject(
span.Context(),
opentracing.HTTPHeaders,
opentracing.HTTPHeadersCarrier(req.Header),
)

return ctx, span
return ctx, req, span
}

func (o *outbound) Call(ctx context.Context, treq *transport.Request) (*transport.Response, error) {
if !o.started.Load() {
// panic because there's no recovery from this
panic(errOutboundNotStarted)
}

start := time.Now()
deadline, _ := ctx.Deadline()
ttl := deadline.Sub(start)

req, err := http.NewRequest("POST", o.URL, treq.Body)
if err != nil {
return nil, err
}

ctx, span := o.createSpan(ctx, req, treq, start)
defer span.Finish()

func (o *outbound) withCoreHeaders(req *http.Request, treq *transport.Request, ttl time.Duration) *http.Request {
req.Header.Set(CallerHeader, treq.Caller)
req.Header.Set(ServiceHeader, treq.Service)
req.Header.Set(ProcedureHeader, treq.Procedure)
Expand All @@ -169,56 +209,34 @@ func (o *outbound) Call(ctx context.Context, treq *transport.Request) (*transpor
req.Header.Set(EncodingHeader, encoding)
}

response, err := o.Client.Do(req.WithContext(ctx))
if err != nil {
// Workaround borrowed from ctxhttp until
// https://github.com/golang/go/issues/17711 is resolved.
select {
case <-ctx.Done():
err = ctx.Err()
default:
}

span.SetTag("error", true)
span.LogEvent(err.Error())
if err == context.DeadlineExceeded {
return nil, errors.ClientTimeoutError(treq.Service, treq.Procedure, deadline.Sub(start))
}

return nil, err
}

span.SetTag("http.status_code", response.StatusCode)
return req
}

if response.StatusCode >= 200 && response.StatusCode < 300 {
appHeaders := applicationHeaders.FromHTTPHeaders(
response.Header, transport.NewHeaders())
return &transport.Response{
Headers: appHeaders,
Body: response.Body,
}, nil
}
func (o *outbound) getHTTPClient() *http.Client {
return o.Client
}

func getErrFromResponse(response *http.Response) error {
// TODO Behavior for 300-range status codes is undefined
contents, err := ioutil.ReadAll(response.Body)
if err != nil {
return nil, err
return err
}

if err := response.Body.Close(); err != nil {
return nil, err
return err
}

// Trim the trailing newline from HTTP error messages
message := strings.TrimSuffix(string(contents), "\n")

if response.StatusCode >= 400 && response.StatusCode < 500 {
return nil, errors.RemoteBadRequestError(message)
return errors.RemoteBadRequestError(message)
}

if response.StatusCode == http.StatusGatewayTimeout {
return nil, errors.RemoteTimeoutError(message)
return errors.RemoteTimeoutError(message)
}

return nil, errors.RemoteUnexpectedError(message)
return errors.RemoteUnexpectedError(message)
}

0 comments on commit 4b3ef1f

Please sign in to comment.