From b3724dc319dd3dcb1694d25839db22c3a9af1c01 Mon Sep 17 00:00:00 2001 From: Siyu Yang Date: Wed, 18 Jul 2018 14:25:28 -0400 Subject: [PATCH 1/6] Removing EncodeDecoder and refactoring --- consumer/config.go | 20 +++--- consumer/consumer.go | 4 +- consumer/consumer_test.go | 2 +- consumer/options.go | 24 +++++-- consumer/server_test.go | 2 +- consumer/types.go | 14 ++-- glide.lock | 16 ++--- producer/config/writer.go | 12 ++-- .../writer/consumer_service_writer_test.go | 21 +++--- producer/writer/consumer_writer.go | 2 +- producer/writer/consumer_writer_test.go | 25 ++++--- producer/writer/message_writer.go | 2 +- producer/writer/message_writer_test.go | 8 +-- producer/writer/options.go | 34 +++++++--- producer/writer/shard_writer_test.go | 17 ++--- producer/writer/writer_test.go | 12 ++-- protocol/proto/benchmark_test.go | 26 ------- protocol/proto/config.go | 29 +++----- protocol/proto/config_test.go | 19 ++---- protocol/proto/encode_decoder.go | 64 ------------------ protocol/proto/encode_decoder_pool.go | 48 ------------- protocol/proto/encode_decoder_pool_test.go | 49 -------------- protocol/proto/encode_decoder_test.go | 67 ------------------- protocol/proto/options.go | 44 ------------ protocol/proto/roundtrip_test.go | 34 ++++++++++ protocol/proto/types.go | 45 ------------- 26 files changed, 172 insertions(+), 468 deletions(-) delete mode 100644 protocol/proto/encode_decoder.go delete mode 100644 protocol/proto/encode_decoder_pool.go delete mode 100644 protocol/proto/encode_decoder_pool_test.go delete mode 100644 protocol/proto/encode_decoder_test.go diff --git a/consumer/config.go b/consumer/config.go index 262f556..1a13530 100644 --- a/consumer/config.go +++ b/consumer/config.go @@ -30,19 +30,23 @@ import ( // Configuration configs the consumer options. type Configuration struct { - EncodeDecoder *proto.EncodeDecoderConfiguration `yaml:"encodeDecoder"` - MessagePool *pool.ObjectPoolConfiguration `yaml:"messagePool"` - AckFlushInterval *time.Duration `yaml:"ackFlushInterval"` - AckBufferSize *int `yaml:"ackBufferSize"` - ConnectionWriteBufferSize *int `yaml:"connectionWriteBufferSize"` - ConnectionReadBufferSize *int `yaml:"connectionReadBufferSize"` + Encoder *proto.BaseConfiguration `yaml:"encoder"` + Decoder *proto.BaseConfiguration `yaml:"decoder"` + MessagePool *pool.ObjectPoolConfiguration `yaml:"messagePool"` + AckFlushInterval *time.Duration `yaml:"ackFlushInterval"` + AckBufferSize *int `yaml:"ackBufferSize"` + ConnectionWriteBufferSize *int `yaml:"connectionWriteBufferSize"` + ConnectionReadBufferSize *int `yaml:"connectionReadBufferSize"` } // NewOptions creates consumer options. func (c *Configuration) NewOptions(iOpts instrument.Options) Options { opts := NewOptions().SetInstrumentOptions(iOpts) - if c.EncodeDecoder != nil { - opts = opts.SetEncodeDecoderOptions(c.EncodeDecoder.NewEncodeDecoderOptions(iOpts)) + if c.Encoder != nil { + opts = opts.SetEncoderOptions(c.Encoder.NewBaseOptions(iOpts)) + } + if c.Decoder != nil { + opts = opts.SetDecoderOptions(c.Decoder.NewBaseOptions(iOpts)) } if c.MessagePool != nil { opts = opts.SetMessagePoolOptions(c.MessagePool.NewObjectPoolOptions(iOpts)) diff --git a/consumer/consumer.go b/consumer/consumer.go index 20f272b..0f7817f 100644 --- a/consumer/consumer.go +++ b/consumer/consumer.go @@ -111,10 +111,10 @@ func newConsumer( return &consumer{ opts: opts, mPool: mPool, - encoder: proto.NewEncoder(opts.EncodeDecoderOptions().EncoderOptions()), + encoder: proto.NewEncoder(opts.EncoderOptions()), decoder: proto.NewDecoder( bufio.NewReaderSize(conn, opts.ConnectionReadBufferSize()), - opts.EncodeDecoderOptions().DecoderOptions(), + opts.DecoderOptions(), ), w: bufio.NewWriterSize(conn, opts.ConnectionWriteBufferSize()), conn: conn, diff --git a/consumer/consumer_test.go b/consumer/consumer_test.go index 415ec53..e875896 100644 --- a/consumer/consumer_test.go +++ b/consumer/consumer_test.go @@ -405,7 +405,7 @@ func testProduceAndReceiveAck(t *testing.T, testMsg msgpb.Message, l Listener, o m.Ack() var ack msgpb.Ack - err = proto.NewDecoder(conn, opts.EncodeDecoderOptions().DecoderOptions()).Decode(&ack) + err = proto.NewDecoder(conn, opts.DecoderOptions()).Decode(&ack) require.NoError(t, err) require.Equal(t, 1, len(ack.Metadata)) require.Equal(t, testMsg.Metadata, ack.Metadata[0]) diff --git a/consumer/options.go b/consumer/options.go index 4ab6837..389122e 100644 --- a/consumer/options.go +++ b/consumer/options.go @@ -36,7 +36,8 @@ var ( ) type options struct { - encdecOptions proto.EncodeDecoderOptions + encOptions proto.BaseOptions + decOptions proto.BaseOptions messagePoolOpts pool.ObjectPoolOptions ackFlushInterval time.Duration ackBufferSize int @@ -48,7 +49,8 @@ type options struct { // NewOptions creates a new options. func NewOptions() Options { return &options{ - encdecOptions: proto.NewEncodeDecoderOptions(), + encOptions: proto.NewBaseOptions(), + decOptions: proto.NewBaseOptions(), messagePoolOpts: pool.NewObjectPoolOptions(), ackFlushInterval: defaultAckFlushInterval, ackBufferSize: defaultAckBufferSize, @@ -58,13 +60,23 @@ func NewOptions() Options { } } -func (opts *options) EncodeDecoderOptions() proto.EncodeDecoderOptions { - return opts.encdecOptions +func (opts *options) EncoderOptions() proto.BaseOptions { + return opts.encOptions } -func (opts *options) SetEncodeDecoderOptions(value proto.EncodeDecoderOptions) Options { +func (opts *options) SetEncoderOptions(value proto.BaseOptions) Options { o := *opts - o.encdecOptions = value + o.encOptions = value + return &o +} + +func (opts *options) DecoderOptions() proto.BaseOptions { + return opts.decOptions +} + +func (opts *options) SetDecoderOptions(value proto.BaseOptions) Options { + o := *opts + o.decOptions = value return &o } diff --git a/consumer/server_test.go b/consumer/server_test.go index 5d93618..3df81d2 100644 --- a/consumer/server_test.go +++ b/consumer/server_test.go @@ -76,7 +76,7 @@ func TestConsumerServer(t *testing.T) { require.Equal(t, testMsg1.Value, bytes) var ack msgpb.Ack - testDecoder := proto.NewDecoder(conn, opts.ConsumerOptions().EncodeDecoderOptions().DecoderOptions()) + testDecoder := proto.NewDecoder(conn, opts.ConsumerOptions().DecoderOptions()) err = testDecoder.Decode(&ack) require.NoError(t, err) require.Equal(t, 1, len(ack.Metadata)) diff --git a/consumer/types.go b/consumer/types.go index 060a126..e8a80b9 100644 --- a/consumer/types.go +++ b/consumer/types.go @@ -66,11 +66,17 @@ type Listener interface { // Options configs the consumer listener. type Options interface { - // EncodeDecoderOptions returns the options for EncodeDecoder. - EncodeDecoderOptions() proto.EncodeDecoderOptions + // EncoderOptions returns the options for Encoder. + EncoderOptions() proto.BaseOptions - // SetEncodeDecoderOptions sets the options for EncodeDecoder. - SetEncodeDecoderOptions(value proto.EncodeDecoderOptions) Options + // SetEncoderOptions sets the options for Encoder. + SetEncoderOptions(value proto.BaseOptions) Options + + // DecoderOptions returns the options for Decoder. + DecoderOptions() proto.BaseOptions + + // SetDecoderOptions sets the options for Decoder. + SetDecoderOptions(value proto.BaseOptions) Options // MessagePoolOptions returns the options for message pool. MessagePoolOptions() pool.ObjectPoolOptions diff --git a/glide.lock b/glide.lock index d4a5b41..25920c8 100644 --- a/glide.lock +++ b/glide.lock @@ -1,18 +1,14 @@ hash: 162187b91d0d35f8e950cff7bc8fb398d6d77f5231a82fbd6cdbd2ad55da1156 -updated: 2018-07-02T16:55:32.346834-04:00 +updated: 2018-07-19T12:14:46.451346-04:00 imports: - name: github.com/apache/thrift - version: 9549b25c77587b29be4e0b5c258221a4ed85d37a + version: b2a4d4ae21c789b689dd162deb819665567f481c subpackages: - lib/go/thrift - name: github.com/beorn7/perks version: 3ac7bf7a47d159a033b107610db8a1b6575507a4 subpackages: - quantile -- name: github.com/davecgh/go-spew - version: 5215b55f46b2b919f50a1df0eaa5886afe4e3b3d - subpackages: - - spew - name: github.com/fortytw2/leaktest version: a5ef70473c97b71626b9abeda80ee92ba2a7de9e - name: github.com/gogo/protobuf @@ -92,17 +88,13 @@ imports: version: fc2b8d3a73c4867e51861bbdd5ae3c1f0869dd6a subpackages: - pbutil -- name: github.com/pmezard/go-difflib - version: d8ed2627bdf02c080bf22230dbb337003b7aba2d - subpackages: - - difflib - name: github.com/stretchr/testify - version: f35b8ab0b5a2cef36673838d662e249dd9c94686 + version: efa3c4c36479edac5af79cbcb4bc8cb525e09b13 subpackages: - assert - require - name: github.com/uber-go/tally - version: 79f2a33b0e55b1255ffbbaf824dcafb09ff34dda + version: ff17f3c43c065c3c2991f571e740eee43ea3a14a subpackages: - m3 - m3/customtransports diff --git a/producer/config/writer.go b/producer/config/writer.go index c7187ad..9db1b64 100644 --- a/producer/config/writer.go +++ b/producer/config/writer.go @@ -81,7 +81,7 @@ type WriterConfiguration struct { TopicName string `yaml:"topicName" validate:"nonzero"` TopicServiceOverride kv.OverrideConfiguration `yaml:"topicServiceOverride"` TopicWatchInitTimeout *time.Duration `yaml:"topicWatchInitTimeout"` - PlacementServiceOverride services.OverrideConfiguration `yaml:"placementServiceOverride"` + PlacementServiceOverride services.OverrideConfiguration `yaml:"placementServiceOverride"` PlacementWatchInitTimeout *time.Duration `yaml:"placementWatchInitTimeout"` MessagePool *pool.ObjectPoolConfiguration `yaml:"messagePool"` MessageRetry *retry.Configuration `yaml:"messageRetry"` @@ -91,7 +91,8 @@ type WriterConfiguration struct { InitialAckMapSize *int `yaml:"initialAckMapSize"` CloseCheckInterval *time.Duration `yaml:"closeCheckInterval"` AckErrorRetry *retry.Configuration `yaml:"ackErrorRetry"` - EncodeDecoder *proto.EncodeDecoderConfiguration `yaml:"encodeDecoder"` + Encoder *proto.BaseConfiguration `yaml:"encoder"` + Decoder *proto.BaseConfiguration `yaml:"decoder"` Connection *ConnectionConfiguration `yaml:"connection"` } @@ -149,8 +150,11 @@ func (c *WriterConfiguration) NewOptions( if c.AckErrorRetry != nil { opts = opts.SetAckErrorRetryOptions(c.AckErrorRetry.NewOptions(iOpts.MetricsScope())) } - if c.EncodeDecoder != nil { - opts = opts.SetEncodeDecoderOptions(c.EncodeDecoder.NewEncodeDecoderOptions(iOpts)) + if c.Encoder != nil { + opts = opts.SetEncoderOptions(c.Encoder.NewBaseOptions(iOpts)) + } + if c.Decoder != nil { + opts = opts.SetDecoderOptions(c.Decoder.NewBaseOptions(iOpts)) } if c.Connection != nil { opts = opts.SetConnectionOptions(c.Connection.NewOptions(iOpts)) diff --git a/producer/writer/consumer_service_writer_test.go b/producer/writer/consumer_service_writer_test.go index 53fcd83..8deee72 100644 --- a/producer/writer/consumer_service_writer_test.go +++ b/producer/writer/consumer_service_writer_test.go @@ -113,7 +113,7 @@ func TestConsumerServiceWriterWithSharedConsumerWithNonShardedPlacement(t *testi wg.Add(1) go func() { - testConsumeAndAckOnConnectionListener(t, lis, opts.EncodeDecoderOptions()) + testConsumeAndAckOnConnectionListener(t, lis, opts.EncoderOptions(), opts.DecoderOptions()) wg.Done() }() @@ -249,7 +249,7 @@ func TestConsumerServiceWriterWithSharedConsumerWithShardedPlacement(t *testing. wg.Add(1) go func() { - testConsumeAndAckOnConnectionListener(t, lis, opts.EncodeDecoderOptions()) + testConsumeAndAckOnConnectionListener(t, lis, opts.EncoderOptions(), opts.DecoderOptions()) wg.Done() }() @@ -389,13 +389,13 @@ func TestConsumerServiceWriterWithReplicatedConsumerWithShardedPlacement(t *test var wg sync.WaitGroup wg.Add(1) go func() { - testConsumeAndAckOnConnectionListener(t, lis1, opts.EncodeDecoderOptions()) + testConsumeAndAckOnConnectionListener(t, lis1, opts.EncoderOptions(), opts.DecoderOptions()) wg.Done() }() wg.Add(1) go func() { - testConsumeAndAckOnConnectionListener(t, lis2, opts.EncodeDecoderOptions()) + testConsumeAndAckOnConnectionListener(t, lis2, opts.EncoderOptions(), opts.DecoderOptions()) wg.Done() }() wg.Wait() @@ -443,24 +443,21 @@ func TestConsumerServiceWriterWithReplicatedConsumerWithShardedPlacement(t *test if err != nil { return } - - server := proto.NewEncodeDecoder( - conn, - opts.EncodeDecoderOptions(), - ) + serverEncoder := proto.NewEncoder(opts.EncoderOptions()) + serverDecoder := proto.NewDecoder(conn, opts.DecoderOptions()) var msg msgpb.Message - err = server.Decode(&msg) + err = serverDecoder.Decode(&msg) if err != nil { conn.Close() continue } - require.NoError(t, server.Encode(&msgpb.Ack{ + require.NoError(t, serverEncoder.Encode(&msgpb.Ack{ Metadata: []msgpb.Metadata{ msg.Metadata, }, })) - _, err = conn.Write(server.Bytes()) + _, err = conn.Write(serverEncoder.Bytes()) require.NoError(t, err) conn.Close() } diff --git a/producer/writer/consumer_writer.go b/producer/writer/consumer_writer.go index c6c1f62..b74ec28 100644 --- a/producer/writer/consumer_writer.go +++ b/producer/writer/consumer_writer.go @@ -139,7 +139,7 @@ func newConsumerWriter( ) ) w := &consumerWriterImpl{ - decoder: proto.NewDecoder(rw, opts.EncodeDecoderOptions().DecoderOptions()), + decoder: proto.NewDecoder(rw, opts.DecoderOptions()), addr: addr, router: router, opts: opts, diff --git a/producer/writer/consumer_writer_test.go b/producer/writer/consumer_writer_test.go index 824fd7f..508140e 100644 --- a/producer/writer/consumer_writer_test.go +++ b/producer/writer/consumer_writer_test.go @@ -70,7 +70,7 @@ func TestNewConsumerWriter(t *testing.T) { wg.Add(1) go func() { - testConsumeAndAckOnConnectionListener(t, lis, opts.EncodeDecoderOptions()) + testConsumeAndAckOnConnectionListener(t, lis, opts.EncoderOptions(), opts.DecoderOptions()) wg.Done() }() @@ -271,7 +271,7 @@ func TestAutoReset(t *testing.T) { defer serverConn.Close() go func() { - testConsumeAndAckOnConnection(t, serverConn, opts.EncodeDecoderOptions()) + testConsumeAndAckOnConnection(t, serverConn, opts.EncoderOptions(), opts.DecoderOptions()) }() w.connectFn = func(addr string) (io.ReadWriteCloser, error) { @@ -382,36 +382,35 @@ func testConnectionOptions() ConnectionOptions { func testConsumeAndAckOnConnection( t *testing.T, conn net.Conn, - opts proto.EncodeDecoderOptions, + encOpts proto.BaseOptions, + decOpts proto.BaseOptions, ) { - server := proto.NewEncodeDecoder( - conn, - opts, - ) - + serverEncoder := proto.NewEncoder(encOpts) + serverDecoder := proto.NewDecoder(conn, decOpts) var msg msgpb.Message - assert.NoError(t, server.Decode(&msg)) + assert.NoError(t, serverDecoder.Decode(&msg)) - err := server.Encode(&msgpb.Ack{ + err := serverEncoder.Encode(&msgpb.Ack{ Metadata: []msgpb.Metadata{ msg.Metadata, }, }) assert.NoError(t, err) - _, err = conn.Write(server.Bytes()) + _, err = conn.Write(serverEncoder.Bytes()) assert.NoError(t, err) } func testConsumeAndAckOnConnectionListener( t *testing.T, lis net.Listener, - opts proto.EncodeDecoderOptions, + encOpts proto.BaseOptions, + decOpts proto.BaseOptions, ) { conn, err := lis.Accept() require.NoError(t, err) defer conn.Close() - testConsumeAndAckOnConnection(t, conn, opts) + testConsumeAndAckOnConnection(t, conn, encOpts, decOpts) } func testConsumerWriterMetrics() consumerWriterMetrics { diff --git a/producer/writer/message_writer.go b/producer/writer/message_writer.go index 7ac1279..fbdb7ee 100644 --- a/producer/writer/message_writer.go +++ b/producer/writer/message_writer.go @@ -183,7 +183,7 @@ func newMessageWriter( opts: opts, retryOpts: opts.MessageRetryOptions(), r: rand.New(rand.NewSource(nowFn().UnixNano())), - encoder: proto.NewEncoder(opts.EncodeDecoderOptions().EncoderOptions()), + encoder: proto.NewEncoder(opts.EncoderOptions()), msgID: 0, queue: list.New(), acks: newAckHelper(opts.InitialAckMapSize()), diff --git a/producer/writer/message_writer_test.go b/producer/writer/message_writer_test.go index d45250a..ee11cd9 100644 --- a/producer/writer/message_writer_test.go +++ b/producer/writer/message_writer_test.go @@ -105,7 +105,7 @@ func TestMessageWriterWithPooling(t *testing.T) { wg.Add(1) go func() { - testConsumeAndAckOnConnectionListener(t, lis, opts.EncodeDecoderOptions()) + testConsumeAndAckOnConnectionListener(t, lis, opts.EncoderOptions(), opts.DecoderOptions()) wg.Done() }() @@ -188,7 +188,7 @@ func TestMessageWriterWithoutPooling(t *testing.T) { wg.Add(1) go func() { - testConsumeAndAckOnConnectionListener(t, lis, opts.EncodeDecoderOptions()) + testConsumeAndAckOnConnectionListener(t, lis, opts.EncoderOptions(), opts.DecoderOptions()) wg.Done() }() @@ -302,7 +302,7 @@ func TestMessageWriterRetryWithoutPooling(t *testing.T) { w.AddConsumerWriter(cw) go func() { - testConsumeAndAckOnConnectionListener(t, lis, opts.EncodeDecoderOptions()) + testConsumeAndAckOnConnectionListener(t, lis, opts.EncoderOptions(), opts.DecoderOptions()) }() for { @@ -362,7 +362,7 @@ func TestMessageWriterRetryWithPooling(t *testing.T) { w.AddConsumerWriter(cw) go func() { - testConsumeAndAckOnConnectionListener(t, lis, opts.EncodeDecoderOptions()) + testConsumeAndAckOnConnectionListener(t, lis, opts.EncoderOptions(), opts.DecoderOptions()) }() for { diff --git a/producer/writer/options.go b/producer/writer/options.go index 1a120e5..b155487 100644 --- a/producer/writer/options.go +++ b/producer/writer/options.go @@ -308,11 +308,13 @@ type Options interface { // SetAckErrorRetryOptions sets the retrier for ack errors. SetAckErrorRetryOptions(value retry.Options) Options - // EncodeDecoderOptions returns the options for EncodeDecoder. - EncodeDecoderOptions() proto.EncodeDecoderOptions + EncoderOptions() proto.BaseOptions - // SetEncodeDecoderOptions sets the options for EncodeDecoder. - SetEncodeDecoderOptions(value proto.EncodeDecoderOptions) Options + SetEncoderOptions(value proto.BaseOptions) Options + + DecoderOptions() proto.BaseOptions + + SetDecoderOptions(value proto.BaseOptions) Options // ConnectionOptions returns the options for connections. ConnectionOptions() ConnectionOptions @@ -341,7 +343,8 @@ type writerOptions struct { initialAckMapSize int closeCheckInterval time.Duration ackErrRetryOpts retry.Options - encdecOpts proto.EncodeDecoderOptions + encOpts proto.BaseOptions + decOpts proto.BaseOptions cOpts ConnectionOptions iOpts instrument.Options } @@ -358,7 +361,8 @@ func NewOptions() Options { initialAckMapSize: defaultInitialAckMapSize, closeCheckInterval: defaultCloseCheckInterval, ackErrRetryOpts: retry.NewOptions(), - encdecOpts: proto.NewEncodeDecoderOptions(), + encOpts: proto.NewBaseOptions(), + decOpts: proto.NewBaseOptions(), cOpts: NewConnectionOptions(), iOpts: instrument.NewOptions(), } @@ -494,13 +498,23 @@ func (opts *writerOptions) SetAckErrorRetryOptions(value retry.Options) Options return &o } -func (opts *writerOptions) EncodeDecoderOptions() proto.EncodeDecoderOptions { - return opts.encdecOpts +func (opts *writerOptions) EncoderOptions() proto.BaseOptions { + return opts.encOpts +} + +func (opts *writerOptions) SetEncoderOptions(value proto.BaseOptions) Options { + o := *opts + o.encOpts = value + return &o +} + +func (opts *writerOptions) DecoderOptions() proto.BaseOptions { + return opts.decOpts } -func (opts *writerOptions) SetEncodeDecoderOptions(value proto.EncodeDecoderOptions) Options { +func (opts *writerOptions) SetDecoderOptions(value proto.BaseOptions) Options { o := *opts - o.encdecOpts = value + o.decOpts = value return &o } diff --git a/producer/writer/shard_writer_test.go b/producer/writer/shard_writer_test.go index 05b451f..552a067 100644 --- a/producer/writer/shard_writer_test.go +++ b/producer/writer/shard_writer_test.go @@ -70,7 +70,7 @@ func TestSharedShardWriter(t *testing.T) { wg.Add(1) go func() { - testConsumeAndAckOnConnectionListener(t, lis, opts.EncodeDecoderOptions()) + testConsumeAndAckOnConnectionListener(t, lis, opts.EncoderOptions(), opts.DecoderOptions()) wg.Done() }() @@ -182,7 +182,7 @@ func TestReplicatedShardWriter(t *testing.T) { wg.Add(1) go func() { - testConsumeAndAckOnConnectionListener(t, lis1, opts.EncodeDecoderOptions()) + testConsumeAndAckOnConnectionListener(t, lis1, opts.EncoderOptions(), opts.DecoderOptions()) wg.Done() }() @@ -205,7 +205,7 @@ func TestReplicatedShardWriter(t *testing.T) { wg.Add(1) go func() { - testConsumeAndAckOnConnectionListener(t, lis2, opts.EncodeDecoderOptions()) + testConsumeAndAckOnConnectionListener(t, lis2, opts.EncoderOptions(), opts.DecoderOptions()) wg.Done() }() @@ -288,7 +288,7 @@ func TestReplicatedShardWriterRemoveMessageWriter(t *testing.T) { wg.Add(1) go func() { - testConsumeAndAckOnConnectionListener(t, lis1, opts.EncodeDecoderOptions()) + testConsumeAndAckOnConnectionListener(t, lis1, opts.EncoderOptions(), opts.DecoderOptions()) wg.Done() }() @@ -308,10 +308,11 @@ func TestReplicatedShardWriterRemoveMessageWriter(t *testing.T) { require.NoError(t, err) defer conn.Close() - server := proto.NewEncodeDecoder(conn, opts.EncodeDecoderOptions()) + serverEncoder := proto.NewEncoder(opts.EncoderOptions()) + serverDecoder := proto.NewDecoder(conn, opts.DecoderOptions()) var msg msgpb.Message - require.NoError(t, server.Decode(&msg)) + require.NoError(t, serverDecoder.Decode(&msg)) sw.UpdateInstances( []placement.Instance{i1}, cws, @@ -320,8 +321,8 @@ func TestReplicatedShardWriterRemoveMessageWriter(t *testing.T) { require.Equal(t, 1, len(sw.messageWriters)) mm.EXPECT().Finalize(producer.Consumed) - require.NoError(t, server.Encode(&msgpb.Ack{Metadata: []msgpb.Metadata{msg.Metadata}})) - _, err = conn.Write(server.Bytes()) + require.NoError(t, serverEncoder.Encode(&msgpb.Ack{Metadata: []msgpb.Metadata{msg.Metadata}})) + _, err = conn.Write(serverEncoder.Bytes()) require.NoError(t, err) // Make sure mw2 is closed and removed from router. for { diff --git a/producer/writer/writer_test.go b/producer/writer/writer_test.go index c555412..f6f2f20 100644 --- a/producer/writer/writer_test.go +++ b/producer/writer/writer_test.go @@ -558,19 +558,19 @@ func TestWriterWrite(t *testing.T) { wg.Add(1) go func() { - testConsumeAndAckOnConnectionListener(t, lis1, opts.EncodeDecoderOptions()) + testConsumeAndAckOnConnectionListener(t, lis1, opts.EncoderOptions(), opts.DecoderOptions()) wg.Done() }() wg.Add(1) go func() { - testConsumeAndAckOnConnectionListener(t, lis2, opts.EncodeDecoderOptions()) + testConsumeAndAckOnConnectionListener(t, lis2, opts.EncoderOptions(), opts.DecoderOptions()) wg.Done() }() wg.Add(1) go func() { - testConsumeAndAckOnConnectionListener(t, lis3, opts.EncodeDecoderOptions()) + testConsumeAndAckOnConnectionListener(t, lis3, opts.EncoderOptions(), opts.DecoderOptions()) wg.Done() }() @@ -731,7 +731,7 @@ func TestWriterSetMessageTTLNanosDropMetric(t *testing.T) { wg.Add(1) go func() { - testConsumeAndAckOnConnectionListener(t, lis1, opts.EncodeDecoderOptions()) + testConsumeAndAckOnConnectionListener(t, lis1, opts.EncoderOptions(), opts.DecoderOptions()) wg.Done() }() wg.Wait() @@ -779,7 +779,7 @@ func TestWriterSetMessageTTLNanosDropMetric(t *testing.T) { mm.EXPECT().Finalize(producer.Consumed).Do(func(interface{}) { called++; wg.Done() }) wg.Add(1) go func() { - testConsumeAndAckOnConnectionListener(t, lis1, opts.EncodeDecoderOptions()) + testConsumeAndAckOnConnectionListener(t, lis1, opts.EncoderOptions(), opts.DecoderOptions()) wg.Done() }() wg.Wait() @@ -791,7 +791,7 @@ func TestWriterSetMessageTTLNanosDropMetric(t *testing.T) { // Wait for the consumer to trigger finalize because there is no more message ttl. wg.Add(1) go func() { - testConsumeAndAckOnConnectionListener(t, lis2, opts.EncodeDecoderOptions()) + testConsumeAndAckOnConnectionListener(t, lis2, opts.EncoderOptions(), opts.DecoderOptions()) }() wg.Wait() require.Equal(t, 1, called) diff --git a/protocol/proto/benchmark_test.go b/protocol/proto/benchmark_test.go index 89b4c53..b4ca8dd 100644 --- a/protocol/proto/benchmark_test.go +++ b/protocol/proto/benchmark_test.go @@ -27,32 +27,6 @@ import ( "github.com/m3db/m3msg/generated/proto/msgpb" ) -func BenchmarkEncodeDecoderRoundTrip(b *testing.B) { - r := bytes.NewReader(nil) - c := NewEncodeDecoder(r, NewEncodeDecoderOptions()).(*encdec) - encodeMsg := msgpb.Message{ - Metadata: msgpb.Metadata{}, - Value: make([]byte, 200), - } - decodeMsg := msgpb.Message{} - b.ReportAllocs() - b.ResetTimer() - for n := 0; n < b.N; n++ { - encodeMsg.Metadata.Id = uint64(n) - err := c.Encode(&encodeMsg) - if err != nil { - b.FailNow() - } - r.Reset(c.Bytes()) - if err := c.Decode(&decodeMsg); err != nil { - b.FailNow() - } - if decodeMsg.Metadata.Id != uint64(n) { - b.FailNow() - } - } -} - func BenchmarkBaseEncodeDecodeRoundTrip(b *testing.B) { r := bytes.NewReader(nil) encoder := NewEncoder(NewBaseOptions()) diff --git a/protocol/proto/config.go b/protocol/proto/config.go index dafc609..fe3ab7a 100644 --- a/protocol/proto/config.go +++ b/protocol/proto/config.go @@ -25,41 +25,30 @@ import ( "github.com/m3db/m3x/pool" ) -// EncodeDecoderConfiguration configures an EncodeDecoder. -type EncodeDecoderConfiguration struct { +type BaseConfiguration struct { MaxMessageSize *int `yaml:"maxMessageSize"` BytesPool *pool.BucketizedPoolConfiguration `yaml:"bytesPool"` - EncodeDecoderPool *pool.ObjectPoolConfiguration `yaml:"encodeDecoderPool"` } -// NewEncodeDecoderOptions creates a new EncodeDecoderOptions. -func (c *EncodeDecoderConfiguration) NewEncodeDecoderOptions( +func (c *BaseConfiguration) NewBaseOptions ( iOpts instrument.Options, -) EncodeDecoderOptions { +) BaseOptions { var ( - encodeOpts = NewBaseOptions() - decodeOpts = NewBaseOptions() + baseOpts = NewBaseOptions() ) if c.MaxMessageSize != nil { - encodeOpts = encodeOpts.SetMaxMessageSize(*c.MaxMessageSize) - decodeOpts = decodeOpts.SetMaxMessageSize(*c.MaxMessageSize) + baseOpts = baseOpts.SetMaxMessageSize(*c.MaxMessageSize) } if c.BytesPool != nil { scope := iOpts.MetricsScope() p := pool.NewBytesPool( c.BytesPool.NewBuckets(), c.BytesPool.NewObjectPoolOptions(iOpts.SetMetricsScope(scope.Tagged( - map[string]string{"pool": "bytes"}, + map[string]string{"pool": "bytes"}, ))), ) p.Init() - encodeOpts = encodeOpts.SetBytesPool(p) - decodeOpts = decodeOpts.SetBytesPool(p) + baseOpts = baseOpts.SetBytesPool(p) } - opts := NewEncodeDecoderOptions() - if c.EncodeDecoderPool != nil { - encodeDecoderPool := c.EncodeDecoderPool.NewObjectPoolOptions(iOpts) - opts = opts.SetEncodeDecoderPool(NewEncodeDecoderPool(encodeDecoderPool)) - } - return opts.SetEncoderOptions(encodeOpts).SetDecoderOptions(decodeOpts) -} + return baseOpts +} \ No newline at end of file diff --git a/protocol/proto/config_test.go b/protocol/proto/config_test.go index e0e37c7..a06adcd 100644 --- a/protocol/proto/config_test.go +++ b/protocol/proto/config_test.go @@ -29,7 +29,7 @@ import ( yaml "gopkg.in/yaml.v2" ) -func TestEncodeDecoderConfig(t *testing.T) { +func TestBaseConfig(t *testing.T) { str := ` maxMessageSize: 1024 bytesPool: @@ -41,22 +41,17 @@ bytesPool: watermark: lowWatermark: 0.01 highWatermark: 0.02 -encodeDecoderPool: - size: 30000 ` - var cfg EncodeDecoderConfiguration + var cfg BaseConfiguration require.NoError(t, yaml.Unmarshal([]byte(str), &cfg)) - opts := cfg.NewEncodeDecoderOptions(instrument.NewOptions()) - require.Equal(t, 1024, opts.EncoderOptions().MaxMessageSize()) - require.Equal(t, 1024, opts.DecoderOptions().MaxMessageSize()) - require.Equal(t, opts.EncoderOptions().BytesPool(), opts.DecoderOptions().BytesPool()) - require.NotNil(t, opts.DecoderOptions().BytesPool()) - require.NotNil(t, opts.EncodeDecoderPool()) - b := opts.EncoderOptions().BytesPool().Get(2) + opts := cfg.NewBaseOptions(instrument.NewOptions()) + require.Equal(t, 1024, opts.MaxMessageSize()) + require.NotNil(t, opts.BytesPool()) + b := opts.BytesPool().Get(2) require.Equal(t, 0, len(b)) require.Equal(t, 4, cap(b)) - b = opts.DecoderOptions().BytesPool().Get(200) + b = opts.BytesPool().Get(200) require.Equal(t, 0, len(b)) require.Equal(t, 1024, cap(b)) } diff --git a/protocol/proto/encode_decoder.go b/protocol/proto/encode_decoder.go deleted file mode 100644 index 98ae3af..0000000 --- a/protocol/proto/encode_decoder.go +++ /dev/null @@ -1,64 +0,0 @@ -// Copyright (c) 2018 Uber Technologies, Inc. -// -// Permission is hereby granted, free of charge, to any person obtaining a copy -// of this software and associated documentation files (the "Software"), to deal -// in the Software without restriction, including without limitation the rights -// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell -// copies of the Software, and to permit persons to whom the Software is -// furnished to do so, subject to the following conditions: -// -// The above copyright notice and this permission notice shall be included in -// all copies or substantial portions of the Software. -// -// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR -// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, -// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE -// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER -// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, -// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN -// THE SOFTWARE. - -package proto - -import ( - "io" -) - -type encdec struct { - Encoder - Decoder - - isClosed bool - pool EncodeDecoderPool -} - -// NewEncodeDecoder creates an EncodeDecoder, the implementation is not thread safe. -func NewEncodeDecoder( - r io.Reader, - opts EncodeDecoderOptions, -) EncodeDecoder { - if opts == nil { - opts = NewEncodeDecoderOptions() - } - return &encdec{ - Encoder: NewEncoder(opts.EncoderOptions()), - Decoder: NewDecoder(r, opts.DecoderOptions()), - isClosed: false, - pool: opts.EncodeDecoderPool(), - } -} - -func (c *encdec) Close() { - if c.isClosed { - return - } - c.isClosed = true - if c.pool != nil { - c.pool.Put(c) - } -} - -func (c *encdec) ResetReader(r io.Reader) { - c.Decoder.ResetReader(r) - c.isClosed = false -} diff --git a/protocol/proto/encode_decoder_pool.go b/protocol/proto/encode_decoder_pool.go deleted file mode 100644 index beaa563..0000000 --- a/protocol/proto/encode_decoder_pool.go +++ /dev/null @@ -1,48 +0,0 @@ -// Copyright (c) 2018 Uber Technologies, Inc. -// -// Permission is hereby granted, free of charge, to any person obtaining a copy -// of this software and associated documentation files (the "Software"), to deal -// in the Software without restriction, including without limitation the rights -// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell -// copies of the Software, and to permit persons to whom the Software is -// furnished to do so, subject to the following conditions: -// -// The above copyright notice and this permission notice shall be included in -// all copies or substantial portions of the Software. -// -// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR -// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, -// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE -// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER -// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, -// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN -// THE SOFTWARE. - -package proto - -import "github.com/m3db/m3x/pool" - -type encdecPool struct { - pool pool.ObjectPool -} - -// NewEncodeDecoderPool creates a EncodeDecoderPool. -func NewEncodeDecoderPool(opts pool.ObjectPoolOptions) EncodeDecoderPool { - return &encdecPool{ - pool: pool.NewObjectPool(opts), - } -} - -func (p *encdecPool) Init(alloc EncodeDecoderAlloc) { - p.pool.Init(func() interface{} { - return alloc() - }) -} - -func (p *encdecPool) Get() EncodeDecoder { - return p.pool.Get().(EncodeDecoder) -} - -func (p *encdecPool) Put(c EncodeDecoder) { - p.pool.Put(c) -} diff --git a/protocol/proto/encode_decoder_pool_test.go b/protocol/proto/encode_decoder_pool_test.go deleted file mode 100644 index 087e1a6..0000000 --- a/protocol/proto/encode_decoder_pool_test.go +++ /dev/null @@ -1,49 +0,0 @@ -// Copyright (c) 2018 Uber Technologies, Inc. -// -// Permission is hereby granted, free of charge, to any person obtaining a copy -// of this software and associated documentation files (the "Software"), to deal -// in the Software without restriction, including without limitation the rights -// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell -// copies of the Software, and to permit persons to whom the Software is -// furnished to do so, subject to the following conditions: -// -// The above copyright notice and this permission notice shall be included in -// all copies or substantial portions of the Software. -// -// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR -// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, -// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE -// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER -// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, -// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN -// THE SOFTWARE. - -package proto - -import ( - "testing" - - "github.com/m3db/m3x/pool" - - "github.com/stretchr/testify/require" -) - -func TestEncodeDecoderPool(t *testing.T) { - p := NewEncodeDecoderPool(pool.NewObjectPoolOptions().SetSize(1)) - opts := NewEncodeDecoderOptions().SetEncodeDecoderPool(p) - p.Init(func() EncodeDecoder { - return NewEncodeDecoder(nil, opts) - }) - - c := p.Get() - c.ResetReader(nil) - - c.Close() - require.True(t, c.(*encdec).isClosed) - - c = p.Get() - require.True(t, c.(*encdec).isClosed) - - c.ResetReader(nil) - require.False(t, c.(*encdec).isClosed) -} diff --git a/protocol/proto/encode_decoder_test.go b/protocol/proto/encode_decoder_test.go deleted file mode 100644 index 35d0586..0000000 --- a/protocol/proto/encode_decoder_test.go +++ /dev/null @@ -1,67 +0,0 @@ -// Copyright (c) 2018 Uber Technologies, Inc. -// -// Permission is hereby granted, free of charge, to any person obtaining a copy -// of this software and associated documentation files (the "Software"), to deal -// in the Software without restriction, including without limitation the rights -// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell -// copies of the Software, and to permit persons to whom the Software is -// furnished to do so, subject to the following conditions: -// -// The above copyright notice and this permission notice shall be included in -// all copies or substantial portions of the Software. -// -// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR -// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, -// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE -// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER -// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, -// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN -// THE SOFTWARE. - -package proto - -import ( - "net" - "testing" - - "github.com/m3db/m3msg/generated/proto/msgpb" - - "github.com/stretchr/testify/require" -) - -func TestEncodeDecoderReset(t *testing.T) { - c := NewEncodeDecoder(nil, nil).(*encdec) - require.Nil(t, c.Decoder.(*decoder).r) - c.Close() - // Safe to close again. - c.Close() - require.True(t, c.isClosed) - - conn := new(net.TCPConn) - c.ResetReader(conn) - require.False(t, c.isClosed) - require.Equal(t, conn, c.Decoder.(*decoder).r) -} - -func TestEncodeDecodeRoundTrip(t *testing.T) { - c := NewEncodeDecoder(nil, nil).(*encdec) - - clientConn, serverConn := net.Pipe() - c.ResetReader(serverConn) - - testMsg := msgpb.Message{ - Metadata: msgpb.Metadata{ - Shard: 1, - Id: 2, - }, - Value: make([]byte, 10), - } - go func() { - require.NoError(t, c.Encode(&testMsg)) - _, err := clientConn.Write(c.Bytes()) - require.NoError(t, err) - }() - var msg msgpb.Message - require.NoError(t, c.Decode(&msg)) - require.Equal(t, testMsg, msg) -} diff --git a/protocol/proto/options.go b/protocol/proto/options.go index e0622e4..44cde37 100644 --- a/protocol/proto/options.go +++ b/protocol/proto/options.go @@ -68,47 +68,3 @@ func (opts *baseOptions) SetBytesPool(value pool.BytesPool) BaseOptions { o.bytesPool = value return &o } - -// NewEncodeDecoderOptions creates an EncodeDecoderOptions. -func NewEncodeDecoderOptions() EncodeDecoderOptions { - return &encdecOptions{ - encOpts: NewBaseOptions(), - decOpts: NewBaseOptions(), - } -} - -type encdecOptions struct { - encOpts BaseOptions - decOpts BaseOptions - pool EncodeDecoderPool -} - -func (opts *encdecOptions) EncoderOptions() BaseOptions { - return opts.encOpts -} - -func (opts *encdecOptions) SetEncoderOptions(value BaseOptions) EncodeDecoderOptions { - o := *opts - o.encOpts = value - return &o -} - -func (opts *encdecOptions) DecoderOptions() BaseOptions { - return opts.decOpts -} - -func (opts *encdecOptions) SetDecoderOptions(value BaseOptions) EncodeDecoderOptions { - o := *opts - o.decOpts = value - return &o -} - -func (opts *encdecOptions) EncodeDecoderPool() EncodeDecoderPool { - return opts.pool -} - -func (opts *encdecOptions) SetEncodeDecoderPool(pool EncodeDecoderPool) EncodeDecoderOptions { - o := *opts - o.pool = pool - return &o -} diff --git a/protocol/proto/roundtrip_test.go b/protocol/proto/roundtrip_test.go index b416ee8..fc828aa 100644 --- a/protocol/proto/roundtrip_test.go +++ b/protocol/proto/roundtrip_test.go @@ -22,6 +22,7 @@ package proto import ( "bytes" + "net" "testing" "github.com/m3db/m3msg/generated/proto/msgpb" @@ -149,6 +150,39 @@ func TestDecodeMessageLargerThanMaxSize(t *testing.T) { require.Contains(t, err.Error(), "larger than maximum supported size") } +func TestDecodeReset(t *testing.T) { + dec := NewDecoder(nil, nil) + require.Nil(t, dec.(*decoder).r) + + conn := new(net.TCPConn) + dec.ResetReader(conn) + require.Equal(t, conn, dec.(*decoder).r) +} + +func TestEncodeDecodeRoundTrip(t *testing.T) { + enc := NewEncoder(nil) + dec := NewDecoder(nil, nil) + + clientConn, serverConn := net.Pipe() + dec.ResetReader(serverConn) + + testMsg := msgpb.Message{ + Metadata: msgpb.Metadata{ + Shard: 1, + Id: 2, + }, + Value: make([]byte, 10), + } + go func() { + require.NoError(t, enc.Encode(&testMsg)) + _, err := clientConn.Write(enc.Bytes()) + require.NoError(t, err) + }() + var msg msgpb.Message + require.NoError(t, dec.Decode(&msg)) + require.Equal(t, testMsg, msg) +} + // nolint: unparam func getBytesPool(bucketSizes int, bucketCaps []int) pool.BytesPool { buckets := make([]pool.Bucket, len(bucketCaps)) diff --git a/protocol/proto/types.go b/protocol/proto/types.go index 837566d..c72c60c 100644 --- a/protocol/proto/types.go +++ b/protocol/proto/types.go @@ -60,30 +60,6 @@ type Decoder interface { ResetReader(r io.Reader) } -// EncodeDecoder can encode and decode. -type EncodeDecoder interface { - Encoder - Decoder - - // Close closes the EncodeDecoder. - Close() -} - -// EncodeDecoderPool is a pool of EncodeDecoders. -type EncodeDecoderPool interface { - // Init initializes the EncodeDecoder pool. - Init(alloc EncodeDecoderAlloc) - - // Get returns an EncodeDecoder from the pool. - Get() EncodeDecoder - - // Put puts an EncodeDecoder into the pool. - Put(c EncodeDecoder) -} - -// EncodeDecoderAlloc allocates an EncodeDecoder. -type EncodeDecoderAlloc func() EncodeDecoder - // BaseOptions configures a base encoder or decoder. type BaseOptions interface { // MaxMessageSize returns the maximum message size. @@ -98,24 +74,3 @@ type BaseOptions interface { // SetBytesPool sets the bytes pool. SetBytesPool(value pool.BytesPool) BaseOptions } - -// EncodeDecoderOptions configures an EncodeDecoder. -type EncodeDecoderOptions interface { - // EncoderOptions returns the options for encoder. - EncoderOptions() BaseOptions - - // SetEncoderOptions sets the options for encoder. - SetEncoderOptions(value BaseOptions) EncodeDecoderOptions - - // DecoderOptions returns the options for decoder. - DecoderOptions() BaseOptions - - // SetDecoderOptions sets the options for decoder. - SetDecoderOptions(value BaseOptions) EncodeDecoderOptions - - // EncodeDecoderPool returns the pool for EncodeDecoder. - EncodeDecoderPool() EncodeDecoderPool - - // SetEncodeDecoderPool sets the pool for EncodeDecoder. - SetEncodeDecoderPool(pool EncodeDecoderPool) EncodeDecoderOptions -} From 68f05ee5979c7a7614d71ec34275595a2b0a2613 Mon Sep 17 00:00:00 2001 From: Siyu Yang Date: Thu, 19 Jul 2018 15:54:52 -0400 Subject: [PATCH 2/6] fix lint --- protocol/proto/config.go | 2 ++ 1 file changed, 2 insertions(+) diff --git a/protocol/proto/config.go b/protocol/proto/config.go index fe3ab7a..10db64c 100644 --- a/protocol/proto/config.go +++ b/protocol/proto/config.go @@ -25,11 +25,13 @@ import ( "github.com/m3db/m3x/pool" ) +// BaseConfiguration configures an Encoder or a Decoder. type BaseConfiguration struct { MaxMessageSize *int `yaml:"maxMessageSize"` BytesPool *pool.BucketizedPoolConfiguration `yaml:"bytesPool"` } +// NewBaseOptions creates a new BaseOptions. func (c *BaseConfiguration) NewBaseOptions ( iOpts instrument.Options, ) BaseOptions { From 91e701879e4ac052175f568a7d7314df478e9a61 Mon Sep 17 00:00:00 2001 From: Siyu Yang Date: Wed, 18 Jul 2018 14:25:28 -0400 Subject: [PATCH 3/6] Removing EncodeDecoder and refactoring --- consumer/config.go | 20 +++--- consumer/consumer.go | 4 +- consumer/consumer_test.go | 2 +- consumer/options.go | 24 +++++-- consumer/server_test.go | 2 +- consumer/types.go | 14 ++-- glide.lock | 16 ++--- producer/config/writer.go | 12 ++-- .../writer/consumer_service_writer_test.go | 21 +++--- producer/writer/consumer_writer.go | 2 +- producer/writer/consumer_writer_test.go | 25 ++++--- producer/writer/message_writer.go | 2 +- producer/writer/message_writer_test.go | 8 +-- producer/writer/options.go | 34 +++++++--- producer/writer/shard_writer_test.go | 17 ++--- producer/writer/writer_test.go | 12 ++-- protocol/proto/benchmark_test.go | 26 ------- protocol/proto/config.go | 29 +++----- protocol/proto/config_test.go | 19 ++---- protocol/proto/encode_decoder.go | 64 ------------------ protocol/proto/encode_decoder_pool.go | 48 ------------- protocol/proto/encode_decoder_pool_test.go | 49 -------------- protocol/proto/encode_decoder_test.go | 67 ------------------- protocol/proto/options.go | 44 ------------ protocol/proto/roundtrip_test.go | 34 ++++++++++ protocol/proto/types.go | 45 ------------- 26 files changed, 172 insertions(+), 468 deletions(-) delete mode 100644 protocol/proto/encode_decoder.go delete mode 100644 protocol/proto/encode_decoder_pool.go delete mode 100644 protocol/proto/encode_decoder_pool_test.go delete mode 100644 protocol/proto/encode_decoder_test.go diff --git a/consumer/config.go b/consumer/config.go index 262f556..1a13530 100644 --- a/consumer/config.go +++ b/consumer/config.go @@ -30,19 +30,23 @@ import ( // Configuration configs the consumer options. type Configuration struct { - EncodeDecoder *proto.EncodeDecoderConfiguration `yaml:"encodeDecoder"` - MessagePool *pool.ObjectPoolConfiguration `yaml:"messagePool"` - AckFlushInterval *time.Duration `yaml:"ackFlushInterval"` - AckBufferSize *int `yaml:"ackBufferSize"` - ConnectionWriteBufferSize *int `yaml:"connectionWriteBufferSize"` - ConnectionReadBufferSize *int `yaml:"connectionReadBufferSize"` + Encoder *proto.BaseConfiguration `yaml:"encoder"` + Decoder *proto.BaseConfiguration `yaml:"decoder"` + MessagePool *pool.ObjectPoolConfiguration `yaml:"messagePool"` + AckFlushInterval *time.Duration `yaml:"ackFlushInterval"` + AckBufferSize *int `yaml:"ackBufferSize"` + ConnectionWriteBufferSize *int `yaml:"connectionWriteBufferSize"` + ConnectionReadBufferSize *int `yaml:"connectionReadBufferSize"` } // NewOptions creates consumer options. func (c *Configuration) NewOptions(iOpts instrument.Options) Options { opts := NewOptions().SetInstrumentOptions(iOpts) - if c.EncodeDecoder != nil { - opts = opts.SetEncodeDecoderOptions(c.EncodeDecoder.NewEncodeDecoderOptions(iOpts)) + if c.Encoder != nil { + opts = opts.SetEncoderOptions(c.Encoder.NewBaseOptions(iOpts)) + } + if c.Decoder != nil { + opts = opts.SetDecoderOptions(c.Decoder.NewBaseOptions(iOpts)) } if c.MessagePool != nil { opts = opts.SetMessagePoolOptions(c.MessagePool.NewObjectPoolOptions(iOpts)) diff --git a/consumer/consumer.go b/consumer/consumer.go index 20f272b..0f7817f 100644 --- a/consumer/consumer.go +++ b/consumer/consumer.go @@ -111,10 +111,10 @@ func newConsumer( return &consumer{ opts: opts, mPool: mPool, - encoder: proto.NewEncoder(opts.EncodeDecoderOptions().EncoderOptions()), + encoder: proto.NewEncoder(opts.EncoderOptions()), decoder: proto.NewDecoder( bufio.NewReaderSize(conn, opts.ConnectionReadBufferSize()), - opts.EncodeDecoderOptions().DecoderOptions(), + opts.DecoderOptions(), ), w: bufio.NewWriterSize(conn, opts.ConnectionWriteBufferSize()), conn: conn, diff --git a/consumer/consumer_test.go b/consumer/consumer_test.go index 415ec53..e875896 100644 --- a/consumer/consumer_test.go +++ b/consumer/consumer_test.go @@ -405,7 +405,7 @@ func testProduceAndReceiveAck(t *testing.T, testMsg msgpb.Message, l Listener, o m.Ack() var ack msgpb.Ack - err = proto.NewDecoder(conn, opts.EncodeDecoderOptions().DecoderOptions()).Decode(&ack) + err = proto.NewDecoder(conn, opts.DecoderOptions()).Decode(&ack) require.NoError(t, err) require.Equal(t, 1, len(ack.Metadata)) require.Equal(t, testMsg.Metadata, ack.Metadata[0]) diff --git a/consumer/options.go b/consumer/options.go index 4ab6837..389122e 100644 --- a/consumer/options.go +++ b/consumer/options.go @@ -36,7 +36,8 @@ var ( ) type options struct { - encdecOptions proto.EncodeDecoderOptions + encOptions proto.BaseOptions + decOptions proto.BaseOptions messagePoolOpts pool.ObjectPoolOptions ackFlushInterval time.Duration ackBufferSize int @@ -48,7 +49,8 @@ type options struct { // NewOptions creates a new options. func NewOptions() Options { return &options{ - encdecOptions: proto.NewEncodeDecoderOptions(), + encOptions: proto.NewBaseOptions(), + decOptions: proto.NewBaseOptions(), messagePoolOpts: pool.NewObjectPoolOptions(), ackFlushInterval: defaultAckFlushInterval, ackBufferSize: defaultAckBufferSize, @@ -58,13 +60,23 @@ func NewOptions() Options { } } -func (opts *options) EncodeDecoderOptions() proto.EncodeDecoderOptions { - return opts.encdecOptions +func (opts *options) EncoderOptions() proto.BaseOptions { + return opts.encOptions } -func (opts *options) SetEncodeDecoderOptions(value proto.EncodeDecoderOptions) Options { +func (opts *options) SetEncoderOptions(value proto.BaseOptions) Options { o := *opts - o.encdecOptions = value + o.encOptions = value + return &o +} + +func (opts *options) DecoderOptions() proto.BaseOptions { + return opts.decOptions +} + +func (opts *options) SetDecoderOptions(value proto.BaseOptions) Options { + o := *opts + o.decOptions = value return &o } diff --git a/consumer/server_test.go b/consumer/server_test.go index 5d93618..3df81d2 100644 --- a/consumer/server_test.go +++ b/consumer/server_test.go @@ -76,7 +76,7 @@ func TestConsumerServer(t *testing.T) { require.Equal(t, testMsg1.Value, bytes) var ack msgpb.Ack - testDecoder := proto.NewDecoder(conn, opts.ConsumerOptions().EncodeDecoderOptions().DecoderOptions()) + testDecoder := proto.NewDecoder(conn, opts.ConsumerOptions().DecoderOptions()) err = testDecoder.Decode(&ack) require.NoError(t, err) require.Equal(t, 1, len(ack.Metadata)) diff --git a/consumer/types.go b/consumer/types.go index 060a126..e8a80b9 100644 --- a/consumer/types.go +++ b/consumer/types.go @@ -66,11 +66,17 @@ type Listener interface { // Options configs the consumer listener. type Options interface { - // EncodeDecoderOptions returns the options for EncodeDecoder. - EncodeDecoderOptions() proto.EncodeDecoderOptions + // EncoderOptions returns the options for Encoder. + EncoderOptions() proto.BaseOptions - // SetEncodeDecoderOptions sets the options for EncodeDecoder. - SetEncodeDecoderOptions(value proto.EncodeDecoderOptions) Options + // SetEncoderOptions sets the options for Encoder. + SetEncoderOptions(value proto.BaseOptions) Options + + // DecoderOptions returns the options for Decoder. + DecoderOptions() proto.BaseOptions + + // SetDecoderOptions sets the options for Decoder. + SetDecoderOptions(value proto.BaseOptions) Options // MessagePoolOptions returns the options for message pool. MessagePoolOptions() pool.ObjectPoolOptions diff --git a/glide.lock b/glide.lock index d4a5b41..25920c8 100644 --- a/glide.lock +++ b/glide.lock @@ -1,18 +1,14 @@ hash: 162187b91d0d35f8e950cff7bc8fb398d6d77f5231a82fbd6cdbd2ad55da1156 -updated: 2018-07-02T16:55:32.346834-04:00 +updated: 2018-07-19T12:14:46.451346-04:00 imports: - name: github.com/apache/thrift - version: 9549b25c77587b29be4e0b5c258221a4ed85d37a + version: b2a4d4ae21c789b689dd162deb819665567f481c subpackages: - lib/go/thrift - name: github.com/beorn7/perks version: 3ac7bf7a47d159a033b107610db8a1b6575507a4 subpackages: - quantile -- name: github.com/davecgh/go-spew - version: 5215b55f46b2b919f50a1df0eaa5886afe4e3b3d - subpackages: - - spew - name: github.com/fortytw2/leaktest version: a5ef70473c97b71626b9abeda80ee92ba2a7de9e - name: github.com/gogo/protobuf @@ -92,17 +88,13 @@ imports: version: fc2b8d3a73c4867e51861bbdd5ae3c1f0869dd6a subpackages: - pbutil -- name: github.com/pmezard/go-difflib - version: d8ed2627bdf02c080bf22230dbb337003b7aba2d - subpackages: - - difflib - name: github.com/stretchr/testify - version: f35b8ab0b5a2cef36673838d662e249dd9c94686 + version: efa3c4c36479edac5af79cbcb4bc8cb525e09b13 subpackages: - assert - require - name: github.com/uber-go/tally - version: 79f2a33b0e55b1255ffbbaf824dcafb09ff34dda + version: ff17f3c43c065c3c2991f571e740eee43ea3a14a subpackages: - m3 - m3/customtransports diff --git a/producer/config/writer.go b/producer/config/writer.go index c7187ad..9db1b64 100644 --- a/producer/config/writer.go +++ b/producer/config/writer.go @@ -81,7 +81,7 @@ type WriterConfiguration struct { TopicName string `yaml:"topicName" validate:"nonzero"` TopicServiceOverride kv.OverrideConfiguration `yaml:"topicServiceOverride"` TopicWatchInitTimeout *time.Duration `yaml:"topicWatchInitTimeout"` - PlacementServiceOverride services.OverrideConfiguration `yaml:"placementServiceOverride"` + PlacementServiceOverride services.OverrideConfiguration `yaml:"placementServiceOverride"` PlacementWatchInitTimeout *time.Duration `yaml:"placementWatchInitTimeout"` MessagePool *pool.ObjectPoolConfiguration `yaml:"messagePool"` MessageRetry *retry.Configuration `yaml:"messageRetry"` @@ -91,7 +91,8 @@ type WriterConfiguration struct { InitialAckMapSize *int `yaml:"initialAckMapSize"` CloseCheckInterval *time.Duration `yaml:"closeCheckInterval"` AckErrorRetry *retry.Configuration `yaml:"ackErrorRetry"` - EncodeDecoder *proto.EncodeDecoderConfiguration `yaml:"encodeDecoder"` + Encoder *proto.BaseConfiguration `yaml:"encoder"` + Decoder *proto.BaseConfiguration `yaml:"decoder"` Connection *ConnectionConfiguration `yaml:"connection"` } @@ -149,8 +150,11 @@ func (c *WriterConfiguration) NewOptions( if c.AckErrorRetry != nil { opts = opts.SetAckErrorRetryOptions(c.AckErrorRetry.NewOptions(iOpts.MetricsScope())) } - if c.EncodeDecoder != nil { - opts = opts.SetEncodeDecoderOptions(c.EncodeDecoder.NewEncodeDecoderOptions(iOpts)) + if c.Encoder != nil { + opts = opts.SetEncoderOptions(c.Encoder.NewBaseOptions(iOpts)) + } + if c.Decoder != nil { + opts = opts.SetDecoderOptions(c.Decoder.NewBaseOptions(iOpts)) } if c.Connection != nil { opts = opts.SetConnectionOptions(c.Connection.NewOptions(iOpts)) diff --git a/producer/writer/consumer_service_writer_test.go b/producer/writer/consumer_service_writer_test.go index 53fcd83..8deee72 100644 --- a/producer/writer/consumer_service_writer_test.go +++ b/producer/writer/consumer_service_writer_test.go @@ -113,7 +113,7 @@ func TestConsumerServiceWriterWithSharedConsumerWithNonShardedPlacement(t *testi wg.Add(1) go func() { - testConsumeAndAckOnConnectionListener(t, lis, opts.EncodeDecoderOptions()) + testConsumeAndAckOnConnectionListener(t, lis, opts.EncoderOptions(), opts.DecoderOptions()) wg.Done() }() @@ -249,7 +249,7 @@ func TestConsumerServiceWriterWithSharedConsumerWithShardedPlacement(t *testing. wg.Add(1) go func() { - testConsumeAndAckOnConnectionListener(t, lis, opts.EncodeDecoderOptions()) + testConsumeAndAckOnConnectionListener(t, lis, opts.EncoderOptions(), opts.DecoderOptions()) wg.Done() }() @@ -389,13 +389,13 @@ func TestConsumerServiceWriterWithReplicatedConsumerWithShardedPlacement(t *test var wg sync.WaitGroup wg.Add(1) go func() { - testConsumeAndAckOnConnectionListener(t, lis1, opts.EncodeDecoderOptions()) + testConsumeAndAckOnConnectionListener(t, lis1, opts.EncoderOptions(), opts.DecoderOptions()) wg.Done() }() wg.Add(1) go func() { - testConsumeAndAckOnConnectionListener(t, lis2, opts.EncodeDecoderOptions()) + testConsumeAndAckOnConnectionListener(t, lis2, opts.EncoderOptions(), opts.DecoderOptions()) wg.Done() }() wg.Wait() @@ -443,24 +443,21 @@ func TestConsumerServiceWriterWithReplicatedConsumerWithShardedPlacement(t *test if err != nil { return } - - server := proto.NewEncodeDecoder( - conn, - opts.EncodeDecoderOptions(), - ) + serverEncoder := proto.NewEncoder(opts.EncoderOptions()) + serverDecoder := proto.NewDecoder(conn, opts.DecoderOptions()) var msg msgpb.Message - err = server.Decode(&msg) + err = serverDecoder.Decode(&msg) if err != nil { conn.Close() continue } - require.NoError(t, server.Encode(&msgpb.Ack{ + require.NoError(t, serverEncoder.Encode(&msgpb.Ack{ Metadata: []msgpb.Metadata{ msg.Metadata, }, })) - _, err = conn.Write(server.Bytes()) + _, err = conn.Write(serverEncoder.Bytes()) require.NoError(t, err) conn.Close() } diff --git a/producer/writer/consumer_writer.go b/producer/writer/consumer_writer.go index c6c1f62..b74ec28 100644 --- a/producer/writer/consumer_writer.go +++ b/producer/writer/consumer_writer.go @@ -139,7 +139,7 @@ func newConsumerWriter( ) ) w := &consumerWriterImpl{ - decoder: proto.NewDecoder(rw, opts.EncodeDecoderOptions().DecoderOptions()), + decoder: proto.NewDecoder(rw, opts.DecoderOptions()), addr: addr, router: router, opts: opts, diff --git a/producer/writer/consumer_writer_test.go b/producer/writer/consumer_writer_test.go index 824fd7f..508140e 100644 --- a/producer/writer/consumer_writer_test.go +++ b/producer/writer/consumer_writer_test.go @@ -70,7 +70,7 @@ func TestNewConsumerWriter(t *testing.T) { wg.Add(1) go func() { - testConsumeAndAckOnConnectionListener(t, lis, opts.EncodeDecoderOptions()) + testConsumeAndAckOnConnectionListener(t, lis, opts.EncoderOptions(), opts.DecoderOptions()) wg.Done() }() @@ -271,7 +271,7 @@ func TestAutoReset(t *testing.T) { defer serverConn.Close() go func() { - testConsumeAndAckOnConnection(t, serverConn, opts.EncodeDecoderOptions()) + testConsumeAndAckOnConnection(t, serverConn, opts.EncoderOptions(), opts.DecoderOptions()) }() w.connectFn = func(addr string) (io.ReadWriteCloser, error) { @@ -382,36 +382,35 @@ func testConnectionOptions() ConnectionOptions { func testConsumeAndAckOnConnection( t *testing.T, conn net.Conn, - opts proto.EncodeDecoderOptions, + encOpts proto.BaseOptions, + decOpts proto.BaseOptions, ) { - server := proto.NewEncodeDecoder( - conn, - opts, - ) - + serverEncoder := proto.NewEncoder(encOpts) + serverDecoder := proto.NewDecoder(conn, decOpts) var msg msgpb.Message - assert.NoError(t, server.Decode(&msg)) + assert.NoError(t, serverDecoder.Decode(&msg)) - err := server.Encode(&msgpb.Ack{ + err := serverEncoder.Encode(&msgpb.Ack{ Metadata: []msgpb.Metadata{ msg.Metadata, }, }) assert.NoError(t, err) - _, err = conn.Write(server.Bytes()) + _, err = conn.Write(serverEncoder.Bytes()) assert.NoError(t, err) } func testConsumeAndAckOnConnectionListener( t *testing.T, lis net.Listener, - opts proto.EncodeDecoderOptions, + encOpts proto.BaseOptions, + decOpts proto.BaseOptions, ) { conn, err := lis.Accept() require.NoError(t, err) defer conn.Close() - testConsumeAndAckOnConnection(t, conn, opts) + testConsumeAndAckOnConnection(t, conn, encOpts, decOpts) } func testConsumerWriterMetrics() consumerWriterMetrics { diff --git a/producer/writer/message_writer.go b/producer/writer/message_writer.go index 7ac1279..fbdb7ee 100644 --- a/producer/writer/message_writer.go +++ b/producer/writer/message_writer.go @@ -183,7 +183,7 @@ func newMessageWriter( opts: opts, retryOpts: opts.MessageRetryOptions(), r: rand.New(rand.NewSource(nowFn().UnixNano())), - encoder: proto.NewEncoder(opts.EncodeDecoderOptions().EncoderOptions()), + encoder: proto.NewEncoder(opts.EncoderOptions()), msgID: 0, queue: list.New(), acks: newAckHelper(opts.InitialAckMapSize()), diff --git a/producer/writer/message_writer_test.go b/producer/writer/message_writer_test.go index d45250a..ee11cd9 100644 --- a/producer/writer/message_writer_test.go +++ b/producer/writer/message_writer_test.go @@ -105,7 +105,7 @@ func TestMessageWriterWithPooling(t *testing.T) { wg.Add(1) go func() { - testConsumeAndAckOnConnectionListener(t, lis, opts.EncodeDecoderOptions()) + testConsumeAndAckOnConnectionListener(t, lis, opts.EncoderOptions(), opts.DecoderOptions()) wg.Done() }() @@ -188,7 +188,7 @@ func TestMessageWriterWithoutPooling(t *testing.T) { wg.Add(1) go func() { - testConsumeAndAckOnConnectionListener(t, lis, opts.EncodeDecoderOptions()) + testConsumeAndAckOnConnectionListener(t, lis, opts.EncoderOptions(), opts.DecoderOptions()) wg.Done() }() @@ -302,7 +302,7 @@ func TestMessageWriterRetryWithoutPooling(t *testing.T) { w.AddConsumerWriter(cw) go func() { - testConsumeAndAckOnConnectionListener(t, lis, opts.EncodeDecoderOptions()) + testConsumeAndAckOnConnectionListener(t, lis, opts.EncoderOptions(), opts.DecoderOptions()) }() for { @@ -362,7 +362,7 @@ func TestMessageWriterRetryWithPooling(t *testing.T) { w.AddConsumerWriter(cw) go func() { - testConsumeAndAckOnConnectionListener(t, lis, opts.EncodeDecoderOptions()) + testConsumeAndAckOnConnectionListener(t, lis, opts.EncoderOptions(), opts.DecoderOptions()) }() for { diff --git a/producer/writer/options.go b/producer/writer/options.go index 1a120e5..b155487 100644 --- a/producer/writer/options.go +++ b/producer/writer/options.go @@ -308,11 +308,13 @@ type Options interface { // SetAckErrorRetryOptions sets the retrier for ack errors. SetAckErrorRetryOptions(value retry.Options) Options - // EncodeDecoderOptions returns the options for EncodeDecoder. - EncodeDecoderOptions() proto.EncodeDecoderOptions + EncoderOptions() proto.BaseOptions - // SetEncodeDecoderOptions sets the options for EncodeDecoder. - SetEncodeDecoderOptions(value proto.EncodeDecoderOptions) Options + SetEncoderOptions(value proto.BaseOptions) Options + + DecoderOptions() proto.BaseOptions + + SetDecoderOptions(value proto.BaseOptions) Options // ConnectionOptions returns the options for connections. ConnectionOptions() ConnectionOptions @@ -341,7 +343,8 @@ type writerOptions struct { initialAckMapSize int closeCheckInterval time.Duration ackErrRetryOpts retry.Options - encdecOpts proto.EncodeDecoderOptions + encOpts proto.BaseOptions + decOpts proto.BaseOptions cOpts ConnectionOptions iOpts instrument.Options } @@ -358,7 +361,8 @@ func NewOptions() Options { initialAckMapSize: defaultInitialAckMapSize, closeCheckInterval: defaultCloseCheckInterval, ackErrRetryOpts: retry.NewOptions(), - encdecOpts: proto.NewEncodeDecoderOptions(), + encOpts: proto.NewBaseOptions(), + decOpts: proto.NewBaseOptions(), cOpts: NewConnectionOptions(), iOpts: instrument.NewOptions(), } @@ -494,13 +498,23 @@ func (opts *writerOptions) SetAckErrorRetryOptions(value retry.Options) Options return &o } -func (opts *writerOptions) EncodeDecoderOptions() proto.EncodeDecoderOptions { - return opts.encdecOpts +func (opts *writerOptions) EncoderOptions() proto.BaseOptions { + return opts.encOpts +} + +func (opts *writerOptions) SetEncoderOptions(value proto.BaseOptions) Options { + o := *opts + o.encOpts = value + return &o +} + +func (opts *writerOptions) DecoderOptions() proto.BaseOptions { + return opts.decOpts } -func (opts *writerOptions) SetEncodeDecoderOptions(value proto.EncodeDecoderOptions) Options { +func (opts *writerOptions) SetDecoderOptions(value proto.BaseOptions) Options { o := *opts - o.encdecOpts = value + o.decOpts = value return &o } diff --git a/producer/writer/shard_writer_test.go b/producer/writer/shard_writer_test.go index 6205b05..43e941e 100644 --- a/producer/writer/shard_writer_test.go +++ b/producer/writer/shard_writer_test.go @@ -70,7 +70,7 @@ func TestSharedShardWriter(t *testing.T) { wg.Add(1) go func() { - testConsumeAndAckOnConnectionListener(t, lis, opts.EncodeDecoderOptions()) + testConsumeAndAckOnConnectionListener(t, lis, opts.EncoderOptions(), opts.DecoderOptions()) wg.Done() }() @@ -182,7 +182,7 @@ func TestReplicatedShardWriter(t *testing.T) { wg.Add(1) go func() { - testConsumeAndAckOnConnectionListener(t, lis1, opts.EncodeDecoderOptions()) + testConsumeAndAckOnConnectionListener(t, lis1, opts.EncoderOptions(), opts.DecoderOptions()) wg.Done() }() @@ -205,7 +205,7 @@ func TestReplicatedShardWriter(t *testing.T) { wg.Add(1) go func() { - testConsumeAndAckOnConnectionListener(t, lis2, opts.EncodeDecoderOptions()) + testConsumeAndAckOnConnectionListener(t, lis2, opts.EncoderOptions(), opts.DecoderOptions()) wg.Done() }() @@ -288,7 +288,7 @@ func TestReplicatedShardWriterRemoveMessageWriter(t *testing.T) { wg.Add(1) go func() { - testConsumeAndAckOnConnectionListener(t, lis1, opts.EncodeDecoderOptions()) + testConsumeAndAckOnConnectionListener(t, lis1, opts.EncoderOptions(), opts.DecoderOptions()) wg.Done() }() @@ -308,10 +308,11 @@ func TestReplicatedShardWriterRemoveMessageWriter(t *testing.T) { require.NoError(t, err) defer conn.Close() - server := proto.NewEncodeDecoder(conn, opts.EncodeDecoderOptions()) + serverEncoder := proto.NewEncoder(opts.EncoderOptions()) + serverDecoder := proto.NewDecoder(conn, opts.DecoderOptions()) var msg msgpb.Message - require.NoError(t, server.Decode(&msg)) + require.NoError(t, serverDecoder.Decode(&msg)) sw.UpdateInstances( []placement.Instance{i1}, cws, @@ -320,8 +321,8 @@ func TestReplicatedShardWriterRemoveMessageWriter(t *testing.T) { require.Equal(t, 1, len(sw.messageWriters)) mm.EXPECT().Finalize(producer.Consumed) - require.NoError(t, server.Encode(&msgpb.Ack{Metadata: []msgpb.Metadata{msg.Metadata}})) - _, err = conn.Write(server.Bytes()) + require.NoError(t, serverEncoder.Encode(&msgpb.Ack{Metadata: []msgpb.Metadata{msg.Metadata}})) + _, err = conn.Write(serverEncoder.Bytes()) require.NoError(t, err) // Make sure mw2 is closed and removed from router. for { diff --git a/producer/writer/writer_test.go b/producer/writer/writer_test.go index c555412..f6f2f20 100644 --- a/producer/writer/writer_test.go +++ b/producer/writer/writer_test.go @@ -558,19 +558,19 @@ func TestWriterWrite(t *testing.T) { wg.Add(1) go func() { - testConsumeAndAckOnConnectionListener(t, lis1, opts.EncodeDecoderOptions()) + testConsumeAndAckOnConnectionListener(t, lis1, opts.EncoderOptions(), opts.DecoderOptions()) wg.Done() }() wg.Add(1) go func() { - testConsumeAndAckOnConnectionListener(t, lis2, opts.EncodeDecoderOptions()) + testConsumeAndAckOnConnectionListener(t, lis2, opts.EncoderOptions(), opts.DecoderOptions()) wg.Done() }() wg.Add(1) go func() { - testConsumeAndAckOnConnectionListener(t, lis3, opts.EncodeDecoderOptions()) + testConsumeAndAckOnConnectionListener(t, lis3, opts.EncoderOptions(), opts.DecoderOptions()) wg.Done() }() @@ -731,7 +731,7 @@ func TestWriterSetMessageTTLNanosDropMetric(t *testing.T) { wg.Add(1) go func() { - testConsumeAndAckOnConnectionListener(t, lis1, opts.EncodeDecoderOptions()) + testConsumeAndAckOnConnectionListener(t, lis1, opts.EncoderOptions(), opts.DecoderOptions()) wg.Done() }() wg.Wait() @@ -779,7 +779,7 @@ func TestWriterSetMessageTTLNanosDropMetric(t *testing.T) { mm.EXPECT().Finalize(producer.Consumed).Do(func(interface{}) { called++; wg.Done() }) wg.Add(1) go func() { - testConsumeAndAckOnConnectionListener(t, lis1, opts.EncodeDecoderOptions()) + testConsumeAndAckOnConnectionListener(t, lis1, opts.EncoderOptions(), opts.DecoderOptions()) wg.Done() }() wg.Wait() @@ -791,7 +791,7 @@ func TestWriterSetMessageTTLNanosDropMetric(t *testing.T) { // Wait for the consumer to trigger finalize because there is no more message ttl. wg.Add(1) go func() { - testConsumeAndAckOnConnectionListener(t, lis2, opts.EncodeDecoderOptions()) + testConsumeAndAckOnConnectionListener(t, lis2, opts.EncoderOptions(), opts.DecoderOptions()) }() wg.Wait() require.Equal(t, 1, called) diff --git a/protocol/proto/benchmark_test.go b/protocol/proto/benchmark_test.go index 89b4c53..b4ca8dd 100644 --- a/protocol/proto/benchmark_test.go +++ b/protocol/proto/benchmark_test.go @@ -27,32 +27,6 @@ import ( "github.com/m3db/m3msg/generated/proto/msgpb" ) -func BenchmarkEncodeDecoderRoundTrip(b *testing.B) { - r := bytes.NewReader(nil) - c := NewEncodeDecoder(r, NewEncodeDecoderOptions()).(*encdec) - encodeMsg := msgpb.Message{ - Metadata: msgpb.Metadata{}, - Value: make([]byte, 200), - } - decodeMsg := msgpb.Message{} - b.ReportAllocs() - b.ResetTimer() - for n := 0; n < b.N; n++ { - encodeMsg.Metadata.Id = uint64(n) - err := c.Encode(&encodeMsg) - if err != nil { - b.FailNow() - } - r.Reset(c.Bytes()) - if err := c.Decode(&decodeMsg); err != nil { - b.FailNow() - } - if decodeMsg.Metadata.Id != uint64(n) { - b.FailNow() - } - } -} - func BenchmarkBaseEncodeDecodeRoundTrip(b *testing.B) { r := bytes.NewReader(nil) encoder := NewEncoder(NewBaseOptions()) diff --git a/protocol/proto/config.go b/protocol/proto/config.go index dafc609..fe3ab7a 100644 --- a/protocol/proto/config.go +++ b/protocol/proto/config.go @@ -25,41 +25,30 @@ import ( "github.com/m3db/m3x/pool" ) -// EncodeDecoderConfiguration configures an EncodeDecoder. -type EncodeDecoderConfiguration struct { +type BaseConfiguration struct { MaxMessageSize *int `yaml:"maxMessageSize"` BytesPool *pool.BucketizedPoolConfiguration `yaml:"bytesPool"` - EncodeDecoderPool *pool.ObjectPoolConfiguration `yaml:"encodeDecoderPool"` } -// NewEncodeDecoderOptions creates a new EncodeDecoderOptions. -func (c *EncodeDecoderConfiguration) NewEncodeDecoderOptions( +func (c *BaseConfiguration) NewBaseOptions ( iOpts instrument.Options, -) EncodeDecoderOptions { +) BaseOptions { var ( - encodeOpts = NewBaseOptions() - decodeOpts = NewBaseOptions() + baseOpts = NewBaseOptions() ) if c.MaxMessageSize != nil { - encodeOpts = encodeOpts.SetMaxMessageSize(*c.MaxMessageSize) - decodeOpts = decodeOpts.SetMaxMessageSize(*c.MaxMessageSize) + baseOpts = baseOpts.SetMaxMessageSize(*c.MaxMessageSize) } if c.BytesPool != nil { scope := iOpts.MetricsScope() p := pool.NewBytesPool( c.BytesPool.NewBuckets(), c.BytesPool.NewObjectPoolOptions(iOpts.SetMetricsScope(scope.Tagged( - map[string]string{"pool": "bytes"}, + map[string]string{"pool": "bytes"}, ))), ) p.Init() - encodeOpts = encodeOpts.SetBytesPool(p) - decodeOpts = decodeOpts.SetBytesPool(p) + baseOpts = baseOpts.SetBytesPool(p) } - opts := NewEncodeDecoderOptions() - if c.EncodeDecoderPool != nil { - encodeDecoderPool := c.EncodeDecoderPool.NewObjectPoolOptions(iOpts) - opts = opts.SetEncodeDecoderPool(NewEncodeDecoderPool(encodeDecoderPool)) - } - return opts.SetEncoderOptions(encodeOpts).SetDecoderOptions(decodeOpts) -} + return baseOpts +} \ No newline at end of file diff --git a/protocol/proto/config_test.go b/protocol/proto/config_test.go index e0e37c7..a06adcd 100644 --- a/protocol/proto/config_test.go +++ b/protocol/proto/config_test.go @@ -29,7 +29,7 @@ import ( yaml "gopkg.in/yaml.v2" ) -func TestEncodeDecoderConfig(t *testing.T) { +func TestBaseConfig(t *testing.T) { str := ` maxMessageSize: 1024 bytesPool: @@ -41,22 +41,17 @@ bytesPool: watermark: lowWatermark: 0.01 highWatermark: 0.02 -encodeDecoderPool: - size: 30000 ` - var cfg EncodeDecoderConfiguration + var cfg BaseConfiguration require.NoError(t, yaml.Unmarshal([]byte(str), &cfg)) - opts := cfg.NewEncodeDecoderOptions(instrument.NewOptions()) - require.Equal(t, 1024, opts.EncoderOptions().MaxMessageSize()) - require.Equal(t, 1024, opts.DecoderOptions().MaxMessageSize()) - require.Equal(t, opts.EncoderOptions().BytesPool(), opts.DecoderOptions().BytesPool()) - require.NotNil(t, opts.DecoderOptions().BytesPool()) - require.NotNil(t, opts.EncodeDecoderPool()) - b := opts.EncoderOptions().BytesPool().Get(2) + opts := cfg.NewBaseOptions(instrument.NewOptions()) + require.Equal(t, 1024, opts.MaxMessageSize()) + require.NotNil(t, opts.BytesPool()) + b := opts.BytesPool().Get(2) require.Equal(t, 0, len(b)) require.Equal(t, 4, cap(b)) - b = opts.DecoderOptions().BytesPool().Get(200) + b = opts.BytesPool().Get(200) require.Equal(t, 0, len(b)) require.Equal(t, 1024, cap(b)) } diff --git a/protocol/proto/encode_decoder.go b/protocol/proto/encode_decoder.go deleted file mode 100644 index 98ae3af..0000000 --- a/protocol/proto/encode_decoder.go +++ /dev/null @@ -1,64 +0,0 @@ -// Copyright (c) 2018 Uber Technologies, Inc. -// -// Permission is hereby granted, free of charge, to any person obtaining a copy -// of this software and associated documentation files (the "Software"), to deal -// in the Software without restriction, including without limitation the rights -// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell -// copies of the Software, and to permit persons to whom the Software is -// furnished to do so, subject to the following conditions: -// -// The above copyright notice and this permission notice shall be included in -// all copies or substantial portions of the Software. -// -// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR -// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, -// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE -// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER -// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, -// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN -// THE SOFTWARE. - -package proto - -import ( - "io" -) - -type encdec struct { - Encoder - Decoder - - isClosed bool - pool EncodeDecoderPool -} - -// NewEncodeDecoder creates an EncodeDecoder, the implementation is not thread safe. -func NewEncodeDecoder( - r io.Reader, - opts EncodeDecoderOptions, -) EncodeDecoder { - if opts == nil { - opts = NewEncodeDecoderOptions() - } - return &encdec{ - Encoder: NewEncoder(opts.EncoderOptions()), - Decoder: NewDecoder(r, opts.DecoderOptions()), - isClosed: false, - pool: opts.EncodeDecoderPool(), - } -} - -func (c *encdec) Close() { - if c.isClosed { - return - } - c.isClosed = true - if c.pool != nil { - c.pool.Put(c) - } -} - -func (c *encdec) ResetReader(r io.Reader) { - c.Decoder.ResetReader(r) - c.isClosed = false -} diff --git a/protocol/proto/encode_decoder_pool.go b/protocol/proto/encode_decoder_pool.go deleted file mode 100644 index beaa563..0000000 --- a/protocol/proto/encode_decoder_pool.go +++ /dev/null @@ -1,48 +0,0 @@ -// Copyright (c) 2018 Uber Technologies, Inc. -// -// Permission is hereby granted, free of charge, to any person obtaining a copy -// of this software and associated documentation files (the "Software"), to deal -// in the Software without restriction, including without limitation the rights -// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell -// copies of the Software, and to permit persons to whom the Software is -// furnished to do so, subject to the following conditions: -// -// The above copyright notice and this permission notice shall be included in -// all copies or substantial portions of the Software. -// -// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR -// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, -// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE -// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER -// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, -// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN -// THE SOFTWARE. - -package proto - -import "github.com/m3db/m3x/pool" - -type encdecPool struct { - pool pool.ObjectPool -} - -// NewEncodeDecoderPool creates a EncodeDecoderPool. -func NewEncodeDecoderPool(opts pool.ObjectPoolOptions) EncodeDecoderPool { - return &encdecPool{ - pool: pool.NewObjectPool(opts), - } -} - -func (p *encdecPool) Init(alloc EncodeDecoderAlloc) { - p.pool.Init(func() interface{} { - return alloc() - }) -} - -func (p *encdecPool) Get() EncodeDecoder { - return p.pool.Get().(EncodeDecoder) -} - -func (p *encdecPool) Put(c EncodeDecoder) { - p.pool.Put(c) -} diff --git a/protocol/proto/encode_decoder_pool_test.go b/protocol/proto/encode_decoder_pool_test.go deleted file mode 100644 index 087e1a6..0000000 --- a/protocol/proto/encode_decoder_pool_test.go +++ /dev/null @@ -1,49 +0,0 @@ -// Copyright (c) 2018 Uber Technologies, Inc. -// -// Permission is hereby granted, free of charge, to any person obtaining a copy -// of this software and associated documentation files (the "Software"), to deal -// in the Software without restriction, including without limitation the rights -// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell -// copies of the Software, and to permit persons to whom the Software is -// furnished to do so, subject to the following conditions: -// -// The above copyright notice and this permission notice shall be included in -// all copies or substantial portions of the Software. -// -// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR -// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, -// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE -// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER -// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, -// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN -// THE SOFTWARE. - -package proto - -import ( - "testing" - - "github.com/m3db/m3x/pool" - - "github.com/stretchr/testify/require" -) - -func TestEncodeDecoderPool(t *testing.T) { - p := NewEncodeDecoderPool(pool.NewObjectPoolOptions().SetSize(1)) - opts := NewEncodeDecoderOptions().SetEncodeDecoderPool(p) - p.Init(func() EncodeDecoder { - return NewEncodeDecoder(nil, opts) - }) - - c := p.Get() - c.ResetReader(nil) - - c.Close() - require.True(t, c.(*encdec).isClosed) - - c = p.Get() - require.True(t, c.(*encdec).isClosed) - - c.ResetReader(nil) - require.False(t, c.(*encdec).isClosed) -} diff --git a/protocol/proto/encode_decoder_test.go b/protocol/proto/encode_decoder_test.go deleted file mode 100644 index 35d0586..0000000 --- a/protocol/proto/encode_decoder_test.go +++ /dev/null @@ -1,67 +0,0 @@ -// Copyright (c) 2018 Uber Technologies, Inc. -// -// Permission is hereby granted, free of charge, to any person obtaining a copy -// of this software and associated documentation files (the "Software"), to deal -// in the Software without restriction, including without limitation the rights -// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell -// copies of the Software, and to permit persons to whom the Software is -// furnished to do so, subject to the following conditions: -// -// The above copyright notice and this permission notice shall be included in -// all copies or substantial portions of the Software. -// -// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR -// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, -// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE -// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER -// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, -// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN -// THE SOFTWARE. - -package proto - -import ( - "net" - "testing" - - "github.com/m3db/m3msg/generated/proto/msgpb" - - "github.com/stretchr/testify/require" -) - -func TestEncodeDecoderReset(t *testing.T) { - c := NewEncodeDecoder(nil, nil).(*encdec) - require.Nil(t, c.Decoder.(*decoder).r) - c.Close() - // Safe to close again. - c.Close() - require.True(t, c.isClosed) - - conn := new(net.TCPConn) - c.ResetReader(conn) - require.False(t, c.isClosed) - require.Equal(t, conn, c.Decoder.(*decoder).r) -} - -func TestEncodeDecodeRoundTrip(t *testing.T) { - c := NewEncodeDecoder(nil, nil).(*encdec) - - clientConn, serverConn := net.Pipe() - c.ResetReader(serverConn) - - testMsg := msgpb.Message{ - Metadata: msgpb.Metadata{ - Shard: 1, - Id: 2, - }, - Value: make([]byte, 10), - } - go func() { - require.NoError(t, c.Encode(&testMsg)) - _, err := clientConn.Write(c.Bytes()) - require.NoError(t, err) - }() - var msg msgpb.Message - require.NoError(t, c.Decode(&msg)) - require.Equal(t, testMsg, msg) -} diff --git a/protocol/proto/options.go b/protocol/proto/options.go index e0622e4..44cde37 100644 --- a/protocol/proto/options.go +++ b/protocol/proto/options.go @@ -68,47 +68,3 @@ func (opts *baseOptions) SetBytesPool(value pool.BytesPool) BaseOptions { o.bytesPool = value return &o } - -// NewEncodeDecoderOptions creates an EncodeDecoderOptions. -func NewEncodeDecoderOptions() EncodeDecoderOptions { - return &encdecOptions{ - encOpts: NewBaseOptions(), - decOpts: NewBaseOptions(), - } -} - -type encdecOptions struct { - encOpts BaseOptions - decOpts BaseOptions - pool EncodeDecoderPool -} - -func (opts *encdecOptions) EncoderOptions() BaseOptions { - return opts.encOpts -} - -func (opts *encdecOptions) SetEncoderOptions(value BaseOptions) EncodeDecoderOptions { - o := *opts - o.encOpts = value - return &o -} - -func (opts *encdecOptions) DecoderOptions() BaseOptions { - return opts.decOpts -} - -func (opts *encdecOptions) SetDecoderOptions(value BaseOptions) EncodeDecoderOptions { - o := *opts - o.decOpts = value - return &o -} - -func (opts *encdecOptions) EncodeDecoderPool() EncodeDecoderPool { - return opts.pool -} - -func (opts *encdecOptions) SetEncodeDecoderPool(pool EncodeDecoderPool) EncodeDecoderOptions { - o := *opts - o.pool = pool - return &o -} diff --git a/protocol/proto/roundtrip_test.go b/protocol/proto/roundtrip_test.go index b416ee8..fc828aa 100644 --- a/protocol/proto/roundtrip_test.go +++ b/protocol/proto/roundtrip_test.go @@ -22,6 +22,7 @@ package proto import ( "bytes" + "net" "testing" "github.com/m3db/m3msg/generated/proto/msgpb" @@ -149,6 +150,39 @@ func TestDecodeMessageLargerThanMaxSize(t *testing.T) { require.Contains(t, err.Error(), "larger than maximum supported size") } +func TestDecodeReset(t *testing.T) { + dec := NewDecoder(nil, nil) + require.Nil(t, dec.(*decoder).r) + + conn := new(net.TCPConn) + dec.ResetReader(conn) + require.Equal(t, conn, dec.(*decoder).r) +} + +func TestEncodeDecodeRoundTrip(t *testing.T) { + enc := NewEncoder(nil) + dec := NewDecoder(nil, nil) + + clientConn, serverConn := net.Pipe() + dec.ResetReader(serverConn) + + testMsg := msgpb.Message{ + Metadata: msgpb.Metadata{ + Shard: 1, + Id: 2, + }, + Value: make([]byte, 10), + } + go func() { + require.NoError(t, enc.Encode(&testMsg)) + _, err := clientConn.Write(enc.Bytes()) + require.NoError(t, err) + }() + var msg msgpb.Message + require.NoError(t, dec.Decode(&msg)) + require.Equal(t, testMsg, msg) +} + // nolint: unparam func getBytesPool(bucketSizes int, bucketCaps []int) pool.BytesPool { buckets := make([]pool.Bucket, len(bucketCaps)) diff --git a/protocol/proto/types.go b/protocol/proto/types.go index 837566d..c72c60c 100644 --- a/protocol/proto/types.go +++ b/protocol/proto/types.go @@ -60,30 +60,6 @@ type Decoder interface { ResetReader(r io.Reader) } -// EncodeDecoder can encode and decode. -type EncodeDecoder interface { - Encoder - Decoder - - // Close closes the EncodeDecoder. - Close() -} - -// EncodeDecoderPool is a pool of EncodeDecoders. -type EncodeDecoderPool interface { - // Init initializes the EncodeDecoder pool. - Init(alloc EncodeDecoderAlloc) - - // Get returns an EncodeDecoder from the pool. - Get() EncodeDecoder - - // Put puts an EncodeDecoder into the pool. - Put(c EncodeDecoder) -} - -// EncodeDecoderAlloc allocates an EncodeDecoder. -type EncodeDecoderAlloc func() EncodeDecoder - // BaseOptions configures a base encoder or decoder. type BaseOptions interface { // MaxMessageSize returns the maximum message size. @@ -98,24 +74,3 @@ type BaseOptions interface { // SetBytesPool sets the bytes pool. SetBytesPool(value pool.BytesPool) BaseOptions } - -// EncodeDecoderOptions configures an EncodeDecoder. -type EncodeDecoderOptions interface { - // EncoderOptions returns the options for encoder. - EncoderOptions() BaseOptions - - // SetEncoderOptions sets the options for encoder. - SetEncoderOptions(value BaseOptions) EncodeDecoderOptions - - // DecoderOptions returns the options for decoder. - DecoderOptions() BaseOptions - - // SetDecoderOptions sets the options for decoder. - SetDecoderOptions(value BaseOptions) EncodeDecoderOptions - - // EncodeDecoderPool returns the pool for EncodeDecoder. - EncodeDecoderPool() EncodeDecoderPool - - // SetEncodeDecoderPool sets the pool for EncodeDecoder. - SetEncodeDecoderPool(pool EncodeDecoderPool) EncodeDecoderOptions -} From c90d54576aeb41d79264ca012e07cbc1e3cf1e2d Mon Sep 17 00:00:00 2001 From: Siyu Yang Date: Thu, 19 Jul 2018 15:54:52 -0400 Subject: [PATCH 4/6] fix lint --- protocol/proto/config.go | 2 ++ 1 file changed, 2 insertions(+) diff --git a/protocol/proto/config.go b/protocol/proto/config.go index fe3ab7a..10db64c 100644 --- a/protocol/proto/config.go +++ b/protocol/proto/config.go @@ -25,11 +25,13 @@ import ( "github.com/m3db/m3x/pool" ) +// BaseConfiguration configures an Encoder or a Decoder. type BaseConfiguration struct { MaxMessageSize *int `yaml:"maxMessageSize"` BytesPool *pool.BucketizedPoolConfiguration `yaml:"bytesPool"` } +// NewBaseOptions creates a new BaseOptions. func (c *BaseConfiguration) NewBaseOptions ( iOpts instrument.Options, ) BaseOptions { From de1ba539dd6714a9f368d72828976dc516c2d616 Mon Sep 17 00:00:00 2001 From: Siyu Yang Date: Fri, 20 Jul 2018 13:03:44 -0400 Subject: [PATCH 5/6] address CR --- consumer/config.go | 8 +++--- consumer/config_test.go | 8 ++++++ consumer/options.go | 16 +++++------ consumer/types.go | 8 +++--- producer/config/writer.go | 36 ++++++++++++------------- producer/config/writer_test.go | 8 ++++++ producer/writer/consumer_writer_test.go | 8 +++--- producer/writer/options.go | 28 ++++++++++--------- protocol/proto/benchmark_test.go | 4 +-- protocol/proto/config.go | 20 +++++++------- protocol/proto/config_test.go | 6 ++--- protocol/proto/decoder.go | 4 +-- protocol/proto/encoder.go | 4 +-- protocol/proto/options.go | 16 +++++------ protocol/proto/roundtrip_test.go | 12 ++++----- protocol/proto/types.go | 8 +++--- 16 files changed, 107 insertions(+), 87 deletions(-) diff --git a/consumer/config.go b/consumer/config.go index 1a13530..69fb40e 100644 --- a/consumer/config.go +++ b/consumer/config.go @@ -30,8 +30,8 @@ import ( // Configuration configs the consumer options. type Configuration struct { - Encoder *proto.BaseConfiguration `yaml:"encoder"` - Decoder *proto.BaseConfiguration `yaml:"decoder"` + Encoder *proto.Configuration `yaml:"encoder"` + Decoder *proto.Configuration `yaml:"decoder"` MessagePool *pool.ObjectPoolConfiguration `yaml:"messagePool"` AckFlushInterval *time.Duration `yaml:"ackFlushInterval"` AckBufferSize *int `yaml:"ackBufferSize"` @@ -43,10 +43,10 @@ type Configuration struct { func (c *Configuration) NewOptions(iOpts instrument.Options) Options { opts := NewOptions().SetInstrumentOptions(iOpts) if c.Encoder != nil { - opts = opts.SetEncoderOptions(c.Encoder.NewBaseOptions(iOpts)) + opts = opts.SetEncoderOptions(c.Encoder.NewOptions(iOpts)) } if c.Decoder != nil { - opts = opts.SetDecoderOptions(c.Decoder.NewBaseOptions(iOpts)) + opts = opts.SetDecoderOptions(c.Decoder.NewOptions(iOpts)) } if c.MessagePool != nil { opts = opts.SetMessagePoolOptions(c.MessagePool.NewObjectPoolOptions(iOpts)) diff --git a/consumer/config_test.go b/consumer/config_test.go index 128ee54..71416e5 100644 --- a/consumer/config_test.go +++ b/consumer/config_test.go @@ -36,6 +36,10 @@ ackFlushInterval: 100ms ackBufferSize: 100 connectionWriteBufferSize: 200 connectionReadBufferSize: 300 +encoder: + maxMessageSize: 100 +decoder: + maxMessageSize: 200 ` var cfg Configuration @@ -47,5 +51,9 @@ connectionReadBufferSize: 300 require.Equal(t, 100, opts.AckBufferSize()) require.Equal(t, 200, opts.ConnectionWriteBufferSize()) require.Equal(t, 300, opts.ConnectionReadBufferSize()) + require.Equal(t, 100, opts.EncoderOptions().MaxMessageSize()) + require.NotNil(t, opts.EncoderOptions().BytesPool()) + require.Equal(t, 200, opts.DecoderOptions().MaxMessageSize()) + require.NotNil(t, opts.EncoderOptions().BytesPool()) require.Nil(t, opts.InstrumentOptions()) } diff --git a/consumer/options.go b/consumer/options.go index 389122e..492c265 100644 --- a/consumer/options.go +++ b/consumer/options.go @@ -36,8 +36,8 @@ var ( ) type options struct { - encOptions proto.BaseOptions - decOptions proto.BaseOptions + encOptions proto.Options + decOptions proto.Options messagePoolOpts pool.ObjectPoolOptions ackFlushInterval time.Duration ackBufferSize int @@ -49,8 +49,8 @@ type options struct { // NewOptions creates a new options. func NewOptions() Options { return &options{ - encOptions: proto.NewBaseOptions(), - decOptions: proto.NewBaseOptions(), + encOptions: proto.NewOptions(), + decOptions: proto.NewOptions(), messagePoolOpts: pool.NewObjectPoolOptions(), ackFlushInterval: defaultAckFlushInterval, ackBufferSize: defaultAckBufferSize, @@ -60,21 +60,21 @@ func NewOptions() Options { } } -func (opts *options) EncoderOptions() proto.BaseOptions { +func (opts *options) EncoderOptions() proto.Options { return opts.encOptions } -func (opts *options) SetEncoderOptions(value proto.BaseOptions) Options { +func (opts *options) SetEncoderOptions(value proto.Options) Options { o := *opts o.encOptions = value return &o } -func (opts *options) DecoderOptions() proto.BaseOptions { +func (opts *options) DecoderOptions() proto.Options { return opts.decOptions } -func (opts *options) SetDecoderOptions(value proto.BaseOptions) Options { +func (opts *options) SetDecoderOptions(value proto.Options) Options { o := *opts o.decOptions = value return &o diff --git a/consumer/types.go b/consumer/types.go index e8a80b9..623ca0c 100644 --- a/consumer/types.go +++ b/consumer/types.go @@ -67,16 +67,16 @@ type Listener interface { // Options configs the consumer listener. type Options interface { // EncoderOptions returns the options for Encoder. - EncoderOptions() proto.BaseOptions + EncoderOptions() proto.Options // SetEncoderOptions sets the options for Encoder. - SetEncoderOptions(value proto.BaseOptions) Options + SetEncoderOptions(value proto.Options) Options // DecoderOptions returns the options for Decoder. - DecoderOptions() proto.BaseOptions + DecoderOptions() proto.Options // SetDecoderOptions sets the options for Decoder. - SetDecoderOptions(value proto.BaseOptions) Options + SetDecoderOptions(value proto.Options) Options // MessagePoolOptions returns the options for message pool. MessagePoolOptions() pool.ObjectPoolOptions diff --git a/producer/config/writer.go b/producer/config/writer.go index 9db1b64..94c094d 100644 --- a/producer/config/writer.go +++ b/producer/config/writer.go @@ -78,22 +78,22 @@ func (c *ConnectionConfiguration) NewOptions(iOpts instrument.Options) writer.Co // WriterConfiguration configs the writer options. type WriterConfiguration struct { - TopicName string `yaml:"topicName" validate:"nonzero"` - TopicServiceOverride kv.OverrideConfiguration `yaml:"topicServiceOverride"` - TopicWatchInitTimeout *time.Duration `yaml:"topicWatchInitTimeout"` - PlacementServiceOverride services.OverrideConfiguration `yaml:"placementServiceOverride"` - PlacementWatchInitTimeout *time.Duration `yaml:"placementWatchInitTimeout"` - MessagePool *pool.ObjectPoolConfiguration `yaml:"messagePool"` - MessageRetry *retry.Configuration `yaml:"messageRetry"` - MessageQueueNewWritesScanInterval *time.Duration `yaml:"messageQueueNewWritesScanInterval"` - MessageQueueFullScanInterval *time.Duration `yaml:"messageQueueFullScanInterval"` - MessageQueueScanBatchSize *int `yaml:"messageQueueScanBatchSize"` - InitialAckMapSize *int `yaml:"initialAckMapSize"` - CloseCheckInterval *time.Duration `yaml:"closeCheckInterval"` - AckErrorRetry *retry.Configuration `yaml:"ackErrorRetry"` - Encoder *proto.BaseConfiguration `yaml:"encoder"` - Decoder *proto.BaseConfiguration `yaml:"decoder"` - Connection *ConnectionConfiguration `yaml:"connection"` + TopicName string `yaml:"topicName" validate:"nonzero"` + TopicServiceOverride kv.OverrideConfiguration `yaml:"topicServiceOverride"` + TopicWatchInitTimeout *time.Duration `yaml:"topicWatchInitTimeout"` + PlacementServiceOverride services.OverrideConfiguration `yaml:"placementServiceOverride"` + PlacementWatchInitTimeout *time.Duration `yaml:"placementWatchInitTimeout"` + MessagePool *pool.ObjectPoolConfiguration `yaml:"messagePool"` + MessageRetry *retry.Configuration `yaml:"messageRetry"` + MessageQueueNewWritesScanInterval *time.Duration `yaml:"messageQueueNewWritesScanInterval"` + MessageQueueFullScanInterval *time.Duration `yaml:"messageQueueFullScanInterval"` + MessageQueueScanBatchSize *int `yaml:"messageQueueScanBatchSize"` + InitialAckMapSize *int `yaml:"initialAckMapSize"` + CloseCheckInterval *time.Duration `yaml:"closeCheckInterval"` + AckErrorRetry *retry.Configuration `yaml:"ackErrorRetry"` + Encoder *proto.Configuration `yaml:"encoder"` + Decoder *proto.Configuration `yaml:"decoder"` + Connection *ConnectionConfiguration `yaml:"connection"` } // NewOptions creates writer options. @@ -151,10 +151,10 @@ func (c *WriterConfiguration) NewOptions( opts = opts.SetAckErrorRetryOptions(c.AckErrorRetry.NewOptions(iOpts.MetricsScope())) } if c.Encoder != nil { - opts = opts.SetEncoderOptions(c.Encoder.NewBaseOptions(iOpts)) + opts = opts.SetEncoderOptions(c.Encoder.NewOptions(iOpts)) } if c.Decoder != nil { - opts = opts.SetDecoderOptions(c.Decoder.NewBaseOptions(iOpts)) + opts = opts.SetDecoderOptions(c.Decoder.NewOptions(iOpts)) } if c.Connection != nil { opts = opts.SetConnectionOptions(c.Connection.NewOptions(iOpts)) diff --git a/producer/config/writer_test.go b/producer/config/writer_test.go index e04d73f..41c3cde 100644 --- a/producer/config/writer_test.go +++ b/producer/config/writer_test.go @@ -87,6 +87,10 @@ ackErrorRetry: initialBackoff: 2ms connection: dialTimeout: 5s +encoder: + maxMessageSize: 100 +decoder: + maxMessageSize: 200 ` var cfg WriterConfiguration require.NoError(t, yaml.Unmarshal([]byte(str), &cfg)) @@ -116,4 +120,8 @@ connection: require.Equal(t, 2*time.Second, wOpts.CloseCheckInterval()) require.Equal(t, 2*time.Millisecond, wOpts.AckErrorRetryOptions().InitialBackoff()) require.Equal(t, 5*time.Second, wOpts.ConnectionOptions().DialTimeout()) + require.Equal(t, 100, wOpts.EncoderOptions().MaxMessageSize()) + require.NotNil(t, wOpts.EncoderOptions().BytesPool()) + require.Equal(t, 200, wOpts.DecoderOptions().MaxMessageSize()) + require.NotNil(t, wOpts.DecoderOptions().BytesPool()) } diff --git a/producer/writer/consumer_writer_test.go b/producer/writer/consumer_writer_test.go index 508140e..15102d2 100644 --- a/producer/writer/consumer_writer_test.go +++ b/producer/writer/consumer_writer_test.go @@ -382,8 +382,8 @@ func testConnectionOptions() ConnectionOptions { func testConsumeAndAckOnConnection( t *testing.T, conn net.Conn, - encOpts proto.BaseOptions, - decOpts proto.BaseOptions, + encOpts proto.Options, + decOpts proto.Options, ) { serverEncoder := proto.NewEncoder(encOpts) serverDecoder := proto.NewDecoder(conn, decOpts) @@ -403,8 +403,8 @@ func testConsumeAndAckOnConnection( func testConsumeAndAckOnConnectionListener( t *testing.T, lis net.Listener, - encOpts proto.BaseOptions, - decOpts proto.BaseOptions, + encOpts proto.Options, + decOpts proto.Options, ) { conn, err := lis.Accept() require.NoError(t, err) diff --git a/producer/writer/options.go b/producer/writer/options.go index b155487..d4aa4f2 100644 --- a/producer/writer/options.go +++ b/producer/writer/options.go @@ -308,13 +308,17 @@ type Options interface { // SetAckErrorRetryOptions sets the retrier for ack errors. SetAckErrorRetryOptions(value retry.Options) Options - EncoderOptions() proto.BaseOptions + // EncoderOptions returns the encoder's options. + EncoderOptions() proto.Options - SetEncoderOptions(value proto.BaseOptions) Options + // SetEncoderOptions sets the encoder's options. + SetEncoderOptions(value proto.Options) Options - DecoderOptions() proto.BaseOptions + // EncoderOptions returns the decoder's options. + DecoderOptions() proto.Options - SetDecoderOptions(value proto.BaseOptions) Options + // SetEncoderOptions sets the decoder's options. + SetDecoderOptions(value proto.Options) Options // ConnectionOptions returns the options for connections. ConnectionOptions() ConnectionOptions @@ -343,8 +347,8 @@ type writerOptions struct { initialAckMapSize int closeCheckInterval time.Duration ackErrRetryOpts retry.Options - encOpts proto.BaseOptions - decOpts proto.BaseOptions + encOpts proto.Options + decOpts proto.Options cOpts ConnectionOptions iOpts instrument.Options } @@ -361,8 +365,8 @@ func NewOptions() Options { initialAckMapSize: defaultInitialAckMapSize, closeCheckInterval: defaultCloseCheckInterval, ackErrRetryOpts: retry.NewOptions(), - encOpts: proto.NewBaseOptions(), - decOpts: proto.NewBaseOptions(), + encOpts: proto.NewOptions(), + decOpts: proto.NewOptions(), cOpts: NewConnectionOptions(), iOpts: instrument.NewOptions(), } @@ -498,21 +502,21 @@ func (opts *writerOptions) SetAckErrorRetryOptions(value retry.Options) Options return &o } -func (opts *writerOptions) EncoderOptions() proto.BaseOptions { +func (opts *writerOptions) EncoderOptions() proto.Options { return opts.encOpts } -func (opts *writerOptions) SetEncoderOptions(value proto.BaseOptions) Options { +func (opts *writerOptions) SetEncoderOptions(value proto.Options) Options { o := *opts o.encOpts = value return &o } -func (opts *writerOptions) DecoderOptions() proto.BaseOptions { +func (opts *writerOptions) DecoderOptions() proto.Options { return opts.decOpts } -func (opts *writerOptions) SetDecoderOptions(value proto.BaseOptions) Options { +func (opts *writerOptions) SetDecoderOptions(value proto.Options) Options { o := *opts o.decOpts = value return &o diff --git a/protocol/proto/benchmark_test.go b/protocol/proto/benchmark_test.go index b4ca8dd..14543ac 100644 --- a/protocol/proto/benchmark_test.go +++ b/protocol/proto/benchmark_test.go @@ -29,8 +29,8 @@ import ( func BenchmarkBaseEncodeDecodeRoundTrip(b *testing.B) { r := bytes.NewReader(nil) - encoder := NewEncoder(NewBaseOptions()) - decoder := NewDecoder(r, NewBaseOptions()) + encoder := NewEncoder(NewOptions()) + decoder := NewDecoder(r, NewOptions()) encodeMsg := msgpb.Message{ Metadata: msgpb.Metadata{}, Value: make([]byte, 200), diff --git a/protocol/proto/config.go b/protocol/proto/config.go index 10db64c..414e434 100644 --- a/protocol/proto/config.go +++ b/protocol/proto/config.go @@ -25,21 +25,21 @@ import ( "github.com/m3db/m3x/pool" ) -// BaseConfiguration configures an Encoder or a Decoder. -type BaseConfiguration struct { +// Configuration configures an Encoder or a Decoder. +type Configuration struct { MaxMessageSize *int `yaml:"maxMessageSize"` BytesPool *pool.BucketizedPoolConfiguration `yaml:"bytesPool"` } -// NewBaseOptions creates a new BaseOptions. -func (c *BaseConfiguration) NewBaseOptions ( +// NewOptions creates a new Options. +func (c *Configuration) NewOptions ( iOpts instrument.Options, -) BaseOptions { +) Options { var ( - baseOpts = NewBaseOptions() + opts = NewOptions() ) if c.MaxMessageSize != nil { - baseOpts = baseOpts.SetMaxMessageSize(*c.MaxMessageSize) + opts = opts.SetMaxMessageSize(*c.MaxMessageSize) } if c.BytesPool != nil { scope := iOpts.MetricsScope() @@ -50,7 +50,7 @@ func (c *BaseConfiguration) NewBaseOptions ( ))), ) p.Init() - baseOpts = baseOpts.SetBytesPool(p) + opts = opts.SetBytesPool(p) } - return baseOpts -} \ No newline at end of file + return opts +} diff --git a/protocol/proto/config_test.go b/protocol/proto/config_test.go index a06adcd..1d4a344 100644 --- a/protocol/proto/config_test.go +++ b/protocol/proto/config_test.go @@ -29,7 +29,7 @@ import ( yaml "gopkg.in/yaml.v2" ) -func TestBaseConfig(t *testing.T) { +func TestConfiguration(t *testing.T) { str := ` maxMessageSize: 1024 bytesPool: @@ -43,9 +43,9 @@ bytesPool: highWatermark: 0.02 ` - var cfg BaseConfiguration + var cfg Configuration require.NoError(t, yaml.Unmarshal([]byte(str), &cfg)) - opts := cfg.NewBaseOptions(instrument.NewOptions()) + opts := cfg.NewOptions(instrument.NewOptions()) require.Equal(t, 1024, opts.MaxMessageSize()) require.NotNil(t, opts.BytesPool()) b := opts.BytesPool().Get(2) diff --git a/protocol/proto/decoder.go b/protocol/proto/decoder.go index 382ed4e..f123ea1 100644 --- a/protocol/proto/decoder.go +++ b/protocol/proto/decoder.go @@ -35,9 +35,9 @@ type decoder struct { } // NewDecoder decodes a new decoder, the implementation is not thread safe. -func NewDecoder(r io.Reader, opts BaseOptions) Decoder { +func NewDecoder(r io.Reader, opts Options) Decoder { if opts == nil { - opts = NewBaseOptions() + opts = NewOptions() } pool := opts.BytesPool() return &decoder{ diff --git a/protocol/proto/encoder.go b/protocol/proto/encoder.go index 4528c77..927a294 100644 --- a/protocol/proto/encoder.go +++ b/protocol/proto/encoder.go @@ -34,9 +34,9 @@ type encoder struct { } // NewEncoder creates a new encoder, the implementation is not thread safe. -func NewEncoder(opts BaseOptions) Encoder { +func NewEncoder(opts Options) Encoder { if opts == nil { - opts = NewBaseOptions() + opts = NewOptions() } pool := opts.BytesPool() return &encoder{ diff --git a/protocol/proto/options.go b/protocol/proto/options.go index 44cde37..9a93789 100644 --- a/protocol/proto/options.go +++ b/protocol/proto/options.go @@ -34,36 +34,36 @@ var ( } ) -// NewBaseOptions creates a new BaseOptions. -func NewBaseOptions() BaseOptions { +// NewOptions creates a new Options. +func NewOptions() Options { pool := pool.NewBytesPool(defaultByteBuckets, nil) pool.Init() - return &baseOptions{ + return &options{ bytesPool: pool, maxMessageSize: defaultMaxMessageSize, } } -type baseOptions struct { +type options struct { maxMessageSize int bytesPool pool.BytesPool } -func (opts *baseOptions) MaxMessageSize() int { +func (opts *options) MaxMessageSize() int { return opts.maxMessageSize } -func (opts *baseOptions) SetMaxMessageSize(value int) BaseOptions { +func (opts *options) SetMaxMessageSize(value int) Options { o := *opts o.maxMessageSize = value return &o } -func (opts *baseOptions) BytesPool() pool.BytesPool { +func (opts *options) BytesPool() pool.BytesPool { return opts.bytesPool } -func (opts *baseOptions) SetBytesPool(value pool.BytesPool) BaseOptions { +func (opts *options) SetBytesPool(value pool.BytesPool) Options { o := *opts o.bytesPool = value return &o diff --git a/protocol/proto/roundtrip_test.go b/protocol/proto/roundtrip_test.go index fc828aa..2738375 100644 --- a/protocol/proto/roundtrip_test.go +++ b/protocol/proto/roundtrip_test.go @@ -32,12 +32,12 @@ import ( ) func TestBaseEncodeDecodeRoundTripWithoutPool(t *testing.T) { - enc := NewEncoder(NewBaseOptions().SetBytesPool(nil)).(*encoder) + enc := NewEncoder(NewOptions().SetBytesPool(nil)).(*encoder) require.Equal(t, 4, len(enc.buffer)) require.Equal(t, 4, cap(enc.buffer)) require.Empty(t, enc.Bytes()) r := bytes.NewReader(nil) - dec := NewDecoder(r, NewBaseOptions().SetBytesPool(nil)).(*decoder) + dec := NewDecoder(r, NewOptions().SetBytesPool(nil)).(*decoder) require.Equal(t, 4, len(dec.buffer)) require.Equal(t, 4, cap(dec.buffer)) encodeMsg := msgpb.Message{ @@ -64,12 +64,12 @@ func TestBaseEncodeDecodeRoundTripWithPool(t *testing.T) { p := getBytesPool(2, []int{2, 8, 100}) p.Init() - enc := NewEncoder(NewBaseOptions().SetBytesPool(p)).(*encoder) + enc := NewEncoder(NewOptions().SetBytesPool(p)).(*encoder) require.Equal(t, 8, len(enc.buffer)) require.Equal(t, 8, cap(enc.buffer)) r := bytes.NewReader(nil) - dec := NewDecoder(r, NewBaseOptions().SetBytesPool(p)).(*decoder) + dec := NewDecoder(r, NewOptions().SetBytesPool(p)).(*decoder) require.Equal(t, 8, len(dec.buffer)) require.Equal(t, 8, cap(dec.buffer)) encodeMsg := msgpb.Message{ @@ -114,7 +114,7 @@ func TestResetReader(t *testing.T) { } func TestEncodeMessageLargerThanMaxSize(t *testing.T) { - opts := NewBaseOptions().SetMaxMessageSize(4) + opts := NewOptions().SetMaxMessageSize(4) enc := NewEncoder(opts) encodeMsg := msgpb.Message{ Metadata: msgpb.Metadata{ @@ -143,7 +143,7 @@ func TestDecodeMessageLargerThanMaxSize(t *testing.T) { require.NoError(t, err) decodeMsg := msgpb.Message{} - opts := NewBaseOptions().SetMaxMessageSize(4) + opts := NewOptions().SetMaxMessageSize(4) dec := NewDecoder(bytes.NewReader(enc.Bytes()), opts) err = dec.Decode(&decodeMsg) require.Error(t, err) diff --git a/protocol/proto/types.go b/protocol/proto/types.go index c72c60c..4be8bec 100644 --- a/protocol/proto/types.go +++ b/protocol/proto/types.go @@ -60,17 +60,17 @@ type Decoder interface { ResetReader(r io.Reader) } -// BaseOptions configures a base encoder or decoder. -type BaseOptions interface { +// Options configures a encoder or decoder. +type Options interface { // MaxMessageSize returns the maximum message size. MaxMessageSize() int // SetMaxMessageSize sets the maximum message size. - SetMaxMessageSize(value int) BaseOptions + SetMaxMessageSize(value int) Options // BytesPool returns the bytes pool. BytesPool() pool.BytesPool // SetBytesPool sets the bytes pool. - SetBytesPool(value pool.BytesPool) BaseOptions + SetBytesPool(value pool.BytesPool) Options } From 7dd2a4f0d7a71ee89cc62e7007dcd0a826f883bc Mon Sep 17 00:00:00 2001 From: Siyu Yang Date: Fri, 20 Jul 2018 15:12:03 -0400 Subject: [PATCH 6/6] gofmt, remove redundant test --- consumer/config.go | 2 +- consumer/options.go | 8 ++-- consumer/types.go | 4 +- integration/integration_test.go | 74 ++++++++++++++++---------------- producer/config/writer.go | 4 +- producer/writer/options.go | 10 ++--- protocol/proto/config.go | 8 ++-- protocol/proto/roundtrip_test.go | 9 ---- 8 files changed, 55 insertions(+), 64 deletions(-) diff --git a/consumer/config.go b/consumer/config.go index 69fb40e..15c206b 100644 --- a/consumer/config.go +++ b/consumer/config.go @@ -30,7 +30,7 @@ import ( // Configuration configs the consumer options. type Configuration struct { - Encoder *proto.Configuration `yaml:"encoder"` + Encoder *proto.Configuration `yaml:"encoder"` Decoder *proto.Configuration `yaml:"decoder"` MessagePool *pool.ObjectPoolConfiguration `yaml:"messagePool"` AckFlushInterval *time.Duration `yaml:"ackFlushInterval"` diff --git a/consumer/options.go b/consumer/options.go index 492c265..d740afd 100644 --- a/consumer/options.go +++ b/consumer/options.go @@ -36,8 +36,8 @@ var ( ) type options struct { - encOptions proto.Options - decOptions proto.Options + encOptions proto.Options + decOptions proto.Options messagePoolOpts pool.ObjectPoolOptions ackFlushInterval time.Duration ackBufferSize int @@ -49,8 +49,8 @@ type options struct { // NewOptions creates a new options. func NewOptions() Options { return &options{ - encOptions: proto.NewOptions(), - decOptions: proto.NewOptions(), + encOptions: proto.NewOptions(), + decOptions: proto.NewOptions(), messagePoolOpts: pool.NewObjectPoolOptions(), ackFlushInterval: defaultAckFlushInterval, ackBufferSize: defaultAckBufferSize, diff --git a/consumer/types.go b/consumer/types.go index 623ca0c..79dd041 100644 --- a/consumer/types.go +++ b/consumer/types.go @@ -70,8 +70,8 @@ type Options interface { EncoderOptions() proto.Options // SetEncoderOptions sets the options for Encoder. - SetEncoderOptions(value proto.Options) Options - + SetEncoderOptions(value proto.Options) Options + // DecoderOptions returns the options for Decoder. DecoderOptions() proto.Options diff --git a/integration/integration_test.go b/integration/integration_test.go index a5afc87..e0c4a0a 100644 --- a/integration/integration_test.go +++ b/integration/integration_test.go @@ -50,8 +50,8 @@ func TestSharedConsumer(t *testing.T) { for i := 1; i <= maxProducers; i++ { s := newTestSetup(t, ctrl, i, []consumerServiceConfig{ - consumerServiceConfig{ct: topic.Shared, isSharded: true, instances: 5, replicas: 2}, - consumerServiceConfig{ct: topic.Shared, isSharded: false, instances: 5, replicas: 2}, + {ct: topic.Shared, isSharded: true, instances: 5, replicas: 2}, + {ct: topic.Shared, isSharded: false, instances: 5, replicas: 2}, }) s.Run(t, ctrl) @@ -71,7 +71,7 @@ func TestReplicatedConsumer(t *testing.T) { for i := 1; i <= maxProducers; i++ { s := newTestSetup(t, ctrl, i, []consumerServiceConfig{ - consumerServiceConfig{ct: topic.Replicated, isSharded: true, instances: 5, replicas: 2}, + {ct: topic.Replicated, isSharded: true, instances: 5, replicas: 2}, }) s.Run(t, ctrl) @@ -92,9 +92,9 @@ func TestSharedAndReplicatedConsumers(t *testing.T) { for i := 1; i <= maxProducers; i++ { for j := 1; j <= maxRF; j++ { s := newTestSetup(t, ctrl, i, []consumerServiceConfig{ - consumerServiceConfig{ct: topic.Shared, isSharded: true, instances: 5, replicas: j}, - consumerServiceConfig{ct: topic.Shared, isSharded: false, instances: 5, replicas: j}, - consumerServiceConfig{ct: topic.Replicated, isSharded: true, instances: 5, replicas: j}, + {ct: topic.Shared, isSharded: true, instances: 5, replicas: j}, + {ct: topic.Shared, isSharded: false, instances: 5, replicas: j}, + {ct: topic.Replicated, isSharded: true, instances: 5, replicas: j}, }) s.Run(t, ctrl) @@ -115,8 +115,8 @@ func TestSharedConsumerWithDeadInstance(t *testing.T) { for i := 1; i <= maxProducers; i++ { s := newTestSetup(t, ctrl, i, []consumerServiceConfig{ - consumerServiceConfig{ct: topic.Shared, isSharded: true, instances: 5, replicas: 2}, - consumerServiceConfig{ct: topic.Shared, isSharded: false, instances: 5, replicas: 2}, + {ct: topic.Shared, isSharded: true, instances: 5, replicas: 2}, + {ct: topic.Shared, isSharded: false, instances: 5, replicas: 2}, }) s.ScheduleOperations( @@ -149,8 +149,8 @@ func TestSharedConsumerWithDeadConnection(t *testing.T) { for i := 1; i <= maxProducers; i++ { s := newTestSetup(t, ctrl, i, []consumerServiceConfig{ - consumerServiceConfig{ct: topic.Shared, isSharded: true, instances: 5, replicas: 2}, - consumerServiceConfig{ct: topic.Shared, isSharded: false, instances: 5, replicas: 2}, + {ct: topic.Shared, isSharded: true, instances: 5, replicas: 2}, + {ct: topic.Shared, isSharded: false, instances: 5, replicas: 2}, }) s.ScheduleOperations( @@ -178,7 +178,7 @@ func TestReplicatedConsumerWithDeadConnection(t *testing.T) { for i := 1; i <= maxProducers; i++ { s := newTestSetup(t, ctrl, i, []consumerServiceConfig{ - consumerServiceConfig{ct: topic.Replicated, isSharded: true, instances: 5, replicas: 2}, + {ct: topic.Replicated, isSharded: true, instances: 5, replicas: 2}, }) s.ScheduleOperations( @@ -207,9 +207,9 @@ func TestSharedAndReplicatedConsumerWithDeadConnection(t *testing.T) { for i := 1; i <= maxProducers; i++ { for j := 1; j <= maxRF; j++ { s := newTestSetup(t, ctrl, i, []consumerServiceConfig{ - consumerServiceConfig{ct: topic.Shared, isSharded: true, instances: 5, replicas: j}, - consumerServiceConfig{ct: topic.Shared, isSharded: false, instances: 5, replicas: j}, - consumerServiceConfig{ct: topic.Replicated, isSharded: true, instances: 5, replicas: j}, + {ct: topic.Shared, isSharded: true, instances: 5, replicas: j}, + {ct: topic.Shared, isSharded: false, instances: 5, replicas: j}, + {ct: topic.Replicated, isSharded: true, instances: 5, replicas: j}, }) s.ScheduleOperations( @@ -246,8 +246,8 @@ func TestSharedConsumerAddInstances(t *testing.T) { for i := 1; i <= maxProducers; i++ { s := newTestSetup(t, ctrl, i, []consumerServiceConfig{ - consumerServiceConfig{ct: topic.Shared, isSharded: true, instances: 5, replicas: 2}, - consumerServiceConfig{ct: topic.Shared, isSharded: false, instances: 5, replicas: 2}, + {ct: topic.Shared, isSharded: true, instances: 5, replicas: 2}, + {ct: topic.Shared, isSharded: false, instances: 5, replicas: 2}, }) s.ScheduleOperations( @@ -275,7 +275,7 @@ func TestReplicatedConsumerAddInstances(t *testing.T) { for i := 1; i <= maxProducers; i++ { s := newTestSetup(t, ctrl, i, []consumerServiceConfig{ - consumerServiceConfig{ct: topic.Replicated, isSharded: true, instances: 5, replicas: 2}, + {ct: topic.Replicated, isSharded: true, instances: 5, replicas: 2}, }) s.ScheduleOperations( @@ -304,9 +304,9 @@ func TestSharedAndReplicatedConsumerAddInstances(t *testing.T) { for i := 1; i <= maxProducers; i++ { for j := 1; j <= maxRF; j++ { s := newTestSetup(t, ctrl, i, []consumerServiceConfig{ - consumerServiceConfig{ct: topic.Shared, isSharded: true, instances: 5, replicas: j}, - consumerServiceConfig{ct: topic.Shared, isSharded: false, instances: 5, replicas: j}, - consumerServiceConfig{ct: topic.Replicated, isSharded: true, instances: 5, replicas: j}, + {ct: topic.Shared, isSharded: true, instances: 5, replicas: j}, + {ct: topic.Shared, isSharded: false, instances: 5, replicas: j}, + {ct: topic.Replicated, isSharded: true, instances: 5, replicas: j}, }) s.ScheduleOperations( @@ -343,8 +343,8 @@ func TestSharedConsumerRemoveInstances(t *testing.T) { for i := 1; i <= maxProducers; i++ { s := newTestSetup(t, ctrl, i, []consumerServiceConfig{ - consumerServiceConfig{ct: topic.Shared, isSharded: true, instances: 5, replicas: 2}, - consumerServiceConfig{ct: topic.Shared, isSharded: false, instances: 5, replicas: 2}, + {ct: topic.Shared, isSharded: true, instances: 5, replicas: 2}, + {ct: topic.Shared, isSharded: false, instances: 5, replicas: 2}, }) s.ScheduleOperations( @@ -372,7 +372,7 @@ func TestReplicatedConsumerRemoveInstances(t *testing.T) { for i := 1; i <= maxProducers; i++ { s := newTestSetup(t, ctrl, i, []consumerServiceConfig{ - consumerServiceConfig{ct: topic.Replicated, isSharded: true, instances: 5, replicas: 2}, + {ct: topic.Replicated, isSharded: true, instances: 5, replicas: 2}, }) s.ScheduleOperations( @@ -401,9 +401,9 @@ func TestSharedAndReplicatedConsumerRemoveInstances(t *testing.T) { for i := 1; i <= maxProducers; i++ { for j := 1; j <= maxRF; j++ { s := newTestSetup(t, ctrl, i, []consumerServiceConfig{ - consumerServiceConfig{ct: topic.Shared, isSharded: true, instances: 5, replicas: j}, - consumerServiceConfig{ct: topic.Shared, isSharded: false, instances: 5, replicas: j}, - consumerServiceConfig{ct: topic.Replicated, isSharded: true, instances: 5, replicas: j}, + {ct: topic.Shared, isSharded: true, instances: 5, replicas: j}, + {ct: topic.Shared, isSharded: false, instances: 5, replicas: j}, + {ct: topic.Replicated, isSharded: true, instances: 5, replicas: j}, }) s.ScheduleOperations( @@ -440,8 +440,8 @@ func TestSharedConsumerReplaceInstances(t *testing.T) { for i := 1; i <= maxProducers; i++ { s := newTestSetup(t, ctrl, i, []consumerServiceConfig{ - consumerServiceConfig{ct: topic.Shared, isSharded: true, instances: 5, replicas: 2}, - consumerServiceConfig{ct: topic.Shared, isSharded: false, instances: 5, replicas: 2}, + {ct: topic.Shared, isSharded: true, instances: 5, replicas: 2}, + {ct: topic.Shared, isSharded: false, instances: 5, replicas: 2}, }) s.ScheduleOperations( @@ -469,7 +469,7 @@ func TestReplicatedConsumerReplaceInstances(t *testing.T) { for i := 1; i <= maxProducers; i++ { s := newTestSetup(t, ctrl, i, []consumerServiceConfig{ - consumerServiceConfig{ct: topic.Replicated, isSharded: true, instances: 5, replicas: 2}, + {ct: topic.Replicated, isSharded: true, instances: 5, replicas: 2}, }) s.ScheduleOperations( @@ -498,9 +498,9 @@ func TestSharedAndReplicatedConsumerReplaceInstances(t *testing.T) { for i := 1; i <= maxProducers; i++ { for j := 1; j <= maxRF; j++ { s := newTestSetup(t, ctrl, i, []consumerServiceConfig{ - consumerServiceConfig{ct: topic.Shared, isSharded: true, instances: 5, replicas: j}, - consumerServiceConfig{ct: topic.Shared, isSharded: false, instances: 5, replicas: j}, - consumerServiceConfig{ct: topic.Replicated, isSharded: true, instances: 5, replicas: j}, + {ct: topic.Shared, isSharded: true, instances: 5, replicas: j}, + {ct: topic.Shared, isSharded: false, instances: 5, replicas: j}, + {ct: topic.Replicated, isSharded: true, instances: 5, replicas: j}, }) s.ScheduleOperations( @@ -537,9 +537,9 @@ func TestRemoveConsumerService(t *testing.T) { for i := 1; i <= maxProducers; i++ { s := newTestSetup(t, ctrl, i, []consumerServiceConfig{ - consumerServiceConfig{ct: topic.Shared, isSharded: true, instances: 5, replicas: 2}, - consumerServiceConfig{ct: topic.Shared, isSharded: false, instances: 1, replicas: 1}, - consumerServiceConfig{ct: topic.Replicated, isSharded: true, instances: 5, replicas: 2}, + {ct: topic.Shared, isSharded: true, instances: 5, replicas: 2}, + {ct: topic.Shared, isSharded: false, instances: 1, replicas: 1}, + {ct: topic.Replicated, isSharded: true, instances: 5, replicas: 2}, }) s.ScheduleOperations( @@ -565,8 +565,8 @@ func TestAddConsumerService(t *testing.T) { for i := 1; i <= maxProducers; i++ { s := newTestSetup(t, ctrl, i, []consumerServiceConfig{ - consumerServiceConfig{ct: topic.Shared, isSharded: true, instances: 5, replicas: 2}, - consumerServiceConfig{ct: topic.Replicated, isSharded: true, instances: 5, replicas: 2}, + {ct: topic.Shared, isSharded: true, instances: 5, replicas: 2}, + {ct: topic.Replicated, isSharded: true, instances: 5, replicas: 2}, }) s.ScheduleOperations( diff --git a/producer/config/writer.go b/producer/config/writer.go index 94c094d..f1651d3 100644 --- a/producer/config/writer.go +++ b/producer/config/writer.go @@ -91,8 +91,8 @@ type WriterConfiguration struct { InitialAckMapSize *int `yaml:"initialAckMapSize"` CloseCheckInterval *time.Duration `yaml:"closeCheckInterval"` AckErrorRetry *retry.Configuration `yaml:"ackErrorRetry"` - Encoder *proto.Configuration `yaml:"encoder"` - Decoder *proto.Configuration `yaml:"decoder"` + Encoder *proto.Configuration `yaml:"encoder"` + Decoder *proto.Configuration `yaml:"decoder"` Connection *ConnectionConfiguration `yaml:"connection"` } diff --git a/producer/writer/options.go b/producer/writer/options.go index d4aa4f2..76eade3 100644 --- a/producer/writer/options.go +++ b/producer/writer/options.go @@ -313,7 +313,7 @@ type Options interface { // SetEncoderOptions sets the encoder's options. SetEncoderOptions(value proto.Options) Options - + // EncoderOptions returns the decoder's options. DecoderOptions() proto.Options @@ -347,8 +347,8 @@ type writerOptions struct { initialAckMapSize int closeCheckInterval time.Duration ackErrRetryOpts retry.Options - encOpts proto.Options - decOpts proto.Options + encOpts proto.Options + decOpts proto.Options cOpts ConnectionOptions iOpts instrument.Options } @@ -365,8 +365,8 @@ func NewOptions() Options { initialAckMapSize: defaultInitialAckMapSize, closeCheckInterval: defaultCloseCheckInterval, ackErrRetryOpts: retry.NewOptions(), - encOpts: proto.NewOptions(), - decOpts: proto.NewOptions(), + encOpts: proto.NewOptions(), + decOpts: proto.NewOptions(), cOpts: NewConnectionOptions(), iOpts: instrument.NewOptions(), } diff --git a/protocol/proto/config.go b/protocol/proto/config.go index 414e434..e60c61b 100644 --- a/protocol/proto/config.go +++ b/protocol/proto/config.go @@ -27,12 +27,12 @@ import ( // Configuration configures an Encoder or a Decoder. type Configuration struct { - MaxMessageSize *int `yaml:"maxMessageSize"` - BytesPool *pool.BucketizedPoolConfiguration `yaml:"bytesPool"` + MaxMessageSize *int `yaml:"maxMessageSize"` + BytesPool *pool.BucketizedPoolConfiguration `yaml:"bytesPool"` } // NewOptions creates a new Options. -func (c *Configuration) NewOptions ( +func (c *Configuration) NewOptions( iOpts instrument.Options, ) Options { var ( @@ -46,7 +46,7 @@ func (c *Configuration) NewOptions ( p := pool.NewBytesPool( c.BytesPool.NewBuckets(), c.BytesPool.NewObjectPoolOptions(iOpts.SetMetricsScope(scope.Tagged( - map[string]string{"pool": "bytes"}, + map[string]string{"pool": "bytes"}, ))), ) p.Init() diff --git a/protocol/proto/roundtrip_test.go b/protocol/proto/roundtrip_test.go index 2738375..e1472bd 100644 --- a/protocol/proto/roundtrip_test.go +++ b/protocol/proto/roundtrip_test.go @@ -150,15 +150,6 @@ func TestDecodeMessageLargerThanMaxSize(t *testing.T) { require.Contains(t, err.Error(), "larger than maximum supported size") } -func TestDecodeReset(t *testing.T) { - dec := NewDecoder(nil, nil) - require.Nil(t, dec.(*decoder).r) - - conn := new(net.TCPConn) - dec.ResetReader(conn) - require.Equal(t, conn, dec.(*decoder).r) -} - func TestEncodeDecodeRoundTrip(t *testing.T) { enc := NewEncoder(nil) dec := NewDecoder(nil, nil)