Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Print percentiles in benchmark #2699

Open
wants to merge 4 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
25 changes: 25 additions & 0 deletions docs/reference/filters.md
Original file line number Diff line number Diff line change
Expand Up @@ -2819,6 +2819,31 @@ Example:
fifo(100, 150, "10s")
```

### fifoWithBody

This Filter is similar to the [lifo](#lifo) filter in regards to
parameters and status codes.
Performance considerations are similar to [fifo](#fifo).

The difference between fifo and fifoWithBody is that fifo will decrement
the concurrency as soon as the backend sent response headers and
fifoWithBody will decrement the concurrency if the response body was
served. Normally both are very similar, but if you have a fully async
component that serves multiple website fragments, this would decrement
concurrency too early.

Parameters:

* MaxConcurrency specifies how many goroutines are allowed to work on this queue (int)
* MaxQueueSize sets the queue size (int)
* Timeout sets the timeout to get request scheduled (time)

Example:

```
fifoWithBody(100, 150, "10s")
```

### lifo

This Filter changes skipper to handle the route with a bounded last in
Expand Down
1 change: 1 addition & 0 deletions filters/builtin/builtin.go
Original file line number Diff line number Diff line change
Expand Up @@ -216,6 +216,7 @@ func Filters() []filters.Spec {
auth.NewForwardToken(),
auth.NewForwardTokenField(),
scheduler.NewFifo(),
scheduler.NewFifoWithBody(),
scheduler.NewLIFO(),
scheduler.NewLIFOGroup(),
rfc.NewPath(),
Expand Down
1 change: 1 addition & 0 deletions filters/filters.go
Original file line number Diff line number Diff line change
Expand Up @@ -327,6 +327,7 @@ const (
SetDynamicBackendUrl = "setDynamicBackendUrl"
ApiUsageMonitoringName = "apiUsageMonitoring"
FifoName = "fifo"
FifoWithBodyName = "fifoWithBody"
LifoName = "lifo"
LifoGroupName = "lifoGroup"
RfcPathName = "rfcPath"
Expand Down
85 changes: 85 additions & 0 deletions filters/scheduler/cleanup_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
package scheduler

import (
"net/http"
"net/http/httptest"
"testing"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

"github.com/zalando/skipper/filters"
"github.com/zalando/skipper/proxy"
"github.com/zalando/skipper/routing"
"github.com/zalando/skipper/routing/testdataclient"
"github.com/zalando/skipper/scheduler"
)

func TestCleanupOnBackendErrors(t *testing.T) {
doc := `
aroute: *
-> lifo(1, 1, "100ms")
-> lifoGroup("foo", 1, 1, "100ms")
-> lifo(2, 2, "200ms")
-> lifoGroup("bar", 1, 1, "100ms")
-> fifo(1, 1, "200ms")
-> "http://test.invalid"
`

dc, err := testdataclient.NewDoc(doc)
require.NoError(t, err)
defer dc.Close()

reg := scheduler.RegistryWith(scheduler.Options{})
defer reg.Close()

fr := make(filters.Registry)
fr.Register(NewLIFO())
fr.Register(NewLIFOGroup())
fr.Register(NewFifo())

ro := routing.Options{
SignalFirstLoad: true,
FilterRegistry: fr,
DataClients: []routing.DataClient{dc},
PostProcessors: []routing.PostProcessor{reg},
}

rt := routing.New(ro)
defer rt.Close()

<-rt.FirstLoad()

pr := proxy.WithParams(proxy.Params{
Routing: rt,
})
defer pr.Close()

ts := httptest.NewServer(pr)
defer ts.Close()

rsp, err := http.Get(ts.URL)
require.NoError(t, err)
rsp.Body.Close()

var route *routing.Route
{
req, err := http.NewRequest("GET", ts.URL, nil)
require.NoError(t, err)

route, _ = rt.Get().Do(req)
require.NotNil(t, route, "failed to lookup route")
}

for _, f := range route.Filters {
if qf, ok := f.Filter.(interface{ GetQueue() *scheduler.Queue }); ok {
status := qf.GetQueue().Status()
assert.Equal(t, scheduler.QueueStatus{ActiveRequests: 0, QueuedRequests: 0, Closed: false}, status)
} else if qf, ok := f.Filter.(interface{ GetQueue() *scheduler.FifoQueue }); ok {
status := qf.GetQueue().Status()
assert.Equal(t, scheduler.QueueStatus{ActiveRequests: 0, QueuedRequests: 0, Closed: false}, status)
} else {
t.Fatal("filter does not implement GetQueue()")
}
}
}
58 changes: 39 additions & 19 deletions filters/scheduler/fifo.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,24 +11,31 @@ import (
"github.com/zalando/skipper/scheduler"
)

const (
fifoKey string = "fifo"
)

type (
fifoSpec struct{}
fifoSpec struct {
typ string
}
fifoFilter struct {
config scheduler.Config
queue *scheduler.FifoQueue
typ string
}
)

func NewFifo() filters.Spec {
return &fifoSpec{}
return &fifoSpec{
typ: filters.FifoName,
}
}

func (*fifoSpec) Name() string {
return filters.FifoName
func NewFifoWithBody() filters.Spec {
return &fifoSpec{
typ: filters.FifoWithBodyName,
}
}

func (s *fifoSpec) Name() string {
return s.typ
}

// CreateFilter creates a fifoFilter, that will use a semaphore based
Expand Down Expand Up @@ -65,6 +72,7 @@ func (s *fifoSpec) CreateFilter(args []interface{}) (filters.Filter, error) {
}

return &fifoFilter{
typ: s.typ,
config: scheduler.Config{
MaxConcurrency: cc,
MaxQueueSize: qs,
Expand Down Expand Up @@ -132,21 +140,33 @@ func (f *fifoFilter) Request(ctx filters.FilterContext) {
}

// ok
pending, _ := ctx.StateBag()[fifoKey].([]func())
ctx.StateBag()[fifoKey] = append(pending, done)
pending, _ := ctx.StateBag()[f.typ].([]func())
ctx.StateBag()[f.typ] = append(pending, done)
}

// Response will decrease the number of inflight requests to release
// the concurrency reservation for the request.
func (f *fifoFilter) Response(ctx filters.FilterContext) {
pending, ok := ctx.StateBag()[fifoKey].([]func())
if !ok {
return
}
last := len(pending) - 1
if last < 0 {
return
switch f.typ {
case filters.FifoName:
pending, ok := ctx.StateBag()[f.typ].([]func())
if !ok {
return
}
last := len(pending) - 1
if last < 0 {
return
}
pending[last]()
ctx.StateBag()[f.typ] = pending[:last]

case filters.FifoWithBodyName:
// nothing to do here, handled in the proxy after copyStream()
}
pending[last]()
ctx.StateBag()[fifoKey] = pending[:last]
}

// HandleErrorResponse is to opt-in for filters to get called
// Response(ctx) in case of errors via proxy. It has to return true to opt-in.
func (f *fifoFilter) HandleErrorResponse() bool {
return true
}