Skip to content

Commit

Permalink
Merge pull request #154 from marcoferrer/remove-time-after-usage
Browse files Browse the repository at this point in the history
Fixes for blocking limiter and perf improvements
  • Loading branch information
platinummonkey committed May 13, 2024
2 parents 10abbc5 + 36db35d commit be996e0
Show file tree
Hide file tree
Showing 14 changed files with 196 additions and 103 deletions.
16 changes: 16 additions & 0 deletions core/metric_registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,17 @@ type CommonMetricSampler struct {
InFlightListener MetricSampleListener
}

// NewCommonMetricSamplerOrNil will only create a new CommonMetricSampler if a valid registry is supplied
func NewCommonMetricSamplerOrNil(registry MetricRegistry, limit Limit, name string, tags ...string) *CommonMetricSampler {
if registry == nil {
return nil
}
if _, ok := registry.(*EmptyMetricRegistry); ok {
return nil
}
return NewCommonMetricSampler(registry, limit, name, tags...)
}

// NewCommonMetricSampler will create a new CommonMetricSampler that will auto-instrument metrics
func NewCommonMetricSampler(registry MetricRegistry, limit Limit, name string, tags ...string) *CommonMetricSampler {
if registry == nil {
Expand All @@ -124,6 +135,11 @@ func NewCommonMetricSampler(registry MetricRegistry, limit Limit, name string, t

// Sample will sample the current sample for metric reporting.
func (s *CommonMetricSampler) Sample(rtt int64, inFlight int, didDrop bool) {
// from noop metrics registry
if s == nil {
return
}

if didDrop {
s.DropCounterListener.AddSample(1.0)
}
Expand Down
5 changes: 3 additions & 2 deletions limit/aimd.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,10 @@ package limit

import (
"fmt"
"github.com/platinummonkey/go-concurrency-limits/core"
"math"
"sync"

"github.com/platinummonkey/go-concurrency-limits/core"
)

// AIMDLimit implements a Loss based dynamic Limit that does an additive increment as long as there are no errors and a
Expand Down Expand Up @@ -55,7 +56,7 @@ func NewAIMDLimit(
listeners: make([]core.LimitChangeListener, 0),
registry: registry,
}
l.commonSampler = core.NewCommonMetricSampler(registry, l, name, tags...)
l.commonSampler = core.NewCommonMetricSamplerOrNil(registry, l, name, tags...)
return l
}

Expand Down
3 changes: 2 additions & 1 deletion limit/fixed.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package limit

import (
"fmt"

"github.com/platinummonkey/go-concurrency-limits/core"
)

Expand All @@ -26,7 +27,7 @@ func NewFixedLimit(name string, limit int, registry core.MetricRegistry, tags ..
limit: limit,
registry: registry,
}
l.commonSampler = core.NewCommonMetricSampler(registry, l, name, tags...)
l.commonSampler = core.NewCommonMetricSamplerOrNil(registry, l, name, tags...)
return l
}

Expand Down
2 changes: 1 addition & 1 deletion limit/gradient.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ func NewGradientLimitWithRegistry(
queueSizeSampleListener: registry.RegisterDistribution(core.PrefixMetricWithName(core.MetricWindowQueueSize, name), tags...),
}

l.commonSampler = core.NewCommonMetricSampler(registry, l, name, tags...)
l.commonSampler = core.NewCommonMetricSamplerOrNil(registry, l, name, tags...)
return l
}

Expand Down
2 changes: 1 addition & 1 deletion limit/gradient2.go
Original file line number Diff line number Diff line change
Expand Up @@ -156,7 +156,7 @@ func NewGradient2Limit(
registry: registry,
}

l.commonSampler = core.NewCommonMetricSampler(registry, l, name, tags...)
l.commonSampler = core.NewCommonMetricSamplerOrNil(registry, l, name, tags...)

return l, nil
}
Expand Down
17 changes: 7 additions & 10 deletions limit/settable.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package limit
import (
"fmt"
"sync"
"sync/atomic"

"github.com/platinummonkey/go-concurrency-limits/core"
)
Expand Down Expand Up @@ -30,15 +31,13 @@ func NewSettableLimit(name string, limit int, registry core.MetricRegistry, tags
limit: int32(limit),
listeners: make([]core.LimitChangeListener, 0),
}
l.commonSampler = core.NewCommonMetricSampler(registry, l, name, tags...)
l.commonSampler = core.NewCommonMetricSamplerOrNil(registry, l, name, tags...)
return l
}

// EstimatedLimit will return the estimated limit.
func (l *SettableLimit) EstimatedLimit() int {
l.mu.RLock()
defer l.mu.RUnlock()
return int(l.limit)
return int(atomic.LoadInt32(&l.limit))
}

// NotifyOnChange will register a callback to receive notification whenever the limit is updated to a new value.
Expand All @@ -50,6 +49,8 @@ func (l *SettableLimit) NotifyOnChange(consumer core.LimitChangeListener) {

// notifyListeners will call the callbacks on limit changes
func (l *SettableLimit) notifyListeners(newLimit int) {
l.mu.Lock()
defer l.mu.Unlock()
for _, listener := range l.listeners {
listener(newLimit)
}
Expand All @@ -63,14 +64,10 @@ func (l *SettableLimit) OnSample(startTime int64, rtt int64, inFlight int, didDr

// SetLimit will update the current limit.
func (l *SettableLimit) SetLimit(limit int) {
l.mu.Lock()
l.limit = int32(limit)
atomic.StoreInt32(&l.limit, int32(limit))
l.notifyListeners(limit)
l.mu.Unlock()
}

func (l *SettableLimit) String() string {
l.mu.RLock()
defer l.mu.RUnlock()
return fmt.Sprintf("SettableLimit{limit=%d}", l.limit)
return fmt.Sprintf("SettableLimit{limit=%d}", atomic.LoadInt32(&l.limit))
}
2 changes: 1 addition & 1 deletion limit/traced.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,6 @@ func (l *TracedLimit) OnSample(startTime int64, rtt int64, inFlight int, didDrop
l.limit.OnSample(startTime, rtt, inFlight, didDrop)
}

func (l TracedLimit) String() string {
func (l *TracedLimit) String() string {
return fmt.Sprintf("TracedLimit{limit=%v, logger=%v}", l.limit, l.logger)
}
2 changes: 1 addition & 1 deletion limit/vegas.go
Original file line number Diff line number Diff line change
Expand Up @@ -184,7 +184,7 @@ func NewVegasLimitWithRegistry(
logger: logger,
}

l.commonSampler = core.NewCommonMetricSampler(registry, l, name, tags...)
l.commonSampler = core.NewCommonMetricSamplerOrNil(registry, l, name, tags...)
return l
}

Expand Down
2 changes: 1 addition & 1 deletion limit/windowed.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ func NewWindowedLimit(
listeners: make([]core.LimitChangeListener, 0),
registry: registry,
}
l.commonSampler = core.NewCommonMetricSampler(registry, l, name, tags...)
l.commonSampler = core.NewCommonMetricSamplerOrNil(registry, l, name, tags...)
return l, nil

}
Expand Down
106 changes: 35 additions & 71 deletions limiter/blocking.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,74 +10,41 @@ import (
"github.com/platinummonkey/go-concurrency-limits/limit"
)

const longBlockingTimeout = time.Hour * 24 * 30 * 12 * 100 // 100 years
// blockUntilSignaled will wait for context cancellation, an unblock signal or timeout
// This method will return true if we were successfully signalled.
func blockUntilSignaled(ctx context.Context, c *sync.Cond, timeout time.Duration) bool {
ready := make(chan struct{})

// timeoutWaiter will wait for a timeout or unblock signal
type timeoutWaiter struct {
timeoutSig chan struct{}
closerSig chan struct{}
c *sync.Cond
once sync.Once
timeout time.Duration
}

func newTimeoutWaiter(c *sync.Cond, timeout time.Duration) *timeoutWaiter {
return &timeoutWaiter{
timeoutSig: make(chan struct{}),
closerSig: make(chan struct{}),
c: c,
timeout: timeout,
}
}

func (w *timeoutWaiter) start() {
// start two routines, one runner to signal, another blocking to wait and call unblock
var wg sync.WaitGroup
wg.Add(2)
go func() {
wg.Done()
w.run()
c.L.Lock()
defer c.L.Unlock()
close(ready)
}()
go func() {
wg.Done()
w.c.L.Lock()
w.c.Wait()
w.c.L.Unlock()
w.unblock()
}()
wg.Wait()
}

func (w *timeoutWaiter) run() {
if w.timeout > 0 {
if timeout > 0 {
// use NewTimer over time.After so that we don't have to
// wait for the timeout to elapse in order to release memory
timer := time.NewTimer(timeout)
defer timer.Stop()

select {
case <-w.closerSig:
close(w.timeoutSig)
return
case <-time.After(w.timeout):
// call unblock
close(w.timeoutSig)
return
case <-ctx.Done():
return false
case <-ready:
return true
case <-timer.C:
return false
}
}

select {
case <-w.closerSig:
close(w.timeoutSig)
return
case <-ctx.Done():
return false
case <-ready:
return true
}
}

func (w *timeoutWaiter) unblock() {
w.once.Do(func() {
close(w.closerSig)
})
}

// wait blocks until we've timed out
func (w *timeoutWaiter) wait() <-chan struct{} {
return w.timeoutSig
}

// BlockingLimiter implements a Limiter that blocks the caller when the limit has been reached. The caller is
// blocked until the limiter has been released. This limiter is commonly used in batch clients that use the limiter
// as a back-pressure mechanism.
Expand All @@ -95,8 +62,8 @@ func NewBlockingLimiter(
logger limit.Logger,
) *BlockingLimiter {
mu := sync.Mutex{}
if timeout <= 0 {
timeout = longBlockingTimeout
if timeout < 0 {
timeout = 0
}
if logger == nil {
logger = limit.NoopLimitLogger{}
Expand All @@ -111,37 +78,34 @@ func NewBlockingLimiter(

// tryAcquire will block when attempting to acquire a token
func (l *BlockingLimiter) tryAcquire(ctx context.Context) (core.Listener, bool) {

for {
l.c.L.Lock()
// if the context has already been cancelled, fail quickly
if err := ctx.Err(); err != nil {
l.logger.Debugf("context cancelled ctx=%v", ctx)
l.c.L.Unlock()
return nil, false
}

// try to acquire a new token and return immediately if successful
listener, ok := l.delegate.Acquire(ctx)
if ok && listener != nil {
l.logger.Debugf("delegate returned a listener ctx=%v", ctx)
l.c.L.Unlock()
return listener, true
}
l.c.L.Unlock()

// We have reached the limit so block until:
// - A token is released
// - A timeout
// - The context is cancelled
timeoutWaiter := newTimeoutWaiter(l.c, l.timeout)
timeoutWaiter.start()
select {
case <-timeoutWaiter.wait():
l.logger.Debugf("blocking released, trying again to acquire ctx=%v", ctx)
case <-ctx.Done():
l.logger.Debugf("blocking released, context's has been cancelled ctx=%v", ctx)
return nil, false
l.logger.Debugf("Blocking waiting for release or timeout ctx=%v", ctx)
if shouldAcquire := blockUntilSignaled(ctx, l.c, l.timeout); shouldAcquire {
listener, ok := l.delegate.Acquire(ctx)
if ok && listener != nil {
l.logger.Debugf("delegate returned a listener ctx=%v", ctx)
return listener, true
}
}
l.logger.Debugf("blocking released, trying again to acquire ctx=%v", ctx)
}
}

Expand Down
Loading

0 comments on commit be996e0

Please sign in to comment.