Skip to content

Commit

Permalink
Added additional tests; removed obsolete implementation of the Compos…
Browse files Browse the repository at this point in the history
…iteBarrier.
  • Loading branch information
Oliver, Jonathan committed Jun 3, 2014
1 parent ccfc455 commit 7778e97
Show file tree
Hide file tree
Showing 2 changed files with 62 additions and 6 deletions.
62 changes: 62 additions & 0 deletions benchmark-disruptor/writer_reservation_multiple_readers_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
package benchmarks

import (
"testing"

"github.com/smartystreets/go-disruptor"
)

func BenchmarkWriterReserveOneMultipleReaders(b *testing.B) {
ringBuffer := [RingBufferSize]int64{}
written, read1, read2 := disruptor.NewCursor(), disruptor.NewCursor(), disruptor.NewCursor()
reader1 := disruptor.NewReader(read1, written, written, SampleConsumer{&ringBuffer})
reader2 := disruptor.NewReader(read2, written, written, SampleConsumer{&ringBuffer})
barrier := disruptor.NewCompositeBarrier(read1, read2)
writer := disruptor.NewWriter(written, barrier, RingBufferSize)

reader1.Start()
reader2.Start()

iterations := int64(b.N)
b.ReportAllocs()
b.ResetTimer()

sequence := disruptor.InitialSequenceValue
for sequence < iterations {
sequence = writer.Reserve(ReserveOne)
ringBuffer[sequence&RingBufferMask] = sequence
writer.Commit(sequence, sequence)
}

reader1.Stop()
reader2.Stop()
}
func BenchmarkWriterReserveManyMultipleReaders(b *testing.B) {
ringBuffer := [RingBufferSize]int64{}
written, read1, read2 := disruptor.NewCursor(), disruptor.NewCursor(), disruptor.NewCursor()
reader1 := disruptor.NewReader(read1, written, written, SampleConsumer{&ringBuffer})
reader2 := disruptor.NewReader(read2, written, written, SampleConsumer{&ringBuffer})
barrier := disruptor.NewCompositeBarrier(read1, read2)
writer := disruptor.NewWriter(written, barrier, RingBufferSize)

reader1.Start()
reader2.Start()

iterations := int64(b.N)
b.ReportAllocs()
b.ResetTimer()

sequence := disruptor.InitialSequenceValue
for sequence < iterations {
sequence = writer.Reserve(ReserveMany)

for i := sequence - ReserveManyDelta; i <= sequence; i++ {
ringBuffer[i&RingBufferMask] = i
}

writer.Commit(sequence, sequence)
}

reader1.Stop()
reader2.Stop()
}
6 changes: 0 additions & 6 deletions composite_barrier.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,24 +2,18 @@ package disruptor

type CompositeBarrier []*Cursor

// type CompositeBarrier struct {
// cursors []*Cursor
// }

func NewCompositeBarrier(upstream ...*Cursor) CompositeBarrier {
if len(upstream) == 0 {
panic("At least one upstream cursor is required.")
}

cursors := make([]*Cursor, len(upstream))
copy(cursors, upstream)
// return &CompositeBarrier{cursors}
return CompositeBarrier(cursors)
}

func (this CompositeBarrier) Read(noop int64) int64 {
minimum := MaxSequenceValue
// for _, item := range this.cursors {
for _, item := range this {
sequence := item.Load()
if sequence < minimum {
Expand Down

0 comments on commit 7778e97

Please sign in to comment.