Skip to content

Commit

Permalink
feature: multiple ratelimit filters in one route are now possible
Browse files Browse the repository at this point in the history
feature: cluster ratelimit instances can now grouped
fix: cluster ratelimit instances do not override shared data anymore, based on the group the ratelimit data is stored in the swarm
fix: X-Forwarded-For Header Lookuper in cluster ratelimits is now treated similar to the default

Signed-off-by: Sandor Szücs <sandor.szuecs@zalando.de>
  • Loading branch information
szuecs committed Jan 22, 2019
1 parent db37d10 commit b72a1c1
Show file tree
Hide file tree
Showing 6 changed files with 187 additions and 80 deletions.
71 changes: 48 additions & 23 deletions filters/ratelimit/ratelimit.go
Expand Up @@ -6,6 +6,7 @@ For detailed documentation of the ratelimit, see https://godoc.org/github.com/za
package ratelimit

import (
"net/http"
"time"

"github.com/zalando/skipper/filters"
Expand Down Expand Up @@ -62,26 +63,30 @@ func NewRatelimit() filters.Spec {
return &spec{typ: ratelimit.ServiceRatelimit, filterName: ratelimit.ServiceRatelimitName}
}

// NewClusterRatelimit creates a rate limiting that is aware of the other
// instances. The value given here should be the combined rate of all instances.
// NewClusterRatelimit creates a rate limiting that is aware of the
// other instances. The value given here should be the combined rate
// of all instances. The ratelimit group parameter can be used to
// select the same ratelimit group across one or more routes.
//
// Example:
//
// backendHealthcheck: Path("/healthcheck")
// -> clusterRatelimit(200, "1m")
// -> clusterRatelimit("groupA", 200, "1m")
// -> "https://foo.backend.net";
//
func NewClusterRateLimit() filters.Spec {
return &spec{typ: ratelimit.ClusterServiceRatelimit, filterName: ratelimit.ClusterServiceRatelimitName}
}

// NewClusterClientRatelimit creates a rate limiting that is aware of the other
// instances. The value given here should be the combined rate of all instances.
// NewClusterClientRatelimit creates a rate limiting that is aware of
// the other instances. The value given here should be the combined
// rate of all instances. The ratelimit group parameter can be used to
// select the same ratelimit group across one or more routes.
//
// Example:
//
// backendHealthcheck: Path("/login")
// -> clusterClientRatelimit(20, "1h")
// -> clusterClientRatelimit("groupB", 20, "1h")
// -> "https://foo.backend.net";
//
// The above example would limit access to "/login" if, the client did
Expand All @@ -95,7 +100,7 @@ func NewClusterRateLimit() filters.Spec {
// Example:
//
// backendHealthcheck: Path("/login")
// -> clusterClientRatelimit(20, "1h", "Authorization")
// -> clusterClientRatelimit("groupC", 20, "1h", "Authorization")
// -> "https://foo.backend.net";
//
func NewClusterClientRateLimit() filters.Spec {
Expand Down Expand Up @@ -143,22 +148,28 @@ func serviceRatelimitFilter(args []interface{}) (filters.Filter, error) {
}

func clusterRatelimitFilter(args []interface{}) (filters.Filter, error) {
if len(args) != 2 {
if len(args) != 3 {
return nil, filters.ErrInvalidFilterParameters
}

maxHits, err := getIntArg(args[0])
group, err := getStringArg(args[0])
if err != nil {
return nil, err
}

timeWindow, err := getDurationArg(args[1])
maxHits, err := getIntArg(args[1])
if err != nil {
return nil, err
}

timeWindow, err := getDurationArg(args[2])
if err != nil {
return nil, err
}

s := ratelimit.Settings{
Type: ratelimit.ClusterServiceRatelimit,
Group: group,
MaxHits: maxHits,
TimeWindow: timeWindow,
Lookuper: ratelimit.NewSameBucketLookuper(),
Expand All @@ -168,36 +179,46 @@ func clusterRatelimitFilter(args []interface{}) (filters.Filter, error) {
}

func clusterClientRatelimitFilter(args []interface{}) (filters.Filter, error) {
if !(len(args) == 2 || len(args) == 3) {
if !(len(args) == 3 || len(args) == 4) {
return nil, filters.ErrInvalidFilterParameters
}

maxHits, err := getIntArg(args[0])
group, err := getStringArg(args[0])
if err != nil {
return nil, err
}

timeWindow, err := getDurationArg(args[1])
maxHits, err := getIntArg(args[1])
if err != nil {
return nil, err
}

timeWindow, err := getDurationArg(args[2])
if err != nil {
return nil, err
}

s := ratelimit.Settings{
Type: ratelimit.ClusterClientRatelimit,
MaxHits: maxHits,
TimeWindow: timeWindow,
Type: ratelimit.ClusterClientRatelimit,
Group: group,
MaxHits: maxHits,
TimeWindow: timeWindow,
CleanInterval: 10 * timeWindow,
}

if len(args) > 2 {
headerName, err := getStringArg(args[2])
if len(args) > 3 {
headerName, err := getStringArg(args[3])
if err != nil {
return nil, err
}
s.Lookuper = ratelimit.NewHeaderLookuper(headerName)
s.CleanInterval = 10 * timeWindow
headerName = http.CanonicalHeaderKey(headerName)
if headerName == "X-Forwarded-For" {
s.Lookuper = ratelimit.NewXForwardedForLookuper()
} else {
s.Lookuper = ratelimit.NewHeaderLookuper(headerName)
}
} else {
s.Lookuper = ratelimit.NewXForwardedForLookuper()
s.CleanInterval = 10 * timeWindow
}

return &filter{settings: s}, nil
Expand Down Expand Up @@ -295,7 +316,11 @@ func getDurationArg(a interface{}) (time.Duration, error) {
// Request stores the configured ratelimit.Settings in the state bag,
// such that it can be used in the proxy to check ratelimit.
func (f *filter) Request(ctx filters.FilterContext) {
ctx.StateBag()[RouteSettingsKey] = f.settings
if settings, ok := ctx.StateBag()[RouteSettingsKey].([]ratelimit.Settings); ok {
ctx.StateBag()[RouteSettingsKey] = append(settings, f.settings)
} else {
ctx.StateBag()[RouteSettingsKey] = []ratelimit.Settings{f.settings}
}
}

func (f *filter) Response(filters.FilterContext) {}
func (*filter) Response(filters.FilterContext) {}
33 changes: 19 additions & 14 deletions filters/ratelimit/ratelimit_test.go
Expand Up @@ -2,6 +2,7 @@ package ratelimit

import (
"net/http"
"reflect"
"testing"
"time"

Expand Down Expand Up @@ -38,7 +39,7 @@ func TestArgs(t *testing.T) {
func TestRateLimit(t *testing.T) {
test := func(
s func() filters.Spec,
expect ratelimit.Settings,
expect []ratelimit.Settings,
args ...interface{},
) func(*testing.T) {
return func(t *testing.T) {
Expand Down Expand Up @@ -66,7 +67,7 @@ func TestRateLimit(t *testing.T) {
t.Error("failed to set the ratelimit settings")
}

if settings != expect {
if !reflect.DeepEqual(settings, expect) {
t.Error("invalid settings")
t.Log("got ", settings)
t.Log("expected", expect)
Expand All @@ -76,31 +77,35 @@ func TestRateLimit(t *testing.T) {

t.Run("ratelimit service", test(
NewRatelimit,
ratelimit.Settings{
Type: ratelimit.ServiceRatelimit,
MaxHits: 3,
TimeWindow: 1 * time.Second,
Lookuper: ratelimit.NewSameBucketLookuper(),
[]ratelimit.Settings{
{
Type: ratelimit.ServiceRatelimit,
MaxHits: 3,
TimeWindow: 1 * time.Second,
Lookuper: ratelimit.NewSameBucketLookuper(),
},
},
3,
"1s",
))

t.Run("ratelimit local", test(
NewLocalRatelimit,
ratelimit.Settings{
Type: ratelimit.LocalRatelimit,
MaxHits: 3,
TimeWindow: 1 * time.Second,
CleanInterval: 10 * time.Second,
Lookuper: ratelimit.NewXForwardedForLookuper(),
[]ratelimit.Settings{
{
Type: ratelimit.LocalRatelimit,
MaxHits: 3,
TimeWindow: 1 * time.Second,
CleanInterval: 10 * time.Second,
Lookuper: ratelimit.NewXForwardedForLookuper(),
},
},
3,
"1s",
))

t.Run("ratelimit disable", test(
NewDisableRatelimit,
ratelimit.Settings{Type: ratelimit.DisableRatelimit},
[]ratelimit.Settings{{Type: ratelimit.DisableRatelimit}},
))
}
54 changes: 31 additions & 23 deletions proxy/proxy.go
Expand Up @@ -711,7 +711,7 @@ func (p *Proxy) makeBackendRequest(ctx *context) (*http.Response, *proxyError) {
}

if p.experimentalUpgrade && isUpgradeRequest(req) {
if err := p.makeUpgradeRequest(ctx, ctx.route, req); err != nil {
if err = p.makeUpgradeRequest(ctx, ctx.route, req); err != nil {
return nil, &proxyError{err: err}
}

Expand Down Expand Up @@ -802,30 +802,37 @@ func (p *Proxy) checkRatelimit(ctx *context) (ratelimit.Settings, int) {
return ratelimit.Settings{}, 0
}

settings, ok := ctx.stateBag[ratelimitfilters.RouteSettingsKey].(ratelimit.Settings)
if !ok {
settings, ok := ctx.stateBag[ratelimitfilters.RouteSettingsKey].([]ratelimit.Settings)
if !ok || len(settings) < 1 {
return ratelimit.Settings{}, 0
}

rl := p.limiters.Get(settings)
if rl == nil {
return settings, 0
}
allow := true
for _, setting := range settings {
rl := p.limiters.Get(setting)
if rl == nil {
p.log.Errorf("RateLimiter is nil for setting: %s", setting)
continue
}

if settings.Lookuper == nil {
p.log.Error("lookuper is nil")
return settings, 0
}
s := settings.Lookuper.Lookup(ctx.Request())
if setting.Lookuper == nil {
p.log.Errorf("Lookuper is nil for setting: %s", setting)
continue
}

if s == "" {
return settings, 0
}
s := setting.Lookuper.Lookup(ctx.Request())
if s == "" {
p.log.Errorf("Lookuper found no data in request for setting: %s and request: %v", setting, ctx.Request())
continue
}

if rl.Allow(s) {
return settings, 0
allow = allow && rl.Allow(s)
if !allow {
return setting, rl.RetryAfter(s)
}
}
return settings, rl.RetryAfter(s)

return ratelimit.Settings{}, 0
}

func (p *Proxy) checkBreaker(c *context) (func(bool), bool) {
Expand Down Expand Up @@ -885,11 +892,6 @@ func (p *Proxy) do(ctx *context) error {
ctx.applyRoute(route, params, p.flags.PreserveHost())

processedFilters := p.applyFiltersToRequest(ctx.route.Filters, ctx)
// per route rate limit
if settings, retryAfter := p.checkRatelimit(ctx); retryAfter > 0 {
rerr := ratelimitError(settings, ctx, retryAfter)
return rerr
}

if ctx.deprecatedShunted() {
p.log.Debug("deprecated shunting detected in route: %s", ctx.route.Id)
Expand All @@ -913,6 +915,12 @@ func (p *Proxy) do(ctx *context) error {
ctx.outgoingDebugRequest = debugReq
ctx.setResponse(&http.Response{Header: make(http.Header)}, p.flags.PreserveOriginal())
} else {
// per route rate limit
if settings, retryAfter := p.checkRatelimit(ctx); retryAfter > 0 {
rerr := ratelimitError(settings, ctx, retryAfter)
return rerr
}

done, allow := p.checkBreaker(ctx)
if !allow {
tracing.LogKV("circuit_breaker", "open", ctx.request.Context())
Expand Down

0 comments on commit b72a1c1

Please sign in to comment.