Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Is go-disruptor support many producer? #9

Closed
ankisme opened this issue Oct 28, 2019 · 2 comments
Closed

Is go-disruptor support many producer? #9

ankisme opened this issue Oct 28, 2019 · 2 comments

Comments

@ankisme
Copy link

ankisme commented Oct 28, 2019

The example/main.go shows how to use go-disruptor when only have a producer.

How could I use go-disruptor to support many producer?

I try to use it like the code below, but the test blocked. It seems that many procuder needs many writers?

func TestGoDisruptorMultipleProducer(t *testing.T) {
	writer, reader := disruptor.New(
		disruptor.WithCapacity(BufferSize),
		disruptor.WithConsumerGroup(MyConsumer{}))

	for i := 1; i <= 100; i++ {
		go publish(writer, reader)
	}

	reader.Read()
}

func publish(writer disruptor.Writer, closer io.Closer) {
	for sequence := int64(0); sequence <= Iterations; {
		sequence = writer.Reserve(Reservations)

		for lower := sequence - Reservations + 1; lower <= sequence; lower++ {
			ringBuffer[lower&BufferMask] = lower
		}

		writer.Commit(sequence-Reservations+1, sequence)
	}
}

// ////////////////////

type MyConsumer struct{}

func (this MyConsumer) Consume(lower, upper int64) {
	for ; lower <= upper; lower++ {
		message := ringBuffer[lower&BufferMask]
		if message != lower {
			panic(fmt.Errorf("race condition: %d %d", message, lower))
		}

		fmt.Println(message)
	}
}

const (
	BufferSize   = 16
	BufferMask   = BufferSize - 1
	Iterations   = 128 * 1024 * 32
	Reservations = 1
)

var ringBuffer = [BufferSize]int64{}
@ankisme
Copy link
Author

ankisme commented Oct 28, 2019

The way I use it cause data race.

=== RUN   TestGoDisruptorMultipleProducer
==================
WARNING: DATA RACE
Read at 0x00c000070560 by goroutine 9:
  github.com/smartystreets-prototypes/go-disruptor.(*DefaultWriter).Reserve()
      D:/proj/mine/gopath/pkg/mod/github.com/smartystreets-prototypes/go-disruptor@v0.0.0-20191016010027-c1aa45f7f564/default_writer.go:28 +0x5c
  web/business/taskqueuepackage/test_test.publish()
      D:/proj/mine/gopath/src/gitee-server/business/taskqueuepackage/test/GoDisruptorMultipleProducer_test.go:29 +0xc6

Previous write at 0x00c000070560 by goroutine 8:
  github.com/smartystreets-prototypes/go-disruptor.(*DefaultWriter).Reserve()
      D:/proj/mine/gopath/pkg/mod/github.com/smartystreets-prototypes/go-disruptor@v0.0.0-20191016010027-c1aa45f7f564/default_writer.go:28 +0x78
  web/business/taskqueuepackage/test_test.publish()
      D:/proj/mine/gopath/src/gitee-server/business/taskqueuepackage/test/GoDisruptorMultipleProducer_test.go:29 +0xc6

Goroutine 9 (running) created at:
  web/business/taskqueuepackage/test_test.TestGoDisruptorMultipleProducer()
      D:/proj/mine/gopath/src/gitee-server/business/taskqueuepackage/test/GoDisruptorMultipleProducer_test.go:21 +0x18c
  testing.tRunner()
      D:/go/src/testing/testing.go:909 +0x1a0

Goroutine 8 (running) created at:
  web/business/taskqueuepackage/test_test.TestGoDisruptorMultipleProducer()
      D:/proj/mine/gopath/src/gitee-server/business/taskqueuepackage/test/GoDisruptorMultipleProducer_test.go:21 +0x18c
  testing.tRunner()
      D:/go/src/testing/testing.go:909 +0x1a0
==================
==================
WARNING: DATA RACE
Read at 0x00c000070568 by goroutine 9:
  github.com/smartystreets-prototypes/go-disruptor.(*DefaultWriter).Reserve()
      D:/proj/mine/gopath/pkg/mod/github.com/smartystreets-prototypes/go-disruptor@v0.0.0-20191016010027-c1aa45f7f564/default_writer.go:29 +0x137
  web/business/taskqueuepackage/test_test.publish()
      D:/proj/mine/gopath/src/gitee-server/business/taskqueuepackage/test/GoDisruptorMultipleProducer_test.go:29 +0xc6

Previous write at 0x00c000070568 by goroutine 8:
  github.com/smartystreets-prototypes/go-disruptor.(*DefaultWriter).Reserve()
      D:/proj/mine/gopath/pkg/mod/github.com/smartystreets-prototypes/go-disruptor@v0.0.0-20191016010027-c1aa45f7f564/default_writer.go:34 +0xce
  web/business/taskqueuepackage/test_test.publish()
      D:/proj/mine/gopath/src/gitee-server/business/taskqueuepackage/test/GoDisruptorMultipleProducer_test.go:29 +0xc6

Goroutine 9 (running) created at:
  web/business/taskqueuepackage/test_test.TestGoDisruptorMultipleProducer()
      D:/proj/mine/gopath/src/gitee-server/business/taskqueuepackage/test/GoDisruptorMultipleProducer_test.go:21 +0x18c
  testing.tRunner()
      D:/go/src/testing/testing.go:909 +0x1a0

Goroutine 8 (running) created at:
  web/business/taskqueuepackage/test_test.TestGoDisruptorMultipleProducer()
      D:/proj/mine/gopath/src/gitee-server/business/taskqueuepackage/test/GoDisruptorMultipleProducer_test.go:21 +0x18c
  testing.tRunner()
      D:/go/src/testing/testing.go:909 +0x1a0

And the DefaultWriter seems should be used in only a goroutine?

func (this *DefaultWriter) Reserve(count int64) int64 {
	if count <= 0 {
		panic(ErrMinimumReservationSize)
	}

	this.previous += count
	for spin := int64(0); this.previous-this.capacity > this.gate; spin++ {
		if spin&SpinMask == 0 {
			runtime.Gosched() // LockSupport.parkNanos(1L); http://bit.ly/1xiDINZ
		}

		this.gate = this.upstream.Load()
	}
	return this.previous
}

@joliver
Copy link
Contributor

joliver commented Nov 1, 2019

Right now it only supports a single producer.

If you want to do multiple producers, the only workaround is to do:

mutex.Lock()
// claim
// write
mutex.Unlock()

@joliver joliver closed this as completed Nov 1, 2019
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants