From 195a69e778f31603767d533efcab1bcfab32a2a9 Mon Sep 17 00:00:00 2001 From: Michael Butler Date: Wed, 4 Oct 2023 20:06:51 -0400 Subject: [PATCH] streamingccl: verify backup schedule pauses after cutover This patch adds an e2e c2c test to verify that replicated backup schedules pause on the destination tenant after cutover. This functionality was added in #111578. Informs #108028 Release note: none --- .../replicationtestutils/testutils.go | 68 ++++++++++++++----- .../alter_replication_job_test.go | 2 +- .../streamingest/datadriven_test.go | 6 +- .../replication_stream_e2e_test.go | 2 +- .../backup_schedule_pauses_after_cutover | 63 +++++++++++++++++ 5 files changed, 122 insertions(+), 19 deletions(-) create mode 100644 pkg/ccl/streamingccl/streamingest/testdata/backup_schedule_pauses_after_cutover diff --git a/pkg/ccl/streamingccl/replicationtestutils/testutils.go b/pkg/ccl/streamingccl/replicationtestutils/testutils.go index 506e653dda7d..a31987981494 100644 --- a/pkg/ccl/streamingccl/replicationtestutils/testutils.go +++ b/pkg/ccl/streamingccl/replicationtestutils/testutils.go @@ -76,6 +76,7 @@ type TenantStreamingClustersArgs struct { MultiTenantSingleClusterTestRegions []string NoMetamorphicExternalConnection bool + ExternalIODir string } var DefaultTenantStreamingClustersArgs = TenantStreamingClustersArgs{ @@ -124,6 +125,7 @@ func (c *TenantStreamingClusters) setupSrcTenant() { tenantArgs := base.TestSharedProcessTenantArgs{ TenantName: c.Args.SrcTenantName, TenantID: c.Args.SrcTenantID, + Knobs: DefaultAppTenantTestingKnobs(), } srcTenantServer, srcTenantConn := serverutils.StartSharedProcessTenant(c.T, c.SrcCluster.Server(0), tenantArgs) @@ -137,12 +139,16 @@ func (c *TenantStreamingClusters) setupSrcTenant() { c.SrcTenantSQL = sqlutils.MakeSQLRunner(srcTenantConn) } -func (c *TenantStreamingClusters) init() { +func (c *TenantStreamingClusters) init(ctx context.Context) { c.SrcSysSQL.ExecMultiple(c.T, ConfigureClusterSettings(c.Args.SrcClusterSettings)...) c.SrcSysSQL.Exec(c.T, `ALTER TENANT $1 SET CLUSTER SETTING sql.virtual_cluster.feature_access.manual_range_split.enabled=true`, c.Args.SrcTenantName) c.SrcSysSQL.Exec(c.T, `ALTER TENANT $1 SET CLUSTER SETTING sql.virtual_cluster.feature_access.manual_range_scatter.enabled=true`, c.Args.SrcTenantName) c.SrcSysSQL.Exec(c.T, `ALTER TENANT $1 SET CLUSTER SETTING sql.virtual_cluster.feature_access.zone_configs.enabled=true`, c.Args.SrcTenantName) c.SrcSysSQL.Exec(c.T, `ALTER TENANT $1 SET CLUSTER SETTING sql.virtual_cluster.feature_access.multiregion.enabled=true`, c.Args.SrcTenantName) + c.SrcSysSQL.Exec(c.T, `ALTER TENANT $1 GRANT CAPABILITY can_use_nodelocal_storage`, c.Args.SrcTenantName) + require.NoError(c.T, c.SrcCluster.Server(0).WaitForTenantCapabilities(ctx, c.Args.SrcTenantID, map[tenantcapabilities.ID]string{ + tenantcapabilities.CanUseNodelocalStorage: "true", + }, "")) if c.Args.SrcInitFunc != nil { c.Args.SrcInitFunc(c.T, c.SrcSysSQL, c.SrcTenantSQL) } @@ -154,25 +160,40 @@ func (c *TenantStreamingClusters) init() { c.DestSysSQL.Exec(c.T, `SET CLUSTER SETTING cross_cluster_replication.enabled = true;`) } -// StartDestTenant starts the destination tenant and returns a cleanup -// function that shuts tenant SQL instance and closes all sessions. -// This function will fail the test if ran prior to the Replication stream -// closing as the tenant will not yet be active -func (c *TenantStreamingClusters) StartDestTenant(ctx context.Context) func() error { - c.DestSysSQL.Exec(c.T, `ALTER TENANT $1 START SERVICE SHARED`, c.Args.DestTenantName) - - destTenantConn := c.DestCluster.Server(0).SystemLayer().SQLConn(c.T, "cluster:"+string(c.Args.DestTenantName)+"/defaultdb") +// StartDestTenant starts the destination tenant and returns a cleanup function +// that shuts tenant SQL instance and closes all sessions. This function will +// fail the test if ran prior to the Replication stream closing as the tenant +// will not yet be active. If the caller passes withTestingKnobs, the +// destination tenant starts up via a testServer.StartSharedProcessTenant(). +func (c *TenantStreamingClusters) StartDestTenant( + ctx context.Context, withTestingKnobs *base.TestingKnobs, +) func() error { + if withTestingKnobs != nil { + var err error + _, c.DestTenantConn, err = c.DestCluster.Server(0).StartSharedProcessTenant(ctx, base.TestSharedProcessTenantArgs{ + TenantID: c.Args.DestTenantID, + TenantName: c.Args.DestTenantName, + Knobs: *withTestingKnobs, + UseDatabase: "defaultdb", + }) + require.NoError(c.T, err) + } else { + c.DestSysSQL.Exec(c.T, `ALTER TENANT $1 START SERVICE SHARED`, c.Args.DestTenantName) + c.DestTenantConn = c.DestCluster.Server(0).SystemLayer().SQLConn(c.T, "cluster:"+string(c.Args.DestTenantName)+"/defaultdb") + } - c.DestTenantConn = destTenantConn - c.DestTenantSQL = sqlutils.MakeSQLRunner(destTenantConn) + c.DestTenantSQL = sqlutils.MakeSQLRunner(c.DestTenantConn) testutils.SucceedsSoon(c.T, func() error { return c.DestTenantConn.Ping() }) // TODO (msbutler): consider granting the new tenant some capabilities. c.DestSysSQL.Exec(c.T, `ALTER TENANT $1 SET CLUSTER SETTING sql.virtual_cluster.feature_access.zone_configs.enabled=true`, c.Args.DestTenantName) - + c.DestSysSQL.Exec(c.T, `ALTER TENANT $1 GRANT CAPABILITY can_use_nodelocal_storage`, c.Args.DestTenantName) + require.NoError(c.T, c.DestCluster.Server(0).WaitForTenantCapabilities(ctx, c.Args.DestTenantID, map[tenantcapabilities.ID]string{ + tenantcapabilities.CanUseNodelocalStorage: "true", + }, "")) return func() error { - return destTenantConn.Close() + return c.DestTenantConn.Close() } } @@ -277,6 +298,20 @@ func (c *TenantStreamingClusters) BuildCreateTenantQuery(externalConnection stri return streamReplStmt } +// DefaultAppTenantTestingKnobs returns the default testing knobs for an application tenant. +func DefaultAppTenantTestingKnobs() base.TestingKnobs { + return base.TestingKnobs{ + JobsTestingKnobs: defaultJobsTestingKnobs(), + } +} + +func defaultJobsTestingKnobs() *jobs.TestingKnobs { + jobTestingKnobs := jobs.NewTestingKnobsWithShortIntervals() + jobTestingKnobs.SchedulerDaemonInitialScanDelay = func() time.Duration { return time.Second } + jobTestingKnobs.SchedulerDaemonScanDelay = func() time.Duration { return time.Second } + return jobTestingKnobs +} + func CreateServerArgs(args TenantStreamingClustersArgs) base.TestServerArgs { if args.TestingKnobs != nil && args.TestingKnobs.DistSQLRetryPolicy == nil { args.TestingKnobs.DistSQLRetryPolicy = &retry.Options{ @@ -291,7 +326,7 @@ func CreateServerArgs(args TenantStreamingClustersArgs) base.TestServerArgs { // to system tenants. Tracked with #76378. DefaultTestTenant: base.TODOTestTenantDisabled, Knobs: base.TestingKnobs{ - JobsTestingKnobs: jobs.NewTestingKnobsWithShortIntervals(), + JobsTestingKnobs: defaultJobsTestingKnobs(), DistSQL: &execinfra.TestingKnobs{ StreamingTestingKnobs: args.TestingKnobs, }, @@ -303,6 +338,7 @@ func CreateServerArgs(args TenantStreamingClustersArgs) base.TestServerArgs { EnableTenantIDReuse: true, }, }, + ExternalIODir: args.ExternalIODir, } } @@ -364,7 +400,7 @@ func CreateMultiTenantStreamingCluster( Rng: rng, } tsc.setupSrcTenant() - tsc.init() + tsc.init(ctx) return tsc, func() { require.NoError(t, tsc.SrcTenantConn.Close()) cleanup() @@ -412,7 +448,7 @@ func CreateTenantStreamingClusters( Rng: rng, } tsc.setupSrcTenant() - tsc.init() + tsc.init(ctx) return tsc, func() { require.NoError(t, tsc.SrcTenantConn.Close()) diff --git a/pkg/ccl/streamingccl/streamingest/alter_replication_job_test.go b/pkg/ccl/streamingccl/streamingest/alter_replication_job_test.go index 2507c33ef690..c55a7c41e5ff 100644 --- a/pkg/ccl/streamingccl/streamingest/alter_replication_job_test.go +++ b/pkg/ccl/streamingccl/streamingest/alter_replication_job_test.go @@ -114,7 +114,7 @@ func TestAlterTenantPauseResume(t *testing.T) { cutoverOutput := replicationtestutils.DecimalTimeToHLC(t, cutoverStr) require.Equal(t, cutoverTime, cutoverOutput.GoTime()) jobutils.WaitForJobToSucceed(c.T, c.DestSysSQL, jobspb.JobID(ingestionJobID)) - cleanupTenant := c.StartDestTenant(ctx) + cleanupTenant := c.StartDestTenant(ctx, nil) defer func() { require.NoError(t, cleanupTenant()) }() diff --git a/pkg/ccl/streamingccl/streamingest/datadriven_test.go b/pkg/ccl/streamingccl/streamingest/datadriven_test.go index 2e7a841ecaa3..5290961a5ed6 100644 --- a/pkg/ccl/streamingccl/streamingest/datadriven_test.go +++ b/pkg/ccl/streamingccl/streamingest/datadriven_test.go @@ -131,10 +131,13 @@ func TestDataDriven(t *testing.T) { case "create-replication-clusters": args := replicationtestutils.DefaultTenantStreamingClustersArgs args.NoMetamorphicExternalConnection = d.HasArg("no-external-conn") + tempDir, dirCleanup := testutils.TempDir(t) + args.ExternalIODir = tempDir var cleanup func() ds.replicationClusters, cleanup = replicationtestutils.CreateTenantStreamingClusters(ctx, t, args) ds.cleanupFns = append(ds.cleanupFns, func() error { cleanup() + dirCleanup() return nil }) @@ -151,7 +154,8 @@ func TestDataDriven(t *testing.T) { ds.replicationClusters.WaitUntilReplicatedTime(stringToHLC(t, replicatedTimeTarget), jobspb.JobID(ds.ingestionJobID)) case "start-replicated-tenant": - cleanupTenant := ds.replicationClusters.StartDestTenant(ctx) + testingKnobs := replicationtestutils.DefaultAppTenantTestingKnobs() + cleanupTenant := ds.replicationClusters.StartDestTenant(ctx, &testingKnobs) ds.cleanupFns = append(ds.cleanupFns, cleanupTenant) case "let": if len(d.CmdArgs) == 0 { diff --git a/pkg/ccl/streamingccl/streamingest/replication_stream_e2e_test.go b/pkg/ccl/streamingccl/streamingest/replication_stream_e2e_test.go index 3afefdc43c73..3bfa20036c3d 100644 --- a/pkg/ccl/streamingccl/streamingest/replication_stream_e2e_test.go +++ b/pkg/ccl/streamingccl/streamingest/replication_stream_e2e_test.go @@ -537,7 +537,7 @@ func TestTenantStreamingUnavailableStreamAddress(t *testing.T) { require.Equal(c.T, cutoverTime, cutoverOutput.GoTime()) jobutils.WaitForJobToSucceed(c.T, c.DestSysSQL, jobspb.JobID(ingestionJobID)) - cleanUpTenant := c.StartDestTenant(ctx) + cleanUpTenant := c.StartDestTenant(ctx, nil) defer func() { require.NoError(t, cleanUpTenant()) }() diff --git a/pkg/ccl/streamingccl/streamingest/testdata/backup_schedule_pauses_after_cutover b/pkg/ccl/streamingccl/streamingest/testdata/backup_schedule_pauses_after_cutover new file mode 100644 index 000000000000..5f81c4f6354c --- /dev/null +++ b/pkg/ccl/streamingccl/streamingest/testdata/backup_schedule_pauses_after_cutover @@ -0,0 +1,63 @@ +# This test ensures that backups schedules pause when the schedule realizes it +# is being executed on a new cluster. + +create-replication-clusters +---- + +start-replication-stream +---- + +# Create test schedule that will begin a backup immediately +exec-sql as=source-tenant +CREATE SCHEDULE datatest +FOR BACKUP INTO 'nodelocal://1/example-schedule' +RECURRING '@weekly' FULL BACKUP ALWAYS +WITH SCHEDULE OPTIONS first_run = 'now'; +---- + +let $fullID as=source-tenant +WITH SCHEDULES AS (SHOW SCHEDULES FOR BACKUP) SELECT id FROM schedules WHERE label='datatest'; +---- + +# wait for one scheduled backup to succeed +query-sql retry as=source-tenant +SELECT count(job_id) FROM [SHOW JOBS] WHERE job_type = 'BACKUP' AND status = 'succeeded'; +---- +1 + +let $ts as=source-system +SELECT clock_timestamp()::timestamp::string +---- + +cutover ts=$ts +---- + +start-replicated-tenant +---- + +# Induce the replicated schedule to begin on the restored cluster, and +# ensure the schedule pauses, since it will realize its running on a new cluster. +exec-sql as=destination-tenant +UPDATE system.scheduled_jobs SET next_run = now() WHERE schedule_id = $fullID +---- + +# An empty next run indicates the schedule is paused. +query-sql retry as=destination-tenant +SELECT next_run FROM system.scheduled_jobs WHERE schedule_id = $fullID +---- + + + +# Unpause the schedule and force it to run immediately. When we Resumed the +# schedule by setting next_run to now above, the schedule's clusterID was +# updated, so the schedule should not pause again. +exec-sql as=destination-tenant +UPDATE system.scheduled_jobs SET next_run = now() WHERE schedule_id = $fullID +---- + + +# Wait for above backup schedule to succeed +query-sql retry as=destination-tenant +SELECT count(job_id) FROM [SHOW JOBS] WHERE job_type = 'BACKUP' AND status = 'succeeded' +---- +2