Skip to content
This repository has been archived by the owner on Oct 17, 2018. It is now read-only.

Commit

Permalink
Merge 7dd2a4f into 42fa97e
Browse files Browse the repository at this point in the history
  • Loading branch information
fishie9 committed Jul 20, 2018
2 parents 42fa97e + 7dd2a4f commit 2fb44bc
Show file tree
Hide file tree
Showing 31 changed files with 259 additions and 542 deletions.
20 changes: 12 additions & 8 deletions consumer/config.go
Expand Up @@ -30,19 +30,23 @@ import (

// Configuration configs the consumer options.
type Configuration struct {
EncodeDecoder *proto.EncodeDecoderConfiguration `yaml:"encodeDecoder"`
MessagePool *pool.ObjectPoolConfiguration `yaml:"messagePool"`
AckFlushInterval *time.Duration `yaml:"ackFlushInterval"`
AckBufferSize *int `yaml:"ackBufferSize"`
ConnectionWriteBufferSize *int `yaml:"connectionWriteBufferSize"`
ConnectionReadBufferSize *int `yaml:"connectionReadBufferSize"`
Encoder *proto.Configuration `yaml:"encoder"`
Decoder *proto.Configuration `yaml:"decoder"`
MessagePool *pool.ObjectPoolConfiguration `yaml:"messagePool"`
AckFlushInterval *time.Duration `yaml:"ackFlushInterval"`
AckBufferSize *int `yaml:"ackBufferSize"`
ConnectionWriteBufferSize *int `yaml:"connectionWriteBufferSize"`
ConnectionReadBufferSize *int `yaml:"connectionReadBufferSize"`
}

// NewOptions creates consumer options.
func (c *Configuration) NewOptions(iOpts instrument.Options) Options {
opts := NewOptions().SetInstrumentOptions(iOpts)
if c.EncodeDecoder != nil {
opts = opts.SetEncodeDecoderOptions(c.EncodeDecoder.NewEncodeDecoderOptions(iOpts))
if c.Encoder != nil {
opts = opts.SetEncoderOptions(c.Encoder.NewOptions(iOpts))
}
if c.Decoder != nil {
opts = opts.SetDecoderOptions(c.Decoder.NewOptions(iOpts))
}
if c.MessagePool != nil {
opts = opts.SetMessagePoolOptions(c.MessagePool.NewObjectPoolOptions(iOpts))
Expand Down
8 changes: 8 additions & 0 deletions consumer/config_test.go
Expand Up @@ -36,6 +36,10 @@ ackFlushInterval: 100ms
ackBufferSize: 100
connectionWriteBufferSize: 200
connectionReadBufferSize: 300
encoder:
maxMessageSize: 100
decoder:
maxMessageSize: 200
`

var cfg Configuration
Expand All @@ -47,5 +51,9 @@ connectionReadBufferSize: 300
require.Equal(t, 100, opts.AckBufferSize())
require.Equal(t, 200, opts.ConnectionWriteBufferSize())
require.Equal(t, 300, opts.ConnectionReadBufferSize())
require.Equal(t, 100, opts.EncoderOptions().MaxMessageSize())
require.NotNil(t, opts.EncoderOptions().BytesPool())
require.Equal(t, 200, opts.DecoderOptions().MaxMessageSize())
require.NotNil(t, opts.EncoderOptions().BytesPool())
require.Nil(t, opts.InstrumentOptions())
}
4 changes: 2 additions & 2 deletions consumer/consumer.go
Expand Up @@ -111,10 +111,10 @@ func newConsumer(
return &consumer{
opts: opts,
mPool: mPool,
encoder: proto.NewEncoder(opts.EncodeDecoderOptions().EncoderOptions()),
encoder: proto.NewEncoder(opts.EncoderOptions()),
decoder: proto.NewDecoder(
bufio.NewReaderSize(conn, opts.ConnectionReadBufferSize()),
opts.EncodeDecoderOptions().DecoderOptions(),
opts.DecoderOptions(),
),
w: bufio.NewWriterSize(conn, opts.ConnectionWriteBufferSize()),
conn: conn,
Expand Down
2 changes: 1 addition & 1 deletion consumer/consumer_test.go
Expand Up @@ -405,7 +405,7 @@ func testProduceAndReceiveAck(t *testing.T, testMsg msgpb.Message, l Listener, o

m.Ack()
var ack msgpb.Ack
err = proto.NewDecoder(conn, opts.EncodeDecoderOptions().DecoderOptions()).Decode(&ack)
err = proto.NewDecoder(conn, opts.DecoderOptions()).Decode(&ack)
require.NoError(t, err)
require.Equal(t, 1, len(ack.Metadata))
require.Equal(t, testMsg.Metadata, ack.Metadata[0])
Expand Down
24 changes: 18 additions & 6 deletions consumer/options.go
Expand Up @@ -36,7 +36,8 @@ var (
)

type options struct {
encdecOptions proto.EncodeDecoderOptions
encOptions proto.Options
decOptions proto.Options
messagePoolOpts pool.ObjectPoolOptions
ackFlushInterval time.Duration
ackBufferSize int
Expand All @@ -48,7 +49,8 @@ type options struct {
// NewOptions creates a new options.
func NewOptions() Options {
return &options{
encdecOptions: proto.NewEncodeDecoderOptions(),
encOptions: proto.NewOptions(),
decOptions: proto.NewOptions(),
messagePoolOpts: pool.NewObjectPoolOptions(),
ackFlushInterval: defaultAckFlushInterval,
ackBufferSize: defaultAckBufferSize,
Expand All @@ -58,13 +60,23 @@ func NewOptions() Options {
}
}

func (opts *options) EncodeDecoderOptions() proto.EncodeDecoderOptions {
return opts.encdecOptions
func (opts *options) EncoderOptions() proto.Options {
return opts.encOptions
}

func (opts *options) SetEncodeDecoderOptions(value proto.EncodeDecoderOptions) Options {
func (opts *options) SetEncoderOptions(value proto.Options) Options {
o := *opts
o.encdecOptions = value
o.encOptions = value
return &o
}

func (opts *options) DecoderOptions() proto.Options {
return opts.decOptions
}

func (opts *options) SetDecoderOptions(value proto.Options) Options {
o := *opts
o.decOptions = value
return &o
}

Expand Down
2 changes: 1 addition & 1 deletion consumer/server_test.go
Expand Up @@ -76,7 +76,7 @@ func TestConsumerServer(t *testing.T) {
require.Equal(t, testMsg1.Value, bytes)

var ack msgpb.Ack
testDecoder := proto.NewDecoder(conn, opts.ConsumerOptions().EncodeDecoderOptions().DecoderOptions())
testDecoder := proto.NewDecoder(conn, opts.ConsumerOptions().DecoderOptions())
err = testDecoder.Decode(&ack)
require.NoError(t, err)
require.Equal(t, 1, len(ack.Metadata))
Expand Down
14 changes: 10 additions & 4 deletions consumer/types.go
Expand Up @@ -66,11 +66,17 @@ type Listener interface {

// Options configs the consumer listener.
type Options interface {
// EncodeDecoderOptions returns the options for EncodeDecoder.
EncodeDecoderOptions() proto.EncodeDecoderOptions
// EncoderOptions returns the options for Encoder.
EncoderOptions() proto.Options

// SetEncodeDecoderOptions sets the options for EncodeDecoder.
SetEncodeDecoderOptions(value proto.EncodeDecoderOptions) Options
// SetEncoderOptions sets the options for Encoder.
SetEncoderOptions(value proto.Options) Options

// DecoderOptions returns the options for Decoder.
DecoderOptions() proto.Options

// SetDecoderOptions sets the options for Decoder.
SetDecoderOptions(value proto.Options) Options

// MessagePoolOptions returns the options for message pool.
MessagePoolOptions() pool.ObjectPoolOptions
Expand Down
16 changes: 4 additions & 12 deletions glide.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

74 changes: 37 additions & 37 deletions integration/integration_test.go
Expand Up @@ -50,8 +50,8 @@ func TestSharedConsumer(t *testing.T) {

for i := 1; i <= maxProducers; i++ {
s := newTestSetup(t, ctrl, i, []consumerServiceConfig{
consumerServiceConfig{ct: topic.Shared, isSharded: true, instances: 5, replicas: 2},
consumerServiceConfig{ct: topic.Shared, isSharded: false, instances: 5, replicas: 2},
{ct: topic.Shared, isSharded: true, instances: 5, replicas: 2},
{ct: topic.Shared, isSharded: false, instances: 5, replicas: 2},
})

s.Run(t, ctrl)
Expand All @@ -71,7 +71,7 @@ func TestReplicatedConsumer(t *testing.T) {

for i := 1; i <= maxProducers; i++ {
s := newTestSetup(t, ctrl, i, []consumerServiceConfig{
consumerServiceConfig{ct: topic.Replicated, isSharded: true, instances: 5, replicas: 2},
{ct: topic.Replicated, isSharded: true, instances: 5, replicas: 2},
})

s.Run(t, ctrl)
Expand All @@ -92,9 +92,9 @@ func TestSharedAndReplicatedConsumers(t *testing.T) {
for i := 1; i <= maxProducers; i++ {
for j := 1; j <= maxRF; j++ {
s := newTestSetup(t, ctrl, i, []consumerServiceConfig{
consumerServiceConfig{ct: topic.Shared, isSharded: true, instances: 5, replicas: j},
consumerServiceConfig{ct: topic.Shared, isSharded: false, instances: 5, replicas: j},
consumerServiceConfig{ct: topic.Replicated, isSharded: true, instances: 5, replicas: j},
{ct: topic.Shared, isSharded: true, instances: 5, replicas: j},
{ct: topic.Shared, isSharded: false, instances: 5, replicas: j},
{ct: topic.Replicated, isSharded: true, instances: 5, replicas: j},
})

s.Run(t, ctrl)
Expand All @@ -115,8 +115,8 @@ func TestSharedConsumerWithDeadInstance(t *testing.T) {

for i := 1; i <= maxProducers; i++ {
s := newTestSetup(t, ctrl, i, []consumerServiceConfig{
consumerServiceConfig{ct: topic.Shared, isSharded: true, instances: 5, replicas: 2},
consumerServiceConfig{ct: topic.Shared, isSharded: false, instances: 5, replicas: 2},
{ct: topic.Shared, isSharded: true, instances: 5, replicas: 2},
{ct: topic.Shared, isSharded: false, instances: 5, replicas: 2},
})

s.ScheduleOperations(
Expand Down Expand Up @@ -149,8 +149,8 @@ func TestSharedConsumerWithDeadConnection(t *testing.T) {

for i := 1; i <= maxProducers; i++ {
s := newTestSetup(t, ctrl, i, []consumerServiceConfig{
consumerServiceConfig{ct: topic.Shared, isSharded: true, instances: 5, replicas: 2},
consumerServiceConfig{ct: topic.Shared, isSharded: false, instances: 5, replicas: 2},
{ct: topic.Shared, isSharded: true, instances: 5, replicas: 2},
{ct: topic.Shared, isSharded: false, instances: 5, replicas: 2},
})

s.ScheduleOperations(
Expand Down Expand Up @@ -178,7 +178,7 @@ func TestReplicatedConsumerWithDeadConnection(t *testing.T) {

for i := 1; i <= maxProducers; i++ {
s := newTestSetup(t, ctrl, i, []consumerServiceConfig{
consumerServiceConfig{ct: topic.Replicated, isSharded: true, instances: 5, replicas: 2},
{ct: topic.Replicated, isSharded: true, instances: 5, replicas: 2},
})

s.ScheduleOperations(
Expand Down Expand Up @@ -207,9 +207,9 @@ func TestSharedAndReplicatedConsumerWithDeadConnection(t *testing.T) {
for i := 1; i <= maxProducers; i++ {
for j := 1; j <= maxRF; j++ {
s := newTestSetup(t, ctrl, i, []consumerServiceConfig{
consumerServiceConfig{ct: topic.Shared, isSharded: true, instances: 5, replicas: j},
consumerServiceConfig{ct: topic.Shared, isSharded: false, instances: 5, replicas: j},
consumerServiceConfig{ct: topic.Replicated, isSharded: true, instances: 5, replicas: j},
{ct: topic.Shared, isSharded: true, instances: 5, replicas: j},
{ct: topic.Shared, isSharded: false, instances: 5, replicas: j},
{ct: topic.Replicated, isSharded: true, instances: 5, replicas: j},
})

s.ScheduleOperations(
Expand Down Expand Up @@ -246,8 +246,8 @@ func TestSharedConsumerAddInstances(t *testing.T) {

for i := 1; i <= maxProducers; i++ {
s := newTestSetup(t, ctrl, i, []consumerServiceConfig{
consumerServiceConfig{ct: topic.Shared, isSharded: true, instances: 5, replicas: 2},
consumerServiceConfig{ct: topic.Shared, isSharded: false, instances: 5, replicas: 2},
{ct: topic.Shared, isSharded: true, instances: 5, replicas: 2},
{ct: topic.Shared, isSharded: false, instances: 5, replicas: 2},
})

s.ScheduleOperations(
Expand Down Expand Up @@ -275,7 +275,7 @@ func TestReplicatedConsumerAddInstances(t *testing.T) {

for i := 1; i <= maxProducers; i++ {
s := newTestSetup(t, ctrl, i, []consumerServiceConfig{
consumerServiceConfig{ct: topic.Replicated, isSharded: true, instances: 5, replicas: 2},
{ct: topic.Replicated, isSharded: true, instances: 5, replicas: 2},
})

s.ScheduleOperations(
Expand Down Expand Up @@ -304,9 +304,9 @@ func TestSharedAndReplicatedConsumerAddInstances(t *testing.T) {
for i := 1; i <= maxProducers; i++ {
for j := 1; j <= maxRF; j++ {
s := newTestSetup(t, ctrl, i, []consumerServiceConfig{
consumerServiceConfig{ct: topic.Shared, isSharded: true, instances: 5, replicas: j},
consumerServiceConfig{ct: topic.Shared, isSharded: false, instances: 5, replicas: j},
consumerServiceConfig{ct: topic.Replicated, isSharded: true, instances: 5, replicas: j},
{ct: topic.Shared, isSharded: true, instances: 5, replicas: j},
{ct: topic.Shared, isSharded: false, instances: 5, replicas: j},
{ct: topic.Replicated, isSharded: true, instances: 5, replicas: j},
})

s.ScheduleOperations(
Expand Down Expand Up @@ -343,8 +343,8 @@ func TestSharedConsumerRemoveInstances(t *testing.T) {

for i := 1; i <= maxProducers; i++ {
s := newTestSetup(t, ctrl, i, []consumerServiceConfig{
consumerServiceConfig{ct: topic.Shared, isSharded: true, instances: 5, replicas: 2},
consumerServiceConfig{ct: topic.Shared, isSharded: false, instances: 5, replicas: 2},
{ct: topic.Shared, isSharded: true, instances: 5, replicas: 2},
{ct: topic.Shared, isSharded: false, instances: 5, replicas: 2},
})

s.ScheduleOperations(
Expand Down Expand Up @@ -372,7 +372,7 @@ func TestReplicatedConsumerRemoveInstances(t *testing.T) {

for i := 1; i <= maxProducers; i++ {
s := newTestSetup(t, ctrl, i, []consumerServiceConfig{
consumerServiceConfig{ct: topic.Replicated, isSharded: true, instances: 5, replicas: 2},
{ct: topic.Replicated, isSharded: true, instances: 5, replicas: 2},
})

s.ScheduleOperations(
Expand Down Expand Up @@ -401,9 +401,9 @@ func TestSharedAndReplicatedConsumerRemoveInstances(t *testing.T) {
for i := 1; i <= maxProducers; i++ {
for j := 1; j <= maxRF; j++ {
s := newTestSetup(t, ctrl, i, []consumerServiceConfig{
consumerServiceConfig{ct: topic.Shared, isSharded: true, instances: 5, replicas: j},
consumerServiceConfig{ct: topic.Shared, isSharded: false, instances: 5, replicas: j},
consumerServiceConfig{ct: topic.Replicated, isSharded: true, instances: 5, replicas: j},
{ct: topic.Shared, isSharded: true, instances: 5, replicas: j},
{ct: topic.Shared, isSharded: false, instances: 5, replicas: j},
{ct: topic.Replicated, isSharded: true, instances: 5, replicas: j},
})

s.ScheduleOperations(
Expand Down Expand Up @@ -440,8 +440,8 @@ func TestSharedConsumerReplaceInstances(t *testing.T) {

for i := 1; i <= maxProducers; i++ {
s := newTestSetup(t, ctrl, i, []consumerServiceConfig{
consumerServiceConfig{ct: topic.Shared, isSharded: true, instances: 5, replicas: 2},
consumerServiceConfig{ct: topic.Shared, isSharded: false, instances: 5, replicas: 2},
{ct: topic.Shared, isSharded: true, instances: 5, replicas: 2},
{ct: topic.Shared, isSharded: false, instances: 5, replicas: 2},
})

s.ScheduleOperations(
Expand Down Expand Up @@ -469,7 +469,7 @@ func TestReplicatedConsumerReplaceInstances(t *testing.T) {

for i := 1; i <= maxProducers; i++ {
s := newTestSetup(t, ctrl, i, []consumerServiceConfig{
consumerServiceConfig{ct: topic.Replicated, isSharded: true, instances: 5, replicas: 2},
{ct: topic.Replicated, isSharded: true, instances: 5, replicas: 2},
})

s.ScheduleOperations(
Expand Down Expand Up @@ -498,9 +498,9 @@ func TestSharedAndReplicatedConsumerReplaceInstances(t *testing.T) {
for i := 1; i <= maxProducers; i++ {
for j := 1; j <= maxRF; j++ {
s := newTestSetup(t, ctrl, i, []consumerServiceConfig{
consumerServiceConfig{ct: topic.Shared, isSharded: true, instances: 5, replicas: j},
consumerServiceConfig{ct: topic.Shared, isSharded: false, instances: 5, replicas: j},
consumerServiceConfig{ct: topic.Replicated, isSharded: true, instances: 5, replicas: j},
{ct: topic.Shared, isSharded: true, instances: 5, replicas: j},
{ct: topic.Shared, isSharded: false, instances: 5, replicas: j},
{ct: topic.Replicated, isSharded: true, instances: 5, replicas: j},
})

s.ScheduleOperations(
Expand Down Expand Up @@ -537,9 +537,9 @@ func TestRemoveConsumerService(t *testing.T) {

for i := 1; i <= maxProducers; i++ {
s := newTestSetup(t, ctrl, i, []consumerServiceConfig{
consumerServiceConfig{ct: topic.Shared, isSharded: true, instances: 5, replicas: 2},
consumerServiceConfig{ct: topic.Shared, isSharded: false, instances: 1, replicas: 1},
consumerServiceConfig{ct: topic.Replicated, isSharded: true, instances: 5, replicas: 2},
{ct: topic.Shared, isSharded: true, instances: 5, replicas: 2},
{ct: topic.Shared, isSharded: false, instances: 1, replicas: 1},
{ct: topic.Replicated, isSharded: true, instances: 5, replicas: 2},
})

s.ScheduleOperations(
Expand All @@ -565,8 +565,8 @@ func TestAddConsumerService(t *testing.T) {

for i := 1; i <= maxProducers; i++ {
s := newTestSetup(t, ctrl, i, []consumerServiceConfig{
consumerServiceConfig{ct: topic.Shared, isSharded: true, instances: 5, replicas: 2},
consumerServiceConfig{ct: topic.Replicated, isSharded: true, instances: 5, replicas: 2},
{ct: topic.Shared, isSharded: true, instances: 5, replicas: 2},
{ct: topic.Replicated, isSharded: true, instances: 5, replicas: 2},
})

s.ScheduleOperations(
Expand Down

0 comments on commit 2fb44bc

Please sign in to comment.