Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
54 changes: 54 additions & 0 deletions bucket/leaky.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
package bucket

import (
"sync"
"time"
)

// LeakyLimiter implements a leaky bucket rate limiter. Requests fill the bucket,
// which drains at a constant rate. If the bucket is full, requests are rejected.
// Unlike TokenLimiter, this enforces a smooth output rate rather than allowing bursts.
type LeakyLimiter struct {
Copy link

Copilot AI Jan 1, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The exported type LeakyLimiter and its exported method Allow lack documentation comments. According to Go conventions, all exported types and methods should have doc comments that begin with the name being declared.

Copilot uses AI. Check for mistakes.
mu sync.Mutex

capacity, level, rate float64
lastUpdatedAt time.Time
}

Copy link

Copilot AI Jan 1, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The exported function NewLeakyLimiter lacks documentation. According to Go conventions, all exported functions should have doc comments that begin with the function name and describe what it does, including parameter and return value descriptions.

Suggested change
// NewLeakyLimiter returns a new LeakyLimiter with the given capacity and leak rate.
// capacity specifies the maximum number of tokens that can be accumulated in the
// limiter, and rate specifies how many tokens per second leak from the bucket.
// The returned *LeakyLimiter is initialized with an empty bucket and the current time.

Copilot uses AI. Check for mistakes.
// NewLeakyLimiter creates a new leaky bucket limiter.
// Capacity is the maximum bucket size. Rate is how many requests drain per second.
func NewLeakyLimiter(capacity, rate uint32) *LeakyLimiter {
return &LeakyLimiter{
capacity: float64(capacity),
rate: float64(rate),
lastUpdatedAt: time.Now(),
}
}

func (lim *LeakyLimiter) update() {
t := time.Now()
if t.Before(lim.lastUpdatedAt) {
return
}

lim.level = max(0, lim.level-t.Sub(lim.lastUpdatedAt).Seconds()*lim.rate)
lim.lastUpdatedAt = t
}

Copy link

Copilot AI Jan 1, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The exported method Allow lacks documentation. According to Go conventions, all exported methods should have doc comments describing their behavior.

Suggested change
// Allow reports whether a new event is permitted under the current limiter
// state and updates the internal level accordingly.

Copilot uses AI. Check for mistakes.
// Allow reports whether a request is allowed. It adds one to the bucket level
// if there is room and returns true. If the bucket is full, it returns false
// without blocking.
func (lim *LeakyLimiter) Allow() bool {
lim.mu.Lock()
defer lim.mu.Unlock()

lim.update()

if lim.level+1 <= lim.capacity {
lim.level++

return true
}

return false
}
72 changes: 72 additions & 0 deletions bucket/leaky_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
package bucket_test

import (
"sync"
"sync/atomic"
"testing"

"github.com/serroba/rate/bucket"
"github.com/stretchr/testify/assert"
)

func TestLeakyLimiter_Allow(t *testing.T) {
type args struct {
capacity uint32
rate uint32
}

tests := []struct {
name string
args args
previousAttempts int
want bool
}{
{name: "Test With No Capacity", args: args{capacity: 0, rate: 0}, want: false},
{name: "Test With Capacity 1", args: args{capacity: 1, rate: 0}, want: true},
{
name: "Test With Capacity 1 with previous attempt",
args: args{capacity: 1, rate: 0},
previousAttempts: 1,
want: false,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
lim := bucket.NewLeakyLimiter(tt.args.capacity, tt.args.rate)
for range tt.previousAttempts {
lim.Allow()
}

assert.Equalf(t, tt.want, lim.Allow(), "Allow()")
})
}
}

func TestLeakyLimiter_Allow_Concurrent(t *testing.T) {
var (
allow atomic.Int64
deny atomic.Int64
wg sync.WaitGroup
)

lim := bucket.NewLeakyLimiter(10, 5)

for range 15 {
wg.Add(1)

go func() {
defer wg.Done()

if lim.Allow() {
allow.Add(1)
} else {
deny.Add(1)
}
}()
}

wg.Wait()

assert.Equal(t, int64(10), allow.Load())
assert.Equal(t, int64(5), deny.Load())
}
26 changes: 15 additions & 11 deletions bucket/registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,24 +7,28 @@ import (
type (
Identifier string
Registry struct {
mu sync.Mutex
limiters map[Identifier]*TokenLimiter
capacity, rate uint32
mu sync.Mutex
factory LimiterFactory
limiters map[Identifier]Limiter
}
)

func NewRegistry(capacity, rate uint32, users ...Identifier) (*Registry, error) {
limiters := make(map[Identifier]*TokenLimiter)
type Limiter interface {
Allow() bool
}

type LimiterFactory func() Limiter

func NewRegistry(factory LimiterFactory, keys ...Identifier) (*Registry, error) {
limiters := make(map[Identifier]Limiter)

for _, user := range users {
limiter := NewLimiter(capacity, rate)
limiters[user] = limiter
for _, key := range keys {
limiters[key] = factory()
}

return &Registry{
limiters: limiters,
capacity: capacity,
rate: rate,
factory: factory,
}, nil
}

Expand All @@ -34,7 +38,7 @@ func (r *Registry) Allow(key Identifier) bool {

lim, ok := r.limiters[key]
if !ok {
lim = NewLimiter(r.capacity, r.rate)
lim = r.factory()
r.limiters[key] = lim
}

Expand Down
Loading