Skip to content

Commit

Permalink
c2c: write all span config updates from intial scan in one transaction
Browse files Browse the repository at this point in the history
Previously, the spanConfigIngestor treated updates from the rangefeed initial
scan the same way as regular updates: the ingestor would incrementally update
the destination side span config table by flushing updates with same source
side transaction timestamp. To undestand why this is problematic, recall that
an initial scan contains all the latest span config records for the relevant
tenant. So, some of these updates may have already been written to the
destination side. Further, the initial scan does not replicate source side span
config updates that have since been written over by later updates. To
summarize, its not possible to replicate a consistent view of the span
config table by writing incremental updates surfaced by the initial scan.

In this patch, the ingestor now buffers all updates surfaced by the initial
scan and flushes these updates in one transaction which also deletes all
existing span config records for the replicating tenant.

Fixes cockroachdb#106823

Release note: None
  • Loading branch information
msbutler committed Sep 5, 2023
1 parent 17d685d commit 911460e
Show file tree
Hide file tree
Showing 9 changed files with 511 additions and 155 deletions.
3 changes: 3 additions & 0 deletions pkg/ccl/streamingccl/replicationtestutils/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -26,12 +26,15 @@ go_library(
"//pkg/security/username",
"//pkg/settings/cluster",
"//pkg/spanconfig",
"//pkg/spanconfig/spanconfigkvaccessor",
"//pkg/sql",
"//pkg/sql/catalog",
"//pkg/sql/catalog/descpb",
"//pkg/sql/catalog/desctestutils",
"//pkg/sql/execinfra",
"//pkg/sql/isql",
"//pkg/sql/rowenc",
"//pkg/sql/sem/catconstants",
"//pkg/sql/sem/tree",
"//pkg/storage",
"//pkg/testutils",
Expand Down
75 changes: 75 additions & 0 deletions pkg/ccl/streamingccl/replicationtestutils/span_config_helpers.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,21 @@
package replicationtestutils

import (
"context"
"fmt"
"strings"
"testing"

"github.com/cockroachdb/cockroach/pkg/base"
"github.com/cockroachdb/cockroach/pkg/keys"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/spanconfig"
"github.com/cockroachdb/cockroach/pkg/spanconfig/spanconfigkvaccessor"
"github.com/cockroachdb/cockroach/pkg/sql"
"github.com/cockroachdb/cockroach/pkg/sql/isql"
"github.com/cockroachdb/cockroach/pkg/sql/sem/catconstants"
"github.com/cockroachdb/cockroach/pkg/sql/sem/tree"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/stretchr/testify/require"
)

Expand Down Expand Up @@ -42,3 +53,67 @@ func RecordToEntry(record spanconfig.Record) roachpb.SpanConfigEntry {
Config: c,
}
}

// PrettyRecords pretty prints the span config target and config ttl.
func PrettyRecords(records []spanconfig.Record) string {
var b strings.Builder
for _, update := range records {
b.WriteString(fmt.Sprintf(" %s: ttl %d,", update.GetTarget().GetSpan(), update.GetConfig().GCPolicy.TTLSeconds))
}
return b.String()
}

// NewReplicationHelperWithDummySpanConfigTable creates a new ReplicationHelper,
// a tenant, and a mock span config table that a spanConfigStreamClient can
// listen for updates on. To mimic tenant creation, this helper writes a
// spanConfig with the target [tenantPrefix,tenantPrefix.Next()). During tenant
// creation, this span config induces a range split on the tenant's start key.
func NewReplicationHelperWithDummySpanConfigTable(
ctx context.Context, t *testing.T, streamingTestKnobs *sql.StreamingTestingKnobs,
) (*ReplicationHelper, *spanconfigkvaccessor.KVAccessor, TenantState, func()) {
const dummySpanConfigurationsName = "dummy_span_configurations"
dummyFQN := tree.NewTableNameWithSchema("d", catconstants.PublicSchemaName, dummySpanConfigurationsName)

streamingTestKnobs.MockSpanConfigTableName = dummyFQN
h, cleanup := NewReplicationHelper(t, base.TestServerArgs{
DefaultTestTenant: base.TestControlsTenantsExplicitly,
Knobs: base.TestingKnobs{
Streaming: streamingTestKnobs,
},
})

h.SysSQL.Exec(t, `
CREATE DATABASE d;
USE d;`)
h.SysSQL.Exec(t, fmt.Sprintf("CREATE TABLE %s (LIKE system.span_configurations INCLUDING ALL)", dummyFQN))

sourceAccessor := spanconfigkvaccessor.New(
h.SysServer.DB(),
h.SysServer.InternalExecutor().(isql.Executor),
h.SysServer.ClusterSettings(),
h.SysServer.Clock(),
dummyFQN.String(),
nil, /* knobs */
)

sourceTenantID := roachpb.MustMakeTenantID(uint64(10))
sourceTenant, tenantCleanup := h.CreateTenant(t, sourceTenantID, "app")

// To mimic tenant creation, write the source tenant split key to the dummy
// span config table. For more info on this split key, read up on
// https://github.com/cockroachdb/cockroach/pull/104920
tenantPrefix := keys.MakeTenantPrefix(sourceTenantID)
tenantSplitSpan := roachpb.Span{Key: tenantPrefix, EndKey: tenantPrefix.Next()}

require.NoError(t, sourceAccessor.UpdateSpanConfigRecords(
ctx,
[]spanconfig.Target{},
[]spanconfig.Record{MakeSpanConfigRecord(t, tenantSplitSpan, 14400)},
hlc.MinTimestamp,
hlc.MaxTimestamp))

return h, sourceAccessor, sourceTenant, func() {
tenantCleanup()
cleanup()
}
}
2 changes: 0 additions & 2 deletions pkg/ccl/streamingccl/streamingest/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -135,15 +135,13 @@ go_test(
"//pkg/settings",
"//pkg/settings/cluster",
"//pkg/spanconfig",
"//pkg/spanconfig/spanconfigkvaccessor",
"//pkg/sql",
"//pkg/sql/catalog/descs",
"//pkg/sql/catalog/desctestutils",
"//pkg/sql/execinfra",
"//pkg/sql/execinfrapb",
"//pkg/sql/isql",
"//pkg/sql/physicalplan",
"//pkg/sql/sem/catconstants",
"//pkg/sql/sem/eval",
"//pkg/sql/sem/tree",
"//pkg/sql/sqlliveness",
Expand Down
2 changes: 2 additions & 0 deletions pkg/ccl/streamingccl/streamingest/datadriven_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -292,6 +292,8 @@ func TestDataDriven(t *testing.T) {
jobutils.WaitForJobToFail(t, runner, jobPBID)
case "reverting":
jobutils.WaitForJobReverting(t, runner, jobPBID)
case "running":
jobutils.WaitForJobToRun(t, runner, jobPBID)
default:
t.Fatalf("unknown state %s", state)
}
Expand Down
116 changes: 88 additions & 28 deletions pkg/ccl/streamingccl/streamingest/ingest_span_configs.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/jobs"
"github.com/cockroachdb/cockroach/pkg/jobs/jobspb"
"github.com/cockroachdb/cockroach/pkg/keys"
"github.com/cockroachdb/cockroach/pkg/kv"
"github.com/cockroachdb/cockroach/pkg/repstream/streampb"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/settings/cluster"
Expand Down Expand Up @@ -48,12 +49,12 @@ import (
// applies to) never overlaps with any other span configuration target. Else,
// C2C would break the span config reconciliation system.
//
// TODO(msbutler): on an initial scan, we need to buffer up all updates and
// write them to the span config table in one transaction, along with a delete
// over the whole tenant key span. Since C2C does not lay a PTS on the source
// side span config table, the initial scan on resumption may miss updates;
// therefore, the only way to cleanly update the destination side span config
// table is to write the latest state of the source table and delete all
// During the rangefeed initial scan, the spanConfigIngestor buffers up all
// updates and writes them to the span config table in one transaction, along
// with a delete over the whole tenant key span. Since C2C does not lay a PTS on
// the source side span config table, the initial scan on resumption may miss
// updates; therefore, the only way to cleanly update the destination side span
// config table is to write the latest state of the source table and delete all
// existing state, in one transaction.
type spanConfigIngestor struct {
accessor spanconfig.KVAccessor
Expand All @@ -65,6 +66,9 @@ type spanConfigIngestor struct {
settings *cluster.Settings
client streamclient.Client
rekeyer *backupccl.KeyRewriter
initialScanComplete bool
destinationTenantKeySpan roachpb.Span
db *kv.DB
testingKnobs *sql.StreamingTestingKnobs
}

Expand Down Expand Up @@ -93,15 +97,20 @@ func makeSpanConfigIngestor(
if err != nil {
return nil, err
}

destTenantStartKey := keys.MakeTenantPrefix(details.DestinationTenantID)
destTenantSpan := roachpb.Span{Key: destTenantStartKey, EndKey: destTenantStartKey.PrefixEnd()}
log.Infof(ctx, "initialized span config ingestor")
return &spanConfigIngestor{
accessor: execCfg.SpanConfigKVAccessor,
settings: execCfg.Settings,
session: ingestionJob.Session(),
client: client,
rekeyer: rekeyer,
stopperCh: stopperCh,
testingKnobs: execCfg.StreamingTestingKnobs,
accessor: execCfg.SpanConfigKVAccessor,
settings: execCfg.Settings,
session: ingestionJob.Session(),
client: client,
rekeyer: rekeyer,
stopperCh: stopperCh,
destinationTenantKeySpan: destTenantSpan,
db: execCfg.DB,
testingKnobs: execCfg.StreamingTestingKnobs,
}, nil
}

Expand Down Expand Up @@ -147,7 +156,7 @@ func (sc *spanConfigIngestor) consumeEvent(ctx context.Context, event streamingc
case streamingccl.SpanConfigEvent:
return sc.bufferRecord(ctx, event.GetSpanConfigEvent())
case streamingccl.CheckpointEvent:
return sc.maybeFlushEvents(ctx)
return sc.maybeFlushOnCheckpoint(ctx)
default:
return errors.AssertionFailedf("received non span config update %s", event)
}
Expand Down Expand Up @@ -179,11 +188,8 @@ func (sc *spanConfigIngestor) bufferRecord(
return nil
}
targetSpan := roachpb.Span{Key: destStartKey, EndKey: destEndKey}
if sc.lastBufferedSourceTimestamp.Less(update.Timestamp) {
// If this event was originally written at a later timestamp than what's in the buffer, flush the buffer.
if err := sc.maybeFlushEvents(ctx); err != nil {
return err
}
if err := sc.maybeFlushOnUpdate(ctx, update.Timestamp); err != nil {
return err
}
target := spanconfig.MakeTargetFromSpan(targetSpan)
if update.SpanConfig.Config.IsEmpty() {
Expand All @@ -198,21 +204,35 @@ func (sc *spanConfigIngestor) bufferRecord(
sc.lastBufferedSourceTimestamp = update.Timestamp
return nil
}
func (sc *spanConfigIngestor) maybeFlushEvents(ctx context.Context) error {
if len(sc.bufferedUpdates) != 0 || len(sc.bufferedDeletes) != 0 {

func (sc *spanConfigIngestor) bufferIsEmpty() bool {
return len(sc.bufferedUpdates) == 0 && len(sc.bufferedDeletes) == 0
}
func (sc *spanConfigIngestor) maybeFlushOnUpdate(
ctx context.Context, updateTimestamp hlc.Timestamp,
) error {
// If this event was originally written at a later timestamp and the initial scan has complete, flush the current buffer.
if sc.initialScanComplete &&
sc.lastBufferedSourceTimestamp.Less(updateTimestamp) &&
!sc.bufferIsEmpty() {
return sc.flushEvents(ctx)
}
return nil
}

func (sc *spanConfigIngestor) maybeFlushOnCheckpoint(ctx context.Context) error {
if !sc.bufferIsEmpty() {
return sc.flushEvents(ctx)
} else if !sc.initialScanComplete {
return errors.AssertionFailedf("a flush after the initial scan checkpoint must have data in it")
}
return nil
}

// flushEvents writes all buffered events to the system span configuration table
// in one transaction via kvAccesor.UpdateSpanConfigRecords.
func (sc *spanConfigIngestor) flushEvents(ctx context.Context) error {
log.VEventf(ctx, 2, "flushing span config %d updates and %d deletes", len(sc.bufferedUpdates), len(sc.bufferedDeletes))
if sc.testingKnobs != nil && sc.testingKnobs.BeforeIngestSpanConfigFlush != nil {
sc.testingKnobs.BeforeIngestSpanConfigFlush(ctx, sc.bufferedUpdates, sc.bufferedDeletes)
}

retryOpts := retry.Options{
InitialBackoff: 1 * time.Second,
MaxBackoff: 5 * time.Second,
Expand All @@ -224,9 +244,15 @@ func (sc *spanConfigIngestor) flushEvents(ctx context.Context) error {
if sessionExpiration.IsEmpty() {
return errors.Errorf("sqlliveness session has expired")
}
err := sc.accessor.UpdateSpanConfigRecords(
ctx, sc.bufferedDeletes, sc.bufferedUpdates, sessionStart, sessionExpiration,
)
var err error
if !sc.initialScanComplete {
// The first flush will always contain all span configs found during the initial scan.
err = sc.flushInitialScan(ctx, sessionStart, sessionExpiration)
} else {
err = sc.accessor.UpdateSpanConfigRecords(
ctx, sc.bufferedDeletes, sc.bufferedUpdates, sessionStart, sessionExpiration,
)
}
if err != nil {
if spanconfig.IsCommitTimestampOutOfBoundsError(err) {
// We expect the underlying sqlliveness session's expiration to be
Expand All @@ -239,7 +265,41 @@ func (sc *spanConfigIngestor) flushEvents(ctx context.Context) error {
}
break
}
if sc.testingKnobs != nil && sc.testingKnobs.RightAfterSpanConfigFlush != nil {
sc.testingKnobs.RightAfterSpanConfigFlush(ctx, sc.bufferedUpdates, sc.bufferedDeletes)
}
sc.bufferedUpdates = sc.bufferedUpdates[:0]
sc.bufferedDeletes = sc.bufferedDeletes[:0]
return nil
}

// flushInitialScan flushes all contents from the source side rangefeed's
// initial scan. The function assumes the buffer contains only updates from the
// initial scan. To obey destination side span config invariants, the function
// deletes all existing span config records related to the replicating tenant in
// the same transaction that it writes all initial scan updates.
func (sc *spanConfigIngestor) flushInitialScan(
ctx context.Context, sessionStart, sessionExpiration hlc.Timestamp,
) error {
if len(sc.bufferedDeletes) != 0 {
return errors.AssertionFailedf("initial scan flush should not contain records to delete")
}
target := spanconfig.MakeTargetFromSpan(sc.destinationTenantKeySpan)
return sc.db.Txn(ctx, func(ctx context.Context, txn *kv.Txn) error {
accessor := sc.accessor.WithTxn(ctx, txn)
existingRecords, err := accessor.GetSpanConfigRecords(ctx, []spanconfig.Target{target})
if err != nil {
return err
}
for _, record := range existingRecords {
sc.bufferedDeletes = append(sc.bufferedDeletes, record.GetTarget())
}
if err := accessor.UpdateSpanConfigRecords(
ctx, sc.bufferedDeletes, sc.bufferedUpdates, sessionStart, sessionExpiration,
); err != nil {
return err
}
sc.initialScanComplete = true
return nil
})
}
Loading

0 comments on commit 911460e

Please sign in to comment.