Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature smart rate limiter #34

Merged
merged 7 commits into from
Mar 4, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion activity/ratelimiter/README.md
Original file line number Diff line number Diff line change
@@ -1,12 +1,14 @@
# Rate Limiter

The `ratelimiter` service type creates a rate limiter with specified `limit`. When it is used in the `step`, it applies `limit` against supplied `token`.
The `ratelimiter` service type creates a rate limiter with specified `limit`. When it is used in the `step`, it applies `limit` against supplied `token`. If a `spikeThreshold` is specified then traffic will be blocked with a probability (that has an exponential decay) when a traffic spike occurs above the `spikeThreshold` multiple.

The available service `settings` are as follows:

| Name | Type | Description |
|:-----------|:--------|:--------------|
| limit | string | Limit can be specifed in the format of "limit-period". Valid periods are 'S', 'M' & 'H' to represent Second, Minute & Hour. Example: "10-S" represents 10 request/second |
| spikeThreshold | decimal | Multiple above base traffic load which triggers the spike block logic. Spike blocking is disabled by default. |
| decayRate | decimal | Exponential decay rate for the spike blocking probability. Default .01 |

The available `input` for the request are as follows:

Expand Down
85 changes: 83 additions & 2 deletions activity/ratelimiter/activity.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,22 @@ package ratelimiter

import (
"context"
"math"
"math/rand"
"sync"
"time"

"github.com/project-flogo/core/activity"
"github.com/project-flogo/core/data/metadata"
"github.com/ulule/limiter"
"github.com/ulule/limiter/drivers/store/memory"
)

const (
// MemorySize is the size of the circular buffer holding the request times
MemorySize = 256
)

var (
activityMetadata = activity.ToMetadata(&Settings{}, &Input{}, &Output{})
)
Expand All @@ -17,6 +26,16 @@ func init() {
activity.Register(&Activity{}, New)
}

// Context is a token context
type Context struct {
sync.Mutex
rand *rand.Rand
index, prev, size int
lastSpike int64
filter, lastRatio float64
memory [MemorySize]int64
}

// Activity is a rate limiter service
// Limit can be specified in the format "<limit>-<period>"
//
Expand All @@ -31,8 +50,58 @@ func init() {
// * 5 requests / hour : "5-H"
type Activity struct {
limiter *limiter.Limiter

sync.RWMutex
context map[string]*Context
threshold, decay float64
}

func (a *Activity) filterRequests(token string) bool {
a.RLock()
context := a.context[token]
a.RUnlock()
if context == nil {
context = &Context{
prev: MemorySize - 1,
rand: rand.New(rand.NewSource(1)),
}
a.Lock()
a.context[token] = context
a.Unlock()
}

context.Lock()
defer context.Unlock()
time := time.Now().UnixNano()
previous := context.memory[context.prev]
context.memory[context.index] = time
context.index, context.prev = (context.index+1)%MemorySize, context.index
size, valid := context.size, true
if size < MemorySize {
size++
context.size, valid = size, false
}
oldest := context.memory[context.index]

alpha := float64(time-previous) / float64(time-oldest)
rate := float64(size) / float64(time-oldest)
context.filter = alpha*rate + (1-alpha)*context.filter
ratio := rate / context.filter
if valid {
if ratio > a.threshold {
context.lastSpike, context.lastRatio = time, ratio-1
}

probability := 1 / (1 + context.lastRatio*math.Exp(a.decay*float64(context.lastSpike-time)))
if context.rand.Float64() > probability {
return true
}
}

return false
}

// New creates a new rate limiter
func New(ctx activity.InitContext) (activity.Activity, error) {
settings := Settings{}
err := metadata.MapToStruct(ctx.Settings(), &settings, true)
Expand All @@ -50,8 +119,15 @@ func New(ctx activity.InitContext) (activity.Activity, error) {
store := memory.NewStore()
limiter := limiter.New(store, rate)

if settings.DecayRate == 0 {
settings.DecayRate = .01
}

act := Activity{
limiter: limiter,
limiter: limiter,
context: make(map[string]*Context, 256),
threshold: settings.SpikeThreshold,
decay: settings.DecayRate,
}

return &act, nil
Expand Down Expand Up @@ -90,9 +166,14 @@ func (a *Activity) Eval(ctx activity.Context) (done bool, err error) {
return true, nil
}

filter := false
if a.threshold != 0 {
filter = a.filterRequests(input.Token)
}

// check the ratelimit
output.LimitAvailable = limiterContext.Remaining
if limiterContext.Reached {
if limiterContext.Reached || filter {
output.LimitReached = true
} else {
output.LimitReached = false
Expand Down
51 changes: 51 additions & 0 deletions activity/ratelimiter/activity_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -142,3 +142,54 @@ func TestRatelimiter(t *testing.T) {
assert.Nil(t, err)
assert.False(t, ctx.output["limitReached"].(bool), "limit should not be reached")
}

func TestSmartRatelimiter(t *testing.T) {
activity, err := New(newInitContext(map[string]interface{}{
"limit": "1000-S",
"spikeThreshold": "2",
}))
assert.Nil(t, err)

for i := 0; i < 256; i++ {
time.Sleep(50 * time.Millisecond)
ctx := newActivityContext(map[string]interface{}{
"token": "abc123",
})
_, err = activity.Eval(ctx)
assert.Nil(t, err)
assert.False(t, ctx.output["limitReached"].(bool), "limit should not be reached")
}
blocked, notBlocked := 0, 0
for i := 0; i < 10; i++ {
time.Sleep(10 * time.Millisecond)
ctx := newActivityContext(map[string]interface{}{
"token": "abc123",
})
_, err = activity.Eval(ctx)
assert.Nil(t, err)
if ctx.output["limitReached"].(bool) {
blocked++
} else {
notBlocked++
}
}
for i := 0; i < 256; i++ {
time.Sleep(50 * time.Millisecond)
ctx := newActivityContext(map[string]interface{}{
"token": "abc123",
})
_, err = activity.Eval(ctx)
assert.Nil(t, err)
if ctx.output["limitReached"].(bool) {
blocked++
} else {
notBlocked++
}
}
assert.Condition(t, func() (success bool) {
return blocked > 0
}, "some requests should have been blocked")
assert.Condition(t, func() (success bool) {
return notBlocked > 0
}, "some requests should not have been blocked")
}
3 changes: 2 additions & 1 deletion activity/ratelimiter/examples/activity_example.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,14 +11,15 @@ import (
)

// Example returns an API example
func Example(limit string) (engine.Engine, error) {
func Example(limit string, threshold float64) (engine.Engine, error) {
app := api.NewApp()

gateway := microapi.New("Pets")

serviceLimiter := gateway.NewService("RateLimiter", &ratelimiter.Activity{})
serviceLimiter.SetDescription("Rate limiter")
serviceLimiter.AddSetting("limit", limit)
serviceLimiter.AddSetting("spikeThreshold", threshold)

serviceStore := gateway.NewService("PetStorePets", &rest.Activity{})
serviceStore.SetDescription("Get pets by ID from the petstore")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ This recipe is a gateway which applies rate limit on specified dispatches.
## Setup
```
git clone https://github.com/project-flogo/microgateway
cd microgateway/activity/ratelimiter/examples/api
cd microgateway/activity/ratelimiter/examples/api/basic
```

## Testing
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ import (
)

func main() {
e, err := examples.Example("3-M")
e, err := examples.Example("3-M", 0)
if err != nil {
panic(err)
}
Expand Down
45 changes: 45 additions & 0 deletions activity/ratelimiter/examples/api/smart/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
# Gateway with smart Rate Limiter
This recipe is a gateway which applies rate limit and traffic spike blocking on specified dispatches.

## Installation
* Install [Go](https://golang.org/)

## Setup
```
git clone https://github.com/project-flogo/microgateway
cd microgateway/activity/ratelimiter/examples/api/smart
```

## Testing

Start the gateway:
```
go run main.go
```

### Run the client

Run the following command:
```
go run main.go -client
```

You should see the following like output:
```
0 {"category":{"id":0,"name":"string"},"id":1,"name":"doggie","photoUrls":["string"],"status":"available","tags":[{"id":0,"name":"string"}]}

1 {"category":{"id":0,"name":"string"},"id":1,"name":"doggie","photoUrls":["string"],"status":"available","tags":[{"id":0,"name":"string"}]}

2 {"category":{"id":0,"name":"string"},"id":1,"name":"doggie","photoUrls":["string"],"status":"available","tags":[{"id":0,"name":"string"}]}
```

After 256 requests there will be a spike and traffic, and then requests will be blocked:
```
256 {"status":"Rate Limit Exceeded - The service you have requested is over the allowed limit."}

257 {"status":"Rate Limit Exceeded - The service you have requested is over the allowed limit."}

258 {"status":"Rate Limit Exceeded - The service you have requested is over the allowed limit."}
```

After some time the requests will no longer be blocked.
73 changes: 73 additions & 0 deletions activity/ratelimiter/examples/api/smart/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
package main

import (
"flag"
"fmt"
"io/ioutil"
"net/http"
"sync"
"time"

"github.com/project-flogo/core/engine"
"github.com/project-flogo/microgateway/activity/ratelimiter/examples"
)

var client = flag.Bool("client", false, "run the client")

func main() {
flag.Parse()

if *client {
client := &http.Client{}
request := func() []byte {
req, err := http.NewRequest(http.MethodGet, "http://localhost:9096/pets/1", nil)
if err != nil {
panic(err)
}
req.Header.Add("Token", "ABC123")
response, err := client.Do(req)
if err != nil {
panic(err)
}
body, err := ioutil.ReadAll(response.Body)
if err != nil {
panic(err)
}
response.Body.Close()

return body
}
count := 0
for i := 0; i < 256; i++ {
time.Sleep(50 * time.Millisecond)
output := request()
fmt.Println(count, string(output))
count++
}
wait := sync.WaitGroup{}
wait.Add(10)
for i := 0; i < 10; i++ {
time.Sleep(10 * time.Millisecond)
go func(count int) {
output := request()
fmt.Println(count, string(output))
wait.Done()
}(count)
count++
}
wait.Wait()
for i := 0; i < 256; i++ {
time.Sleep(50 * time.Millisecond)
output := request()
fmt.Println(count, string(output))
count++
}
return
}

e, err := examples.Example("1000-S", 2)
if err != nil {
panic(err)
}
engine.RunEngine(e)
}
Loading