From 4b3ef1fb57499728bd37f68940bb74adae791edd Mon Sep 17 00:00:00 2001 From: Will Hughes Date: Tue, 1 Nov 2016 17:28:21 -0700 Subject: [PATCH] [PeerList][Part 4] Refactor Http.outbound Summary: In order to put the Peer Logic into the HTTP.outbound as seamlessly as possible I ended up refactoring some of the outbound "call" code to be more modular, there should be no functional change in this PR. --- transport/http/outbound.go | 128 +++++++++++++++++++++---------------- 1 file changed, 73 insertions(+), 55 deletions(-) diff --git a/transport/http/outbound.go b/transport/http/outbound.go index 7f5f7a821..696cd1e54 100644 --- a/transport/http/outbound.go +++ b/transport/http/outbound.go @@ -98,7 +98,66 @@ func (o *outbound) Stop() error { return nil } -func (o *outbound) createSpan(ctx context.Context, req *http.Request, treq *transport.Request, start time.Time) (context.Context, opentracing.Span) { +func (o *outbound) Call(ctx context.Context, treq *transport.Request) (*transport.Response, error) { + if !o.started.Load() { + // panic because there's no recovery from this + panic(errOutboundNotStarted) + } + start := time.Now() + deadline, _ := ctx.Deadline() + ttl := deadline.Sub(start) + + req, err := o.createRequest(treq) + if err != nil { + return nil, err + } + + req.Header = applicationHeaders.ToHTTPHeaders(treq.Headers, nil) + ctx, req, span := o.withOpentracingSpan(ctx, req, treq, start) + defer span.Finish() + req = o.withCoreHeaders(req, treq, ttl) + + response, err := o.getHTTPClient().Do(req.WithContext(ctx)) + + if err != nil { + // Workaround borrowed from ctxhttp until + // https://github.com/golang/go/issues/17711 is resolved. + select { + case <-ctx.Done(): + err = ctx.Err() + default: + } + + span.SetTag("error", true) + span.LogEvent(err.Error()) + if err == context.DeadlineExceeded { + end := time.Now() + return nil, errors.ClientTimeoutError(treq.Service, treq.Procedure, end.Sub(start)) + } + + return nil, err + } + + span.SetTag("http.status_code", response.StatusCode) + + if response.StatusCode >= 200 && response.StatusCode < 300 { + appHeaders := applicationHeaders.FromHTTPHeaders( + response.Header, transport.NewHeaders()) + return &transport.Response{ + Headers: appHeaders, + Body: response.Body, + }, nil + } + + return nil, getErrFromResponse(response) +} + +func (o *outbound) createRequest(treq *transport.Request) (*http.Request, error) { + reqURL := o.URL + return http.NewRequest("POST", reqURL, treq.Body) +} + +func (o *outbound) withOpentracingSpan(ctx context.Context, req *http.Request, treq *transport.Request, start time.Time) (context.Context, *http.Request, opentracing.Span) { // Apply HTTP Context headers for tracing and baggage carried by tracing. tracer := o.Deps.Tracer() var parent opentracing.SpanContext // ok to be nil @@ -121,35 +180,16 @@ func (o *outbound) createSpan(ctx context.Context, req *http.Request, treq *tran ext.HTTPUrl.Set(span, req.URL.String()) ctx = opentracing.ContextWithSpan(ctx, span) - req.Header = applicationHeaders.ToHTTPHeaders(treq.Headers, nil) - tracer.Inject( span.Context(), opentracing.HTTPHeaders, opentracing.HTTPHeadersCarrier(req.Header), ) - return ctx, span + return ctx, req, span } -func (o *outbound) Call(ctx context.Context, treq *transport.Request) (*transport.Response, error) { - if !o.started.Load() { - // panic because there's no recovery from this - panic(errOutboundNotStarted) - } - - start := time.Now() - deadline, _ := ctx.Deadline() - ttl := deadline.Sub(start) - - req, err := http.NewRequest("POST", o.URL, treq.Body) - if err != nil { - return nil, err - } - - ctx, span := o.createSpan(ctx, req, treq, start) - defer span.Finish() - +func (o *outbound) withCoreHeaders(req *http.Request, treq *transport.Request, ttl time.Duration) *http.Request { req.Header.Set(CallerHeader, treq.Caller) req.Header.Set(ServiceHeader, treq.Service) req.Header.Set(ProcedureHeader, treq.Procedure) @@ -169,56 +209,34 @@ func (o *outbound) Call(ctx context.Context, treq *transport.Request) (*transpor req.Header.Set(EncodingHeader, encoding) } - response, err := o.Client.Do(req.WithContext(ctx)) - if err != nil { - // Workaround borrowed from ctxhttp until - // https://github.com/golang/go/issues/17711 is resolved. - select { - case <-ctx.Done(): - err = ctx.Err() - default: - } - - span.SetTag("error", true) - span.LogEvent(err.Error()) - if err == context.DeadlineExceeded { - return nil, errors.ClientTimeoutError(treq.Service, treq.Procedure, deadline.Sub(start)) - } - - return nil, err - } - - span.SetTag("http.status_code", response.StatusCode) + return req +} - if response.StatusCode >= 200 && response.StatusCode < 300 { - appHeaders := applicationHeaders.FromHTTPHeaders( - response.Header, transport.NewHeaders()) - return &transport.Response{ - Headers: appHeaders, - Body: response.Body, - }, nil - } +func (o *outbound) getHTTPClient() *http.Client { + return o.Client +} +func getErrFromResponse(response *http.Response) error { // TODO Behavior for 300-range status codes is undefined contents, err := ioutil.ReadAll(response.Body) if err != nil { - return nil, err + return err } if err := response.Body.Close(); err != nil { - return nil, err + return err } // Trim the trailing newline from HTTP error messages message := strings.TrimSuffix(string(contents), "\n") if response.StatusCode >= 400 && response.StatusCode < 500 { - return nil, errors.RemoteBadRequestError(message) + return errors.RemoteBadRequestError(message) } if response.StatusCode == http.StatusGatewayTimeout { - return nil, errors.RemoteTimeoutError(message) + return errors.RemoteTimeoutError(message) } - return nil, errors.RemoteUnexpectedError(message) + return errors.RemoteUnexpectedError(message) }