Skip to content

Commit

Permalink
fix #402, #403 : retry was not wait for specified time
Browse files Browse the repository at this point in the history
  • Loading branch information
watermint committed Jul 6, 2020
1 parent b390570 commit b26b848
Show file tree
Hide file tree
Showing 6 changed files with 281 additions and 8 deletions.
22 changes: 20 additions & 2 deletions domain/dropbox/api/dbx_context_impl/context.go
Expand Up @@ -13,8 +13,10 @@ import (
"github.com/watermint/toolbox/essentials/network/nw_client"
"github.com/watermint/toolbox/essentials/network/nw_replay"
"github.com/watermint/toolbox/essentials/network/nw_rest"
"github.com/watermint/toolbox/essentials/network/nw_simulator"
"github.com/watermint/toolbox/infra/api/api_auth"
"github.com/watermint/toolbox/infra/api/api_request"
"github.com/watermint/toolbox/infra/app"
"github.com/watermint/toolbox/infra/control/app_control"
"github.com/watermint/toolbox/infra/ui/app_ui"
"net/http"
Expand Down Expand Up @@ -43,15 +45,31 @@ func NewReplayMock(ctl app_control.Control, rr []nw_replay.Response) dbx_context
}

func New(ctl app_control.Control, token api_auth.Context) dbx_context.Context {
client := nw_rest.New(
nw_rest.Assert(dbx_response_impl.AssertResponse))
l := ctl.Log()
opts := make([]nw_rest.ClientOpt, 0)
opts = append(opts, nw_rest.Assert(dbx_response_impl.AssertResponse))
if ctl.Feature().Experiment(app.ExperimentDbxClientConditionerNarrow20) {
l.Debug("Experiment: Network conditioner enabled: 20%")
opts = append(opts, nw_rest.Conditioner(20, nw_simulator.RetryAfterHeaderRetryAfter, decorateRateLimit))
} else if ctl.Feature().Experiment(app.ExperimentDbxClientConditionerNarrow40) {
l.Debug("Experiment: Network conditioner enabled: 40%")
opts = append(opts, nw_rest.Conditioner(40, nw_simulator.RetryAfterHeaderRetryAfter, decorateRateLimit))
} else if ctl.Feature().Experiment(app.ExperimentDbxClientConditionerNarrow100) {
l.Debug("Experiment: Network conditioner enabled: 100%")
opts = append(opts, nw_rest.Conditioner(100, nw_simulator.RetryAfterHeaderRetryAfter, decorateRateLimit))
}
client := nw_rest.New(opts...)
return &ctxImpl{
client: client,
ctl: ctl,
builder: dbx_request.NewBuilder(ctl, token),
}
}

func decorateRateLimit(res *http.Response) {

}

type ctxImpl struct {
client nw_client.Rest
ctl app_control.Control
Expand Down
2 changes: 1 addition & 1 deletion essentials/network/nw_ratelimit/ratelimit.go
Expand Up @@ -207,7 +207,7 @@ func (z *limitStateImpl) UpdateRetryAfter(hash, endpoint string, retryAfter time

key := z.keyHash(hash, endpoint)
ra, ok := z.retryAfter[key]
if !ok || ra.After(retryAfter) {
if !ok || ra.Before(retryAfter) {
z.retryAfter[key] = retryAfter
}
}
Expand Down
46 changes: 41 additions & 5 deletions essentials/network/nw_rest/client.go
Expand Up @@ -2,21 +2,27 @@ package nw_rest

import (
"github.com/watermint/toolbox/essentials/http/es_response"
"github.com/watermint/toolbox/essentials/log/esl"
"github.com/watermint/toolbox/essentials/network/nw_capture"
"github.com/watermint/toolbox/essentials/network/nw_client"
"github.com/watermint/toolbox/essentials/network/nw_http"
"github.com/watermint/toolbox/essentials/network/nw_replay"
"github.com/watermint/toolbox/essentials/network/nw_retry"
"github.com/watermint/toolbox/essentials/network/nw_simulator"
"github.com/watermint/toolbox/infra/api/api_context"
)

// Assert broken response or rate limit for retry
type AssertResponse func(res es_response.Response) es_response.Response

type ClientOpts struct {
Assert AssertResponse
Mock bool
ReplayMock []nw_replay.Response
Assert AssertResponse
Mock bool
ReplayMock []nw_replay.Response
ConditionerRate int
ConditionerDecorator nw_simulator.ResponseDecorator
ConditionerHeaderType nw_simulator.RetryAfterHeaderType
conditionerEnabled bool
}

func (z ClientOpts) Apply(opts ...ClientOpt) ClientOpts {
Expand Down Expand Up @@ -54,7 +60,19 @@ func Assert(ar AssertResponse) ClientOpt {
}
}

func Conditioner(rate int, headerType nw_simulator.RetryAfterHeaderType, decorator nw_simulator.ResponseDecorator) ClientOpt {
return func(o ClientOpts) ClientOpts {
o.ConditionerRate = rate
o.ConditionerHeaderType = headerType
o.ConditionerDecorator = decorator
o.conditionerEnabled = true
return o
}
}

func New(opts ...ClientOpt) nw_client.Rest {
l := esl.Default()

co := ClientOpts{}.Apply(opts...)
var hc nw_client.Http
switch {
Expand All @@ -66,8 +84,26 @@ func New(opts ...ClientOpt) nw_client.Rest {
hc = nw_http.NewClient()
}

c0 := NewAssert(co.Assert, nw_capture.New(hc))
return nw_retry.NewRetry(nw_retry.NewRatelimit(c0))
var c0, c1, c2 nw_client.Rest

// Layer 0: capture
c0 = nw_capture.New(hc)

// Layer 1: simulator
if co.conditionerEnabled {
l.Debug("Network conditioner enabled",
esl.Int("Rate", co.ConditionerRate),
esl.Int("HeaderType", int(co.ConditionerHeaderType)))
c1 = nw_simulator.New(c0, co.ConditionerRate, co.ConditionerHeaderType, co.ConditionerDecorator)
} else {
c1 = c0
}

// Layer 2: assert
c2 = NewAssert(co.Assert, c1)

// Layer 3: retry
return nw_retry.NewRetry(nw_retry.NewRatelimit(c2))
}

func NewAssert(assert AssertResponse, client nw_client.Rest) nw_client.Rest {
Expand Down
100 changes: 100 additions & 0 deletions essentials/network/nw_simulator/narrow.go
@@ -0,0 +1,100 @@
package nw_simulator

import (
"bytes"
"fmt"
"github.com/watermint/toolbox/essentials/http/es_response"
"github.com/watermint/toolbox/essentials/http/es_response_impl"
"github.com/watermint/toolbox/essentials/network/nw_client"
"github.com/watermint/toolbox/essentials/network/nw_concurrency"
"github.com/watermint/toolbox/essentials/network/nw_ratelimit"
"github.com/watermint/toolbox/essentials/network/nw_retry"
"github.com/watermint/toolbox/infra/api/api_context"
"io/ioutil"
"math/rand"
"net/http"
"time"
)

const (
maxRetryAfter = 4

RetryAfterHeaderRetryAfter = iota
RetryAfterHeaderGitHub
RetryAfterHeaderIetfDraftSecond
RetryAfterHeaderIetfDraftTimestamp
)

type RetryAfterHeaderType int
type ResponseDecorator func(res *http.Response)

func NoDecorator(res *http.Response) {
}

func New(client nw_client.Rest, rate int, headerType RetryAfterHeaderType, decorator ResponseDecorator) nw_client.Rest {
return &narrowClient{
rate: rate,
headerType: headerType,
decorator: decorator,
client: client,
}
}

type narrowClient struct {
// too many requests error rate in percent
rate int

// retry after header type
headerType RetryAfterHeaderType

// response decorator
decorator ResponseDecorator

// nested client
client nw_client.Rest
}

func (z narrowClient) Call(ctx api_context.Context, req nw_client.RequestBuilder) (res es_response.Response) {
if rand.Intn(100) >= z.rate {
return z.client.Call(ctx, req)
} else {
hr := &http.Response{}
hr.StatusCode = http.StatusTooManyRequests
hr.Header = make(map[string][]string)

retryAfterSec := rand.Intn(maxRetryAfter) + 1

switch z.headerType {
case RetryAfterHeaderGitHub:
hr.Header.Add(nw_retry.HeaderXRateLimitLimit, "100")
hr.Header.Add(nw_retry.HeaderXRateLimitRemaining, "0")
hr.Header.Add(nw_retry.HeaderXRateLimitReset, fmt.Sprintf("%d", time.Now().Add(time.Duration(retryAfterSec)*time.Second).Unix()))

case RetryAfterHeaderIetfDraftTimestamp:
hr.Header.Add(nw_retry.HeaderRateLimitLimit, "100")
hr.Header.Add(nw_retry.HeaderRateLimitRemaining, "0")
hr.Header.Add(nw_retry.HeaderRateLimitReset, time.Now().Add(time.Duration(retryAfterSec)*time.Second).Format(time.RFC1123))

case RetryAfterHeaderIetfDraftSecond:
hr.Header.Add(nw_retry.HeaderRateLimitLimit, "100")
hr.Header.Add(nw_retry.HeaderRateLimitRemaining, "0")
hr.Header.Add(nw_retry.HeaderRateLimitReset, fmt.Sprintf("%d", retryAfterSec))

default:
hr.Header.Add(nw_retry.HeaderRetryAfter, fmt.Sprintf("%d", retryAfterSec))
}

if z.decorator != nil {
z.decorator(hr)
}
if hr.Body == nil {
hr.Body = ioutil.NopCloser(&bytes.Buffer{})
}

nw_ratelimit.WaitIfRequired(ctx.ClientHash(), req.Endpoint())
nw_concurrency.Start()
res := es_response_impl.New(ctx, hr)
nw_concurrency.End()
return res
}
}
115 changes: 115 additions & 0 deletions essentials/network/nw_simulator/narrow_test.go
@@ -0,0 +1,115 @@
package nw_simulator

import (
"bytes"
"github.com/watermint/toolbox/essentials/http/es_response"
"github.com/watermint/toolbox/essentials/log/esl"
"github.com/watermint/toolbox/essentials/network/nw_client"
"github.com/watermint/toolbox/essentials/network/nw_retry"
"github.com/watermint/toolbox/infra/api/api_context"
"net/http"
"strconv"
"testing"
"time"
)

type PanicClient struct {
}

func (p PanicClient) Call(ctx api_context.Context, req nw_client.RequestBuilder) (res es_response.Response) {
panic("always panic!")
}

type MockApiContext struct {
}

func (z MockApiContext) ClientHash() string {
return ""
}

func (z MockApiContext) Log() esl.Logger {
return esl.Default()
}

func (z MockApiContext) Capture() esl.Logger {
return esl.Default()
}

type MockReqBuilder struct {
}

func (z MockReqBuilder) Build() (*http.Request, error) {
return http.NewRequest("POST", z.Endpoint(), &bytes.Buffer{})
}

func (z MockReqBuilder) Endpoint() string {
return "http://www.example.com"
}

func (z MockReqBuilder) Param() string {
return ""
}

func TestNarrowClient_Call(t *testing.T) {
{
nc := New(&PanicClient{}, 100, RetryAfterHeaderRetryAfter, NoDecorator)
res := nc.Call(&MockApiContext{}, &MockReqBuilder{})
if res.IsSuccess() {
t.Error(res.IsSuccess())
}
if res.Code() != http.StatusTooManyRequests {
t.Error(res.Code())
}
v := res.Header(nw_retry.HeaderRetryAfter)
if va, err := strconv.Atoi(v); err != nil || va < 1 {
t.Error(err, va)
}
}

{
nc := New(&PanicClient{}, 100, RetryAfterHeaderGitHub, NoDecorator)
now := time.Now().Unix()
res := nc.Call(&MockApiContext{}, &MockReqBuilder{})
if res.IsSuccess() {
t.Error(res.IsSuccess())
}
if res.Code() != http.StatusTooManyRequests {
t.Error(res.Code())
}
v := res.Header(nw_retry.HeaderXRateLimitReset)
if va, err := strconv.Atoi(v); err != nil || int64(va) < now {
t.Error(err, va)
}
}

{
nc := New(&PanicClient{}, 100, RetryAfterHeaderIetfDraftTimestamp, NoDecorator)
now := time.Now()
res := nc.Call(&MockApiContext{}, &MockReqBuilder{})
if res.IsSuccess() {
t.Error(res.IsSuccess())
}
if res.Code() != http.StatusTooManyRequests {
t.Error(res.Code())
}
v := res.Header(nw_retry.HeaderRateLimitReset)
if va, err := time.Parse(time.RFC1123, v); err != nil || va.Before(now) {
t.Error(err, va)
}
}

{
nc := New(&PanicClient{}, 100, RetryAfterHeaderIetfDraftSecond, NoDecorator)
res := nc.Call(&MockApiContext{}, &MockReqBuilder{})
if res.IsSuccess() {
t.Error(res.IsSuccess())
}
if res.Code() != http.StatusTooManyRequests {
t.Error(res.Code())
}
v := res.Header(nw_retry.HeaderRateLimitReset)
if va, err := strconv.Atoi(v); err != nil || va < 1 {
t.Error(err, va)
}
}
}
4 changes: 4 additions & 0 deletions infra/app/experiments.go
Expand Up @@ -4,4 +4,8 @@ const (
ExperimentKvsStorageUseInMemory = "kvs_use_inmemory"
ExperimentKvsStorageCompressionZstd = "kvs_compress_zstd"
ExperimentKvsStorageCompressionSnappy = "kvs_compress_snappy"

ExperimentDbxClientConditionerNarrow20 = "dbx_client_conditioner_narrow20" // 429 error for 20% traffic
ExperimentDbxClientConditionerNarrow40 = "dbx_client_conditioner_narrow40" // 429 error for 40% traffic
ExperimentDbxClientConditionerNarrow100 = "dbx_client_conditioner_narrow100" // 429 error for 100% traffic
)

0 comments on commit b26b848

Please sign in to comment.