Skip to content

Commit

Permalink
more testing incoming
Browse files Browse the repository at this point in the history
Signed-off-by: Won Jun Jang <wjang@uber.com>
  • Loading branch information
black-adder committed Nov 13, 2017
1 parent 24586f8 commit 2b7fda5
Show file tree
Hide file tree
Showing 5 changed files with 86 additions and 41 deletions.
8 changes: 8 additions & 0 deletions cmd/collector/app/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ type options struct {
queueSize int
reportBusy bool
extraFormatTypes []string
getPriority func(item interface{}) int
}

// Option is a function that sets some option on StorageBuilder.
Expand Down Expand Up @@ -135,6 +136,13 @@ func (options) ExtraFormatTypes(extraFormatTypes []string) Option {
}
}

// GetPriority creates an Option that retrieves the priority of a span
func (options) GetPriority(getPriority func(item interface{}) int) Option {
return func(b *options) {
b.getPriority = getPriority
}
}

func (o options) apply(opts ...Option) options {
ret := options{}
for _, opt := range opts {
Expand Down
2 changes: 1 addition & 1 deletion cmd/collector/app/span_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ func newSpanProcessor(spanWriter spanstore.Writer, opts ...Option) *spanProcesso
droppedItemHandler := func(item interface{}) {
handlerMetrics.SpansDropped.Inc(1)
}
boundedQueue := queue.NewBoundedQueue(options.queueSize, droppedItemHandler)
boundedQueue := queue.NewBoundedQueue(options.queueSize, droppedItemHandler, queue.GetPriority(options.getPriority))

sp := spanProcessor{
queue: boundedQueue,
Expand Down
59 changes: 32 additions & 27 deletions pkg/queue/bounded_queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,36 +22,40 @@ import (
"github.com/uber/jaeger-lib/metrics"
)

const (
totalPriorities = 2
highPriority = 0
lowPriority = 1
)

// BoundedQueue implements a producer-consumer exchange similar to a ring buffer queue,
// where the queue is bounded and if it fills up due to slow consumers, the new items written by
// the producer force the earliest items to be dropped. The implementation is actually based on
// channels, with a special Reaper goroutine that wakes up when the queue is full and consumers
// the items from the top of the queue until its size drops back to maxSize
type BoundedQueue struct {
capacity int
size int32
onDroppedItem func(item interface{})
items []chan interface{}
stopCh chan struct{}
stopWG sync.WaitGroup
stopped int32
priorityLevels int
getPriority func(item interface{}) int
capacity int
size int32
onDroppedItem func(item interface{})
items []chan interface{}
stopCh chan struct{}
stopWG sync.WaitGroup
stopped int32
getPriority func(item interface{}) int
}

// NewBoundedQueue constructs the new queue of specified capacity, and with an optional
// callback for dropped items (e.g. useful to emit metrics).
func NewBoundedQueue(capacity int, onDroppedItem func(item interface{}), options ...Option) *BoundedQueue {
opts := applyOptions(options...)
bq := &BoundedQueue{
capacity: capacity,
onDroppedItem: onDroppedItem,
items: make([]chan interface{}, opts.priorityLevels),
stopCh: make(chan struct{}),
priorityLevels: opts.priorityLevels,
getPriority: opts.getPriority,
capacity: capacity,
onDroppedItem: onDroppedItem,
items: make([]chan interface{}, totalPriorities),
stopCh: make(chan struct{}),
getPriority: opts.getPriority,
}
for i := 0; i < opts.priorityLevels; i++ {
for i := 0; i < totalPriorities; i++ {
bq.items[i] = make(chan interface{}, capacity)
}
return bq
Expand All @@ -69,18 +73,19 @@ func (q *BoundedQueue) StartConsumers(num int, consumer func(item interface{}))
defer q.stopWG.Done()
for {
select {
case item := <-q.items[highPriority]:
atomic.AddInt32(&q.size, -1)
consumer(item)
continue
default:
}
select {
case item := <-q.items[lowPriority]:
atomic.AddInt32(&q.size, -1)
consumer(item)
continue
case <-q.stopCh:
return
default:
for _, items := range q.items {
select {
case item := <-items:
atomic.AddInt32(&q.size, -1)
consumer(item)
break
default:
}
}
}
}
}()
Expand Down Expand Up @@ -113,7 +118,7 @@ func (q *BoundedQueue) Stop() {
atomic.StoreInt32(&q.stopped, 1) // disable producer
close(q.stopCh)
q.stopWG.Wait()
for i := 0; i < q.priorityLevels; i++ {
for i := 0; i < totalPriorities; i++ {
close(q.items[i])
}
}
Expand Down
15 changes: 2 additions & 13 deletions pkg/queue/bounded_queue_options.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,15 +19,7 @@ type Option func(c *Options)

// Options control behavior of the BoundedQueue.
type Options struct {
priorityLevels int
getPriority func(item interface{}) int
}

// PriorityLevels determines the number of different priority levels for the bounded queue.
func PriorityLevels(priorityLevels int) Option {
return func(o *Options) {
o.priorityLevels = priorityLevels
}
getPriority func(item interface{}) int
}

// GetPriority determines the priority level of a item.
Expand All @@ -42,12 +34,9 @@ func applyOptions(opts ...Option) Options {
for _, opt := range opts {
opt(&o)
}
if o.priorityLevels == 0 {
o.priorityLevels = 1
}
if o.getPriority == nil {
o.getPriority = func(item interface{}) int {
return 0
return lowPriority
}
}
return o
Expand Down
43 changes: 43 additions & 0 deletions pkg/queue/bounded_queue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,49 @@ func TestBoundedQueue(t *testing.T) {
assert.False(t, q.Produce("x"), "cannot push to closed queue")
}

func TestBoundedQueueWithPriority(t *testing.T) {
mFact := metrics.NewLocalFactory(0)
counter := mFact.Counter("dropped", nil)

q := NewBoundedQueue(
1,
func(item interface{}) {
counter.Inc(1)
},
GetPriority(func(item interface{}) int {
if item.(string) == "a" {
return highPriority
} else {
return lowPriority
}
}),
)
assert.Equal(t, 1, q.Capacity())

consumerState := newConsumerState(t)

q.StartConsumers(1, func(item interface{}) {
consumerState.record(item.(string))
})

assert.True(t, q.Produce("b"))
assert.True(t, q.Produce("a"))

// now that consumers are unblocked, we can add more items
expected := map[string]bool{
"a": true,
"b": true,
}
for _, item := range []string{"d", "e", "f"} {
assert.True(t, q.Produce(item))
expected[item] = true
consumerState.assertConsumed(expected)
}

q.Stop()
assert.False(t, q.Produce("x"), "cannot push to closed queue")
}

type consumerState struct {
sync.Mutex
t *testing.T
Expand Down

0 comments on commit 2b7fda5

Please sign in to comment.