Skip to content

Commit

Permalink
streamingest: replicate span configs relevant to replicating tenant
Browse files Browse the repository at this point in the history
This patch spawns a new goroutine on the stream ingestion job coordinator which
in turn creates a special span config client and subscription for replicating
span config updates. As updates come in, the coordinator will buffer updates
and deletes with the same source side commit timestamp and write them in a
transaction to the destination side system span configuration table when it
observes a new update with a newer timestamp.

The ingestion side assumes each replicated update is unique and in timestamp
order, which is enforced by the producer side logic built in cockroachdb#108356. This
assumption simplifies the destination side ingestion logic which must write
updates with the same source side transaction commit timestamp at the
same new timestamp on the destination side. This invariant ensures a span
configuration's target (i.e. the span that a configuration applies to) never
overlaps with any other span configuration target. Else, c2c would break the
span config reconciliation system.

Note that cutover will not revert any ingested span config updates, as we
wouldn't not want to issue revert range requests on an online system table in
the system tenant. That being said, a future PR will need to teach the
destination side application tenant to conduct a full span reconcilation job
immediately after cutover, which will safely revert any span config updates
that committed after the cutover timestamp.

Informs cockroachdb#106823

Release note: None
  • Loading branch information
msbutler committed Aug 25, 2023
1 parent 9cc4973 commit 348c5a9
Show file tree
Hide file tree
Showing 10 changed files with 480 additions and 2 deletions.
3 changes: 3 additions & 0 deletions pkg/ccl/streamingccl/replicationtestutils/testutils.go
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,7 @@ func (c *TenantStreamingClusters) init() {
c.SrcSysSQL.ExecMultiple(c.T, ConfigureClusterSettings(c.Args.SrcClusterSettings)...)
c.SrcSysSQL.Exec(c.T, `ALTER TENANT $1 SET CLUSTER SETTING sql.virtual_cluster.feature_access.manual_range_split.enabled=true`, c.Args.SrcTenantName)
c.SrcSysSQL.Exec(c.T, `ALTER TENANT $1 SET CLUSTER SETTING sql.virtual_cluster.feature_access.manual_range_scatter.enabled=true`, c.Args.SrcTenantName)
c.SrcSysSQL.Exec(c.T, `ALTER TENANT $1 SET CLUSTER SETTING sql.virtual_cluster.feature_access.zone_configs.enabled=true`, c.Args.SrcTenantName)
if c.Args.SrcInitFunc != nil {
c.Args.SrcInitFunc(c.T, c.SrcSysSQL, c.SrcTenantSQL)
}
Expand All @@ -158,6 +159,8 @@ func (c *TenantStreamingClusters) StartDestTenant(ctx context.Context) func() er
return c.DestTenantConn.Ping()
})
// TODO (msbutler): consider granting the new tenant some capabilities.
c.DestSysSQL.Exec(c.T, `ALTER TENANT $1 SET CLUSTER SETTING sql.virtual_cluster.feature_access.zone_configs.enabled=true`, c.Args.DestTenantName)

return func() error {
return destTenantConn.Close()
}
Expand Down
13 changes: 12 additions & 1 deletion pkg/ccl/streamingccl/streamclient/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -161,6 +161,9 @@ func NewStreamClient(
case "postgres", "postgresql":
// The canonical PostgreSQL URL scheme is "postgresql", however our
// own client commands also accept "postgres".
if processOptions(opts).forSpanConfigs {
return NewSpanConfigStreamClient(ctx, streamURL, opts...)
}
return NewPartitionedStreamClient(ctx, streamURL, opts...)
case "external":
if db == nil {
Expand Down Expand Up @@ -226,7 +229,8 @@ func GetFirstActiveClient(
}

type options struct {
streamID streampb.StreamID
streamID streampb.StreamID
forSpanConfigs bool
}

func (o *options) appName() string {
Expand All @@ -249,6 +253,13 @@ func WithStreamID(id streampb.StreamID) Option {
}
}

// ForSpanConfigs will create a client for replicating span configs.
func ForSpanConfigs() Option {
return func(o *options) {
o.forSpanConfigs = true
}
}

func processOptions(opts []Option) *options {
ret := &options{}
for _, o := range opts {
Expand Down
5 changes: 5 additions & 0 deletions pkg/ccl/streamingccl/streamingest/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ go_library(
srcs = [
"alter_replication_job.go",
"external_connection.go",
"ingest_span_configs.go",
"merged_subscription.go",
"metrics.go",
"stream_ingest_manager.go",
Expand Down Expand Up @@ -42,8 +43,11 @@ go_library(
"//pkg/roachpb",
"//pkg/settings",
"//pkg/settings/cluster",
"//pkg/spanconfig",
"//pkg/spanconfig/spanconfigkvaccessor",
"//pkg/sql",
"//pkg/sql/catalog/colinfo",
"//pkg/sql/catalog/systemschema",
"//pkg/sql/execinfra",
"//pkg/sql/execinfrapb",
"//pkg/sql/exprutil",
Expand All @@ -57,6 +61,7 @@ go_library(
"//pkg/sql/sem/asof",
"//pkg/sql/sem/eval",
"//pkg/sql/sem/tree",
"//pkg/sql/sqlliveness",
"//pkg/sql/syntheticprivilege",
"//pkg/sql/types",
"//pkg/storage",
Expand Down
55 changes: 55 additions & 0 deletions pkg/ccl/streamingccl/streamingest/datadriven_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,9 @@ import (
"github.com/cockroachdb/cockroach/pkg/ccl/streamingccl/replicationtestutils"
"github.com/cockroachdb/cockroach/pkg/ccl/streamingccl/replicationutils"
"github.com/cockroachdb/cockroach/pkg/jobs/jobspb"
"github.com/cockroachdb/cockroach/pkg/keys"
"github.com/cockroachdb/cockroach/pkg/sql/sem/tree"
"github.com/cockroachdb/cockroach/pkg/testutils"
"github.com/cockroachdb/cockroach/pkg/testutils/datapathutils"
"github.com/cockroachdb/cockroach/pkg/testutils/jobutils"
"github.com/cockroachdb/cockroach/pkg/testutils/skip"
Expand All @@ -29,6 +31,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/datadriven"
"github.com/cockroachdb/errors"
"github.com/stretchr/testify/require"
)

Expand Down Expand Up @@ -195,6 +198,9 @@ func TestDataDriven(t *testing.T) {
case "query-sql":
var as string
d.ScanArgs(t, "as", &as)
if d.HasArg("retry") {
ds.queryAsWithRetry(t, as, d.Input, d.Expected)
}
return ds.queryAs(t, as, d.Input)

case "compare-replication-results":
Expand Down Expand Up @@ -291,6 +297,43 @@ func TestDataDriven(t *testing.T) {
}
}
return ""
case "list-ttls":
var (
as string
)
d.ScanArgs(t, "as", &as)
var codec keys.SQLCodec
switch {
case strings.HasPrefix(as, "source"):
codec = keys.MakeSQLCodec(ds.replicationClusters.Args.SrcTenantID)
case strings.HasPrefix(as, "destination"):
codec = keys.MakeSQLCodec(ds.replicationClusters.Args.DestTenantID)
default:
t.Fatalf("%s does not begin with 'source' or 'destination'", as)
}

getTableID := func(arg string) uint32 {
var tableID string
d.ScanArgs(t, arg, &tableID)
varValue, ok := ds.vars[tableID]
if ok {
tableID = varValue
}
parsedID, err := strconv.Atoi(tableID)
if err != nil {
t.Fatalf("could not convert table ID %s", tableID)
}
return uint32(parsedID)
}

startKey := codec.TablePrefix(getTableID("min_table_id"))
endKey := codec.TablePrefix(getTableID("max_table_id"))

listQuery := fmt.Sprintf(
`SELECT crdb_internal.pb_to_json('cockroach.roachpb.SpanConfig', config)->'gcPolicy'->'ttlSeconds'
FROM system.span_configurations
WHERE start_key >= '\x%x' AND start_key <= '\x%x';`, startKey, endKey)
return ds.queryAsWithRetry(t, as, listQuery, d.Expected)

default:
t.Fatalf("unsupported instruction: %s", d.Cmd)
Expand Down Expand Up @@ -342,6 +385,18 @@ func (d *datadrivenTestState) queryAs(t *testing.T, as, query string) string {
return output
}

func (d *datadrivenTestState) queryAsWithRetry(t *testing.T, as, query, expected string) string {
var output string
testutils.SucceedsSoon(t, func() error {
output = d.queryAs(t, as, query)
if output != expected {
return errors.Newf("latest output: %s\n expected: %s", output, expected)
}
return nil
})
return output
}

func (d *datadrivenTestState) execAs(t *testing.T, as, query string) {
switch as {
case "source-system":
Expand Down
Loading

0 comments on commit 348c5a9

Please sign in to comment.