Skip to content

Commit

Permalink
Merge pull request #163 from m-lab/retry
Browse files Browse the repository at this point in the history
Add retry to external request API
  • Loading branch information
gfr10598 committed Jan 2, 2019
2 parents 3c1b434 + f49a079 commit a8d8180
Show file tree
Hide file tree
Showing 4 changed files with 157 additions and 20 deletions.
112 changes: 96 additions & 16 deletions api/v2/api-v2.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,13 @@ import (
"context"
"encoding/json"
"errors"
"fmt"
"io/ioutil"
"net/http"
"time"

"github.com/prometheus/client_golang/prometheus"

"github.com/m-lab/annotation-service/api"
)

Expand Down Expand Up @@ -46,40 +49,117 @@ type Response struct {
}

/*************************************************************************
* Local Annotator API *
* Remote Annotator API *
*************************************************************************/
var (
RequestTimeHistogram = prometheus.NewHistogramVec(
prometheus.HistogramOpts{
Name: "annotator_external_latency_hist_msec",
Help: "annotator latency distributions.",
Buckets: []float64{
1.0, 1.3, 1.6, 2.0, 2.5, 3.2, 4.0, 5.0, 6.3, 7.9,
10, 13, 16, 20, 25, 32, 40, 50, 63, 79,
100, 130, 160, 200, 250, 320, 400, 500, 630, 790,
1000, 1300, 1600, 2000, 2500, 3200, 4000, 5000, 6300, 7900,
},
},
[]string{"detail"})
)

// GetAnnotations takes a url, and Request, makes remote call, and returns parsed ResponseV2
// TODO(gfr) Should pass the annotator's request context through and use it here.
func GetAnnotations(ctx context.Context, url string, date time.Time, ips []string) (*Response, error) {
req := NewRequest(date, ips)
encodedData, err := json.Marshal(req)
func init() {
prometheus.MustRegister(RequestTimeHistogram)
}

func post(ctx context.Context, url string, encodedData []byte) (*http.Response, error) {
ctx, cancel := context.WithTimeout(ctx, 2*time.Second)
defer cancel()

httpReq, err := http.NewRequest("POST", url, bytes.NewReader(encodedData))
if err != nil {
return nil, err
}

var netClient = &http.Client{
// Median response time is < 10 msec, but 99th percentile is 0.6 seconds.
Timeout: 2 * time.Second,
// Make the actual request
return http.DefaultClient.Do(httpReq.WithContext(ctx))
}

// ErrStatusNotOK is returned from GetAnnotation if http status is other than OK. Response body may have more info.
var ErrStatusNotOK = errors.New("http status not StatusOK")

func waitOneSecond(ctx context.Context) error {
select {
case <-ctx.Done():
return ctx.Err()
case <-time.After(time.Second):
return nil
}
}

httpReq, err := http.NewRequest("POST", url, bytes.NewReader(encodedData))
// postWithRetry will retry for some error conditions, up to the deadline in the provided context.
// Returns if http status is OK, error is not nil, http status is not ServiceUnavailable or timeout.
func postWithRetry(ctx context.Context, url string, encodedData []byte) (*http.Response, error) {
for {
start := time.Now()
resp, err := post(ctx, url, encodedData)
if err != nil {
RequestTimeHistogram.WithLabelValues(err.Error()).Observe(float64(time.Since(start).Nanoseconds()) / 1e6)
return nil, err
}
if resp.StatusCode == http.StatusOK {
RequestTimeHistogram.WithLabelValues("success").Observe(float64(time.Since(start).Nanoseconds()) / 1e6)
return resp, err
}
if resp.StatusCode != http.StatusServiceUnavailable {
RequestTimeHistogram.WithLabelValues(resp.Status).Observe(float64(time.Since(start).Nanoseconds()) / 1e6)
return resp, ErrStatusNotOK
}
if ctx.Err() != nil {
RequestTimeHistogram.WithLabelValues("timeout").Observe(float64(time.Since(start).Nanoseconds()) / 1e6)
return nil, ctx.Err()
}
// This is a recoverable error, so we should retry.
RequestTimeHistogram.WithLabelValues("retry").Observe(float64(time.Since(start).Nanoseconds()) / 1e6)
err = waitOneSecond(ctx)
if err != nil {
return nil, err
}
}
}

// GetAnnotations takes a url, and Request, makes remote call, and returns parsed ResponseV2
// TODO(gfr) Should pass the annotator's request context through and use it here.
func GetAnnotations(ctx context.Context, url string, date time.Time, ips []string) (*Response, error) {
req := NewRequest(date, ips)
encodedData, err := json.Marshal(req)
if err != nil {
return nil, err
}

// Make the actual request
httpResp, err := netClient.Do(httpReq.WithContext(ctx))
httpResp, err := postWithRetry(ctx, url, encodedData)
if err != nil {
if httpResp == nil || httpResp.Body == nil {
return nil, err
}
defer httpResp.Body.Close()
if err == ErrStatusNotOK {
body, ioutilErr := ioutil.ReadAll(httpResp.Body)
if ioutilErr != nil {
return nil, ioutilErr
}
// To avoid some bug causing a gigantic error string...
if len(body) > 60 { // 60 is completely arbitrary.
body = body[0:60]
}
return nil, fmt.Errorf("%s : %s", httpResp.Status, string(body))
}
return nil, err
}
defer httpResp.Body.Close()

// Catch errors reported by the service
defer httpResp.Body.Close()
// Handle other status codes
if httpResp.StatusCode != http.StatusOK {
return nil, errors.New("URL:" + url + " gave response code " + httpResp.Status)
return nil, errors.New(httpResp.Status)
}

// Copy response into a byte slice
body, err := ioutil.ReadAll(httpResp.Body)
if err != nil {
Expand Down
53 changes: 51 additions & 2 deletions api/v2/api-v2_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"log"
"net/http"
"net/http/httptest"
"strings"
"testing"
"time"

Expand All @@ -21,19 +22,36 @@ func init() {

func TestDoRequest(t *testing.T) {
expectedJson := `{"AnnotatorDate":"2018-12-05T00:00:00Z","Annotations":{"147.1.2.3":{"Geo":{"continent_code":"NA","country_code":"US","country_name":"United States","latitude":37.751,"longitude":-97.822},"ASN":{}},"8.8.8.8":{"Geo":{"continent_code":"NA","country_code":"US","country_name":"United States","latitude":37.751,"longitude":-97.822},"ASN":{}}}}`
callCount := 0

ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
fmt.Fprint(w, expectedJson)
if callCount < 3 {
w.WriteHeader(http.StatusServiceUnavailable)
} else {
fmt.Fprint(w, expectedJson)
}
callCount++
}))
url := ts.URL

//url = "https://annotator-dot-mlab-sandbox.appspot.com/batch_annotate"
ips := []string{"8.8.8.8", "147.1.2.3"}
resp, err := api.GetAnnotations(context.Background(), url, time.Now(), ips)
ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
defer cancel()
resp, err := api.GetAnnotations(ctx, url, time.Now(), ips)
if err == nil {
t.Fatal("Should have timed out")
}
ctx, cancel = context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
resp, err = api.GetAnnotations(ctx, url, time.Now(), ips)
if err != nil {
t.Fatal(err)
}

if callCount != 4 {
t.Error("Should have been two calls to server.")
}
expectedResponse := api.Response{}
err = json.Unmarshal([]byte(expectedJson), &expectedResponse)
if err != nil {
Expand All @@ -44,3 +62,34 @@ func TestDoRequest(t *testing.T) {
t.Error(diff)
}
}

func TestSomeErrors(t *testing.T) {
callCount := 0
ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
if callCount == 0 {
w.WriteHeader(http.StatusInternalServerError)
fmt.Fprint(w, "body message")
} else {
w.WriteHeader(http.StatusServiceUnavailable)
}
callCount++
}))
url := ts.URL

ips := []string{"8.8.8.8", "147.1.2.3"}
ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
defer cancel()
_, err := api.GetAnnotations(ctx, url, time.Now(), ips)
if callCount != 1 {
t.Error("Should have been two calls to server.")
}
if err == nil {
t.Fatal("Should have produced an error")
}
if !strings.Contains(err.Error(), "body message") {
t.Error("Expected err containing body message", err)
}
if !strings.Contains(err.Error(), "Internal Server Error") {
t.Error("Expected err containing Internal Server Error", err)
}
}
9 changes: 7 additions & 2 deletions handler/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -272,11 +272,16 @@ func handleV2(w http.ResponseWriter, jsonBuffer []byte) {
// No need to validate IP addresses, as they are net.IP
response := v2.Response{}

// For now, use the date of the first item. In future the items will not have individual timestamps.
if len(request.IPs) > 0 {
// For old request format, we use the date of the first RequestData
response, err = AnnotateV2(request.Date, request.IPs)
if err != nil {
if err == manager.ErrPendingAnnotatorLoad {
// Encourage client to try again soon.
w.WriteHeader(http.StatusServiceUnavailable)
} else {
// If it isn't loading, client should probably give up instead of retrying.
w.WriteHeader(http.StatusInternalServerError)
}
fmt.Fprintf(w, err.Error())
return
}
Expand Down
3 changes: 3 additions & 0 deletions handler/handler_test.go
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
package handler_test

// TODO 201901 - TestStatusServiceUnavailable()
// TODO 201901 - TestStatusInternalError()

import (
"bytes"
"errors"
Expand Down

0 comments on commit a8d8180

Please sign in to comment.