Skip to content

Commit

Permalink
c2c: teach event stream to decode and send span config updates
Browse files Browse the repository at this point in the history
If the user creates a stream client for span config updates, they will now
receive span config updates in their subscription.

Informs cockroachdb#106823

Release note: None
  • Loading branch information
msbutler committed Jul 27, 2023
1 parent 5919aea commit b36580f
Show file tree
Hide file tree
Showing 9 changed files with 350 additions and 27 deletions.
66 changes: 66 additions & 0 deletions pkg/ccl/streamingccl/event.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,9 @@ const (
// CheckpointEvent indicates that GetResolvedSpans will be meaningful. The resolved
// timestamp indicates that all KVs have been emitted up to this timestamp.
CheckpointEvent
// SpanConfigEvent indicates that the SpanConfig field of an enven holds an updated
// SpanConfigRecord.
SpanConfigEvent
)

// Event describes an event emitted by a cluster to cluster stream. Its Type
Expand All @@ -52,6 +55,9 @@ type Event interface {
// GetResolvedSpans returns a list of span-time pairs indicating the time for
// which all KV events within that span has been emitted.
GetResolvedSpans() []jobspb.ResolvedSpan

// GetSpanConfigEvent returns a SpanConfig event if the EventType is SpanConfigEvent
GetSpanConfigEvent() *roachpb.SpanConfigEntry
}

// kvEvent is a key value pair that needs to be ingested.
Expand Down Expand Up @@ -86,6 +92,11 @@ func (kve kvEvent) GetResolvedSpans() []jobspb.ResolvedSpan {
return nil
}

// GetSpanConfigEvent implements the Event interface.
func (kve kvEvent) GetSpanConfigEvent() *roachpb.SpanConfigEntry {
return nil
}

// sstableEvent is a sstable that needs to be ingested.
type sstableEvent struct {
sst kvpb.RangeFeedSSTable
Expand Down Expand Up @@ -116,6 +127,11 @@ func (sste sstableEvent) GetResolvedSpans() []jobspb.ResolvedSpan {
return nil
}

// GetSpanConfigEvent implements the Event interface.
func (sste sstableEvent) GetSpanConfigEvent() *roachpb.SpanConfigEntry {
return nil
}

var _ Event = sstableEvent{}

// delRangeEvent is a DeleteRange event that needs to be ingested.
Expand Down Expand Up @@ -148,6 +164,11 @@ func (dre delRangeEvent) GetResolvedSpans() []jobspb.ResolvedSpan {
return nil
}

// GetSpanConfigEvent implements the Event interface.
func (dre delRangeEvent) GetSpanConfigEvent() *roachpb.SpanConfigEntry {
return nil
}

var _ Event = delRangeEvent{}

// checkpointEvent indicates that the stream has emitted every change for all
Expand Down Expand Up @@ -183,6 +204,47 @@ func (ce checkpointEvent) GetResolvedSpans() []jobspb.ResolvedSpan {
return ce.resolvedSpans
}

// GetSpanConfigEvent implements the Event interface.
func (ce checkpointEvent) GetSpanConfigEvent() *roachpb.SpanConfigEntry {
return nil
}

type spanConfigEvent struct {
spanConfigEvent roachpb.SpanConfigEntry
}

var _ Event = spanConfigEvent{}

// Type implements the Event interface.
func (spe spanConfigEvent) Type() EventType {
return SpanConfigEvent
}

// GetKV implements the Event interface.
func (spe spanConfigEvent) GetKV() *roachpb.KeyValue {
return nil
}

// GetSSTable implements the Event interface.
func (spe spanConfigEvent) GetSSTable() *kvpb.RangeFeedSSTable {
return nil
}

// GetDeleteRange implements the Event interface.
func (spe spanConfigEvent) GetDeleteRange() *kvpb.RangeFeedDeleteRange {
return nil
}

// GetResolvedSpans implements the Event interface.
func (spe spanConfigEvent) GetResolvedSpans() []jobspb.ResolvedSpan {
return nil
}

// GetSpanConfigEvent implements the Event interface.
func (spe spanConfigEvent) GetSpanConfigEvent() *roachpb.SpanConfigEntry {
return &spe.spanConfigEvent
}

// MakeKVEvent creates an Event from a KV.
func MakeKVEvent(kv roachpb.KeyValue) Event {
return kvEvent{kv: kv}
Expand All @@ -202,3 +264,7 @@ func MakeDeleteRangeEvent(delRange kvpb.RangeFeedDeleteRange) Event {
func MakeCheckpointEvent(resolvedSpans []jobspb.ResolvedSpan) Event {
return checkpointEvent{resolvedSpans: resolvedSpans}
}

func MakeSpanConfigEvent(spanConfig roachpb.SpanConfigEntry) Event {
return spanConfigEvent{spanConfigEvent: spanConfig}
}
14 changes: 10 additions & 4 deletions pkg/ccl/streamingccl/streamclient/partitioned_stream_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -311,19 +311,25 @@ func parseEvent(streamEvent *streampb.StreamEvent) streamingccl.Event {

var event streamingccl.Event
if streamEvent.Batch != nil {
if len(streamEvent.Batch.Ssts) > 0 {
switch {
case len(streamEvent.Batch.Ssts) > 0:
event = streamingccl.MakeSSTableEvent(streamEvent.Batch.Ssts[0])
streamEvent.Batch.Ssts = streamEvent.Batch.Ssts[1:]
} else if len(streamEvent.Batch.KeyValues) > 0 {
case len(streamEvent.Batch.KeyValues) > 0:
event = streamingccl.MakeKVEvent(streamEvent.Batch.KeyValues[0])
streamEvent.Batch.KeyValues = streamEvent.Batch.KeyValues[1:]
} else if len(streamEvent.Batch.DelRanges) > 0 {
case len(streamEvent.Batch.DelRanges) > 0:
event = streamingccl.MakeDeleteRangeEvent(streamEvent.Batch.DelRanges[0])
streamEvent.Batch.DelRanges = streamEvent.Batch.DelRanges[1:]
case len(streamEvent.Batch.SpanConfigs) > 0:
event = streamingccl.MakeSpanConfigEvent(streamEvent.Batch.SpanConfigs[0])
streamEvent.Batch.SpanConfigs = streamEvent.Batch.SpanConfigs[1:]
}

if len(streamEvent.Batch.KeyValues) == 0 &&
len(streamEvent.Batch.Ssts) == 0 &&
len(streamEvent.Batch.DelRanges) == 0 {
len(streamEvent.Batch.DelRanges) == 0 &&
len(streamEvent.Batch.SpanConfigs) == 0 {
streamEvent.Batch = nil
}
}
Expand Down
5 changes: 5 additions & 0 deletions pkg/ccl/streamingccl/streamproducer/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@ go_library(
importpath = "github.com/cockroachdb/cockroach/pkg/ccl/streamingccl/streamproducer",
visibility = ["//visibility:public"],
deps = [
"//pkg/ccl/changefeedccl/cdcevent",
"//pkg/ccl/changefeedccl/changefeedbase",
"//pkg/ccl/streamingccl",
"//pkg/ccl/streamingccl/replicationutils",
"//pkg/ccl/utilccl",
Expand All @@ -31,6 +33,7 @@ go_library(
"//pkg/roachpb",
"//pkg/security/username",
"//pkg/settings/cluster",
"//pkg/spanconfig",
"//pkg/sql",
"//pkg/sql/catalog/descpb",
"//pkg/sql/catalog/descs",
Expand Down Expand Up @@ -97,6 +100,8 @@ go_test(
"//pkg/security/username",
"//pkg/server",
"//pkg/settings/cluster",
"//pkg/spanconfig",
"//pkg/spanconfig/spanconfigkvaccessor",
"//pkg/sql",
"//pkg/sql/catalog/descs",
"//pkg/sql/catalog/desctestutils",
Expand Down
70 changes: 55 additions & 15 deletions pkg/ccl/streamingccl/streamproducer/event_stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,15 +13,19 @@ import (
"fmt"
"time"

"github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/cdcevent"
"github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/changefeedbase"
"github.com/cockroachdb/cockroach/pkg/ccl/streamingccl/replicationutils"
"github.com/cockroachdb/cockroach/pkg/jobs/jobspb"
"github.com/cockroachdb/cockroach/pkg/keys"
"github.com/cockroachdb/cockroach/pkg/kv"
"github.com/cockroachdb/cockroach/pkg/kv/kvclient/kvcoord"
"github.com/cockroachdb/cockroach/pkg/kv/kvclient/rangefeed"
"github.com/cockroachdb/cockroach/pkg/kv/kvpb"
"github.com/cockroachdb/cockroach/pkg/repstream/streampb"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/sql"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/descpb"
"github.com/cockroachdb/cockroach/pkg/sql/sem/eval"
"github.com/cockroachdb/cockroach/pkg/sql/sem/tree"
"github.com/cockroachdb/cockroach/pkg/sql/types"
Expand All @@ -48,13 +52,14 @@ type eventStream struct {
data tree.Datums // Data to send to the consumer

// Fields below initialized when Start called.
rf *rangefeed.RangeFeed // Currently running rangefeed.
streamGroup ctxgroup.Group // Context group controlling stream execution.
doneChan chan struct{} // Channel signaled to close the stream loop.
eventsCh chan kvcoord.RangeFeedMessage // Channel receiving rangefeed events.
errCh chan error // Signaled when error occurs in rangefeed.
streamCh chan tree.Datums // Channel signaled to forward datums to consumer.
sp *tracing.Span // Span representing the lifetime of the eventStream.
rf *rangefeed.RangeFeed // Currently running rangefeed.
streamGroup ctxgroup.Group // Context group controlling stream execution.
doneChan chan struct{} // Channel signaled to close the stream loop.
eventsCh chan kvcoord.RangeFeedMessage // Channel receiving rangefeed events.
errCh chan error // Signaled when error occurs in rangefeed.
streamCh chan tree.Datums // Channel signaled to forward datums to consumer.
sp *tracing.Span // Span representing the lifetime of the eventStream.
spanConfigDecoder cdcevent.Decoder
}

var _ eval.ValueGenerator = (*eventStream)(nil)
Expand Down Expand Up @@ -94,6 +99,10 @@ func (s *eventStream) Start(ctx context.Context, txn *kv.Txn) error {

s.doneChan = make(chan struct{})

if err := s.maybeInitSpanConfigDecoder(ctx); err != nil {
return err
}

// Common rangefeed options.
opts := []rangefeed.Option{
rangefeed.WithOnCheckpoint(s.onCheckpoint),
Expand Down Expand Up @@ -160,6 +169,25 @@ func (s *eventStream) Start(ctx context.Context, txn *kv.Txn) error {
return nil
}

func (s *eventStream) maybeInitSpanConfigDecoder(ctx context.Context) error {
if !s.spec.Config.SpanConfigsForTenant.IsSet() {
return nil
}
_, tableID, err := keys.SystemSQLCodec.DecodeTablePrefix(s.spec.Spans[0].Key)
if err != nil {
return err
}
targets := changefeedbase.Targets{}
targets.Add(changefeedbase.Target{
Type: jobspb.ChangefeedTargetSpecification_PRIMARY_FAMILY_ONLY,
TableID: descpb.ID(tableID),
FamilyName: "primary",
})

s.spanConfigDecoder, err = cdcevent.NewEventDecoder(ctx, s.execCfg, targets, false, false)
return err
}

func (s *eventStream) maybeSetError(err error) {
// Only send the error if the channel is empty, else it's ok to swallow the
// error because the first error in the channel will shut down the event
Expand Down Expand Up @@ -363,11 +391,14 @@ func (p *checkpointPacer) shouldCheckpoint(

// Add a RangeFeedSSTable into current batch.
func (s *eventStream) addSST(
sst *kvpb.RangeFeedSSTable, registeredSpan roachpb.Span, seb *streamEventBatcher,
ctx context.Context,
sst *kvpb.RangeFeedSSTable,
registeredSpan roachpb.Span,
seb *streamEventBatcher,
) error {
// We send over the whole SSTable if the sst span is within
// the registered span boundaries.
if registeredSpan.Contains(sst.Span) {
if registeredSpan.Contains(sst.Span) && !s.spec.Config.SpanConfigsForTenant.IsSet() {
seb.addSST(sst)
return nil
}
Expand All @@ -381,11 +412,15 @@ func (s *eventStream) addSST(
// key value and each MVCCRangeKey value in the trimmed SSTable.
return replicationutils.ScanSST(sst, registeredSpan,
func(mvccKV storage.MVCCKeyValue) error {
seb.addKV(&roachpb.KeyValue{
kv := roachpb.KeyValue{
Key: mvccKV.Key.Key,
Value: roachpb.Value{
RawBytes: mvccKV.Value,
Timestamp: mvccKV.Key.Timestamp}})
Timestamp: mvccKV.Key.Timestamp}}
if s.spec.Config.SpanConfigsForTenant.IsSet() {
return seb.addKVAsSpanConfig(ctx, &kv)
}
seb.addKV(&kv)
return nil
}, func(rangeKeyVal storage.MVCCRangeKeyValue) error {
seb.addDelRange(&kvpb.RangeFeedDeleteRange{
Expand All @@ -403,7 +438,7 @@ func (s *eventStream) addSST(
// accumulating them in a batch, and sending those events to the ValueGenerator.
func (s *eventStream) streamLoop(ctx context.Context, frontier *span.Frontier) error {
pacer := makeCheckpointPacer(s.spec.Config.MinCheckpointFrequency)
seb := makeStreamEventBatcher()
seb := makeStreamEventBatcher(s.spanConfigDecoder, s.spec.Config.SpanConfigsForTenant)

maybeFlushBatch := func(force bool) error {
if (force && seb.getSize() > 0) || seb.getSize() > int(s.spec.Config.BatchByteSize) {
Expand Down Expand Up @@ -431,10 +466,15 @@ func (s *eventStream) streamLoop(ctx context.Context, frontier *span.Frontier) e
case ev := <-s.eventsCh:
switch {
case ev.Val != nil:
seb.addKV(&roachpb.KeyValue{
keyValue := roachpb.KeyValue{
Key: ev.Val.Key,
Value: ev.Val.Value,
})
}
if !s.spec.Config.SpanConfigsForTenant.IsSet() {
seb.addKV(&keyValue)
} else if err := seb.addKVAsSpanConfig(ctx, &keyValue); err != nil {
return err
}
if err := maybeFlushBatch(flushIfNeeded); err != nil {
return err
}
Expand All @@ -454,7 +494,7 @@ func (s *eventStream) streamLoop(ctx context.Context, frontier *span.Frontier) e
}
}
case ev.SST != nil:
err := s.addSST(ev.SST, ev.RegisteredSpan, seb)
err := s.addSST(ctx, ev.SST, ev.RegisteredSpan, seb)
if err != nil {
return err
}
Expand Down
Loading

0 comments on commit b36580f

Please sign in to comment.