Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
37 changes: 20 additions & 17 deletions limiter_fixed_truncated_window.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ type FixedTruncatedWindowRateLimiter struct {

mu sync.Mutex

rate time.Duration
rate Rate
window time.Time
capacity int64
rateLimitReached bool
Expand All @@ -63,14 +63,21 @@ func (l *FixedTruncatedWindowRateLimiter) Check(ctx context.Context) (Result, er
}

func (l *FixedTruncatedWindowRateLimiter) Dump(ctx context.Context) (r Result, err error) {
l.mu.Lock()
defer l.mu.Unlock()

now := l.clock.Now()
window := now.Truncate(l.rate)

c, err := l.db.Get(ctx, window)
if TimeGTE(l.window.Add(l.rate.Duration()), now) {
l.rateLimitReached = false
l.window = now.Truncate(l.rate.Unit)
}

c, err := l.db.Get(ctx, l.window)

if c >= l.capacity {
// rate limit exceeded
r = res(l.rate-now.Sub(window), l.capacity-c)
r = res(l.window.Add(l.rate.Duration()).Sub(now), l.capacity-c)
} else {
r = res(0, l.capacity-c)
}
Expand All @@ -89,21 +96,19 @@ func (l *FixedTruncatedWindowRateLimiter) try(ctx context.Context, tokens int64)

now := l.clock.Now()

window := now.Truncate(l.rate)

if !l.window.Equal(window) {
if TimeGTE(l.window.Add(l.rate.Duration()), now) {
l.rateLimitReached = false
l.window = window
l.window = now.Truncate(l.rate.Unit)
}

ttw := l.rate - now.Sub(window)
ttw := l.window.Add(l.rate.Duration()).Sub(now)

if l.rateLimitReached {
return res(ttw, 0), ErrRateLimitExceeded
}

c, err := l.db.Inc(ctx, FixedWindowIncArgs{
Window: window,
Window: l.window,
Tokens: tokens,
Capacity: l.capacity,
TTL: ttw,
Expand Down Expand Up @@ -134,22 +139,20 @@ func (l *FixedTruncatedWindowRateLimiter) check(ctx context.Context, tokens int6

now := l.clock.Now()

window := now.Truncate(l.rate)

if !l.window.Equal(window) {
if TimeGTE(l.window.Add(l.rate.Duration()), now) {
// new window so no rate Limit
l.rateLimitReached = false
l.window = window
l.window = now.Truncate(l.rate.Unit)
return res(0, l.capacity), nil
}

ttw := l.rate - now.Sub(window)
ttw := l.window.Add(l.rate.Duration()).Sub(now)

if l.rateLimitReached {
return res(ttw, 0), ErrRateLimitExceeded
}

c, err := l.db.Get(ctx, window)
c, err := l.db.Get(ctx, l.window)

if err != nil {
return nores, err
Expand Down Expand Up @@ -177,7 +180,7 @@ func NewFixedTruncatedWindowRateLimiter(
) *FixedTruncatedWindowRateLimiter {
return &FixedTruncatedWindowRateLimiter{
capacity: args.Capacity,
rate: args.Rate.Duration(),
rate: args.Rate,
clock: args.Clock,
db: args.DB,
rateLimitReached: false,
Expand Down
48 changes: 24 additions & 24 deletions limiter_fixed_truncated_window_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -158,15 +158,15 @@ func TestNewFixedTruncatedWindowRateLimiter(t *testing.T) {
},
{
method: check,
passTime: 0,
passTime: time.Second,
expectedErr: ErrRateLimitExceeded,
expectedTtw: time.Second * 4,
expectedTtw: time.Second * 10,
expectedFreeSlots: 0,
},
{
method: try,
passTime: 0,
expectedTtw: time.Second * 4,
expectedTtw: time.Second * 9,
expectedErr: ErrRateLimitExceeded,
expectedFreeSlots: 0,
},
Expand Down Expand Up @@ -201,10 +201,10 @@ func TestNewFixedTruncatedWindowRateLimiter(t *testing.T) {
},
{
method: check,
passTime: 0,
expectedErr: nil,
expectedFreeSlots: 2,
expectedTtw: 0,
passTime: time.Second * 7,
expectedErr: ErrRateLimitExceeded,
expectedFreeSlots: 0,
expectedTtw: time.Second * 7,
},
{
method: try,
Expand Down Expand Up @@ -256,42 +256,42 @@ func TestNewFixedTruncatedWindowRateLimiter(t *testing.T) {
expectedFreeSlots: 0,
expectedErr: nil,
},
// Rate Limit is reached and 1 second passes...
{
method: check,
passTime: 0,
expectedErr: ErrRateLimitExceeded,
expectedFreeSlots: 0,
expectedTtw: time.Second * 2,
expectedTtw: time.Second * 10,
},
// Rate Limit is reached and 1 second passes...
{
method: try,
passTime: time.Second,
expectedTtw: time.Second * 2,
expectedTtw: time.Second * 10,
expectedErr: ErrRateLimitExceeded,
expectedFreeSlots: 0,
},
{
method: check,
passTime: 0,
expectedErr: ErrRateLimitExceeded,
expectedTtw: time.Second,
expectedTtw: time.Second * 9,
expectedFreeSlots: 0,
},
// Rate limit is still held. Moving 2 seconds and getting into next window
{
method: try,
passTime: time.Second * 2,
expectedTtw: time.Second,
expectedTtw: time.Second * 9,
expectedErr: ErrRateLimitExceeded,
expectedFreeSlots: 0,
},
{
method: check,
passTime: 0,
expectedErr: nil,
expectedTtw: 0,
expectedFreeSlots: 2,
passTime: time.Second * 7,
expectedErr: ErrRateLimitExceeded,
expectedTtw: time.Second * 7,
expectedFreeSlots: 0,
},
// Requests check be made again
{
Expand Down Expand Up @@ -346,24 +346,24 @@ func TestNewFixedTruncatedWindowRateLimiter(t *testing.T) {
},
{
method: dump,
passTime: 0,
passTime: time.Second * 2,
expectedErr: nil,
expectedFreeSlots: 0,
expectedTtw: time.Second * 2,
expectedTtw: time.Second * 10,
},
{
method: try,
passTime: 0,
expectedTtw: time.Second * 2,
passTime: time.Second * 8,
expectedTtw: time.Second * 8,
expectedFreeSlots: 0,
expectedErr: ErrRateLimitExceeded,
},
{
method: dump,
passTime: 0,
expectedErr: nil,
expectedFreeSlots: 0,
expectedTtw: time.Second * 2,
expectedFreeSlots: 2,
expectedTtw: 0,
},
},
},
Expand Down Expand Up @@ -403,10 +403,10 @@ func TestNewFixedTruncatedWindowRateLimiter(t *testing.T) {
},
{
method: dump,
passTime: time.Second * 2,
passTime: time.Second * 10,
expectedErr: nil,
expectedFreeSlots: 0,
expectedTtw: time.Second * 2,
expectedTtw: time.Second * 10,
},
{
method: dump,
Expand Down
27 changes: 12 additions & 15 deletions limiter_token_fixed_window_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -173,7 +173,7 @@ func TestNewTokenFixedWindowRateLimiter_WindowTruncated(t *testing.T) {
method: check,
forwardAfter: 0,
expectedFreeSlots: 6,
expectedTtw: time.Second * 4,
expectedTtw: time.Second * 10,
expectedErr: ErrRateLimitExceeded,
requestTokens: 20,
},
Expand All @@ -182,14 +182,14 @@ func TestNewTokenFixedWindowRateLimiter_WindowTruncated(t *testing.T) {
requestTokens: 7, // 21 -> Rate Limit!
forwardAfter: 0,
expectedFreeSlots: 0,
expectedTtw: time.Second * 4,
expectedTtw: time.Second * 10,
expectedErr: ErrRateLimitExceeded,
},
{
method: check,
forwardAfter: 0,
expectedFreeSlots: 0,
expectedTtw: time.Second * 4,
expectedTtw: time.Second * 10,
expectedErr: ErrRateLimitExceeded,
requestTokens: 1,
},
Expand Down Expand Up @@ -227,8 +227,8 @@ func TestNewTokenFixedWindowRateLimiter_WindowTruncated(t *testing.T) {
},
{
method: try,
requestTokens: 1, // 2
forwardAfter: time.Second * 2, // 2022-02-05 00:00:11
requestTokens: 1,
forwardAfter: time.Second * 9, // now it's 2022-02-05 00:00:09
expectedFreeSlots: 0,
expectedTtw: 0, // TODO(@sonirico): Should this be zero?
expectedErr: nil,
Expand All @@ -252,7 +252,7 @@ func TestNewTokenFixedWindowRateLimiter_WindowTruncated(t *testing.T) {
method: check,
forwardAfter: 0,
expectedFreeSlots: 0,
expectedTtw: time.Second * 9,
expectedTtw: time.Second * 10,
expectedErr: ErrRateLimitExceeded,
requestTokens: 1,
},
Expand Down Expand Up @@ -301,33 +301,31 @@ func TestNewTokenFixedWindowRateLimiter_WindowTruncated(t *testing.T) {
forwardAfter: 0,
expectedFreeSlots: 2,
expectedErr: ErrRateLimitExceeded,
expectedTtw: time.Second * 2,
expectedTtw: time.Second * 10,
requestTokens: 3,
},
// Rate Limit is reached and 1 second passes...
{
method: try,
requestTokens: 3, // 11
forwardAfter: time.Second,
expectedFreeSlots: 0,
expectedTtw: time.Second * 2,
expectedTtw: time.Second * 10,
expectedErr: ErrRateLimitExceeded,
},
{
method: check,
forwardAfter: 0,
expectedFreeSlots: 0,
expectedTtw: time.Second,
expectedTtw: time.Second * 9,
expectedErr: ErrRateLimitExceeded,
requestTokens: 1,
},
// Rate limit is still held. Moving 2 seconds and getting into next window
{
method: try,
requestTokens: 3, // 11
forwardAfter: time.Second * 2,
forwardAfter: time.Second * 9,
expectedFreeSlots: 0,
expectedTtw: time.Second,
expectedTtw: time.Second * 9,
expectedErr: ErrRateLimitExceeded,
},
{
Expand All @@ -338,7 +336,6 @@ func TestNewTokenFixedWindowRateLimiter_WindowTruncated(t *testing.T) {
expectedErr: nil,
requestTokens: 1,
},
// Requests check be made again
{
method: try,
requestTokens: 3, // 3
Expand Down Expand Up @@ -392,7 +389,7 @@ func TestNewTokenFixedWindowRateLimiter_WindowTruncated(t *testing.T) {
{
method: dump,
expectedFreeSlots: 0,
expectedTtw: time.Second * 2,
expectedTtw: time.Second * 10,
expectedErr: nil,
},
},
Expand Down
33 changes: 22 additions & 11 deletions tests/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,35 +20,46 @@ func main() {
redisCli := redis.NewClient(redisOpts)

rateLimit :=
pacemaker.NewFixedWindowRateLimiter(
pacemaker.FixedWindowArgs{
pacemaker.NewFixedTruncatedWindowRateLimiter(
pacemaker.FixedTruncatedWindowArgs{
Capacity: 1200,
Rate: pacemaker.Rate{
Unit: time.Hour,
Amount: 1,
Unit: time.Minute,
Amount: 3,
},
Clock: pacemaker.NewClock(),
DB: pacemaker.NewFixedWindowRedisStorage(
redisCli,
pacemaker.FixedWindowRedisStorageOpts{
Prefix: "pacemaker",
Prefix: "pacemaker-test-marcos-2",
},
),
},
)

result, err := rateLimit.Try(ctx)
result, err := rateLimit.Dump(ctx)

if err != nil {
log.Printf("error dump: '%v'", err)
}

log.Printf("Dump Result: '%v'", result)

result, err = rateLimit.Try(ctx)
if err != nil {
log.Printf("error try: '%v'", err)
}

log.Printf("Try Result: '%v'", result)

result, err = rateLimit.Dump(ctx)
for {
time.Sleep(time.Second)
result, err = rateLimit.Dump(ctx)

if err != nil {
log.Printf("error dump: '%v'", err)
}
if err != nil {
log.Printf("error dump: '%v'", err)
}

log.Printf("Dump Result: '%v'", result)
log.Printf("Dump Result: '%v'", result)
}
}
Loading