Skip to content
This repository has been archived by the owner on Oct 17, 2018. It is now read-only.

Commit

Permalink
Merge b3724dc into f2ea40e
Browse files Browse the repository at this point in the history
  • Loading branch information
fishie9 committed Jul 19, 2018
2 parents f2ea40e + b3724dc commit 92ae7cc
Show file tree
Hide file tree
Showing 26 changed files with 172 additions and 468 deletions.
20 changes: 12 additions & 8 deletions consumer/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,19 +30,23 @@ import (

// Configuration configs the consumer options.
type Configuration struct {
EncodeDecoder *proto.EncodeDecoderConfiguration `yaml:"encodeDecoder"`
MessagePool *pool.ObjectPoolConfiguration `yaml:"messagePool"`
AckFlushInterval *time.Duration `yaml:"ackFlushInterval"`
AckBufferSize *int `yaml:"ackBufferSize"`
ConnectionWriteBufferSize *int `yaml:"connectionWriteBufferSize"`
ConnectionReadBufferSize *int `yaml:"connectionReadBufferSize"`
Encoder *proto.BaseConfiguration `yaml:"encoder"`
Decoder *proto.BaseConfiguration `yaml:"decoder"`
MessagePool *pool.ObjectPoolConfiguration `yaml:"messagePool"`
AckFlushInterval *time.Duration `yaml:"ackFlushInterval"`
AckBufferSize *int `yaml:"ackBufferSize"`
ConnectionWriteBufferSize *int `yaml:"connectionWriteBufferSize"`
ConnectionReadBufferSize *int `yaml:"connectionReadBufferSize"`
}

// NewOptions creates consumer options.
func (c *Configuration) NewOptions(iOpts instrument.Options) Options {
opts := NewOptions().SetInstrumentOptions(iOpts)
if c.EncodeDecoder != nil {
opts = opts.SetEncodeDecoderOptions(c.EncodeDecoder.NewEncodeDecoderOptions(iOpts))
if c.Encoder != nil {
opts = opts.SetEncoderOptions(c.Encoder.NewBaseOptions(iOpts))
}
if c.Decoder != nil {
opts = opts.SetDecoderOptions(c.Decoder.NewBaseOptions(iOpts))
}
if c.MessagePool != nil {
opts = opts.SetMessagePoolOptions(c.MessagePool.NewObjectPoolOptions(iOpts))
Expand Down
4 changes: 2 additions & 2 deletions consumer/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,10 +111,10 @@ func newConsumer(
return &consumer{
opts: opts,
mPool: mPool,
encoder: proto.NewEncoder(opts.EncodeDecoderOptions().EncoderOptions()),
encoder: proto.NewEncoder(opts.EncoderOptions()),
decoder: proto.NewDecoder(
bufio.NewReaderSize(conn, opts.ConnectionReadBufferSize()),
opts.EncodeDecoderOptions().DecoderOptions(),
opts.DecoderOptions(),
),
w: bufio.NewWriterSize(conn, opts.ConnectionWriteBufferSize()),
conn: conn,
Expand Down
2 changes: 1 addition & 1 deletion consumer/consumer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -405,7 +405,7 @@ func testProduceAndReceiveAck(t *testing.T, testMsg msgpb.Message, l Listener, o

m.Ack()
var ack msgpb.Ack
err = proto.NewDecoder(conn, opts.EncodeDecoderOptions().DecoderOptions()).Decode(&ack)
err = proto.NewDecoder(conn, opts.DecoderOptions()).Decode(&ack)
require.NoError(t, err)
require.Equal(t, 1, len(ack.Metadata))
require.Equal(t, testMsg.Metadata, ack.Metadata[0])
Expand Down
24 changes: 18 additions & 6 deletions consumer/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,8 @@ var (
)

type options struct {
encdecOptions proto.EncodeDecoderOptions
encOptions proto.BaseOptions
decOptions proto.BaseOptions
messagePoolOpts pool.ObjectPoolOptions
ackFlushInterval time.Duration
ackBufferSize int
Expand All @@ -48,7 +49,8 @@ type options struct {
// NewOptions creates a new options.
func NewOptions() Options {
return &options{
encdecOptions: proto.NewEncodeDecoderOptions(),
encOptions: proto.NewBaseOptions(),
decOptions: proto.NewBaseOptions(),
messagePoolOpts: pool.NewObjectPoolOptions(),
ackFlushInterval: defaultAckFlushInterval,
ackBufferSize: defaultAckBufferSize,
Expand All @@ -58,13 +60,23 @@ func NewOptions() Options {
}
}

func (opts *options) EncodeDecoderOptions() proto.EncodeDecoderOptions {
return opts.encdecOptions
func (opts *options) EncoderOptions() proto.BaseOptions {
return opts.encOptions
}

func (opts *options) SetEncodeDecoderOptions(value proto.EncodeDecoderOptions) Options {
func (opts *options) SetEncoderOptions(value proto.BaseOptions) Options {
o := *opts
o.encdecOptions = value
o.encOptions = value
return &o
}

func (opts *options) DecoderOptions() proto.BaseOptions {
return opts.decOptions
}

func (opts *options) SetDecoderOptions(value proto.BaseOptions) Options {
o := *opts
o.decOptions = value
return &o
}

Expand Down
2 changes: 1 addition & 1 deletion consumer/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ func TestConsumerServer(t *testing.T) {
require.Equal(t, testMsg1.Value, bytes)

var ack msgpb.Ack
testDecoder := proto.NewDecoder(conn, opts.ConsumerOptions().EncodeDecoderOptions().DecoderOptions())
testDecoder := proto.NewDecoder(conn, opts.ConsumerOptions().DecoderOptions())
err = testDecoder.Decode(&ack)
require.NoError(t, err)
require.Equal(t, 1, len(ack.Metadata))
Expand Down
14 changes: 10 additions & 4 deletions consumer/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,11 +66,17 @@ type Listener interface {

// Options configs the consumer listener.
type Options interface {
// EncodeDecoderOptions returns the options for EncodeDecoder.
EncodeDecoderOptions() proto.EncodeDecoderOptions
// EncoderOptions returns the options for Encoder.
EncoderOptions() proto.BaseOptions

// SetEncodeDecoderOptions sets the options for EncodeDecoder.
SetEncodeDecoderOptions(value proto.EncodeDecoderOptions) Options
// SetEncoderOptions sets the options for Encoder.
SetEncoderOptions(value proto.BaseOptions) Options

// DecoderOptions returns the options for Decoder.
DecoderOptions() proto.BaseOptions

// SetDecoderOptions sets the options for Decoder.
SetDecoderOptions(value proto.BaseOptions) Options

// MessagePoolOptions returns the options for message pool.
MessagePoolOptions() pool.ObjectPoolOptions
Expand Down
16 changes: 4 additions & 12 deletions glide.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

12 changes: 8 additions & 4 deletions producer/config/writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ type WriterConfiguration struct {
TopicName string `yaml:"topicName" validate:"nonzero"`
TopicServiceOverride kv.OverrideConfiguration `yaml:"topicServiceOverride"`
TopicWatchInitTimeout *time.Duration `yaml:"topicWatchInitTimeout"`
PlacementServiceOverride services.OverrideConfiguration `yaml:"placementServiceOverride"`
PlacementServiceOverride services.OverrideConfiguration `yaml:"placementServiceOverride"`
PlacementWatchInitTimeout *time.Duration `yaml:"placementWatchInitTimeout"`
MessagePool *pool.ObjectPoolConfiguration `yaml:"messagePool"`
MessageRetry *retry.Configuration `yaml:"messageRetry"`
Expand All @@ -91,7 +91,8 @@ type WriterConfiguration struct {
InitialAckMapSize *int `yaml:"initialAckMapSize"`
CloseCheckInterval *time.Duration `yaml:"closeCheckInterval"`
AckErrorRetry *retry.Configuration `yaml:"ackErrorRetry"`
EncodeDecoder *proto.EncodeDecoderConfiguration `yaml:"encodeDecoder"`
Encoder *proto.BaseConfiguration `yaml:"encoder"`
Decoder *proto.BaseConfiguration `yaml:"decoder"`
Connection *ConnectionConfiguration `yaml:"connection"`
}

Expand Down Expand Up @@ -149,8 +150,11 @@ func (c *WriterConfiguration) NewOptions(
if c.AckErrorRetry != nil {
opts = opts.SetAckErrorRetryOptions(c.AckErrorRetry.NewOptions(iOpts.MetricsScope()))
}
if c.EncodeDecoder != nil {
opts = opts.SetEncodeDecoderOptions(c.EncodeDecoder.NewEncodeDecoderOptions(iOpts))
if c.Encoder != nil {
opts = opts.SetEncoderOptions(c.Encoder.NewBaseOptions(iOpts))
}
if c.Decoder != nil {
opts = opts.SetDecoderOptions(c.Decoder.NewBaseOptions(iOpts))
}
if c.Connection != nil {
opts = opts.SetConnectionOptions(c.Connection.NewOptions(iOpts))
Expand Down
21 changes: 9 additions & 12 deletions producer/writer/consumer_service_writer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ func TestConsumerServiceWriterWithSharedConsumerWithNonShardedPlacement(t *testi

wg.Add(1)
go func() {
testConsumeAndAckOnConnectionListener(t, lis, opts.EncodeDecoderOptions())
testConsumeAndAckOnConnectionListener(t, lis, opts.EncoderOptions(), opts.DecoderOptions())
wg.Done()
}()

Expand Down Expand Up @@ -249,7 +249,7 @@ func TestConsumerServiceWriterWithSharedConsumerWithShardedPlacement(t *testing.

wg.Add(1)
go func() {
testConsumeAndAckOnConnectionListener(t, lis, opts.EncodeDecoderOptions())
testConsumeAndAckOnConnectionListener(t, lis, opts.EncoderOptions(), opts.DecoderOptions())
wg.Done()
}()

Expand Down Expand Up @@ -389,13 +389,13 @@ func TestConsumerServiceWriterWithReplicatedConsumerWithShardedPlacement(t *test
var wg sync.WaitGroup
wg.Add(1)
go func() {
testConsumeAndAckOnConnectionListener(t, lis1, opts.EncodeDecoderOptions())
testConsumeAndAckOnConnectionListener(t, lis1, opts.EncoderOptions(), opts.DecoderOptions())
wg.Done()
}()

wg.Add(1)
go func() {
testConsumeAndAckOnConnectionListener(t, lis2, opts.EncodeDecoderOptions())
testConsumeAndAckOnConnectionListener(t, lis2, opts.EncoderOptions(), opts.DecoderOptions())
wg.Done()
}()
wg.Wait()
Expand Down Expand Up @@ -443,24 +443,21 @@ func TestConsumerServiceWriterWithReplicatedConsumerWithShardedPlacement(t *test
if err != nil {
return
}

server := proto.NewEncodeDecoder(
conn,
opts.EncodeDecoderOptions(),
)
serverEncoder := proto.NewEncoder(opts.EncoderOptions())
serverDecoder := proto.NewDecoder(conn, opts.DecoderOptions())

var msg msgpb.Message
err = server.Decode(&msg)
err = serverDecoder.Decode(&msg)
if err != nil {
conn.Close()
continue
}
require.NoError(t, server.Encode(&msgpb.Ack{
require.NoError(t, serverEncoder.Encode(&msgpb.Ack{
Metadata: []msgpb.Metadata{
msg.Metadata,
},
}))
_, err = conn.Write(server.Bytes())
_, err = conn.Write(serverEncoder.Bytes())
require.NoError(t, err)
conn.Close()
}
Expand Down
2 changes: 1 addition & 1 deletion producer/writer/consumer_writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,7 @@ func newConsumerWriter(
)
)
w := &consumerWriterImpl{
decoder: proto.NewDecoder(rw, opts.EncodeDecoderOptions().DecoderOptions()),
decoder: proto.NewDecoder(rw, opts.DecoderOptions()),
addr: addr,
router: router,
opts: opts,
Expand Down
25 changes: 12 additions & 13 deletions producer/writer/consumer_writer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ func TestNewConsumerWriter(t *testing.T) {

wg.Add(1)
go func() {
testConsumeAndAckOnConnectionListener(t, lis, opts.EncodeDecoderOptions())
testConsumeAndAckOnConnectionListener(t, lis, opts.EncoderOptions(), opts.DecoderOptions())
wg.Done()
}()

Expand Down Expand Up @@ -271,7 +271,7 @@ func TestAutoReset(t *testing.T) {
defer serverConn.Close()

go func() {
testConsumeAndAckOnConnection(t, serverConn, opts.EncodeDecoderOptions())
testConsumeAndAckOnConnection(t, serverConn, opts.EncoderOptions(), opts.DecoderOptions())
}()

w.connectFn = func(addr string) (io.ReadWriteCloser, error) {
Expand Down Expand Up @@ -382,36 +382,35 @@ func testConnectionOptions() ConnectionOptions {
func testConsumeAndAckOnConnection(
t *testing.T,
conn net.Conn,
opts proto.EncodeDecoderOptions,
encOpts proto.BaseOptions,
decOpts proto.BaseOptions,
) {
server := proto.NewEncodeDecoder(
conn,
opts,
)

serverEncoder := proto.NewEncoder(encOpts)
serverDecoder := proto.NewDecoder(conn, decOpts)
var msg msgpb.Message
assert.NoError(t, server.Decode(&msg))
assert.NoError(t, serverDecoder.Decode(&msg))

err := server.Encode(&msgpb.Ack{
err := serverEncoder.Encode(&msgpb.Ack{
Metadata: []msgpb.Metadata{
msg.Metadata,
},
})
assert.NoError(t, err)
_, err = conn.Write(server.Bytes())
_, err = conn.Write(serverEncoder.Bytes())
assert.NoError(t, err)
}

func testConsumeAndAckOnConnectionListener(
t *testing.T,
lis net.Listener,
opts proto.EncodeDecoderOptions,
encOpts proto.BaseOptions,
decOpts proto.BaseOptions,
) {
conn, err := lis.Accept()
require.NoError(t, err)
defer conn.Close()

testConsumeAndAckOnConnection(t, conn, opts)
testConsumeAndAckOnConnection(t, conn, encOpts, decOpts)
}

func testConsumerWriterMetrics() consumerWriterMetrics {
Expand Down
2 changes: 1 addition & 1 deletion producer/writer/message_writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -183,7 +183,7 @@ func newMessageWriter(
opts: opts,
retryOpts: opts.MessageRetryOptions(),
r: rand.New(rand.NewSource(nowFn().UnixNano())),
encoder: proto.NewEncoder(opts.EncodeDecoderOptions().EncoderOptions()),
encoder: proto.NewEncoder(opts.EncoderOptions()),
msgID: 0,
queue: list.New(),
acks: newAckHelper(opts.InitialAckMapSize()),
Expand Down
8 changes: 4 additions & 4 deletions producer/writer/message_writer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ func TestMessageWriterWithPooling(t *testing.T) {

wg.Add(1)
go func() {
testConsumeAndAckOnConnectionListener(t, lis, opts.EncodeDecoderOptions())
testConsumeAndAckOnConnectionListener(t, lis, opts.EncoderOptions(), opts.DecoderOptions())
wg.Done()
}()

Expand Down Expand Up @@ -188,7 +188,7 @@ func TestMessageWriterWithoutPooling(t *testing.T) {

wg.Add(1)
go func() {
testConsumeAndAckOnConnectionListener(t, lis, opts.EncodeDecoderOptions())
testConsumeAndAckOnConnectionListener(t, lis, opts.EncoderOptions(), opts.DecoderOptions())
wg.Done()
}()

Expand Down Expand Up @@ -302,7 +302,7 @@ func TestMessageWriterRetryWithoutPooling(t *testing.T) {

w.AddConsumerWriter(cw)
go func() {
testConsumeAndAckOnConnectionListener(t, lis, opts.EncodeDecoderOptions())
testConsumeAndAckOnConnectionListener(t, lis, opts.EncoderOptions(), opts.DecoderOptions())
}()

for {
Expand Down Expand Up @@ -362,7 +362,7 @@ func TestMessageWriterRetryWithPooling(t *testing.T) {

w.AddConsumerWriter(cw)
go func() {
testConsumeAndAckOnConnectionListener(t, lis, opts.EncodeDecoderOptions())
testConsumeAndAckOnConnectionListener(t, lis, opts.EncoderOptions(), opts.DecoderOptions())
}()

for {
Expand Down

0 comments on commit 92ae7cc

Please sign in to comment.