Skip to content

Commit

Permalink
deprecate lifo, fifo queue limiters
Browse files Browse the repository at this point in the history
  • Loading branch information
marcoferrer committed Dec 28, 2021
1 parent f68c91e commit f45e8ae
Showing 1 changed file with 16 additions and 5 deletions.
21 changes: 16 additions & 5 deletions limiter/queue_blocking.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@ import (
"github.com/platinummonkey/go-concurrency-limits/core"
)

type EvictFunc func()

// QueueOrdering defines the pattern for ordering requests on Pool
type QueueOrdering string

Expand All @@ -34,7 +36,7 @@ func (e *queueElement) setListener(listener core.Listener) bool {
close(e.releaseChan)
return true
default:
// timeout has expired
// timeout has expired or context has been cancelled
return false
}
}
Expand Down Expand Up @@ -68,12 +70,16 @@ func (q *queue) len() uint64 {
return uint64(q.list.Len())
}

func (q *queue) push(ctx context.Context) (func(), <-chan core.Listener) {
func (q *queue) push(ctx context.Context) (EvictFunc, <-chan core.Listener) {
q.mu.Lock()
defer q.mu.Unlock()
releaseChan := make(chan core.Listener)

e := &queueElement{ctx: ctx, releaseChan: releaseChan}

// We always push to the front of the list regardless of
// queue order. As usage of the list will always assume
// Front == newest and Back == Oldest
listElement := q.list.PushFront(e)

return q.evictionFunc(listElement), releaseChan
Expand All @@ -88,7 +94,11 @@ func (q *queue) pop() *queueElement {
return ele
}

func (q *queue) peek() (func(), *queueElement) {
// peek will return the next queue element to process
// depending on the ordering configured in queue.
// The element returned is not evicted from the queue
// until EvictFunc is invoked
func (q *queue) peek() (EvictFunc, *queueElement) {
q.mu.RLock()
defer q.mu.RUnlock()
if q.list.Len() > 0 {
Expand All @@ -106,7 +116,7 @@ func (q *queue) peek() (func(), *queueElement) {
return nil, nil
}

// QueueBlockingListener implements a blocking listener for the QueueBlockingListener
// QueueBlockingListener implements a blocking listener for the QueueBlockingLimiter
type QueueBlockingListener struct {
delegateListener core.Listener
limiter *QueueBlockingLimiter
Expand Down Expand Up @@ -264,7 +274,8 @@ func (l *QueueBlockingLimiter) tryAcquire(ctx context.Context) core.Listener {
}

// Create a holder for a listener and block until a listener is released by another
// operation. Holders will be unblocked in LIFO order
// operation. Holders will be unblocked in LIFO or FIFO order depending on whatever
// ordering was configured when backlog was instantiated
evict, eventReleaseChan := l.backlog.push(ctx)

// We're using a nil chan so that we
Expand Down

0 comments on commit f45e8ae

Please sign in to comment.