Skip to content

Commit

Permalink
consumer/protocol: add ring_buffer_size to ShardSpec
Browse files Browse the repository at this point in the history
Default to 8192 (existing value) if zero, but otherwise take the value
from the ShardSpec.

Issue gazette#304
  • Loading branch information
jgraettinger committed Sep 20, 2021
1 parent ae45b8a commit c167e91
Show file tree
Hide file tree
Showing 6 changed files with 184 additions and 120 deletions.
266 changes: 152 additions & 114 deletions consumer/protocol/protocol.pb.go

Large diffs are not rendered by default.

11 changes: 11 additions & 0 deletions consumer/protocol/protocol.proto
Original file line number Diff line number Diff line change
Expand Up @@ -204,6 +204,17 @@ message ShardSpec {
// produced -- an ACK which can't arrive until the transaction closes.
bool disable_wait_for_ack = 11
[ (gogoproto.moretags) = "yaml:\"disable_wait_for_ack,omitempty\"" ];

// Size of the ring buffer used to sequence read-uncommitted messages
// into consumed, read-committed ones. The ring buffer is a performance
// optimization only: applications will replay portions of journals as
// needed when messages aren't available in the buffer.
// It can remain small if source journal transactions are small,
// but larger transactions will achieve better performance with a
// larger ring.
// If zero, a reasonable default (currently 8192) is used.
uint32 ring_buffer_size = 12
[ (gogoproto.moretags) = "yaml:\"ring_buffer_size,omitempty\"" ];
}

// ConsumerSpec describes a Consumer process instance and its configuration.
Expand Down
13 changes: 11 additions & 2 deletions consumer/protocol/shard_spec_extensions.go
Original file line number Diff line number Diff line change
Expand Up @@ -143,17 +143,20 @@ func UnionShardSpecs(a, b ShardSpec) ShardSpec {
if a.MinTxnDuration == 0 {
a.MinTxnDuration = b.MinTxnDuration
}
if a.Disable == false {
if !a.Disable {
a.Disable = b.Disable
}
if a.HotStandbys == 0 {
a.HotStandbys = b.HotStandbys
}
a.LabelSet = pb.UnionLabelSets(a.LabelSet, b.LabelSet, pb.LabelSet{})

if a.DisableWaitForAck == false {
if !a.DisableWaitForAck {
a.DisableWaitForAck = b.DisableWaitForAck
}
if a.RingBufferSize == 0 {
a.RingBufferSize = b.RingBufferSize
}
return a
}

Expand Down Expand Up @@ -189,6 +192,9 @@ func IntersectShardSpecs(a, b ShardSpec) ShardSpec {
if a.DisableWaitForAck != b.DisableWaitForAck {
a.DisableWaitForAck = false
}
if a.RingBufferSize != b.RingBufferSize {
a.RingBufferSize = 0
}
return a
}

Expand Down Expand Up @@ -224,6 +230,9 @@ func SubtractShardSpecs(a, b ShardSpec) ShardSpec {
if a.DisableWaitForAck == b.DisableWaitForAck {
a.DisableWaitForAck = false
}
if a.RingBufferSize == b.RingBufferSize {
a.RingBufferSize = 0
}
return a
}

Expand Down
2 changes: 2 additions & 0 deletions consumer/protocol/shard_spec_extensions_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,7 @@ func (s *SpecSuite) TestSetOperations(c *gc.C) {
},
},
DisableWaitForAck: true,
RingBufferSize: 123,
}
var other = ShardSpec{
Sources: []ShardSpec_Source{
Expand All @@ -148,6 +149,7 @@ func (s *SpecSuite) TestSetOperations(c *gc.C) {
},
},
DisableWaitForAck: false,
RingBufferSize: 456,
}

c.Check(UnionShardSpecs(ShardSpec{}, model), gc.DeepEquals, model)
Expand Down
10 changes: 7 additions & 3 deletions consumer/shard.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,8 @@ const (
// be rather large, to minimize processing stalls. The current value will
// tolerate a data delay of up to 82ms @ 100K messages / sec without stalling.
messageBufferSize = 1 << 13 // 8192.
// Size of the ring buffer used by message.Sequencer.
messageRingSize = messageBufferSize
// Size of the ring buffer used by message.Sequencer if the ShardSpec doesn't specify.
defaultMessageRingSize = messageBufferSize
// Maximum interval between the newest and an older producer within a journal,
// before the message sequencer will prune the older producer state.
messageSequencerPruneHorizon = time.Hour * 24
Expand Down Expand Up @@ -248,10 +248,14 @@ func servePrimary(s *shard) (err error) {
startReadingMessages(s, cp, msgCh)
}

var ringSize = s.Spec().RingBufferSize
if ringSize == 0 {
ringSize = defaultMessageRingSize
}
s.sequencer = message.NewSequencer(
pc.FlattenReadThrough(cp),
pc.FlattenProducerStates(cp),
messageRingSize,
int(ringSize),
)

if err = runTransactions(s, cp, msgCh, hintsCh); err != nil {
Expand Down
2 changes: 1 addition & 1 deletion consumer/test_support_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -425,7 +425,7 @@ func playAndComplete(t require.TestingT, shard *shard) pc.Checkpoint {
shard.sequencer = message.NewSequencer(
pc.FlattenReadThrough(cp),
pc.FlattenProducerStates(cp),
messageRingSize,
defaultMessageRingSize,
)
return cp
}
Expand Down

0 comments on commit c167e91

Please sign in to comment.