diff --git a/pkg/ccl/changefeedccl/alter_changefeed_stmt.go b/pkg/ccl/changefeedccl/alter_changefeed_stmt.go index bdf7071deea8..a1eb8224d1eb 100644 --- a/pkg/ccl/changefeedccl/alter_changefeed_stmt.go +++ b/pkg/ccl/changefeedccl/alter_changefeed_stmt.go @@ -192,7 +192,11 @@ func alterChangefeedPlanHook( newPayload.Details = jobspb.WrapPayloadDetails(newDetails) newPayload.Description = jobRecord.Description newPayload.DescriptorIDs = jobRecord.DescriptorIDs - + newExpiration, err := newOptions.GetPTSExpiration() + if err != nil { + return err + } + newPayload.MaximumPTSAge = newExpiration j, err := p.ExecCfg().JobRegistry.LoadJobWithTxn(ctx, jobID, p.InternalSQLTxn()) if err != nil { return err @@ -204,6 +208,7 @@ func alterChangefeedPlanHook( if newProgress != nil { ju.UpdateProgress(newProgress) } + return nil }); err != nil { return err diff --git a/pkg/ccl/changefeedccl/changefeed_processors.go b/pkg/ccl/changefeedccl/changefeed_processors.go index 5b1e487e0984..72986eb23996 100644 --- a/pkg/ccl/changefeedccl/changefeed_processors.go +++ b/pkg/ccl/changefeedccl/changefeed_processors.go @@ -1369,7 +1369,9 @@ func (cf *changeFrontier) manageProtectedTimestamps( recordID := progress.ProtectedTimestampRecord if recordID == uuid.Nil { - ptr := createProtectedTimestampRecord(ctx, cf.flowCtx.Codec(), cf.spec.JobID, AllTargets(cf.spec.Feed), highWater, progress) + ptr := createProtectedTimestampRecord( + ctx, cf.flowCtx.Codec(), cf.spec.JobID, AllTargets(cf.spec.Feed), highWater, progress, + ) if err := pts.Protect(ctx, ptr); err != nil { return err } diff --git a/pkg/ccl/changefeedccl/changefeed_stmt.go b/pkg/ccl/changefeedccl/changefeed_stmt.go index a456d8ce8855..2280769fd370 100644 --- a/pkg/ccl/changefeedccl/changefeed_stmt.go +++ b/pkg/ccl/changefeedccl/changefeed_stmt.go @@ -650,9 +650,28 @@ func createChangefeedJobRecord( // TODO: Ideally those option validations would happen in validateDetails() // earlier, like the others. err = validateSink(ctx, p, jobID, details, opts) - + if err != nil { + return nil, err + } details.Opts = opts.AsMap() + ptsExpiration, err := opts.GetPTSExpiration() + if err != nil { + return nil, err + } + + if ptsExpiration < time.Hour { + // This threshold is rather arbitrary. But we want to warn users about + // the potential impact of keeping this setting too low. + p.BufferClientNotice(ctx, pgnotice.Newf( + `the value of %s for changefeed option %s might be too low. +Having a low value for should not have adverse effect as long as changefeed is running. However, +should the changefeed be paused, it will need to be resumed before expiration time. +The value of this setting should reflect how much time the changefeed may remain to be paused, +before it is canceled. Few hours to a few days range are appropriate for this option. +`, ptsExpiration, changefeedbase.OptExpirePTSAfter, ptsExpiration)) + } + jr := &jobs.Record{ Description: jobDescription, Username: p.User(), @@ -662,11 +681,12 @@ func createChangefeedJobRecord( } return sqlDescIDs }(), - Details: details, - CreatedBy: changefeedStmt.CreatedByInfo, + Details: details, + CreatedBy: changefeedStmt.CreatedByInfo, + MaximumPTSAge: ptsExpiration, } - return jr, err + return jr, nil } func validateSettings(ctx context.Context, p sql.PlanHookState) error { @@ -1206,7 +1226,9 @@ func (b *changefeedResumer) OnPauseRequest( return nil } pts := execCfg.ProtectedTimestampProvider.WithTxn(txn) - ptr := createProtectedTimestampRecord(ctx, execCfg.Codec, b.job.ID(), AllTargets(details), *resolved, cp) + ptr := createProtectedTimestampRecord( + ctx, execCfg.Codec, b.job.ID(), AllTargets(details), *resolved, cp, + ) return pts.Protect(ctx, ptr) } diff --git a/pkg/ccl/changefeedccl/changefeed_test.go b/pkg/ccl/changefeedccl/changefeed_test.go index 25afbffbde2b..6bb745eb8f70 100644 --- a/pkg/ccl/changefeedccl/changefeed_test.go +++ b/pkg/ccl/changefeedccl/changefeed_test.go @@ -4166,6 +4166,53 @@ func TestChangefeedDataTTL(t *testing.T) { cdcTestWithSystem(t, testFn, feedTestForceSink("sinkless"), feedTestNoTenants) } +// TestChangefeedCanceledWhenPTSIsOld verifies paused changefeed job which holds PTS +// record gets canceled if paused for too long. +func TestChangefeedCanceledWhenPTSIsOld(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + + testFn := func(t *testing.T, s TestServer, f cdctest.TestFeedFactory) { + sqlDB := sqlutils.MakeSQLRunner(s.DB) + + sqlDB.Exec(t, `SET CLUSTER SETTING kv.closed_timestamp.target_duration = '100ms';`) + // Create the data table; it will only contain a + // single row with multiple versions. + sqlDB.Exec(t, `CREATE TABLE foo (a INT PRIMARY KEY, b INT)`) + + feed, err := f.Feed("CREATE CHANGEFEED FOR TABLE foo WITH protect_data_from_gc_on_pause, gc_protect_expires_after='24h'") + require.NoError(t, err) + defer func() { + closeFeed(t, feed) + }() + + jobFeed := feed.(cdctest.EnterpriseTestFeed) + require.NoError(t, jobFeed.Pause()) + + // While the job is paused, take opportunity to test that alter changefeed + // works when setting gc_protect_expires_after option. + + // Verify we can set it to 0 -- i.e. disable. + sqlDB.Exec(t, fmt.Sprintf("ALTER CHANGEFEED %d SET gc_protect_expires_after = '0s'", jobFeed.JobID())) + // Now, set it to something very small. + sqlDB.Exec(t, fmt.Sprintf("ALTER CHANGEFEED %d SET gc_protect_expires_after = '250ms'", jobFeed.JobID())) + + // Stale PTS record should trigger job cancellation. + require.NoError(t, jobFeed.WaitForStatus(func(s jobs.Status) bool { + if s == jobs.StatusCanceled { + return true + } + // t.Logf("still waiting for job cancelatlation; current status %s", s) + return false + })) + } + + // Ensure metrics poller loop runs fast. + st := cluster.MakeTestingClusterSettings() + jobs.PollJobsMetricsInterval.Override(context.Background(), &st.SV, 100*time.Millisecond) + cdcTest(t, testFn, feedTestEnterpriseSinks, withSettings(st)) +} + // TestChangefeedSchemaTTL ensures that changefeeds fail with an error in the case // where the feed has fallen behind the GC TTL of the table's schema. func TestChangefeedSchemaTTL(t *testing.T) { diff --git a/pkg/ccl/changefeedccl/changefeedbase/options.go b/pkg/ccl/changefeedccl/changefeedbase/options.go index 3bfafde72e55..3112c5b4f396 100644 --- a/pkg/ccl/changefeedccl/changefeedbase/options.go +++ b/pkg/ccl/changefeedccl/changefeedbase/options.go @@ -90,6 +90,7 @@ const ( OptSchemaChangePolicy = `schema_change_policy` OptSplitColumnFamilies = `split_column_families` OptProtectDataFromGCOnPause = `protect_data_from_gc_on_pause` + OptExpirePTSAfter = `gc_protect_expires_after` OptWebhookAuthHeader = `webhook_auth_header` OptWebhookClientTimeout = `webhook_client_timeout` OptOnError = `on_error` @@ -320,6 +321,7 @@ var ChangefeedOptionExpectValues = map[string]OptionPermittedValues{ OptNoInitialScan: flagOption, OptInitialScanOnly: flagOption, OptProtectDataFromGCOnPause: flagOption, + OptExpirePTSAfter: durationOption.thatCanBeZero(), OptKafkaSinkConfig: jsonOption, OptWebhookSinkConfig: jsonOption, OptWebhookAuthHeader: stringOption, @@ -339,7 +341,7 @@ var CommonOptions = makeStringSet(OptCursor, OptEndTime, OptEnvelope, OptSchemaChangeEvents, OptSchemaChangePolicy, OptProtectDataFromGCOnPause, OptOnError, OptInitialScan, OptNoInitialScan, OptInitialScanOnly, OptUnordered, - OptMinCheckpointFrequency, OptMetricsScope, OptVirtualColumns, Topics) + OptMinCheckpointFrequency, OptMetricsScope, OptVirtualColumns, Topics, OptExpirePTSAfter) // SQLValidOptions is options exclusive to SQL sink var SQLValidOptions map[string]struct{} = nil @@ -883,6 +885,19 @@ func (s StatementOptions) GetMinCheckpointFrequency() (*time.Duration, error) { return s.getDurationValue(OptMinCheckpointFrequency) } +// GetPTSExpiration returns the maximum age of the protected timestamp record. +// Changefeeds that fail to update their records in time will be canceled. +func (s StatementOptions) GetPTSExpiration() (time.Duration, error) { + exp, err := s.getDurationValue(OptExpirePTSAfter) + if err != nil { + return 0, err + } + if exp == nil { + return 0, nil + } + return *exp, nil +} + // ForceKeyInValue sets the encoding option KeyInValue to true and then validates the // resoluting encoding options. func (s StatementOptions) ForceKeyInValue() error { diff --git a/pkg/ccl/changefeedccl/helpers_test.go b/pkg/ccl/changefeedccl/helpers_test.go index 387e0054892e..dc74054377d3 100644 --- a/pkg/ccl/changefeedccl/helpers_test.go +++ b/pkg/ccl/changefeedccl/helpers_test.go @@ -41,6 +41,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/security/username" "github.com/cockroachdb/cockroach/pkg/server" "github.com/cockroachdb/cockroach/pkg/server/serverpb" + "github.com/cockroachdb/cockroach/pkg/settings/cluster" "github.com/cockroachdb/cockroach/pkg/sql/execinfra" "github.com/cockroachdb/cockroach/pkg/testutils" "github.com/cockroachdb/cockroach/pkg/testutils/serverutils" @@ -395,12 +396,12 @@ func startTestFullServer( DisableDefaultTestTenant: true, UseDatabase: `d`, ExternalIODir: options.externalIODir, + Settings: options.settings, } if options.argsFn != nil { options.argsFn(&args) } - resetRetry := testingUseFastRetry() resetFlushFrequency := changefeedbase.TestingSetDefaultMinCheckpointFrequency(testSinkFlushFrequency) s, db, _ := serverutils.StartServer(t, args) @@ -505,6 +506,7 @@ func startTestTenant( UseDatabase: `d`, TestingKnobs: knobs, ExternalIODir: options.externalIODir, + Settings: options.settings, } tenantServer, tenantDB := serverutils.StartTenant(t, systemServer, tenantArgs) @@ -534,6 +536,7 @@ type feedTestOptions struct { allowedSinkTypes []string disabledSinkTypes []string disableSyntheticTimestamps bool + settings *cluster.Settings } type feedTestOption func(opts *feedTestOptions) @@ -583,6 +586,14 @@ func withArgsFn(fn updateArgsFn) feedTestOption { return func(opts *feedTestOptions) { opts.argsFn = fn } } +// withSettingsFn arranges for a feed option to set the settings for +// both system and test tenant. +func withSettings(st *cluster.Settings) feedTestOption { + return func(opts *feedTestOptions) { + opts.settings = st + } +} + // withKnobsFn is a feedTestOption that allows the caller to modify // the testing knobs used by the test server. For multi-tenant // testing, these knobs are applied to both the kv and sql nodes. diff --git a/pkg/jobs/adopt.go b/pkg/jobs/adopt.go index 4b3ebd86c327..be9d9d6444fd 100644 --- a/pkg/jobs/adopt.go +++ b/pkg/jobs/adopt.go @@ -522,7 +522,10 @@ func (r *Registry) servePauseAndCancelRequests(ctx context.Context, s sqllivenes // update. r.clearLeaseForJobID(id, txn, txn.KV()) } - md.Payload.Error = errJobCanceled.Error() + if md.Payload.Error == "" { + // Set default cancellation reason. + md.Payload.Error = errJobCanceled.Error() + } encodedErr := errors.EncodeError(ctx, errJobCanceled) md.Payload.FinalResumeError = &encodedErr ju.UpdatePayload(md.Payload) diff --git a/pkg/jobs/jobs.go b/pkg/jobs/jobs.go index daa860c426ff..52ca9d29eb30 100644 --- a/pkg/jobs/jobs.go +++ b/pkg/jobs/jobs.go @@ -17,6 +17,7 @@ import ( "fmt" "reflect" "sync/atomic" + "time" "github.com/cockroachdb/cockroach/pkg/jobs/jobspb" "github.com/cockroachdb/cockroach/pkg/kv" @@ -114,6 +115,9 @@ type Record struct { // CreatedBy, if set, annotates this record with the information on // this job creator. CreatedBy *CreatedByInfo + // MaximumPTSAge specifies the maximum age of PTS record held by a job. + // 0 means no limit. + MaximumPTSAge time.Duration } // AppendDescription appends description to this records Description with a @@ -423,6 +427,16 @@ func (u Updater) Unpaused(ctx context.Context) error { // that it is in state StatusCancelRequested and will move it to state // StatusReverting. func (u Updater) CancelRequested(ctx context.Context) error { + return u.CancelRequestedWithReason(ctx, errJobCanceled) +} + +// CancelRequestedWithReason sets the status of the tracked job to cancel-requested. It +// does not directly cancel the job; like job.Paused, it expects the job to call +// job.Progressed soon, observe a "job is cancel-requested" error, and abort. +// Further the node the runs the job will actively cancel it when it notices +// that it is in state StatusCancelRequested and will move it to state +// StatusReverting. +func (u Updater) CancelRequestedWithReason(ctx context.Context, reason error) error { return u.Update(ctx, func(txn isql.Txn, md JobMetadata, ju *JobUpdater) error { if md.Payload.Noncancelable { return errors.Newf("job %d: not cancelable", md.ID) @@ -438,6 +452,8 @@ func (u Updater) CancelRequested(ctx context.Context) error { return errors.Wrapf(decodedErr, "job %d is paused and has non-nil FinalResumeError "+ "hence cannot be canceled and should be reverted", md.ID) } + md.Payload.Error = reason.Error() + ju.UpdatePayload(md.Payload) ju.UpdateStatus(StatusCancelRequested) return nil }) diff --git a/pkg/jobs/jobspb/jobs.proto b/pkg/jobs/jobspb/jobs.proto index aa3f73e84c76..b9af5cf2e7a8 100644 --- a/pkg/jobs/jobspb/jobs.proto +++ b/pkg/jobs/jobspb/jobs.proto @@ -1238,6 +1238,12 @@ message Payload { // cluster version, in case a job resuming later needs to use this information // to migrate or update the job. roachpb.Version creation_cluster_version = 36 [(gogoproto.nullable) = false]; + + // If a job lays protected timestamp records, this optional field + // specifies how old such record could get before this job is canceled. + int64 maximum_pts_age = 40 [(gogoproto.casttype) = "time.Duration", (gogoproto.customname) = "MaximumPTSAge"]; + + // NEXT ID: 41 } message Progress { diff --git a/pkg/jobs/metrics.go b/pkg/jobs/metrics.go index dcb0e1fa2c74..b0de4b621b02 100644 --- a/pkg/jobs/metrics.go +++ b/pkg/jobs/metrics.go @@ -69,6 +69,7 @@ type JobTypeMetrics struct { FailOrCancelFailed *metric.Counter NumJobsWithPTS *metric.Gauge + ExpiredPTS *metric.Counter ProtectedAge *metric.Gauge } @@ -181,8 +182,8 @@ func makeMetaProtectedCount(typeStr string) metric.Metadata { return metric.Metadata{ Name: fmt.Sprintf("jobs.%s.protected_record_count", typeStr), Help: fmt.Sprintf("Number of protected timestamp records held by %s jobs", typeStr), - Measurement: "bytes", - Unit: metric.Unit_BYTES, + Measurement: "records", + Unit: metric.Unit_COUNT, MetricType: io_prometheus_client.MetricType_GAUGE, } } @@ -197,6 +198,16 @@ func makeMetaProtectedAge(typeStr string) metric.Metadata { } } +func makeMetaExpiredPTS(typeStr string) metric.Metadata { + return metric.Metadata{ + Name: fmt.Sprintf("jobs.%s.expired_pts_records", typeStr), + Help: fmt.Sprintf("Number of expired protected timestamp records owned by %s jobs", typeStr), + Measurement: "records", + Unit: metric.Unit_COUNT, + MetricType: io_prometheus_client.MetricType_COUNTER, + } +} + var ( metaAdoptIterations = metric.Metadata{ Name: "jobs.adopt_iterations", @@ -268,6 +279,7 @@ func (m *Metrics) init(histogramWindowInterval time.Duration) { FailOrCancelRetryError: metric.NewCounter(makeMetaFailOrCancelRetryError(typeStr)), FailOrCancelFailed: metric.NewCounter(makeMetaFailOrCancelFailed(typeStr)), NumJobsWithPTS: metric.NewGauge(makeMetaProtectedCount(typeStr)), + ExpiredPTS: metric.NewCounter(makeMetaExpiredPTS(typeStr)), ProtectedAge: metric.NewGauge(makeMetaProtectedAge(typeStr)), } if opts, ok := options[jt]; ok && opts.metrics != nil { diff --git a/pkg/jobs/metricspoller/job_statistics.go b/pkg/jobs/metricspoller/job_statistics.go index f912606a14ad..14ad30e1ae62 100644 --- a/pkg/jobs/metricspoller/job_statistics.go +++ b/pkg/jobs/metricspoller/job_statistics.go @@ -23,6 +23,8 @@ import ( "github.com/cockroachdb/cockroach/pkg/sql/sem/tree" "github.com/cockroachdb/cockroach/pkg/sql/sessiondata" "github.com/cockroachdb/cockroach/pkg/util/hlc" + "github.com/cockroachdb/cockroach/pkg/util/log" + "github.com/cockroachdb/cockroach/pkg/util/timeutil" "github.com/cockroachdb/errors" ) @@ -77,10 +79,14 @@ func updatePausedMetrics(ctx context.Context, execCtx sql.JobExecContext) error return nil } -// updatePTSStats update protected timestamp statistics per job type. -func updatePTSStats(ctx context.Context, execCtx sql.JobExecContext) error { +// manageJobsProtectedTimestamps manages protected timestamp records owned by various jobs. +// This function mostly concerns itself with collecting statistics related to job PTS records. +// It also detects PTS records that are too old (as configured by the owner job) and requests +// job cancellation for those jobs. +func manageJobsProtectedTimestamps(ctx context.Context, execCtx sql.JobExecContext) error { type ptsStat struct { numRecords int64 + expired int64 oldest hlc.Timestamp } var ptsStats map[jobspb.Type]*ptsStat @@ -105,17 +111,38 @@ func updatePTSStats(ctx context.Context, execCtx sql.JobExecContext) error { continue } p := j.Payload() - stats := ptsStats[p.Type()] + jobType, err := p.CheckType() + if err != nil { + return err + } + stats := ptsStats[jobType] if stats == nil { stats = &ptsStat{} - ptsStats[p.Type()] = stats + ptsStats[jobType] = stats } stats.numRecords++ if stats.oldest.IsEmpty() || rec.Timestamp.Less(stats.oldest) { stats.oldest = rec.Timestamp } - } + // If MaximumPTSAge is set on the job payload, verify if PTS record + // timestamp is fresh enough. Note: we only look at paused jobs. + // If the running job wants to enforce an invariant wrt to PTS age, + // it can do so itself. This check here is a safety mechanism to detect + // paused jobs that own protected timestamp records. + if j.Status() == jobs.StatusPaused && + p.MaximumPTSAge > 0 && + rec.Timestamp.GoTime().Add(p.MaximumPTSAge).Before(timeutil.Now()) { + stats.expired++ + ptsExpired := errors.Newf( + "protected timestamp records %s as of %s (age %s) exceeds job configured limit of %s", + rec.ID, rec.Timestamp, timeutil.Since(rec.Timestamp.GoTime()), p.MaximumPTSAge) + if err := j.WithTxn(txn).CancelRequestedWithReason(ctx, ptsExpired); err != nil { + return err + } + log.Warningf(ctx, "job %d canceled due to %s", id, ptsExpired) + } + } return nil }); err != nil { return err @@ -130,6 +157,7 @@ func updatePTSStats(ctx context.Context, execCtx sql.JobExecContext) error { stats, found := ptsStats[jobspb.Type(typ)] if found { m.NumJobsWithPTS.Update(stats.numRecords) + m.ExpiredPTS.Inc(stats.expired) if stats.oldest.WallTime > 0 { m.ProtectedAge.Update((execCfg.Clock.Now().WallTime - stats.oldest.WallTime) / 1e9) } else { @@ -137,6 +165,7 @@ func updatePTSStats(ctx context.Context, execCtx sql.JobExecContext) error { } } else { // If we haven't found PTS records for a job type, then reset stats. + // (note: we don't reset counter based stats) m.NumJobsWithPTS.Update(0) m.ProtectedAge.Update(0) } diff --git a/pkg/jobs/metricspoller/poller.go b/pkg/jobs/metricspoller/poller.go index 186687f58607..6e63ad26f9f2 100644 --- a/pkg/jobs/metricspoller/poller.go +++ b/pkg/jobs/metricspoller/poller.go @@ -71,13 +71,11 @@ func (mp *metricsPoller) Resume(ctx context.Context, execCtx interface{}) error return ctx.Err() case <-t.C: t.Read = true - if err := runTask("paused-jobs", updatePausedMetrics); err != nil { - log.Errorf(ctx, "Periodic stats collector task paused-jobs completed with error %s", err) - metrics.numErrors.Inc(1) - } - if err := runTask("pts-stats", updatePTSStats); err != nil { - log.Errorf(ctx, "Periodic stats collector task pts-stats completed with error %s", err) - metrics.numErrors.Inc(1) + for name, task := range metricPollerTasks { + if err := runTask(name, task); err != nil { + log.Errorf(ctx, "Periodic stats collector task %s completed with error %s", name, err) + metrics.numErrors.Inc(1) + } } } } @@ -87,6 +85,13 @@ type pollerMetrics struct { numErrors *metric.Counter } +// metricsPollerTasks lists the list of tasks performed on each iteration +// of metrics poller. +var metricPollerTasks = map[string]func(ctx context.Context, execCtx sql.JobExecContext) error{ + "paused-jobs": updatePausedMetrics, + "manage-pts": manageJobsProtectedTimestamps, +} + func (m pollerMetrics) MetricStruct() {} func newPollerMetrics() metric.Struct { diff --git a/pkg/jobs/registry.go b/pkg/jobs/registry.go index 3062077873ac..a211692a5984 100644 --- a/pkg/jobs/registry.go +++ b/pkg/jobs/registry.go @@ -343,6 +343,7 @@ func (r *Registry) makePayload(ctx context.Context, record *Record) (jobspb.Payl Noncancelable: record.NonCancelable, CreationClusterVersion: r.settings.Version.ActiveVersion(ctx).Version, CreationClusterID: r.clusterID.Get(), + MaximumPTSAge: record.MaximumPTSAge, }, nil } diff --git a/pkg/ts/catalog/BUILD.bazel b/pkg/ts/catalog/BUILD.bazel index f11f5787f140..4020c5dd8168 100644 --- a/pkg/ts/catalog/BUILD.bazel +++ b/pkg/ts/catalog/BUILD.bazel @@ -14,6 +14,7 @@ go_library( importpath = "github.com/cockroachdb/cockroach/pkg/ts/catalog", visibility = ["//visibility:public"], deps = [ + "//pkg/jobs/jobspb", "//pkg/ts/tspb", "//pkg/util/metric", "@com_github_cockroachdb_errors//:errors",