-
Notifications
You must be signed in to change notification settings - Fork 50
/
executor.go
84 lines (65 loc) · 2.37 KB
/
executor.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
package aws
import (
"context"
"net/http"
"github.com/aws/aws-sdk-go/aws/request"
"github.com/justtrackio/gosoline/pkg/exec"
"github.com/justtrackio/gosoline/pkg/log"
)
type RequestFunction func() (*request.Request, interface{})
//go:generate mockery --name Executor
type Executor interface {
Execute(ctx context.Context, f RequestFunction) (interface{}, error)
}
func NewExecutor(logger log.Logger, res *exec.ExecutableResource, settings *exec.BackoffSettings, checks ...exec.ErrorChecker) Executor {
return NewBackoffExecutor(logger, res, settings, checks...)
}
type DefaultExecutor struct{}
func (e DefaultExecutor) Execute(ctx context.Context, f RequestFunction) (interface{}, error) {
req, out := f()
req.SetContext(ctx)
err := req.Send()
return out, err
}
type Sender func(req *request.Request) (*http.Response, error)
type BackoffExecutor struct {
executor exec.Executor
sender Sender
}
func NewBackoffExecutor(logger log.Logger, res *exec.ExecutableResource, settings *exec.BackoffSettings, checks ...exec.ErrorChecker) Executor {
return NewBackoffExecutorWithSender(logger, res, settings, func(req *request.Request) (*http.Response, error) {
err := req.Send()
return req.HTTPResponse, err
}, checks...)
}
func NewBackoffExecutorWithSender(logger log.Logger, res *exec.ExecutableResource, settings *exec.BackoffSettings, sender Sender, checks ...exec.ErrorChecker) Executor {
checks = append(checks, []exec.ErrorChecker{
exec.CheckRequestCanceled,
exec.CheckUsedClosedConnectionError,
exec.CheckTimeoutError,
exec.CheckClientAwaitHeaderTimeoutError,
exec.CheckTlsHandshakeTimeoutError,
CheckInvalidStatusError,
CheckConnectionError,
CheckErrorRetryable,
CheckErrorThrottle,
}...)
return &BackoffExecutor{
executor: exec.NewBackoffExecutor(logger, res, settings, checks...),
sender: sender,
}
}
func (b BackoffExecutor) Execute(ctx context.Context, f RequestFunction) (interface{}, error) {
return b.executor.Execute(ctx, func(ctx context.Context) (interface{}, error) {
req, out := f()
req.SetContext(ctx)
res, err := b.sender(req)
// ignore the error should we get a http internal server back, otherwise we do not retry correctly
if res != nil && res.StatusCode >= http.StatusInternalServerError && res.StatusCode != http.StatusNotImplemented {
return nil, &InvalidStatusError{
Status: res.StatusCode,
}
}
return out, err
})
}