Skip to content

Commit

Permalink
c2c: add region constraints replication test
Browse files Browse the repository at this point in the history
This patch adds a test that ensures that a replicating tenant's regional
constraints are obeyed in the destination cluster. This test serves as an end
to end test of the span config replication work tracked in cockroachdb#106823.

Epic: none

Release note: None
  • Loading branch information
msbutler committed Sep 11, 2023
1 parent 8b47bf4 commit a41ff46
Show file tree
Hide file tree
Showing 3 changed files with 101 additions and 11 deletions.
48 changes: 37 additions & 11 deletions pkg/ccl/streamingccl/replicationtestutils/testutils.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,17 +53,19 @@ type srcInitExecFunc func(t *testing.T, sysSQL *sqlutils.SQLRunner, tenantSQL *s
type destInitExecFunc func(t *testing.T, sysSQL *sqlutils.SQLRunner) // Tenant is created by the replication stream

type TenantStreamingClustersArgs struct {
SrcTenantName roachpb.TenantName
SrcTenantID roachpb.TenantID
SrcInitFunc srcInitExecFunc
SrcNumNodes int
SrcClusterSettings map[string]string
SrcTenantName roachpb.TenantName
SrcTenantID roachpb.TenantID
SrcInitFunc srcInitExecFunc
SrcNumNodes int
SrcClusterSettings map[string]string
SrcClusterTestRegions []string

DestTenantName roachpb.TenantName
DestTenantID roachpb.TenantID
DestInitFunc destInitExecFunc
DestNumNodes int
DestClusterSettings map[string]string
DestClusterTestRegions []string
RetentionTTLSeconds int
TestingKnobs *sql.StreamingTestingKnobs
TenantCapabilitiesTestingKnobs *tenantcapabilities.TestingKnobs
Expand Down Expand Up @@ -133,6 +135,7 @@ func (c *TenantStreamingClusters) init() {
c.SrcSysSQL.Exec(c.T, `ALTER TENANT $1 SET CLUSTER SETTING sql.virtual_cluster.feature_access.manual_range_split.enabled=true`, c.Args.SrcTenantName)
c.SrcSysSQL.Exec(c.T, `ALTER TENANT $1 SET CLUSTER SETTING sql.virtual_cluster.feature_access.manual_range_scatter.enabled=true`, c.Args.SrcTenantName)
c.SrcSysSQL.Exec(c.T, `ALTER TENANT $1 SET CLUSTER SETTING sql.virtual_cluster.feature_access.zone_configs.enabled=true`, c.Args.SrcTenantName)
c.SrcSysSQL.Exec(c.T, `ALTER TENANT $1 SET CLUSTER SETTING sql.virtual_cluster.feature_access.multiregion.enabled=true`, c.Args.SrcTenantName)
if c.Args.SrcInitFunc != nil {
c.Args.SrcInitFunc(c.T, c.SrcSysSQL, c.SrcTenantSQL)
}
Expand Down Expand Up @@ -284,9 +287,28 @@ func CreateServerArgs(args TenantStreamingClustersArgs) base.TestServerArgs {
}

func startC2CTestCluster(
ctx context.Context, t *testing.T, serverArgs base.TestServerArgs, numNodes int,
ctx context.Context, t *testing.T, serverArgs base.TestServerArgs, numNodes int, regions []string,
) (*testcluster.TestCluster, url.URL, func()) {

params := base.TestClusterArgs{ServerArgs: serverArgs}

makeLocality := func(locStr string) roachpb.Locality {
return roachpb.Locality{Tiers: []roachpb.Tier{{Key: "region", Value: locStr}}}
}
if len(regions) == 1 {
params.ServerArgs.Locality = makeLocality(regions[0])
}
if len(regions) > 1 {
require.Equal(t, len(regions), numNodes)
serverArgsPerNode := make(map[int]base.TestServerArgs)
for i, locality := range regions {
param := serverArgs
param.Locality = makeLocality(locality)
serverArgsPerNode[i] = param
}
params.ServerArgsPerNode = serverArgsPerNode
}

c := testcluster.StartTestCluster(t, numNodes, params)
c.Server(0).Clock().Now()
// TODO(casper): support adding splits when we have multiple nodes.
Expand All @@ -303,7 +325,7 @@ func CreateMultiTenantStreamingCluster(

serverArgs := CreateServerArgs(args)
cluster, url, cleanup := startC2CTestCluster(ctx, t, serverArgs,
args.MultitenantSingleClusterNumNodes)
args.MultitenantSingleClusterNumNodes, args.SrcClusterTestRegions)

destNodeIdx := args.MultitenantSingleClusterNumNodes - 1
tsc := &TenantStreamingClusters{
Expand Down Expand Up @@ -338,15 +360,15 @@ func CreateTenantStreamingClusters(
var srcCleanup func()
g.GoCtx(func(ctx context.Context) error {
// Start the source cluster.
srcCluster, srcURL, srcCleanup = startC2CTestCluster(ctx, t, serverArgs, args.SrcNumNodes)
srcCluster, srcURL, srcCleanup = startC2CTestCluster(ctx, t, serverArgs, args.SrcNumNodes, args.SrcClusterTestRegions)
return nil
})

var destCluster *testcluster.TestCluster
var destCleanup func()
g.GoCtx(func(ctx context.Context) error {
// Start the destination cluster.
destCluster, _, destCleanup = startC2CTestCluster(ctx, t, serverArgs, args.DestNumNodes)
destCluster, _, destCleanup = startC2CTestCluster(ctx, t, serverArgs, args.DestNumNodes, args.DestClusterTestRegions)
return nil
})

Expand Down Expand Up @@ -449,8 +471,12 @@ func CreateScatteredTable(t *testing.T, c *TenantStreamingClusters, numNodes int
}

var defaultSrcClusterSetting = map[string]string{
`kv.rangefeed.enabled`: `true`,
`kv.closed_timestamp.target_duration`: `'1s'`,
`kv.rangefeed.enabled`: `true`,
// Speed up the rangefeed. These were set by squinting at the settings set in
// the changefeed integration tests.
`kv.closed_timestamp.target_duration`: `'100ms'`,
`kv.rangefeed.closed_timestamp_refresh_interval`: `'200ms'`,
`kv.closed_timestamp.side_transport_interval`: `'50ms'`,
// Large timeout makes test to not fail with unexpected timeout failures.
`stream_replication.job_liveness.timeout`: `'3m'`,
`stream_replication.stream_liveness_track_frequency`: `'2s'`,
Expand Down
1 change: 1 addition & 0 deletions pkg/ccl/streamingccl/streamingest/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -164,6 +164,7 @@ go_test(
"//pkg/util/log",
"//pkg/util/protoutil",
"//pkg/util/randutil",
"//pkg/util/rangedesc",
"//pkg/util/span",
"//pkg/util/syncutil",
"//pkg/util/timeutil",
Expand Down
63 changes: 63 additions & 0 deletions pkg/ccl/streamingccl/streamingest/replication_stream_e2e_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/rangedesc"
"github.com/cockroachdb/cockroach/pkg/util/syncutil"
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
"github.com/cockroachdb/errors"
Expand Down Expand Up @@ -1135,3 +1136,65 @@ func TestLoadProducerAndIngestionProgress(t *testing.T) {
require.NoError(t, err)
require.Equal(t, jobspb.Replicating, ingestionProgress.ReplicationStatus)
}

// TestStreamingRegionalConstraint ensures that the replicating tenants regional
// constraints are obeyed during replication. This test serves as an end to end
// test of span config replication.
func TestStreamingRegionalConstraint(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

ctx := context.Background()
regions := []string{"mars", "venus", "mercury"}
args := replicationtestutils.DefaultTenantStreamingClustersArgs
args.SrcNumNodes = 3
args.DestNumNodes = 3
args.SrcClusterTestRegions = regions
args.DestClusterTestRegions = regions
marsNodeID := roachpb.NodeID(1)

c, cleanup := replicationtestutils.CreateTenantStreamingClusters(ctx, t, args)
defer cleanup()

producerJobID, ingestionJobID := c.StartStreamReplication(ctx)
jobutils.WaitForJobToRun(c.T, c.SrcSysSQL, jobspb.JobID(producerJobID))
jobutils.WaitForJobToRun(c.T, c.DestSysSQL, jobspb.JobID(ingestionJobID))

c.SrcTenantSQL.Exec(t, "CREATE DATABASE test")
c.SrcTenantSQL.Exec(t, `ALTER DATABASE test CONFIGURE ZONE USING constraints = '[+region=mars]', num_replicas = 1;`)
c.SrcTenantSQL.Exec(t, "CREATE TABLE test.x (id INT PRIMARY KEY, n INT)")
c.SrcTenantSQL.Exec(t, "INSERT INTO test.x VALUES (1, 1)")

srcTime := c.SrcCluster.Server(0).Clock().Now()
c.WaitUntilReplicatedTime(srcTime, jobspb.JobID(ingestionJobID))

checkLocalities := func(targetSpan roachpb.Span, scanner rangedesc.Scanner) func() error {
// make pageSize large enough to not affect the test
pageSize := 10000
init := func() {}

return func() error {
return scanner.Scan(ctx, pageSize, init, targetSpan, func(descriptors ...roachpb.RangeDescriptor) error {
for _, desc := range descriptors {
for _, replica := range desc.InternalReplicas {
if replica.NodeID != marsNodeID {
return errors.Newf("found table data located on another node %d", replica.NodeID)
}
}
}
return nil
})
}
}

srcCodec := keys.MakeSQLCodec(c.Args.SrcTenantID)
tableDesc := desctestutils.TestingGetPublicTableDescriptor(
c.SrcSysServer.DB(), srcCodec, "test", "x")
destCodec := keys.MakeSQLCodec(c.Args.DestTenantID)

testutils.SucceedsSoon(t,
checkLocalities(tableDesc.PrimaryIndexSpan(srcCodec), rangedesc.NewScanner(c.SrcSysServer.DB())))

testutils.SucceedsSoon(t,
checkLocalities(tableDesc.PrimaryIndexSpan(destCodec), rangedesc.NewScanner(c.DestSysServer.DB())))
}

0 comments on commit a41ff46

Please sign in to comment.