Skip to content

Commit

Permalink
streamingccl: stream span config checkpoints
Browse files Browse the repository at this point in the history
This patch modifies the span config event stream to emit a checkpoint event
containing the rangefeed frontier after the event stream processes each
rangefeed cache flush.

The span config client can then use this information while processing updates.
Specifically, the subscription.Next() call may return a
checkpoint which indicates that all updates up to a given frontier have been
emitted by the rangefeed.

This patch also fixes two bugs:
- prevents sending an empty batch of updates
- prevents sending system target span config updates

Informs cockroachdb#106823

Release note: None
  • Loading branch information
msbutler committed Aug 25, 2023
1 parent fbd85a2 commit 9cc4973
Show file tree
Hide file tree
Showing 4 changed files with 83 additions and 18 deletions.
29 changes: 20 additions & 9 deletions pkg/ccl/streamingccl/streamproducer/replication_stream_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -938,17 +938,28 @@ USE d;`)
codec := source.mu.codec.(*partitionStreamDecoder)
updateCount := 0
for {
source.mu.Lock()
require.True(t, source.mu.rows.Next())
source.mu.codec.decode()
if codec.e.Batch != nil {
for _, cfg := range codec.e.Batch.SpanConfigs {
if receivedSpanConfigs.maybeAddNewRecord(cfg.SpanConfig, cfg.Timestamp.WallTime) {
updateCount++
func() {
// This codeblock is wrapped in an anonymous function to ensure the source
// gets unlocked if an assertion fails. Else, the test can hang.
source.mu.Lock()
defer source.mu.Unlock()
require.True(t, source.mu.rows.Next())
source.mu.codec.decode()
if codec.e.Batch != nil {
require.Greater(t, len(codec.e.Batch.SpanConfigs), 0, "a non empty batch had zero span config updates")
for _, cfg := range codec.e.Batch.SpanConfigs {
if receivedSpanConfigs.maybeAddNewRecord(cfg.SpanConfig, cfg.Timestamp.WallTime) {
updateCount++
}
}
}
}
source.mu.Unlock()
if codec.e.Checkpoint != nil {
require.Equal(t, 1, len(codec.e.Checkpoint.ResolvedSpans))
// The frontier in the checkpoint must be greater or equal to the commit
// timestamp associated with the latest event.
require.LessOrEqual(t, receivedSpanConfigs.latestTime, codec.e.Checkpoint.ResolvedSpans[0].Timestamp.WallTime)
}
}()
if updateCount == len(expectedSpanConfigs.allUpdates) {
break
}
Expand Down
49 changes: 45 additions & 4 deletions pkg/ccl/streamingccl/streamproducer/span_config_event_stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ package streamproducer
import (
"context"

"github.com/cockroachdb/cockroach/pkg/jobs/jobspb"
"github.com/cockroachdb/cockroach/pkg/keys"
"github.com/cockroachdb/cockroach/pkg/kv"
"github.com/cockroachdb/cockroach/pkg/kv/kvclient/rangefeed/rangefeedcache"
Expand All @@ -22,6 +23,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/sql/sem/tree"
"github.com/cockroachdb/cockroach/pkg/sql/types"
"github.com/cockroachdb/cockroach/pkg/util/ctxgroup"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/mon"
"github.com/cockroachdb/cockroach/pkg/util/protoutil"
Expand Down Expand Up @@ -207,6 +209,10 @@ func (s *spanConfigEventStream) streamLoop(ctx context.Context) error {
pacer := makeCheckpointPacer(s.spec.Config.MinCheckpointFrequency)
bufferedEvents := make([]streampb.StreamedSpanConfigEntry, 0)
batcher := makeStreamEventBatcher()
frontier, err := makeSpanConfigFrontier(s.spec.Spans)
if err != nil {
return err
}

for {
select {
Expand All @@ -217,7 +223,13 @@ func (s *spanConfigEventStream) streamLoop(ctx context.Context) error {
case update := <-s.updateCh:
for _, ev := range update.Events {
spcfgEvent := ev.(*spanconfigkvsubscriber.BufferEvent)
target := spcfgEvent.Update.GetTarget().ToProto()
target := spcfgEvent.Update.GetTarget()
if target.IsSystemTarget() {
// We skip replicating SystemTarget Span configs as they are created
// via the internal span config machinery, not via a public facing API
// (like the KVAccessor).
continue
}
_, tenantID, err := keys.DecodeTenantPrefix(target.GetSpan().Key)
if err != nil {
return err
Expand All @@ -228,17 +240,23 @@ func (s *spanConfigEventStream) streamLoop(ctx context.Context) error {

streamedSpanCfgEntry := streampb.StreamedSpanConfigEntry{
SpanConfig: roachpb.SpanConfigEntry{
Target: target,
Target: target.ToProto(),
Config: spcfgEvent.Update.GetConfig(),
},
Timestamp: spcfgEvent.Timestamp(),
}
bufferedEvents = append(bufferedEvents, streamedSpanCfgEntry)
}
batcher.addSpanConfigs(bufferedEvents)
batcher.addSpanConfigs(bufferedEvents, update.Timestamp)
bufferedEvents = bufferedEvents[:0]
if pacer.shouldCheckpoint(update.Timestamp, true) {
if err := s.flushEvent(ctx, &streampb.StreamEvent{Batch: &batcher.batch}); err != nil {
if batcher.getSize() > 0 {
if err := s.flushEvent(ctx, &streampb.StreamEvent{Batch: &batcher.batch}); err != nil {
return err
}
}
frontier.update(update.Timestamp)
if err := s.flushEvent(ctx, &streampb.StreamEvent{Checkpoint: &frontier.checkpoint}); err != nil {
return err
}
batcher.reset()
Expand All @@ -247,6 +265,29 @@ func (s *spanConfigEventStream) streamLoop(ctx context.Context) error {
}
}

func makeSpanConfigFrontier(spans roachpb.Spans) (*spanConfigFrontier, error) {
if len(spans) != 1 {
return nil, errors.AssertionFailedf("unexpected input span length %d", len(spans))
}
checkpoint := streampb.StreamEvent_StreamCheckpoint{
ResolvedSpans: []jobspb.ResolvedSpan{{
Span: spans[0],
},
},
}
return &spanConfigFrontier{
checkpoint: checkpoint,
}, nil
}

type spanConfigFrontier struct {
checkpoint streampb.StreamEvent_StreamCheckpoint
}

func (spf *spanConfigFrontier) update(frontier hlc.Timestamp) {
spf.checkpoint.ResolvedSpans[0].Timestamp = frontier
}

func streamSpanConfigPartition(
evalCtx *eval.Context, spec streampb.StreamPartitionSpec,
) (eval.ValueGenerator, error) {
Expand Down
8 changes: 4 additions & 4 deletions pkg/ccl/streamingccl/streamproducer/stream_event_batcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,9 @@ func (seb *streamEventBatcher) addDelRange(d *kvpb.RangeFeedDeleteRange) {
// that an update is newer than the tracked frontier and by checking for
// duplicates within the inputEvents. This function assumes that the input events are
// timestamp ordered and that the largest timestamp in the batch is the rangefeed frontier.
func (seb *streamEventBatcher) addSpanConfigs(inputEvents []streampb.StreamedSpanConfigEntry) {
func (seb *streamEventBatcher) addSpanConfigs(
inputEvents []streampb.StreamedSpanConfigEntry, inputFrontier hlc.Timestamp,
) {

// eventSeenAtCurrentTimestamp is used to track unique events within the
// inputEvents at the current timestamp.
Expand All @@ -82,9 +84,7 @@ func (seb *streamEventBatcher) addSpanConfigs(inputEvents []streampb.StreamedSpa
seb.size += event.Size()
}
}
if len(seb.batch.SpanConfigs) > 0 {
seb.spanConfigFrontier = seb.batch.SpanConfigs[len(seb.batch.SpanConfigs)-1].Timestamp
}
seb.spanConfigFrontier = inputFrontier
}

func (seb *streamEventBatcher) getSize() int {
Expand Down
15 changes: 14 additions & 1 deletion pkg/ccl/streamingccl/streamproducer/stream_event_batcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,42 +96,55 @@ func TestBatchSpanConfigs(t *testing.T) {

for idx, toAdd := range []struct {
updates []streampb.StreamedSpanConfigEntry
frontier int64
expected []streampb.StreamedSpanConfigEntry
shouldFlushAfter bool
}{
{
// Both spans should buffer
updates: events(makeEntry(t1Span, 3, 1), makeEntry(t1Span, 4, 2)),
frontier: 2,
expected: events(makeEntry(t1Span, 3, 1), makeEntry(t1Span, 4, 2)),
},
{
// An event at a lower timestamp should not get flushed
updates: events(makeEntry(t1Span, 3, 1)),
frontier: 2,
expected: []streampb.StreamedSpanConfigEntry{},
},
{
// An empty update should not screw things up.
updates: []streampb.StreamedSpanConfigEntry{},
frontier: 2,
expected: []streampb.StreamedSpanConfigEntry{},
},
{
// An event at the frontier should not get flushed
updates: events(makeEntry(t1Span, 4, 2)),
frontier: 2,
expected: []streampb.StreamedSpanConfigEntry{},
},
{
// Only one of the duplicate events should flush
updates: events(makeEntry(t1Span, 3, 3), makeEntry(t1Span, 3, 3)),
frontier: 3,
expected: events(makeEntry(t1Span, 3, 3)),
},
{
// Only one of the duplicate events should flush, even if there's an event in between the duplicate events.
updates: events(makeEntry(t1Span, 3, 4), makeEntry(t2Span, 4, 4), makeEntry(t1Span, 3, 4)),
frontier: 4,
expected: events(makeEntry(t1Span, 3, 4), makeEntry(t2Span, 4, 4)),
},
// Duplicate events from previous addition get elided.
{
updates: events(makeEntry(t1Span, 3, 4), makeEntry(t2Span, 4, 4), makeEntry(t1Span, 3, 4), makeEntry(t2Span, 3, 5)),
frontier: 5,
expected: []streampb.StreamedSpanConfigEntry{makeEntry(t2Span, 3, 5)},
},
} {
bufferedEvents = append(bufferedEvents, toAdd.updates...)
seb.addSpanConfigs(bufferedEvents)
seb.addSpanConfigs(bufferedEvents, hlc.Timestamp{WallTime: toAdd.frontier})
require.Equal(t, toAdd.expected, seb.batch.SpanConfigs[previousBatchIdx:], fmt.Sprintf("failed on step %d (indexed by 0)", idx))
bufferedEvents = bufferedEvents[:0]
if rng.Intn(2) == 0 {
Expand Down

0 comments on commit 9cc4973

Please sign in to comment.