diff --git a/docs/reference/filters.md b/docs/reference/filters.md index 98fea702f5..fa2da89075 100644 --- a/docs/reference/filters.md +++ b/docs/reference/filters.md @@ -2819,6 +2819,31 @@ Example: fifo(100, 150, "10s") ``` +### fifoWithBody + +This Filter is similar to the [lifo](#lifo) filter in regards to +parameters and status codes. +Performance considerations are similar to [fifo](#fifo). + +The difference between fifo and fifoWithBody is that fifo will decrement +the concurrency as soon as the backend sent response headers and +fifoWithBody will decrement the concurrency if the response body was +served. Normally both are very similar, but if you have a fully async +component that serves multiple website fragments, this would decrement +concurrency too early. + +Parameters: + +* MaxConcurrency specifies how many goroutines are allowed to work on this queue (int) +* MaxQueueSize sets the queue size (int) +* Timeout sets the timeout to get request scheduled (time) + +Example: + +``` +fifoWithBody(100, 150, "10s") +``` + ### lifo This Filter changes skipper to handle the route with a bounded last in diff --git a/filters/builtin/builtin.go b/filters/builtin/builtin.go index 0609d87412..d4e1895bf4 100644 --- a/filters/builtin/builtin.go +++ b/filters/builtin/builtin.go @@ -216,6 +216,7 @@ func Filters() []filters.Spec { auth.NewForwardToken(), auth.NewForwardTokenField(), scheduler.NewFifo(), + scheduler.NewFifoWithBody(), scheduler.NewLIFO(), scheduler.NewLIFOGroup(), rfc.NewPath(), diff --git a/filters/filters.go b/filters/filters.go index 6caaaa7a41..9f9a1f333a 100644 --- a/filters/filters.go +++ b/filters/filters.go @@ -327,6 +327,7 @@ const ( SetDynamicBackendUrl = "setDynamicBackendUrl" ApiUsageMonitoringName = "apiUsageMonitoring" FifoName = "fifo" + FifoWithBodyName = "fifoWithBody" LifoName = "lifo" LifoGroupName = "lifoGroup" RfcPathName = "rfcPath" diff --git a/filters/scheduler/cleanup_test.go b/filters/scheduler/cleanup_test.go new file mode 100644 index 0000000000..a479173561 --- /dev/null +++ b/filters/scheduler/cleanup_test.go @@ -0,0 +1,85 @@ +package scheduler + +import ( + "net/http" + "net/http/httptest" + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + + "github.com/zalando/skipper/filters" + "github.com/zalando/skipper/proxy" + "github.com/zalando/skipper/routing" + "github.com/zalando/skipper/routing/testdataclient" + "github.com/zalando/skipper/scheduler" +) + +func TestCleanupOnBackendErrors(t *testing.T) { + doc := ` + aroute: * + -> lifo(1, 1, "100ms") + -> lifoGroup("foo", 1, 1, "100ms") + -> lifo(2, 2, "200ms") + -> lifoGroup("bar", 1, 1, "100ms") + -> fifo(1, 1, "200ms") + -> "http://test.invalid" + ` + + dc, err := testdataclient.NewDoc(doc) + require.NoError(t, err) + defer dc.Close() + + reg := scheduler.RegistryWith(scheduler.Options{}) + defer reg.Close() + + fr := make(filters.Registry) + fr.Register(NewLIFO()) + fr.Register(NewLIFOGroup()) + fr.Register(NewFifo()) + + ro := routing.Options{ + SignalFirstLoad: true, + FilterRegistry: fr, + DataClients: []routing.DataClient{dc}, + PostProcessors: []routing.PostProcessor{reg}, + } + + rt := routing.New(ro) + defer rt.Close() + + <-rt.FirstLoad() + + pr := proxy.WithParams(proxy.Params{ + Routing: rt, + }) + defer pr.Close() + + ts := httptest.NewServer(pr) + defer ts.Close() + + rsp, err := http.Get(ts.URL) + require.NoError(t, err) + rsp.Body.Close() + + var route *routing.Route + { + req, err := http.NewRequest("GET", ts.URL, nil) + require.NoError(t, err) + + route, _ = rt.Get().Do(req) + require.NotNil(t, route, "failed to lookup route") + } + + for _, f := range route.Filters { + if qf, ok := f.Filter.(interface{ GetQueue() *scheduler.Queue }); ok { + status := qf.GetQueue().Status() + assert.Equal(t, scheduler.QueueStatus{ActiveRequests: 0, QueuedRequests: 0, Closed: false}, status) + } else if qf, ok := f.Filter.(interface{ GetQueue() *scheduler.FifoQueue }); ok { + status := qf.GetQueue().Status() + assert.Equal(t, scheduler.QueueStatus{ActiveRequests: 0, QueuedRequests: 0, Closed: false}, status) + } else { + t.Fatal("filter does not implement GetQueue()") + } + } +} diff --git a/filters/scheduler/fifo.go b/filters/scheduler/fifo.go index 040d92a3ad..50ceb95b65 100644 --- a/filters/scheduler/fifo.go +++ b/filters/scheduler/fifo.go @@ -11,24 +11,31 @@ import ( "github.com/zalando/skipper/scheduler" ) -const ( - fifoKey string = "fifo" -) - type ( - fifoSpec struct{} + fifoSpec struct { + typ string + } fifoFilter struct { config scheduler.Config queue *scheduler.FifoQueue + typ string } ) func NewFifo() filters.Spec { - return &fifoSpec{} + return &fifoSpec{ + typ: filters.FifoName, + } } -func (*fifoSpec) Name() string { - return filters.FifoName +func NewFifoWithBody() filters.Spec { + return &fifoSpec{ + typ: filters.FifoWithBodyName, + } +} + +func (s *fifoSpec) Name() string { + return s.typ } // CreateFilter creates a fifoFilter, that will use a semaphore based @@ -65,6 +72,7 @@ func (s *fifoSpec) CreateFilter(args []interface{}) (filters.Filter, error) { } return &fifoFilter{ + typ: s.typ, config: scheduler.Config{ MaxConcurrency: cc, MaxQueueSize: qs, @@ -132,21 +140,33 @@ func (f *fifoFilter) Request(ctx filters.FilterContext) { } // ok - pending, _ := ctx.StateBag()[fifoKey].([]func()) - ctx.StateBag()[fifoKey] = append(pending, done) + pending, _ := ctx.StateBag()[f.typ].([]func()) + ctx.StateBag()[f.typ] = append(pending, done) } // Response will decrease the number of inflight requests to release // the concurrency reservation for the request. func (f *fifoFilter) Response(ctx filters.FilterContext) { - pending, ok := ctx.StateBag()[fifoKey].([]func()) - if !ok { - return - } - last := len(pending) - 1 - if last < 0 { - return + switch f.typ { + case filters.FifoName: + pending, ok := ctx.StateBag()[f.typ].([]func()) + if !ok { + return + } + last := len(pending) - 1 + if last < 0 { + return + } + pending[last]() + ctx.StateBag()[f.typ] = pending[:last] + + case filters.FifoWithBodyName: + // nothing to do here, handled in the proxy after copyStream() } - pending[last]() - ctx.StateBag()[fifoKey] = pending[:last] +} + +// HandleErrorResponse is to opt-in for filters to get called +// Response(ctx) in case of errors via proxy. It has to return true to opt-in. +func (f *fifoFilter) HandleErrorResponse() bool { + return true } diff --git a/filters/scheduler/fifo_test.go b/filters/scheduler/fifo_test.go index b0ed9bda38..c9f0093f20 100644 --- a/filters/scheduler/fifo_test.go +++ b/filters/scheduler/fifo_test.go @@ -1,15 +1,20 @@ package scheduler import ( + "bytes" "fmt" "io" "net/http" stdlibhttptest "net/http/httptest" - "net/url" + "strings" "testing" + "testing/iotest" "time" "github.com/opentracing/opentracing-go/mocktracer" + "github.com/sirupsen/logrus" + + "github.com/zalando/skipper/eskip" "github.com/zalando/skipper/filters" "github.com/zalando/skipper/metrics/metricstest" "github.com/zalando/skipper/net/httptest" @@ -19,11 +24,34 @@ import ( "github.com/zalando/skipper/scheduler" ) +func TestCreateFifoName(t *testing.T) { + for _, tt := range []struct { + name string + filterFunc func() filters.Spec + }{ + { + name: filters.FifoName, + filterFunc: NewFifo, + }, + { + name: filters.FifoWithBodyName, + filterFunc: NewFifoWithBody, + }, + } { + t.Run(tt.name, func(t *testing.T) { + if tt.filterFunc().Name() != tt.name { + t.Fatalf("got %q, want %q", tt.filterFunc().Name(), tt.name) + } + }) + } +} + func TestCreateFifoFilter(t *testing.T) { for _, tt := range []struct { name string args []interface{} wantParseErr bool + wantConfig scheduler.Config }{ { name: "fifo no args", @@ -51,6 +79,38 @@ func TestCreateFifoFilter(t *testing.T) { 5, "1s", }, + wantConfig: scheduler.Config{ + MaxConcurrency: 3, + MaxQueueSize: 5, + Timeout: 1 * time.Second, + }, + }, + { + name: "fifo negative value arg1", + args: []interface{}{ + -3, + 5, + "1s", + }, + wantParseErr: true, + }, + { + name: "fifo negative value arg2", + args: []interface{}{ + 3, + -5, + "1s", + }, + wantParseErr: true, + }, + { + name: "fifo too small value arg3", + args: []interface{}{ + 3, + 5, + "1ns", + }, + wantParseErr: true, }, { name: "fifo wrong type arg1", @@ -100,18 +160,226 @@ func TestCreateFifoFilter(t *testing.T) { }, } { t.Run(tt.name, func(t *testing.T) { - spec := &fifoSpec{} - ff, err := spec.CreateFilter(tt.args) - if err != nil && !tt.wantParseErr { - t.Fatalf("Failed to parse filter: %v", err) + for _, f := range []func() filters.Spec{NewFifo, NewFifoWithBody} { + spec := f() + ff, err := spec.CreateFilter(tt.args) + if err != nil && !tt.wantParseErr { + t.Fatalf("Failed to parse filter: %v", err) + } + if err == nil && tt.wantParseErr { + t.Fatal("Failed to get wanted error on create filter") + } + + if _, ok := ff.(*fifoFilter); !ok && err == nil { + t.Fatal("Failed to convert filter to *fifoFilter") + } + } + }) + } +} + +type flusher struct { + w http.ResponseWriter +} + +func (f *flusher) Flush() { + f.w.(http.Flusher).Flush() +} + +func (f *flusher) Unwrap() http.ResponseWriter { + return f.w +} + +func (f *flusher) Write(p []byte) (n int, err error) { + n, err = f.w.Write(p) + if err == nil { + f.Flush() + } + return +} + +type slowReader struct { + r io.Reader + d time.Duration +} + +func (sr *slowReader) Read(p []byte) (int, error) { + logrus.Infof("slowReader: %d", len(p)) + if len(p) == 0 { + return 0, nil + } + time.Sleep(sr.d) + n, err := sr.r.Read(p) + logrus.Infof("slowReader: %d %v", n, err) + return n, err +} + +func TestFifoWithBody(t *testing.T) { + for _, tt := range []struct { + name string + args []interface{} + backendTime time.Duration + responseSize int + wantErr bool + }{ + { + name: "fifoWithBody 1024B with 1 queue should be ok", + args: []interface{}{1, 1, "1s"}, + backendTime: 10 * time.Millisecond, + responseSize: 1024, + }, + { + name: "fifoWithBody 1024B with 0 queue should fail", + args: []interface{}{1, 0, "10ms"}, + backendTime: 50 * time.Millisecond, + responseSize: 1024, + wantErr: true, + }, + { + name: "fifoWithBody 2x 1024B with 1 queue should be ok", + args: []interface{}{1, 1, "1s"}, + backendTime: 10 * time.Millisecond, + responseSize: 2 * 1024, + }, + { + name: "fifoWithBody 2x 1024B with 0 queue should fail", + args: []interface{}{1, 0, "15ms"}, + backendTime: 10 * time.Millisecond, + responseSize: 2 * 1024, + wantErr: true, + }, + } { + t.Run(tt.name, func(t *testing.T) { + + backend := stdlibhttptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + t.Logf("backend path: %s", r.URL.Path) + buf := bytes.NewBufferString(strings.Repeat("A", tt.responseSize)) + halfReader := iotest.HalfReader(buf) + sr := &slowReader{ + d: 100 * time.Millisecond, + r: halfReader, + } + + w.WriteHeader(http.StatusOK) + // sleep here to test the difference between streaming response and not + time.Sleep(tt.backendTime) + // TODO: maybe better to do slow body streaming? + b := make([]byte, 1024) + io.CopyBuffer(&flusher{w}, sr, b) + })) + defer backend.Close() + + // proxy + metrics := &metricstest.MockMetrics{} + defer metrics.Close() + reg := scheduler.RegistryWith(scheduler.Options{ + Metrics: metrics, + EnableRouteFIFOMetrics: true, + }) + defer reg.Close() + fr := make(filters.Registry) + fr.Register(NewFifoWithBody()) + args := append(tt.args, backend.URL) + doc := fmt.Sprintf(`r: * -> fifoWithBody(%v, %v, "%v") -> "%s"`, args...) + t.Logf("%s", doc) + dc, err := testdataclient.NewDoc(doc) + if err != nil { + t.Fatalf("Failed to create testdataclient: %v", err) + } + defer dc.Close() + ro := routing.Options{ + SignalFirstLoad: true, + FilterRegistry: fr, + DataClients: []routing.DataClient{dc}, + PostProcessors: []routing.PostProcessor{reg}, + } + rt := routing.New(ro) + defer rt.Close() + <-rt.FirstLoad() + tracer := &testTracer{MockTracer: mocktracer.New()} + pr := proxy.WithParams(proxy.Params{ + Routing: rt, + OpenTracing: &proxy.OpenTracingParams{Tracer: tracer}, + }) + defer pr.Close() + ts := stdlibhttptest.NewServer(pr) + defer ts.Close() + + // simple test + rsp, err := ts.Client().Get(ts.URL + "/test") + if err != nil { + t.Fatalf("Failed to get response from %s: %v", ts.URL, err) } - if err == nil && tt.wantParseErr { - t.Fatal("Failed to get wanted error on create filter") + defer rsp.Body.Close() + if rsp.StatusCode != http.StatusOK { + t.Fatalf("Failed to get valid response from endpoint: %d", rsp.StatusCode) + } + b, err := io.ReadAll(rsp.Body) + if err != nil { + t.Fatalf("Failed to read response body from: %v", err) + } + if len(b) != tt.responseSize { + t.Fatalf("Failed to read the size, got: %v, want: %v", len(b), tt.responseSize) + } + + t.Log("the streaming test") + // the streaming test + rspCH := make(chan *http.Response) + errCH := make(chan error) + defer func() { + close(rspCH) + close(errCH) + }() + waithCH := make(chan struct{}) + go func() { + rsp, err := ts.Client().Get(ts.URL + "/1") + t.Logf("rsp1: %s", rsp.Status) + close(waithCH) + if err != nil { + errCH <- err + } else { + rspCH <- rsp + } + }() + + <-waithCH + rsp2, err2 := ts.Client().Get(ts.URL + "/2") + t.Logf("rsp2: %s", rsp.Status) + if tt.wantErr { + n, err := io.Copy(io.Discard, rsp2.Body) + if n != 0 { + t.Fatalf("Failed to get error copied %d bytes, err: %v", n, err) + } + rsp2.Body.Close() + } else { + if err2 != nil { + t.Errorf("Failed to do 2nd request: %v", err2) + } else { + b, err2 := io.ReadAll(rsp2.Body) + if err2 != nil { + t.Errorf("Failed 2nd request to read body: %v", err2) + } + if len(b) != tt.responseSize { + t.Errorf("Failed 2nd request to get response size: %d, want: %d", len(b), tt.responseSize) + } + } } - if _, ok := ff.(*fifoFilter); !ok && err == nil { - t.Fatal("Failed to convert filter to *fifoFilter") + // read body from first request + select { + case err := <-errCH: + t.Fatalf("Failed to do request: %v", err) + case rsp := <-rspCH: + t.Logf("client1 got %s", rsp.Status) + b, err := io.ReadAll(rsp.Body) + if err != nil { + t.Fatalf("Failed to read body: %v", err) + } + if len(b) != tt.responseSize { + t.Fatalf("Failed to get response size: %d, want: %d", len(b), tt.responseSize) + } } + }) } } @@ -119,111 +387,86 @@ func TestCreateFifoFilter(t *testing.T) { func TestFifo(t *testing.T) { for _, tt := range []struct { name string - args []interface{} + filter string freq int per time.Duration backendTime time.Duration clientTimeout time.Duration - wantConfig scheduler.Config - wantParseErr bool wantOkRate float64 - epsilon float64 }{ { - name: "fifo defaults", - args: []interface{}{}, - wantParseErr: true, + name: "fifo simple ok", + filter: `fifo(3, 5, "1s")`, + freq: 20, + per: 100 * time.Millisecond, + backendTime: 1 * time.Millisecond, + wantOkRate: 1.0, }, { - name: "fifo simple ok", - args: []interface{}{ - 3, - 5, - "1s", - }, + name: "fifoWithBody simple ok", + filter: `fifoWithBody(3, 5, "1s")`, freq: 20, per: 100 * time.Millisecond, backendTime: 1 * time.Millisecond, clientTimeout: time.Second, - wantConfig: scheduler.Config{ - MaxConcurrency: 3, - MaxQueueSize: 5, - Timeout: time.Second, - }, - wantParseErr: false, - wantOkRate: 1.0, - epsilon: 1, + wantOkRate: 1.0, }, { - name: "fifo with reaching max concurrency and queue timeouts", - args: []interface{}{ - 3, - 5, - "10ms", - }, + name: "fifo simple client canceled", + filter: `fifo(3, 5, "1s")`, + freq: 20, + per: 100 * time.Millisecond, + backendTime: 1 * time.Millisecond, + clientTimeout: time.Nanosecond, + wantOkRate: 0, + }, + { + name: "fifoWithBody simple client canceled", + filter: `fifoWithBody(3, 5, "1s")`, + freq: 20, + per: 100 * time.Millisecond, + backendTime: 1 * time.Millisecond, + clientTimeout: time.Nanosecond, + wantOkRate: 0, + }, + { + name: "fifo with reaching max concurrency and queue timeouts", + filter: `fifo(3, 5, "10ms")`, + freq: 20, + per: 10 * time.Millisecond, + backendTime: 11 * time.Millisecond, + clientTimeout: time.Second, + wantOkRate: 0.005, + }, + { + name: "fifoWithBody with reaching max concurrency and queue timeouts", + filter: `fifoWithBody(3, 5, "10ms")`, + freq: 20, + per: 10 * time.Millisecond, + backendTime: 11 * time.Millisecond, + clientTimeout: time.Second, + wantOkRate: 0.005, + }, + { + name: "fifo with reaching max concurrency and queue full", + filter: `fifo(1, 1, "250ms")`, freq: 200, per: 100 * time.Millisecond, - backendTime: 10 * time.Millisecond, + backendTime: 100 * time.Millisecond, clientTimeout: time.Second, - wantConfig: scheduler.Config{ - MaxConcurrency: 3, - MaxQueueSize: 5, - Timeout: 10 * time.Millisecond, - }, - wantParseErr: false, - wantOkRate: 0.1, - epsilon: 1, + wantOkRate: 0.0008, }, { - name: "fifo with reaching max concurrency and queue full", - args: []interface{}{ - 1, - 1, - "250ms", - }, + name: "fifoWithBody with reaching max concurrency and queue full", + filter: `fifoWithBody(1, 1, "250ms")`, freq: 200, per: 100 * time.Millisecond, backendTime: 100 * time.Millisecond, clientTimeout: time.Second, - wantConfig: scheduler.Config{ - MaxConcurrency: 1, - MaxQueueSize: 1, - Timeout: 250 * time.Millisecond, - }, - wantParseErr: false, - wantOkRate: 0.0008, - epsilon: 1, + wantOkRate: 0.0008, }, } { t.Run(tt.name, func(t *testing.T) { - fs := NewFifo() - if fs.Name() != filters.FifoName { - t.Fatalf("Failed to get name got %s want %s", fs.Name(), filters.FifoName) - } - - // no parse error - ff, err := fs.CreateFilter(tt.args) - if err != nil && !tt.wantParseErr { - t.Fatalf("Failed to parse filter: %v", err) - } - if err == nil && tt.wantParseErr { - t.Fatalf("want parse error but have no: %v", err) - } - if tt.wantParseErr { - return - } - - // validate config - if f, ok := ff.(*fifoFilter); ok { - config := f.Config() - if config != tt.wantConfig { - t.Fatalf("Failed to get Config, got: %v, want: %v", config, tt.wantConfig) - } - if f.queue != f.GetQueue() { - t.Fatal("Failed to get expected queue") - } - } - metrics := &metricstest.MockMetrics{} reg := scheduler.RegistryWith(scheduler.Options{ Metrics: metrics, @@ -232,7 +475,8 @@ func TestFifo(t *testing.T) { defer reg.Close() fr := make(filters.Registry) - fr.Register(fs) + fr.Register(NewFifo()) + fr.Register(NewFifoWithBody()) backend := stdlibhttptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { time.Sleep(tt.backendTime) @@ -241,22 +485,11 @@ func TestFifo(t *testing.T) { })) defer backend.Close() - var fmtStr string - switch len(tt.args) { - case 0: - fmtStr = `aroute: * -> fifo() -> "%s"` - case 1: - fmtStr = `aroute: * -> fifo(%v) -> "%s"` - case 2: - fmtStr = `aroute: * -> fifo(%v, %v) -> "%s"` - case 3: - fmtStr = `aroute: * -> fifo(%v, %v, "%v") -> "%s"` - default: - t.Fatalf("Test not possible %d >3", len(tt.args)) + if ff := eskip.MustParseFilters(tt.filter); len(ff) != 1 { + t.Fatalf("expected one filter, got %d", len(ff)) } - args := append(tt.args, backend.URL) - doc := fmt.Sprintf(fmtStr, args...) + doc := fmt.Sprintf(`aroute: * -> %s -> "%s"`, tt.filter, backend.URL) t.Logf("%s", doc) dc, err := testdataclient.NewDoc(doc) @@ -285,14 +518,9 @@ func TestFifo(t *testing.T) { ts := stdlibhttptest.NewServer(pr) defer ts.Close() - reqURL, err := url.Parse(ts.URL) + rsp, err := ts.Client().Get(ts.URL) if err != nil { - t.Fatalf("Failed to parse url %s: %v", ts.URL, err) - } - - rsp, err := http.DefaultClient.Get(reqURL.String()) - if err != nil { - t.Fatalf("Failed to get response from %s: %v", reqURL.String(), err) + t.Fatalf("Failed to get response from %s: %v", ts.URL, err) } defer rsp.Body.Close() @@ -300,16 +528,26 @@ func TestFifo(t *testing.T) { t.Fatalf("Failed to get valid response from endpoint: %d", rsp.StatusCode) } - va := httptest.NewVegetaAttacker(reqURL.String(), tt.freq, tt.per, tt.clientTimeout) + va := httptest.NewVegetaAttacker(ts.URL, tt.freq, tt.per, tt.clientTimeout) va.Attack(io.Discard, 1*time.Second, tt.name) t.Logf("Success [0..1]: %0.2f", va.Success()) t.Logf("requests: %d", va.TotalRequests()) + count200, _ := va.CountStatus(200) + count499, _ := va.CountStatus(0) + count502, _ := va.CountStatus(502) + count503, _ := va.CountStatus(503) + t.Logf("status 200: %d", count200) + t.Logf("status 499: %d", count499) + t.Logf("status 502: %d", count502) + t.Logf("status 503: %d", count503) + got := va.TotalSuccess() want := tt.wantOkRate * float64(va.TotalRequests()) if got < want { t.Fatalf("OK rate too low got 0 { t.Fatal("no OK") @@ -327,71 +565,38 @@ func TestFifo(t *testing.T) { } } -func TestConstantRouteUpdatesFifo(t *testing.T) { +func TestFifoConstantRouteUpdates(t *testing.T) { for _, tt := range []struct { name string - args []interface{} + filter string freq int per time.Duration updateRate time.Duration backendTime time.Duration clientTimeout time.Duration - wantConfig scheduler.Config - wantParseErr bool wantOkRate float64 - epsilon float64 }{ { - name: "fifo simple ok", - args: []interface{}{ - 3, - 5, - "1s", - }, + name: "fifo simple ok", + filter: `fifo(3, 5, "1s")`, freq: 20, per: 100 * time.Millisecond, updateRate: 25 * time.Millisecond, backendTime: 1 * time.Millisecond, clientTimeout: time.Second, - wantConfig: scheduler.Config{ - MaxConcurrency: 3, - MaxQueueSize: 5, - Timeout: time.Second, - }, - wantParseErr: false, - wantOkRate: 1.0, - epsilon: 1, + wantOkRate: 1.0, + }, { + name: "fifoWithBody simple ok", + filter: `fifoWithBody(3, 5, "1s")`, + freq: 20, + per: 100 * time.Millisecond, + updateRate: 25 * time.Millisecond, + backendTime: 1 * time.Millisecond, + clientTimeout: time.Second, + wantOkRate: 1.0, }, } { t.Run(tt.name, func(t *testing.T) { - fs := NewFifo() - if fs.Name() != filters.FifoName { - t.Fatalf("Failed to get name got %s want %s", fs.Name(), filters.FifoName) - } - - // no parse error - ff, err := fs.CreateFilter(tt.args) - if err != nil && !tt.wantParseErr { - t.Fatalf("Failed to parse filter: %v", err) - } - if err == nil && tt.wantParseErr { - t.Fatalf("want parse error but have no: %v", err) - } - if tt.wantParseErr { - return - } - - // validate config - if f, ok := ff.(*fifoFilter); ok { - config := f.Config() - if config != tt.wantConfig { - t.Fatalf("Failed to get Config, got: %v, want: %v", config, tt.wantConfig) - } - if f.queue != f.GetQueue() { - t.Fatal("Failed to get expected queue") - } - } - metrics := &metricstest.MockMetrics{} reg := scheduler.RegistryWith(scheduler.Options{ Metrics: metrics, @@ -400,7 +605,8 @@ func TestConstantRouteUpdatesFifo(t *testing.T) { defer reg.Close() fr := make(filters.Registry) - fr.Register(fs) + fr.Register(NewFifo()) + fr.Register(NewFifoWithBody()) backend := stdlibhttptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { time.Sleep(tt.backendTime) @@ -409,9 +615,11 @@ func TestConstantRouteUpdatesFifo(t *testing.T) { })) defer backend.Close() - args := append(tt.args, backend.URL) - doc := fmt.Sprintf(`aroute: * -> fifo(%v, %v, "%v") -> "%s"`, args...) + if ff := eskip.MustParseFilters(tt.filter); len(ff) != 1 { + t.Fatalf("expected one filter, got %d", len(ff)) + } + doc := fmt.Sprintf(`aroute: * -> %s -> "%s"`, tt.filter, backend.URL) dc, err := testdataclient.NewDoc(doc) if err != nil { t.Fatalf("Failed to create testdataclient: %v", err) @@ -438,14 +646,9 @@ func TestConstantRouteUpdatesFifo(t *testing.T) { ts := stdlibhttptest.NewServer(pr) defer ts.Close() - reqURL, err := url.Parse(ts.URL) - if err != nil { - t.Fatalf("Failed to parse url %s: %v", ts.URL, err) - } - - rsp, err := http.DefaultClient.Get(reqURL.String()) + rsp, err := ts.Client().Get(ts.URL) if err != nil { - t.Fatalf("Failed to get response from %s: %v", reqURL.String(), err) + t.Fatalf("Failed to get response from %s: %v", ts.URL, err) } defer rsp.Body.Close() @@ -455,7 +658,7 @@ func TestConstantRouteUpdatesFifo(t *testing.T) { // run dataclient updates quit := make(chan struct{}) - newDoc := fmt.Sprintf(`aroute: * -> fifo(100, 200, "250ms") -> "%s"`, backend.URL) + newDoc := fmt.Sprintf(`aroute: * -> %s -> "%s"`, tt.filter, backend.URL) go func(q chan<- struct{}, updateRate time.Duration, doc1, doc2 string) { i := 0 for { @@ -475,7 +678,7 @@ func TestConstantRouteUpdatesFifo(t *testing.T) { }(quit, tt.updateRate, doc, newDoc) - va := httptest.NewVegetaAttacker(reqURL.String(), tt.freq, tt.per, tt.clientTimeout) + va := httptest.NewVegetaAttacker(ts.URL, tt.freq, tt.per, tt.clientTimeout) va.Attack(io.Discard, 1*time.Second, tt.name) quit <- struct{}{} diff --git a/filters/scheduler/lifo.go b/filters/scheduler/lifo.go index 60db279a58..717580bcd9 100644 --- a/filters/scheduler/lifo.go +++ b/filters/scheduler/lifo.go @@ -243,7 +243,7 @@ func (l *lifoFilter) Close() error { // increase the number of inflight requests and respond to the caller, // if the bounded queue returns an error. Status code by Error: // -// - 503 if jobqueue.ErrQueueFull +// - 503 if jobqueue.ErrStackFull // - 502 if jobqueue.ErrTimeout func (l *lifoFilter) Request(ctx filters.FilterContext) { request(l.GetQueue(), scheduler.LIFOKey, ctx) @@ -255,6 +255,12 @@ func (l *lifoFilter) Response(ctx filters.FilterContext) { response(scheduler.LIFOKey, ctx) } +// HandleErrorResponse is to opt-in for filters to get called +// Response(ctx) in case of errors via proxy. It has to return true to opt-in. +func (l *lifoFilter) HandleErrorResponse() bool { + return true +} + func (l *lifoGroupFilter) Group() string { return l.name } @@ -300,6 +306,12 @@ func (l *lifoGroupFilter) Response(ctx filters.FilterContext) { response(scheduler.LIFOKey, ctx) } +// HandleErrorResponse is to opt-in for filters to get called +// Response(ctx) in case of errors via proxy. It has to return true to opt-in. +func (l *lifoGroupFilter) HandleErrorResponse() bool { + return true +} + func request(q *scheduler.Queue, key string, ctx filters.FilterContext) { if q == nil { ctx.Logger().Warnf("Unexpected scheduler.Queue is nil for key %s", key) diff --git a/go.mod b/go.mod index c535c75840..967e658de5 100644 --- a/go.mod +++ b/go.mod @@ -72,6 +72,7 @@ require ( github.com/OneOfOne/xxhash v1.2.8 // indirect github.com/agnivade/levenshtein v1.1.1 // indirect github.com/armon/go-metrics v0.0.0-20180917152333-f0300d1749da // indirect + github.com/benburkert/pbench v0.0.0-20160623210926-4ec5821845ef // indirect github.com/beorn7/perks v1.0.1 // indirect github.com/bmizerany/perks v0.0.0-20141205001514-d9a9656a3a4b // indirect github.com/bytecodealliance/wasmtime-go/v3 v3.0.2 // indirect @@ -93,6 +94,7 @@ require ( github.com/envoyproxy/protoc-gen-validate v1.0.2 // indirect github.com/felixge/httpsnoop v1.0.3 // indirect github.com/fsnotify/fsnotify v1.6.0 // indirect + github.com/gavv/monotime v0.0.0-20190418164738-30dba4353424 // indirect github.com/go-ini/ini v1.67.0 // indirect github.com/go-logr/logr v1.2.4 // indirect github.com/go-logr/stdr v1.2.2 // indirect diff --git a/go.sum b/go.sum index a32b959119..1fcfaa72f4 100644 --- a/go.sum +++ b/go.sum @@ -39,6 +39,8 @@ github.com/armon/go-metrics v0.0.0-20180917152333-f0300d1749da h1:8GUt8eRujhVEGZ github.com/armon/go-metrics v0.0.0-20180917152333-f0300d1749da/go.mod h1:Q73ZrmVTwzkszR9V5SSuryQ31EELlFMUz1kKyl939pY= github.com/aryszka/jobqueue v0.0.2 h1:LYPhzklo0XFpVF+QtzfP9XRQPEsbJ2EW5Pur6pxxaS4= github.com/aryszka/jobqueue v0.0.2/go.mod h1:SdxqI6HZ4E1Lss94tey5OfjcAu3bdCDWS1AQzzIN4m4= +github.com/benburkert/pbench v0.0.0-20160623210926-4ec5821845ef h1:+7ZJvJGiV4hUBdjhEDhfGdjBCOmhVi0YQ5n+6g/ei+k= +github.com/benburkert/pbench v0.0.0-20160623210926-4ec5821845ef/go.mod h1:hrhDSsc41bBqGejYXbvMh6qexfcC2vXjodP5gufwWyI= github.com/beorn7/perks v1.0.1 h1:VlbKKnNfV8bJzeqoa4cOKqO6bYr3WgKZxO8Z16+hsOM= github.com/beorn7/perks v1.0.1/go.mod h1:G2ZrVWU2WbWT9wwq4/hrbKbnv/1ERSJQ0ibhJ6rlkpw= github.com/bmizerany/perks v0.0.0-20141205001514-d9a9656a3a4b h1:AP/Y7sqYicnjGDfD5VcY4CIfh1hRXBUavxrvELjTiOE= @@ -139,6 +141,8 @@ github.com/fsnotify/fsnotify v1.4.7/go.mod h1:jwhsz4b93w/PPRr/qN1Yymfu8t87LnFCMo github.com/fsnotify/fsnotify v1.4.9/go.mod h1:znqG4EE+3YCdAaPaxE2ZRY/06pZUdp0tY4IgpuI1SZQ= github.com/fsnotify/fsnotify v1.6.0 h1:n+5WquG0fcWoWp6xPWfHdbskMCQaFnG6PfBrh1Ky4HY= github.com/fsnotify/fsnotify v1.6.0/go.mod h1:sl3t1tCWJFWoRz9R8WJCbQihKKwmorjAbSClcnxKAGw= +github.com/gavv/monotime v0.0.0-20190418164738-30dba4353424 h1:Vh7rylVZRZCj6W41lRlP17xPk4Nq260H4Xo/DDYmEZk= +github.com/gavv/monotime v0.0.0-20190418164738-30dba4353424/go.mod h1:vmp8DIyckQMXOPl0AQVHt+7n5h7Gb7hS6CUydiV8QeA= github.com/ghodss/yaml v1.0.0 h1:wQHKEahhL6wmXdzwWG11gIVCkOv05bNOh+Rxn0yngAk= github.com/ghodss/yaml v1.0.0/go.mod h1:4dBDuWmgqj2HViK6kFavaiC9ZROes6MMH2rRYeMEF04= github.com/go-gl/glfw v0.0.0-20190409004039-e6da0acd62b1/go.mod h1:vR7hzQXu2zJy9AVAgeJqvqgH9Q5CA+iKCZ2gyEVpxRU= diff --git a/net/httptest/client.go b/net/httptest/client.go index 9b6b44f06b..778913cc0c 100644 --- a/net/httptest/client.go +++ b/net/httptest/client.go @@ -2,7 +2,6 @@ package httptest import ( "io" - "log" "strconv" "time" @@ -64,13 +63,8 @@ func (atk *VegetaAttacker) Attack(w io.Writer, d time.Duration, name string) { continue } atk.metrics.Add(res) - //metrics.Latencies.Add(res.Latency) } atk.metrics.Close() - // logrus.Info("histogram reporter:") - // histReporter := vegeta.NewHistogramReporter(atk.metrics.Histogram) - // histReporter.Report(os.Stdout) - log.Print("text reporter:") reporter := vegeta.NewTextReporter(atk.metrics) reporter.Report(w) } diff --git a/proxy/proxy.go b/proxy/proxy.go index 2d2ef831d8..3498185683 100644 --- a/proxy/proxy.go +++ b/proxy/proxy.go @@ -40,7 +40,6 @@ import ( "github.com/zalando/skipper/ratelimit" "github.com/zalando/skipper/rfc" "github.com/zalando/skipper/routing" - "github.com/zalando/skipper/scheduler" "github.com/zalando/skipper/tracing" ) @@ -1046,29 +1045,6 @@ func (p *Proxy) do(ctx *context) (err error) { return errMaxLoopbacksReached } - // this can be deleted after fixing - // https://github.com/zalando/skipper/issues/1238 problem - // happens if we get proxy errors, for example connect errors, - // which would block responses until fifo() timeouts. - defer func() { - stateBag := ctx.StateBag() - - pendingFIFO, _ := stateBag[scheduler.FIFOKey].([]func()) - for _, done := range pendingFIFO { - done() - } - - pendingLIFO, _ := stateBag[scheduler.LIFOKey].([]func()) - for _, done := range pendingLIFO { - done() - } - - // Cleanup state bag to avoid double call of done() - // because do() could be called for loopback backend - delete(stateBag, scheduler.FIFOKey) - delete(stateBag, scheduler.LIFOKey) - }() - // proxy global setting if !ctx.wasExecuted() { if settings, retryAfter := p.limiters.Check(ctx.request); retryAfter > 0 { @@ -1478,6 +1454,7 @@ func (p *Proxy) ServeHTTP(w http.ResponseWriter, r *http.Request) { err := p.do(ctx) + // writeTimeout() filter if d, ok := ctx.StateBag()[filters.WriteTimeout].(time.Duration); ok { e := ctx.ResponseController().SetWriteDeadline(time.Now().Add(d)) if e != nil { @@ -1485,12 +1462,22 @@ func (p *Proxy) ServeHTTP(w http.ResponseWriter, r *http.Request) { } } + // stream response body to client if err != nil { p.errorResponse(ctx, err) } else { p.serveResponse(ctx) } + // fifoWtihBody() filter + if sbf, ok := ctx.StateBag()[filters.FifoWithBodyName]; ok { + if fs, ok := sbf.([]func()); ok { + for i := len(fs) - 1; i >= 0; i-- { + fs[i]() + } + } + } + if ctx.cancelBackendContext != nil { ctx.cancelBackendContext() } diff --git a/routing/endpointregistry_test.go b/routing/endpointregistry_test.go index 1256da4d17..6d2b21bca8 100644 --- a/routing/endpointregistry_test.go +++ b/routing/endpointregistry_test.go @@ -3,10 +3,10 @@ package routing_test import ( "fmt" "runtime/metrics" - "sync" "testing" "time" + "github.com/benburkert/pbench" "github.com/stretchr/testify/assert" "github.com/zalando/skipper/eskip" "github.com/zalando/skipper/routing" @@ -131,29 +131,27 @@ func benchmarkIncInflightRequests(b *testing.B, name string, goroutines int) { const key string = "some key" const mapSize int = 10000 - b.Run(name, func(b *testing.B) { + percentileBench := pbench.New(b) + percentileBench.ReportPercentile(0.95) + percentileBench.ReportPercentile(0.99) + + percentileBench.Run(name, func(b *pbench.B) { r := routing.NewEndpointRegistry(routing.RegistryOptions{}) for i := 1; i < mapSize; i++ { r.IncInflightRequest(fmt.Sprintf("foo-%d", i)) } r.IncInflightRequest(key) r.IncInflightRequest(key) - - wg := sync.WaitGroup{} b.ResetTimer() - for i := 0; i < goroutines; i++ { - wg.Add(1) - go func() { - defer wg.Done() - for n := 0; n < b.N/goroutines; n++ { - r.IncInflightRequest(key) - } - }() - } - wg.Wait() - printTotalMutexWaitTime(b) + b.RunParallel(func(pb *pbench.PB) { + for pb.Next() { + r.IncInflightRequest(key) + } + }) }) + + printTotalMutexWaitTime(b) } func BenchmarkIncInflightRequests(b *testing.B) { @@ -167,31 +165,29 @@ func benchmarkGetInflightRequests(b *testing.B, name string, goroutines int) { const key string = "some key" const mapSize int = 10000 - b.Run(name, func(b *testing.B) { + percentileBench := pbench.New(b) + percentileBench.ReportPercentile(0.95) + percentileBench.ReportPercentile(0.99) + + percentileBench.Run(name, func(b *pbench.B) { r := routing.NewEndpointRegistry(routing.RegistryOptions{}) for i := 1; i < mapSize; i++ { r.IncInflightRequest(fmt.Sprintf("foo-%d", i)) } r.IncInflightRequest(key) r.IncInflightRequest(key) + b.ResetTimer() var dummy int64 - wg := sync.WaitGroup{} - b.ResetTimer() - for i := 0; i < goroutines; i++ { - wg.Add(1) - go func() { - defer wg.Done() - for n := 0; n < b.N/goroutines; n++ { - dummy = r.GetMetrics(key).InflightRequests() - } - }() - } + b.RunParallel(func(pb *pbench.PB) { + for pb.Next() { + dummy = r.GetMetrics(key).InflightRequests() + } + }) dummy++ - wg.Wait() - - printTotalMutexWaitTime(b) }) + + printTotalMutexWaitTime(b) } func BenchmarkGetInflightRequests(b *testing.B) { diff --git a/scheduler/scheduler.go b/scheduler/scheduler.go index 5da0dd803e..e2e7de35e4 100644 --- a/scheduler/scheduler.go +++ b/scheduler/scheduler.go @@ -155,6 +155,20 @@ func (fq *fifoQueue) wait(ctx context.Context) (func(), error) { cnt := fq.counter fq.mu.RUnlock() + // check request context expired + // https://github.com/golang/go/issues/63615 + if err := ctx.Err(); err != nil { + switch err { + case context.DeadlineExceeded: + return nil, ErrQueueTimeout + case context.Canceled: + return nil, ErrClientCanceled + default: + // does not exist yet in Go stdlib as of Go1.18.4 + return nil, err + } + } + // handle queue all := cnt.Add(1) // queue full?