Skip to content

Commit

Permalink
streamingccl: verify backup schedule pauses after cutover
Browse files Browse the repository at this point in the history
This patch adds an e2e c2c test to verify that replicated backup schedules
pause on the destination tenant after cutover. This functionality was added
in cockroachdb#111578.

Informs cockroachdb#108028

Release note: none
  • Loading branch information
msbutler committed Oct 5, 2023
1 parent ace81b6 commit 195a69e
Show file tree
Hide file tree
Showing 5 changed files with 122 additions and 19 deletions.
68 changes: 52 additions & 16 deletions pkg/ccl/streamingccl/replicationtestutils/testutils.go
Expand Up @@ -76,6 +76,7 @@ type TenantStreamingClustersArgs struct {
MultiTenantSingleClusterTestRegions []string

NoMetamorphicExternalConnection bool
ExternalIODir string
}

var DefaultTenantStreamingClustersArgs = TenantStreamingClustersArgs{
Expand Down Expand Up @@ -124,6 +125,7 @@ func (c *TenantStreamingClusters) setupSrcTenant() {
tenantArgs := base.TestSharedProcessTenantArgs{
TenantName: c.Args.SrcTenantName,
TenantID: c.Args.SrcTenantID,
Knobs: DefaultAppTenantTestingKnobs(),
}
srcTenantServer, srcTenantConn := serverutils.StartSharedProcessTenant(c.T, c.SrcCluster.Server(0),
tenantArgs)
Expand All @@ -137,12 +139,16 @@ func (c *TenantStreamingClusters) setupSrcTenant() {
c.SrcTenantSQL = sqlutils.MakeSQLRunner(srcTenantConn)
}

func (c *TenantStreamingClusters) init() {
func (c *TenantStreamingClusters) init(ctx context.Context) {
c.SrcSysSQL.ExecMultiple(c.T, ConfigureClusterSettings(c.Args.SrcClusterSettings)...)
c.SrcSysSQL.Exec(c.T, `ALTER TENANT $1 SET CLUSTER SETTING sql.virtual_cluster.feature_access.manual_range_split.enabled=true`, c.Args.SrcTenantName)
c.SrcSysSQL.Exec(c.T, `ALTER TENANT $1 SET CLUSTER SETTING sql.virtual_cluster.feature_access.manual_range_scatter.enabled=true`, c.Args.SrcTenantName)
c.SrcSysSQL.Exec(c.T, `ALTER TENANT $1 SET CLUSTER SETTING sql.virtual_cluster.feature_access.zone_configs.enabled=true`, c.Args.SrcTenantName)
c.SrcSysSQL.Exec(c.T, `ALTER TENANT $1 SET CLUSTER SETTING sql.virtual_cluster.feature_access.multiregion.enabled=true`, c.Args.SrcTenantName)
c.SrcSysSQL.Exec(c.T, `ALTER TENANT $1 GRANT CAPABILITY can_use_nodelocal_storage`, c.Args.SrcTenantName)
require.NoError(c.T, c.SrcCluster.Server(0).WaitForTenantCapabilities(ctx, c.Args.SrcTenantID, map[tenantcapabilities.ID]string{
tenantcapabilities.CanUseNodelocalStorage: "true",
}, ""))
if c.Args.SrcInitFunc != nil {
c.Args.SrcInitFunc(c.T, c.SrcSysSQL, c.SrcTenantSQL)
}
Expand All @@ -154,25 +160,40 @@ func (c *TenantStreamingClusters) init() {
c.DestSysSQL.Exec(c.T, `SET CLUSTER SETTING cross_cluster_replication.enabled = true;`)
}

// StartDestTenant starts the destination tenant and returns a cleanup
// function that shuts tenant SQL instance and closes all sessions.
// This function will fail the test if ran prior to the Replication stream
// closing as the tenant will not yet be active
func (c *TenantStreamingClusters) StartDestTenant(ctx context.Context) func() error {
c.DestSysSQL.Exec(c.T, `ALTER TENANT $1 START SERVICE SHARED`, c.Args.DestTenantName)

destTenantConn := c.DestCluster.Server(0).SystemLayer().SQLConn(c.T, "cluster:"+string(c.Args.DestTenantName)+"/defaultdb")
// StartDestTenant starts the destination tenant and returns a cleanup function
// that shuts tenant SQL instance and closes all sessions. This function will
// fail the test if ran prior to the Replication stream closing as the tenant
// will not yet be active. If the caller passes withTestingKnobs, the
// destination tenant starts up via a testServer.StartSharedProcessTenant().
func (c *TenantStreamingClusters) StartDestTenant(
ctx context.Context, withTestingKnobs *base.TestingKnobs,
) func() error {
if withTestingKnobs != nil {
var err error
_, c.DestTenantConn, err = c.DestCluster.Server(0).StartSharedProcessTenant(ctx, base.TestSharedProcessTenantArgs{
TenantID: c.Args.DestTenantID,
TenantName: c.Args.DestTenantName,
Knobs: *withTestingKnobs,
UseDatabase: "defaultdb",
})
require.NoError(c.T, err)
} else {
c.DestSysSQL.Exec(c.T, `ALTER TENANT $1 START SERVICE SHARED`, c.Args.DestTenantName)
c.DestTenantConn = c.DestCluster.Server(0).SystemLayer().SQLConn(c.T, "cluster:"+string(c.Args.DestTenantName)+"/defaultdb")
}

c.DestTenantConn = destTenantConn
c.DestTenantSQL = sqlutils.MakeSQLRunner(destTenantConn)
c.DestTenantSQL = sqlutils.MakeSQLRunner(c.DestTenantConn)
testutils.SucceedsSoon(c.T, func() error {
return c.DestTenantConn.Ping()
})
// TODO (msbutler): consider granting the new tenant some capabilities.
c.DestSysSQL.Exec(c.T, `ALTER TENANT $1 SET CLUSTER SETTING sql.virtual_cluster.feature_access.zone_configs.enabled=true`, c.Args.DestTenantName)

c.DestSysSQL.Exec(c.T, `ALTER TENANT $1 GRANT CAPABILITY can_use_nodelocal_storage`, c.Args.DestTenantName)
require.NoError(c.T, c.DestCluster.Server(0).WaitForTenantCapabilities(ctx, c.Args.DestTenantID, map[tenantcapabilities.ID]string{
tenantcapabilities.CanUseNodelocalStorage: "true",
}, ""))
return func() error {
return destTenantConn.Close()
return c.DestTenantConn.Close()
}
}

Expand Down Expand Up @@ -277,6 +298,20 @@ func (c *TenantStreamingClusters) BuildCreateTenantQuery(externalConnection stri
return streamReplStmt
}

// DefaultAppTenantTestingKnobs returns the default testing knobs for an application tenant.
func DefaultAppTenantTestingKnobs() base.TestingKnobs {
return base.TestingKnobs{
JobsTestingKnobs: defaultJobsTestingKnobs(),
}
}

func defaultJobsTestingKnobs() *jobs.TestingKnobs {
jobTestingKnobs := jobs.NewTestingKnobsWithShortIntervals()
jobTestingKnobs.SchedulerDaemonInitialScanDelay = func() time.Duration { return time.Second }
jobTestingKnobs.SchedulerDaemonScanDelay = func() time.Duration { return time.Second }
return jobTestingKnobs
}

func CreateServerArgs(args TenantStreamingClustersArgs) base.TestServerArgs {
if args.TestingKnobs != nil && args.TestingKnobs.DistSQLRetryPolicy == nil {
args.TestingKnobs.DistSQLRetryPolicy = &retry.Options{
Expand All @@ -291,7 +326,7 @@ func CreateServerArgs(args TenantStreamingClustersArgs) base.TestServerArgs {
// to system tenants. Tracked with #76378.
DefaultTestTenant: base.TODOTestTenantDisabled,
Knobs: base.TestingKnobs{
JobsTestingKnobs: jobs.NewTestingKnobsWithShortIntervals(),
JobsTestingKnobs: defaultJobsTestingKnobs(),
DistSQL: &execinfra.TestingKnobs{
StreamingTestingKnobs: args.TestingKnobs,
},
Expand All @@ -303,6 +338,7 @@ func CreateServerArgs(args TenantStreamingClustersArgs) base.TestServerArgs {
EnableTenantIDReuse: true,
},
},
ExternalIODir: args.ExternalIODir,
}
}

Expand Down Expand Up @@ -364,7 +400,7 @@ func CreateMultiTenantStreamingCluster(
Rng: rng,
}
tsc.setupSrcTenant()
tsc.init()
tsc.init(ctx)
return tsc, func() {
require.NoError(t, tsc.SrcTenantConn.Close())
cleanup()
Expand Down Expand Up @@ -412,7 +448,7 @@ func CreateTenantStreamingClusters(
Rng: rng,
}
tsc.setupSrcTenant()
tsc.init()
tsc.init(ctx)

return tsc, func() {
require.NoError(t, tsc.SrcTenantConn.Close())
Expand Down
Expand Up @@ -114,7 +114,7 @@ func TestAlterTenantPauseResume(t *testing.T) {
cutoverOutput := replicationtestutils.DecimalTimeToHLC(t, cutoverStr)
require.Equal(t, cutoverTime, cutoverOutput.GoTime())
jobutils.WaitForJobToSucceed(c.T, c.DestSysSQL, jobspb.JobID(ingestionJobID))
cleanupTenant := c.StartDestTenant(ctx)
cleanupTenant := c.StartDestTenant(ctx, nil)
defer func() {
require.NoError(t, cleanupTenant())
}()
Expand Down
6 changes: 5 additions & 1 deletion pkg/ccl/streamingccl/streamingest/datadriven_test.go
Expand Up @@ -131,10 +131,13 @@ func TestDataDriven(t *testing.T) {
case "create-replication-clusters":
args := replicationtestutils.DefaultTenantStreamingClustersArgs
args.NoMetamorphicExternalConnection = d.HasArg("no-external-conn")
tempDir, dirCleanup := testutils.TempDir(t)
args.ExternalIODir = tempDir
var cleanup func()
ds.replicationClusters, cleanup = replicationtestutils.CreateTenantStreamingClusters(ctx, t, args)
ds.cleanupFns = append(ds.cleanupFns, func() error {
cleanup()
dirCleanup()
return nil
})

Expand All @@ -151,7 +154,8 @@ func TestDataDriven(t *testing.T) {
ds.replicationClusters.WaitUntilReplicatedTime(stringToHLC(t, replicatedTimeTarget),
jobspb.JobID(ds.ingestionJobID))
case "start-replicated-tenant":
cleanupTenant := ds.replicationClusters.StartDestTenant(ctx)
testingKnobs := replicationtestutils.DefaultAppTenantTestingKnobs()
cleanupTenant := ds.replicationClusters.StartDestTenant(ctx, &testingKnobs)
ds.cleanupFns = append(ds.cleanupFns, cleanupTenant)
case "let":
if len(d.CmdArgs) == 0 {
Expand Down
Expand Up @@ -537,7 +537,7 @@ func TestTenantStreamingUnavailableStreamAddress(t *testing.T) {
require.Equal(c.T, cutoverTime, cutoverOutput.GoTime())
jobutils.WaitForJobToSucceed(c.T, c.DestSysSQL, jobspb.JobID(ingestionJobID))

cleanUpTenant := c.StartDestTenant(ctx)
cleanUpTenant := c.StartDestTenant(ctx, nil)
defer func() {
require.NoError(t, cleanUpTenant())
}()
Expand Down
@@ -0,0 +1,63 @@
# This test ensures that backups schedules pause when the schedule realizes it
# is being executed on a new cluster.

create-replication-clusters
----

start-replication-stream
----

# Create test schedule that will begin a backup immediately
exec-sql as=source-tenant
CREATE SCHEDULE datatest
FOR BACKUP INTO 'nodelocal://1/example-schedule'
RECURRING '@weekly' FULL BACKUP ALWAYS
WITH SCHEDULE OPTIONS first_run = 'now';
----

let $fullID as=source-tenant
WITH SCHEDULES AS (SHOW SCHEDULES FOR BACKUP) SELECT id FROM schedules WHERE label='datatest';
----

# wait for one scheduled backup to succeed
query-sql retry as=source-tenant
SELECT count(job_id) FROM [SHOW JOBS] WHERE job_type = 'BACKUP' AND status = 'succeeded';
----
1

let $ts as=source-system
SELECT clock_timestamp()::timestamp::string
----

cutover ts=$ts
----

start-replicated-tenant
----

# Induce the replicated schedule to begin on the restored cluster, and
# ensure the schedule pauses, since it will realize its running on a new cluster.
exec-sql as=destination-tenant
UPDATE system.scheduled_jobs SET next_run = now() WHERE schedule_id = $fullID
----

# An empty next run indicates the schedule is paused.
query-sql retry as=destination-tenant
SELECT next_run FROM system.scheduled_jobs WHERE schedule_id = $fullID
----
<nil>


# Unpause the schedule and force it to run immediately. When we Resumed the
# schedule by setting next_run to now above, the schedule's clusterID was
# updated, so the schedule should not pause again.
exec-sql as=destination-tenant
UPDATE system.scheduled_jobs SET next_run = now() WHERE schedule_id = $fullID
----


# Wait for above backup schedule to succeed
query-sql retry as=destination-tenant
SELECT count(job_id) FROM [SHOW JOBS] WHERE job_type = 'BACKUP' AND status = 'succeeded'
----
2

0 comments on commit 195a69e

Please sign in to comment.