Skip to content

Commit

Permalink
Merge pull request #4 from vetinari/resizing-buffer
Browse files Browse the repository at this point in the history
add Resize(string, int)
  • Loading branch information
szuecs committed Mar 21, 2018
2 parents 61dbec8 + 21ae2c8 commit 74fd0df
Show file tree
Hide file tree
Showing 3 changed files with 102 additions and 0 deletions.
29 changes: 29 additions & 0 deletions circularbuffer.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package circularbuffer

import (
"fmt"
"sync"
"time"
)
Expand Down Expand Up @@ -106,3 +107,31 @@ func (cb *CircularBuffer) delta() time.Duration {
cb.RUnlock()
return cur.Sub(next)
}

// needs to be called with Lock() held by caller
func (cb *CircularBuffer) resize(n int) {
if n <= 0 {
fmt.Printf("resize(): refusing to resize circular buffer to %d", n)
return
}
cur := len(cb.slots)
if cur == n {
return
}
newSlots := make([]time.Time, n, n)
if cur < n {
copy(newSlots, cb.slots)
cb.slots = newSlots
} else {
cb.offset = cb.offset - n
if cb.offset < 0 {
cb.offset += cur
}
for i := 0; i < n; i++ {
off := (cb.offset + i) % cur
newSlots[i] = cb.slots[off]
}
cb.slots = newSlots
cb.offset = n - 1
}
}
46 changes: 46 additions & 0 deletions circularbuffer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -157,3 +157,49 @@ func TestCicularBufferMassiveConcurrentUse(t *testing.T) {
wg.Wait()

}

func TestResizeBufferIncrease(t *testing.T) {
l := 4
window := 1 * time.Second
cb := NewCircularBuffer(l, window)
start := time.Now()
for i := 0; i < l; i++ {
cb.Add(start)
}
cb.resize(2 * l)
for i := 0; i < l; i++ {
if !cb.slots[i].Equal(start) {
t.Errorf("invalid value found in slot %d: %s", i, cb.slots[i])
}
}
for i := l; i < 2*l; i++ {
if !cb.slots[i].IsZero() {
t.Errorf("invalid value found in slot %d: %s", i, cb.slots[i])
}
}
}

func TestResizeBufferDecrease(t *testing.T) {
l := 8
window := 1 * time.Second

for off := 0; off < l; off++ {
for newSize := 1; newSize < l; newSize++ {
cb := NewCircularBuffer(l, window)
cb.offset = off
start := time.Time{}
for i := 0; i < l; i++ {
cb.Add(start.Add(time.Duration(i) * window))
}
cb.resize(newSize)
for i := 0; i < newSize; i++ {
if !cb.slots[i].Equal(start.Add(window * time.Duration(l-newSize+i))) {
t.Errorf("invalid value found for new size %d in slot %d: %s", newSize, i, cb.slots[i])
}
}
if !cb.slots[cb.offset].Equal(start.Add(window * time.Duration(l-1))) {
t.Errorf("invalid value found for new size %d at offset %d: %s", newSize, cb.offset, cb.slots[cb.offset])
}
}
}
}
27 changes: 27 additions & 0 deletions ratelimit.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ type RateLimiter interface {
// Close cleans up the RateLimiter implementation.
Close()
Delta(string) time.Duration
Resize(string, int)
}

// NewRateLimiter returns a new initialized RateLimitter with maxHits
Expand All @@ -35,10 +36,20 @@ func (cb *CircularBuffer) Allow(s string) bool {
func (cb *CircularBuffer) Close() {
}

// Delta returns the diffence between the current and the oldest value in
// the buffer, i.e. maxHits / Delta() => rate
func (cb *CircularBuffer) Delta(s string) time.Duration {
return cb.delta()
}

// Resize resizes the circular buffer to the given size. Resizing to a size
// <= 0 is not performed
func (cb *CircularBuffer) Resize(s string, n int) {
cb.Lock()
cb.resize(n)
cb.Unlock()
}

// ClientRateLimiter implements the RateLimiter interface and does
// rate limiting based on the the String passed to Allow(). This can
// be used to limit per client calls to the backend. For example you
Expand Down Expand Up @@ -86,6 +97,8 @@ func (rl *ClientRateLimiter) Allow(s string) bool {
return present
}

// Delta returns the diffence between the current and the oldest value in
// the buffer, i.e. maxHits / Delta() => rate
func (rl *ClientRateLimiter) Delta(s string) time.Duration {
rl.RLock()
if _, present := rl.bag[s]; !present {
Expand All @@ -97,6 +110,20 @@ func (rl *ClientRateLimiter) Delta(s string) time.Duration {
return delta
}

// Resize resizes the given circular buffer to the given size. Resizing to a size
// <= 0 is not performed
func (rl *ClientRateLimiter) Resize(s string, n int) {
rl.RLock()
if _, present := rl.bag[s]; !present {
rl.RUnlock()
return
}
rl.RUnlock()
rl.Lock()
rl.bag[s].resize(n)
rl.Unlock()
}

// DeleteOld removes old entries from state bag
func (rl *ClientRateLimiter) DeleteOld() {
rl.Lock()
Expand Down

0 comments on commit 74fd0df

Please sign in to comment.