Skip to content

Commit

Permalink
feature: retry() filter
Browse files Browse the repository at this point in the history
feature: net.Client.Retry()

Signed-off-by: Sandor Szücs <sandor.szuecs@zalando.de>
  • Loading branch information
szuecs committed Mar 13, 2024
1 parent fd81a7a commit a834d90
Show file tree
Hide file tree
Showing 9 changed files with 473 additions and 50 deletions.
2 changes: 2 additions & 0 deletions filters/builtin/builtin.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
"github.com/zalando/skipper/filters/fadein"
"github.com/zalando/skipper/filters/flowid"
logfilter "github.com/zalando/skipper/filters/log"
"github.com/zalando/skipper/filters/retry"
"github.com/zalando/skipper/filters/rfc"
"github.com/zalando/skipper/filters/scheduler"
"github.com/zalando/skipper/filters/sed"
Expand Down Expand Up @@ -229,6 +230,7 @@ func Filters() []filters.Spec {
fadein.NewEndpointCreated(),
consistenthash.NewConsistentHashKey(),
consistenthash.NewConsistentHashBalanceFactor(),
retry.NewRetry(),
tls.New(),
}
}
Expand Down
1 change: 1 addition & 0 deletions filters/filters.go
Original file line number Diff line number Diff line change
Expand Up @@ -333,6 +333,7 @@ const (
FifoWithBodyName = "fifoWithBody"
LifoName = "lifo"
LifoGroupName = "lifoGroup"
RetryName = "retry"
RfcPathName = "rfcPath"
RfcHostName = "rfcHost"
BearerInjectorName = "bearerinjector"
Expand Down
18 changes: 18 additions & 0 deletions filters/retry/retry.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
package retry

import (
"github.com/zalando/skipper/filters"
)

type retry struct{}

// NewRetry creates a filter specification for the retry() filter
func NewRetry() filters.Spec { return retry{} }

func (retry) Name() string { return filters.RetryName }
func (retry) CreateFilter([]interface{}) (filters.Filter, error) { return retry{}, nil }
func (retry) Response(filters.FilterContext) {}

func (retry) Request(ctx filters.FilterContext) {
ctx.StateBag()[filters.RetryName] = struct{}{}
}
95 changes: 95 additions & 0 deletions filters/retry/retry_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
package retry

import (
"bytes"
"io"
"net/http"
"net/http/httptest"
"testing"

"github.com/AlexanderYastrebov/noleak"
"github.com/zalando/skipper/eskip"
"github.com/zalando/skipper/filters"
"github.com/zalando/skipper/proxy/proxytest"
)

func TestRetry(t *testing.T) {
for _, tt := range []struct {
name string
method string
body string
}{
{
name: "test GET",
method: "GET",
},
{
name: "test POST",
method: "POST",
body: "hello POST",
},
{
name: "test PATCH",
method: "PATCH",
body: "hello PATCH",
},
{
name: "test PUT",
method: "PUT",
body: "hello PUT",
}} {
t.Run(tt.name, func(t *testing.T) {
i := 0
backend := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
if i == 0 {
i++
w.WriteHeader(http.StatusBadGateway)
return
}

got, err := io.ReadAll(r.Body)
if err != nil {
t.Fatalf("got no data")
}
s := string(got)
if tt.body != s {
t.Fatalf("Failed to get the right data want: %q, got: %q", tt.body, s)
}

w.WriteHeader(http.StatusOK)
}))
defer backend.Close()

noleak.Check(t)

fr := make(filters.Registry)
retry := NewRetry()
fr.Register(retry)
r := &eskip.Route{
Filters: []*eskip.Filter{
{Name: retry.Name()},
},
Backend: backend.URL,
}

proxy := proxytest.New(fr, r)
defer proxy.Close()

buf := bytes.NewBufferString(tt.body)
req, err := http.NewRequest(tt.method, proxy.URL, buf)
if err != nil {
t.Fatal(err)
}

rsp, err := http.DefaultClient.Do(req)
if err != nil {
t.Fatalf("Failed to execute retry: %v", err)
}

if rsp.StatusCode != http.StatusOK {
t.Fatalf("unexpected status code: %s", rsp.Status)
}
rsp.Body.Close()
})
}
}
48 changes: 48 additions & 0 deletions io/copy_stream.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
package io

import (
"bytes"
"io"
)

type bodyBuffer struct{ *bytes.Buffer }

func (buf *bodyBuffer) Close() error {
return nil
}

type CopyBodyStream struct {
left int
buf *bodyBuffer
input io.ReadCloser
}

func NewCopyBodyStream(left int, buf *bytes.Buffer, rc io.ReadCloser) *CopyBodyStream {
return &CopyBodyStream{
left: left,
buf: &bodyBuffer{Buffer: buf},
input: rc,
}
}

func (cb *CopyBodyStream) Len() int {
return cb.buf.Len()
}

func (cb *CopyBodyStream) Read(p []byte) (n int, err error) {
n, err = cb.input.Read(p)
if cb.left > 0 && n > 0 {
m := min(n, cb.left)
cb.buf.Write(p[:m])
cb.left -= m
}
return n, err
}

func (cb *CopyBodyStream) Close() error {
return cb.input.Close()
}

func (cb *CopyBodyStream) GetBody() io.ReadCloser {
return cb.buf
}
41 changes: 41 additions & 0 deletions io/copy_stream_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
package io

import (
"bytes"
"io"
"testing"
)

type tbuf struct{ *bytes.Buffer }

func (tb *tbuf) Read(p []byte) (int, error) {
return tb.Buffer.Read(p)
}
func (tb *tbuf) Close() error {
return nil
}

func TestCopyBodyStream(t *testing.T) {
s := "content"
bbuf := &tbuf{bytes.NewBufferString(s)}
cbs := NewCopyBodyStream(bbuf.Len(), &bytes.Buffer{}, bbuf)

buf := make([]byte, len(s))
cbs.Read(buf)

if cbs.Len() != len(buf) {
t.Fatalf("Failed to have the same buf buffer size want: %d, got: %d", cbs.Len(), len(buf))
}

got, err := io.ReadAll(cbs.GetBody())
if err != nil {
t.Fatalf("Failed to read: %v", err)
}
if gotStr := string(got); s != gotStr {
t.Fatalf("Failed to get the right content: %s != %s", s, gotStr)
}

if err = cbs.Close(); err != nil {
t.Fatal(err)
}
}
62 changes: 14 additions & 48 deletions net/httpclient.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package net
import (
"bytes"
"crypto/tls"
"errors"
"fmt"
"io"
"net/http"
Expand All @@ -15,6 +16,7 @@ import (

"github.com/opentracing/opentracing-go"
"github.com/opentracing/opentracing-go/ext"
skpio "github.com/zalando/skipper/io"
"github.com/zalando/skipper/logging"
"github.com/zalando/skipper/secrets"
)
Expand All @@ -24,43 +26,7 @@ const (
defaultRefreshInterval = 5 * time.Minute
)

type mybuf struct{ *bytes.Buffer }

func (buf *mybuf) Close() error {
return nil
}

type copyBodyStream struct {
left int
buf *mybuf
input io.ReadCloser
}

func newCopyBodyStream(left int, buf *bytes.Buffer, rc io.ReadCloser) *copyBodyStream {
return &copyBodyStream{
left: left,
buf: &mybuf{Buffer: buf},
input: rc,
}
}

func (cb *copyBodyStream) Read(p []byte) (n int, err error) {
n, err = cb.input.Read(p)
if cb.left > 0 && n > 0 {
m := min(n, cb.left)
cb.buf.Write(p[:m])
cb.left -= m
}
return n, err
}

func (cb *copyBodyStream) Close() error {
return cb.input.Close()
}

func (cb *copyBodyStream) GetBody() io.ReadCloser {
return cb.buf
}
var errRequestNotFound = errors.New("request not found")

// Client adds additional features like Bearer token injection, and
// opentracing to the wrapped http.Client with the same interface as
Expand Down Expand Up @@ -166,8 +132,8 @@ func (c *Client) Do(req *http.Request) (*http.Response, error) {
req.Header.Set("Authorization", "Bearer "+string(b))
}
}
if req.Body != nil && req.Body != http.NoBody {
retryBuffer := newCopyBodyStream(int(req.ContentLength), &bytes.Buffer{}, req.Body)
if req.Body != nil && req.Body != http.NoBody && req.ContentLength > 0 {
retryBuffer := skpio.NewCopyBodyStream(int(req.ContentLength), &bytes.Buffer{}, req.Body)
c.retryBuffers.Store(req, retryBuffer)
req.Body = retryBuffer
}
Expand All @@ -179,20 +145,20 @@ func (c *Client) Retry(req *http.Request) (*http.Response, error) {
return c.Do(req)
}

if rc, err := req.GetBody(); err == nil {
println("req.GetBody() case")
c.retryBuffers.Delete(req)
req.Body = rc
return c.Do(req)
}
// Next line panics on TestClientRetryBodyHalfReader
// if rc, err := req.GetBody(); err == nil {
// c.retryBuffers.Delete(req)
// req.Body = rc
// return c.Do(req)
// }
// return nil, fmt.Errorf("failed to retry")

println("our own retry buffer impl")
buf, ok := c.retryBuffers.Load(req)
if !ok {
return nil, fmt.Errorf("no retry possible, request not found: %s %s", req.Method, req.URL)
return nil, fmt.Errorf("no retry possible, %w: %s %s", errRequestNotFound, req.Method, req.URL)
}

retryBuffer, ok := buf.(*copyBodyStream)
retryBuffer, ok := buf.(*skpio.CopyBodyStream)
if !ok {
return nil, fmt.Errorf("no retry possible, no retry buffer for request: %s %s", req.Method, req.URL)
}
Expand Down

0 comments on commit a834d90

Please sign in to comment.