From 0a3285d9d74f85de846b840d68bb97dbfda3d655 Mon Sep 17 00:00:00 2001 From: Yevgeniy Miretskiy Date: Tue, 14 Feb 2023 16:55:58 -0500 Subject: [PATCH] changefeedccl: Expire protected timestamps Changefeeds utilize protected timestamp system (PTS) to ensure that the data targeted by changefeed is not garbage collected prematurely. PTS record is managed by running changefeed by periodically updating PTS record timestamp, so that the data older than the that timestamp may be GCed. However, if the changefeed stops running when it is paused (either due to operator action, or due to `on_error=pause` option, the PTS record remains so that the changefeed can be resumed at a later time. However, it is also possible that operator may not notice that the job is paused for too long, thus causing buildup of garbage data. Excessive buildup of GC work is not great since it impacts overall cluster performance, and, once GC can resume, its cost is proportional to how much GC work needs to be done. This PR introduces a new changefeed option `gc_protect_expires_after` to automatically expire PTS records that are too old. This automatic expiration is a safety mechanism in case changefeed job gets paused by an operator or due to an error, while holding onto PTS record due to `protect_gc_on_pause` option. The operator is still expected to monitor changefeed jobs, and to restart paused changefeeds expediently. If the changefeed job remains paused, and the underlying PTS records expires, then the changefeed job will be canceled to prevent build up of GC data. Epic: CRDB-21953 This PR does not add expiration to the job itself, as requested by #84598, but it does accomplish the same goal. Release note (enterprise change): Changefeed will automatically expire PTS records for paused jobs if changefeed is configured with `gc_protect_expires_after` option. --- .../changefeedccl/alter_changefeed_stmt.go | 7 ++- .../changefeedccl/changefeed_processors.go | 4 +- pkg/ccl/changefeedccl/changefeed_stmt.go | 32 +++++++++++-- pkg/ccl/changefeedccl/changefeed_test.go | 47 +++++++++++++++++++ .../changefeedccl/changefeedbase/options.go | 17 ++++++- pkg/ccl/changefeedccl/helpers_test.go | 13 ++++- pkg/jobs/adopt.go | 5 +- pkg/jobs/jobs.go | 16 +++++++ pkg/jobs/jobspb/jobs.proto | 6 +++ pkg/jobs/metrics.go | 16 ++++++- pkg/jobs/metricspoller/job_statistics.go | 39 +++++++++++++-- pkg/jobs/metricspoller/poller.go | 19 +++++--- pkg/jobs/registry.go | 1 + pkg/ts/catalog/BUILD.bazel | 1 + 14 files changed, 199 insertions(+), 24 deletions(-) diff --git a/pkg/ccl/changefeedccl/alter_changefeed_stmt.go b/pkg/ccl/changefeedccl/alter_changefeed_stmt.go index bdf7071deea8..a1eb8224d1eb 100644 --- a/pkg/ccl/changefeedccl/alter_changefeed_stmt.go +++ b/pkg/ccl/changefeedccl/alter_changefeed_stmt.go @@ -192,7 +192,11 @@ func alterChangefeedPlanHook( newPayload.Details = jobspb.WrapPayloadDetails(newDetails) newPayload.Description = jobRecord.Description newPayload.DescriptorIDs = jobRecord.DescriptorIDs - + newExpiration, err := newOptions.GetPTSExpiration() + if err != nil { + return err + } + newPayload.MaximumPTSAge = newExpiration j, err := p.ExecCfg().JobRegistry.LoadJobWithTxn(ctx, jobID, p.InternalSQLTxn()) if err != nil { return err @@ -204,6 +208,7 @@ func alterChangefeedPlanHook( if newProgress != nil { ju.UpdateProgress(newProgress) } + return nil }); err != nil { return err diff --git a/pkg/ccl/changefeedccl/changefeed_processors.go b/pkg/ccl/changefeedccl/changefeed_processors.go index 5b1e487e0984..72986eb23996 100644 --- a/pkg/ccl/changefeedccl/changefeed_processors.go +++ b/pkg/ccl/changefeedccl/changefeed_processors.go @@ -1369,7 +1369,9 @@ func (cf *changeFrontier) manageProtectedTimestamps( recordID := progress.ProtectedTimestampRecord if recordID == uuid.Nil { - ptr := createProtectedTimestampRecord(ctx, cf.flowCtx.Codec(), cf.spec.JobID, AllTargets(cf.spec.Feed), highWater, progress) + ptr := createProtectedTimestampRecord( + ctx, cf.flowCtx.Codec(), cf.spec.JobID, AllTargets(cf.spec.Feed), highWater, progress, + ) if err := pts.Protect(ctx, ptr); err != nil { return err } diff --git a/pkg/ccl/changefeedccl/changefeed_stmt.go b/pkg/ccl/changefeedccl/changefeed_stmt.go index a456d8ce8855..2280769fd370 100644 --- a/pkg/ccl/changefeedccl/changefeed_stmt.go +++ b/pkg/ccl/changefeedccl/changefeed_stmt.go @@ -650,9 +650,28 @@ func createChangefeedJobRecord( // TODO: Ideally those option validations would happen in validateDetails() // earlier, like the others. err = validateSink(ctx, p, jobID, details, opts) - + if err != nil { + return nil, err + } details.Opts = opts.AsMap() + ptsExpiration, err := opts.GetPTSExpiration() + if err != nil { + return nil, err + } + + if ptsExpiration < time.Hour { + // This threshold is rather arbitrary. But we want to warn users about + // the potential impact of keeping this setting too low. + p.BufferClientNotice(ctx, pgnotice.Newf( + `the value of %s for changefeed option %s might be too low. +Having a low value for should not have adverse effect as long as changefeed is running. However, +should the changefeed be paused, it will need to be resumed before expiration time. +The value of this setting should reflect how much time the changefeed may remain to be paused, +before it is canceled. Few hours to a few days range are appropriate for this option. +`, ptsExpiration, changefeedbase.OptExpirePTSAfter, ptsExpiration)) + } + jr := &jobs.Record{ Description: jobDescription, Username: p.User(), @@ -662,11 +681,12 @@ func createChangefeedJobRecord( } return sqlDescIDs }(), - Details: details, - CreatedBy: changefeedStmt.CreatedByInfo, + Details: details, + CreatedBy: changefeedStmt.CreatedByInfo, + MaximumPTSAge: ptsExpiration, } - return jr, err + return jr, nil } func validateSettings(ctx context.Context, p sql.PlanHookState) error { @@ -1206,7 +1226,9 @@ func (b *changefeedResumer) OnPauseRequest( return nil } pts := execCfg.ProtectedTimestampProvider.WithTxn(txn) - ptr := createProtectedTimestampRecord(ctx, execCfg.Codec, b.job.ID(), AllTargets(details), *resolved, cp) + ptr := createProtectedTimestampRecord( + ctx, execCfg.Codec, b.job.ID(), AllTargets(details), *resolved, cp, + ) return pts.Protect(ctx, ptr) } diff --git a/pkg/ccl/changefeedccl/changefeed_test.go b/pkg/ccl/changefeedccl/changefeed_test.go index 25afbffbde2b..6bb745eb8f70 100644 --- a/pkg/ccl/changefeedccl/changefeed_test.go +++ b/pkg/ccl/changefeedccl/changefeed_test.go @@ -4166,6 +4166,53 @@ func TestChangefeedDataTTL(t *testing.T) { cdcTestWithSystem(t, testFn, feedTestForceSink("sinkless"), feedTestNoTenants) } +// TestChangefeedCanceledWhenPTSIsOld verifies paused changefeed job which holds PTS +// record gets canceled if paused for too long. +func TestChangefeedCanceledWhenPTSIsOld(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + + testFn := func(t *testing.T, s TestServer, f cdctest.TestFeedFactory) { + sqlDB := sqlutils.MakeSQLRunner(s.DB) + + sqlDB.Exec(t, `SET CLUSTER SETTING kv.closed_timestamp.target_duration = '100ms';`) + // Create the data table; it will only contain a + // single row with multiple versions. + sqlDB.Exec(t, `CREATE TABLE foo (a INT PRIMARY KEY, b INT)`) + + feed, err := f.Feed("CREATE CHANGEFEED FOR TABLE foo WITH protect_data_from_gc_on_pause, gc_protect_expires_after='24h'") + require.NoError(t, err) + defer func() { + closeFeed(t, feed) + }() + + jobFeed := feed.(cdctest.EnterpriseTestFeed) + require.NoError(t, jobFeed.Pause()) + + // While the job is paused, take opportunity to test that alter changefeed + // works when setting gc_protect_expires_after option. + + // Verify we can set it to 0 -- i.e. disable. + sqlDB.Exec(t, fmt.Sprintf("ALTER CHANGEFEED %d SET gc_protect_expires_after = '0s'", jobFeed.JobID())) + // Now, set it to something very small. + sqlDB.Exec(t, fmt.Sprintf("ALTER CHANGEFEED %d SET gc_protect_expires_after = '250ms'", jobFeed.JobID())) + + // Stale PTS record should trigger job cancellation. + require.NoError(t, jobFeed.WaitForStatus(func(s jobs.Status) bool { + if s == jobs.StatusCanceled { + return true + } + // t.Logf("still waiting for job cancelatlation; current status %s", s) + return false + })) + } + + // Ensure metrics poller loop runs fast. + st := cluster.MakeTestingClusterSettings() + jobs.PollJobsMetricsInterval.Override(context.Background(), &st.SV, 100*time.Millisecond) + cdcTest(t, testFn, feedTestEnterpriseSinks, withSettings(st)) +} + // TestChangefeedSchemaTTL ensures that changefeeds fail with an error in the case // where the feed has fallen behind the GC TTL of the table's schema. func TestChangefeedSchemaTTL(t *testing.T) { diff --git a/pkg/ccl/changefeedccl/changefeedbase/options.go b/pkg/ccl/changefeedccl/changefeedbase/options.go index 3bfafde72e55..3112c5b4f396 100644 --- a/pkg/ccl/changefeedccl/changefeedbase/options.go +++ b/pkg/ccl/changefeedccl/changefeedbase/options.go @@ -90,6 +90,7 @@ const ( OptSchemaChangePolicy = `schema_change_policy` OptSplitColumnFamilies = `split_column_families` OptProtectDataFromGCOnPause = `protect_data_from_gc_on_pause` + OptExpirePTSAfter = `gc_protect_expires_after` OptWebhookAuthHeader = `webhook_auth_header` OptWebhookClientTimeout = `webhook_client_timeout` OptOnError = `on_error` @@ -320,6 +321,7 @@ var ChangefeedOptionExpectValues = map[string]OptionPermittedValues{ OptNoInitialScan: flagOption, OptInitialScanOnly: flagOption, OptProtectDataFromGCOnPause: flagOption, + OptExpirePTSAfter: durationOption.thatCanBeZero(), OptKafkaSinkConfig: jsonOption, OptWebhookSinkConfig: jsonOption, OptWebhookAuthHeader: stringOption, @@ -339,7 +341,7 @@ var CommonOptions = makeStringSet(OptCursor, OptEndTime, OptEnvelope, OptSchemaChangeEvents, OptSchemaChangePolicy, OptProtectDataFromGCOnPause, OptOnError, OptInitialScan, OptNoInitialScan, OptInitialScanOnly, OptUnordered, - OptMinCheckpointFrequency, OptMetricsScope, OptVirtualColumns, Topics) + OptMinCheckpointFrequency, OptMetricsScope, OptVirtualColumns, Topics, OptExpirePTSAfter) // SQLValidOptions is options exclusive to SQL sink var SQLValidOptions map[string]struct{} = nil @@ -883,6 +885,19 @@ func (s StatementOptions) GetMinCheckpointFrequency() (*time.Duration, error) { return s.getDurationValue(OptMinCheckpointFrequency) } +// GetPTSExpiration returns the maximum age of the protected timestamp record. +// Changefeeds that fail to update their records in time will be canceled. +func (s StatementOptions) GetPTSExpiration() (time.Duration, error) { + exp, err := s.getDurationValue(OptExpirePTSAfter) + if err != nil { + return 0, err + } + if exp == nil { + return 0, nil + } + return *exp, nil +} + // ForceKeyInValue sets the encoding option KeyInValue to true and then validates the // resoluting encoding options. func (s StatementOptions) ForceKeyInValue() error { diff --git a/pkg/ccl/changefeedccl/helpers_test.go b/pkg/ccl/changefeedccl/helpers_test.go index 387e0054892e..dc74054377d3 100644 --- a/pkg/ccl/changefeedccl/helpers_test.go +++ b/pkg/ccl/changefeedccl/helpers_test.go @@ -41,6 +41,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/security/username" "github.com/cockroachdb/cockroach/pkg/server" "github.com/cockroachdb/cockroach/pkg/server/serverpb" + "github.com/cockroachdb/cockroach/pkg/settings/cluster" "github.com/cockroachdb/cockroach/pkg/sql/execinfra" "github.com/cockroachdb/cockroach/pkg/testutils" "github.com/cockroachdb/cockroach/pkg/testutils/serverutils" @@ -395,12 +396,12 @@ func startTestFullServer( DisableDefaultTestTenant: true, UseDatabase: `d`, ExternalIODir: options.externalIODir, + Settings: options.settings, } if options.argsFn != nil { options.argsFn(&args) } - resetRetry := testingUseFastRetry() resetFlushFrequency := changefeedbase.TestingSetDefaultMinCheckpointFrequency(testSinkFlushFrequency) s, db, _ := serverutils.StartServer(t, args) @@ -505,6 +506,7 @@ func startTestTenant( UseDatabase: `d`, TestingKnobs: knobs, ExternalIODir: options.externalIODir, + Settings: options.settings, } tenantServer, tenantDB := serverutils.StartTenant(t, systemServer, tenantArgs) @@ -534,6 +536,7 @@ type feedTestOptions struct { allowedSinkTypes []string disabledSinkTypes []string disableSyntheticTimestamps bool + settings *cluster.Settings } type feedTestOption func(opts *feedTestOptions) @@ -583,6 +586,14 @@ func withArgsFn(fn updateArgsFn) feedTestOption { return func(opts *feedTestOptions) { opts.argsFn = fn } } +// withSettingsFn arranges for a feed option to set the settings for +// both system and test tenant. +func withSettings(st *cluster.Settings) feedTestOption { + return func(opts *feedTestOptions) { + opts.settings = st + } +} + // withKnobsFn is a feedTestOption that allows the caller to modify // the testing knobs used by the test server. For multi-tenant // testing, these knobs are applied to both the kv and sql nodes. diff --git a/pkg/jobs/adopt.go b/pkg/jobs/adopt.go index 4b3ebd86c327..be9d9d6444fd 100644 --- a/pkg/jobs/adopt.go +++ b/pkg/jobs/adopt.go @@ -522,7 +522,10 @@ func (r *Registry) servePauseAndCancelRequests(ctx context.Context, s sqllivenes // update. r.clearLeaseForJobID(id, txn, txn.KV()) } - md.Payload.Error = errJobCanceled.Error() + if md.Payload.Error == "" { + // Set default cancellation reason. + md.Payload.Error = errJobCanceled.Error() + } encodedErr := errors.EncodeError(ctx, errJobCanceled) md.Payload.FinalResumeError = &encodedErr ju.UpdatePayload(md.Payload) diff --git a/pkg/jobs/jobs.go b/pkg/jobs/jobs.go index daa860c426ff..52ca9d29eb30 100644 --- a/pkg/jobs/jobs.go +++ b/pkg/jobs/jobs.go @@ -17,6 +17,7 @@ import ( "fmt" "reflect" "sync/atomic" + "time" "github.com/cockroachdb/cockroach/pkg/jobs/jobspb" "github.com/cockroachdb/cockroach/pkg/kv" @@ -114,6 +115,9 @@ type Record struct { // CreatedBy, if set, annotates this record with the information on // this job creator. CreatedBy *CreatedByInfo + // MaximumPTSAge specifies the maximum age of PTS record held by a job. + // 0 means no limit. + MaximumPTSAge time.Duration } // AppendDescription appends description to this records Description with a @@ -423,6 +427,16 @@ func (u Updater) Unpaused(ctx context.Context) error { // that it is in state StatusCancelRequested and will move it to state // StatusReverting. func (u Updater) CancelRequested(ctx context.Context) error { + return u.CancelRequestedWithReason(ctx, errJobCanceled) +} + +// CancelRequestedWithReason sets the status of the tracked job to cancel-requested. It +// does not directly cancel the job; like job.Paused, it expects the job to call +// job.Progressed soon, observe a "job is cancel-requested" error, and abort. +// Further the node the runs the job will actively cancel it when it notices +// that it is in state StatusCancelRequested and will move it to state +// StatusReverting. +func (u Updater) CancelRequestedWithReason(ctx context.Context, reason error) error { return u.Update(ctx, func(txn isql.Txn, md JobMetadata, ju *JobUpdater) error { if md.Payload.Noncancelable { return errors.Newf("job %d: not cancelable", md.ID) @@ -438,6 +452,8 @@ func (u Updater) CancelRequested(ctx context.Context) error { return errors.Wrapf(decodedErr, "job %d is paused and has non-nil FinalResumeError "+ "hence cannot be canceled and should be reverted", md.ID) } + md.Payload.Error = reason.Error() + ju.UpdatePayload(md.Payload) ju.UpdateStatus(StatusCancelRequested) return nil }) diff --git a/pkg/jobs/jobspb/jobs.proto b/pkg/jobs/jobspb/jobs.proto index aa3f73e84c76..b9af5cf2e7a8 100644 --- a/pkg/jobs/jobspb/jobs.proto +++ b/pkg/jobs/jobspb/jobs.proto @@ -1238,6 +1238,12 @@ message Payload { // cluster version, in case a job resuming later needs to use this information // to migrate or update the job. roachpb.Version creation_cluster_version = 36 [(gogoproto.nullable) = false]; + + // If a job lays protected timestamp records, this optional field + // specifies how old such record could get before this job is canceled. + int64 maximum_pts_age = 40 [(gogoproto.casttype) = "time.Duration", (gogoproto.customname) = "MaximumPTSAge"]; + + // NEXT ID: 41 } message Progress { diff --git a/pkg/jobs/metrics.go b/pkg/jobs/metrics.go index dcb0e1fa2c74..b0de4b621b02 100644 --- a/pkg/jobs/metrics.go +++ b/pkg/jobs/metrics.go @@ -69,6 +69,7 @@ type JobTypeMetrics struct { FailOrCancelFailed *metric.Counter NumJobsWithPTS *metric.Gauge + ExpiredPTS *metric.Counter ProtectedAge *metric.Gauge } @@ -181,8 +182,8 @@ func makeMetaProtectedCount(typeStr string) metric.Metadata { return metric.Metadata{ Name: fmt.Sprintf("jobs.%s.protected_record_count", typeStr), Help: fmt.Sprintf("Number of protected timestamp records held by %s jobs", typeStr), - Measurement: "bytes", - Unit: metric.Unit_BYTES, + Measurement: "records", + Unit: metric.Unit_COUNT, MetricType: io_prometheus_client.MetricType_GAUGE, } } @@ -197,6 +198,16 @@ func makeMetaProtectedAge(typeStr string) metric.Metadata { } } +func makeMetaExpiredPTS(typeStr string) metric.Metadata { + return metric.Metadata{ + Name: fmt.Sprintf("jobs.%s.expired_pts_records", typeStr), + Help: fmt.Sprintf("Number of expired protected timestamp records owned by %s jobs", typeStr), + Measurement: "records", + Unit: metric.Unit_COUNT, + MetricType: io_prometheus_client.MetricType_COUNTER, + } +} + var ( metaAdoptIterations = metric.Metadata{ Name: "jobs.adopt_iterations", @@ -268,6 +279,7 @@ func (m *Metrics) init(histogramWindowInterval time.Duration) { FailOrCancelRetryError: metric.NewCounter(makeMetaFailOrCancelRetryError(typeStr)), FailOrCancelFailed: metric.NewCounter(makeMetaFailOrCancelFailed(typeStr)), NumJobsWithPTS: metric.NewGauge(makeMetaProtectedCount(typeStr)), + ExpiredPTS: metric.NewCounter(makeMetaExpiredPTS(typeStr)), ProtectedAge: metric.NewGauge(makeMetaProtectedAge(typeStr)), } if opts, ok := options[jt]; ok && opts.metrics != nil { diff --git a/pkg/jobs/metricspoller/job_statistics.go b/pkg/jobs/metricspoller/job_statistics.go index f912606a14ad..14ad30e1ae62 100644 --- a/pkg/jobs/metricspoller/job_statistics.go +++ b/pkg/jobs/metricspoller/job_statistics.go @@ -23,6 +23,8 @@ import ( "github.com/cockroachdb/cockroach/pkg/sql/sem/tree" "github.com/cockroachdb/cockroach/pkg/sql/sessiondata" "github.com/cockroachdb/cockroach/pkg/util/hlc" + "github.com/cockroachdb/cockroach/pkg/util/log" + "github.com/cockroachdb/cockroach/pkg/util/timeutil" "github.com/cockroachdb/errors" ) @@ -77,10 +79,14 @@ func updatePausedMetrics(ctx context.Context, execCtx sql.JobExecContext) error return nil } -// updatePTSStats update protected timestamp statistics per job type. -func updatePTSStats(ctx context.Context, execCtx sql.JobExecContext) error { +// manageJobsProtectedTimestamps manages protected timestamp records owned by various jobs. +// This function mostly concerns itself with collecting statistics related to job PTS records. +// It also detects PTS records that are too old (as configured by the owner job) and requests +// job cancellation for those jobs. +func manageJobsProtectedTimestamps(ctx context.Context, execCtx sql.JobExecContext) error { type ptsStat struct { numRecords int64 + expired int64 oldest hlc.Timestamp } var ptsStats map[jobspb.Type]*ptsStat @@ -105,17 +111,38 @@ func updatePTSStats(ctx context.Context, execCtx sql.JobExecContext) error { continue } p := j.Payload() - stats := ptsStats[p.Type()] + jobType, err := p.CheckType() + if err != nil { + return err + } + stats := ptsStats[jobType] if stats == nil { stats = &ptsStat{} - ptsStats[p.Type()] = stats + ptsStats[jobType] = stats } stats.numRecords++ if stats.oldest.IsEmpty() || rec.Timestamp.Less(stats.oldest) { stats.oldest = rec.Timestamp } - } + // If MaximumPTSAge is set on the job payload, verify if PTS record + // timestamp is fresh enough. Note: we only look at paused jobs. + // If the running job wants to enforce an invariant wrt to PTS age, + // it can do so itself. This check here is a safety mechanism to detect + // paused jobs that own protected timestamp records. + if j.Status() == jobs.StatusPaused && + p.MaximumPTSAge > 0 && + rec.Timestamp.GoTime().Add(p.MaximumPTSAge).Before(timeutil.Now()) { + stats.expired++ + ptsExpired := errors.Newf( + "protected timestamp records %s as of %s (age %s) exceeds job configured limit of %s", + rec.ID, rec.Timestamp, timeutil.Since(rec.Timestamp.GoTime()), p.MaximumPTSAge) + if err := j.WithTxn(txn).CancelRequestedWithReason(ctx, ptsExpired); err != nil { + return err + } + log.Warningf(ctx, "job %d canceled due to %s", id, ptsExpired) + } + } return nil }); err != nil { return err @@ -130,6 +157,7 @@ func updatePTSStats(ctx context.Context, execCtx sql.JobExecContext) error { stats, found := ptsStats[jobspb.Type(typ)] if found { m.NumJobsWithPTS.Update(stats.numRecords) + m.ExpiredPTS.Inc(stats.expired) if stats.oldest.WallTime > 0 { m.ProtectedAge.Update((execCfg.Clock.Now().WallTime - stats.oldest.WallTime) / 1e9) } else { @@ -137,6 +165,7 @@ func updatePTSStats(ctx context.Context, execCtx sql.JobExecContext) error { } } else { // If we haven't found PTS records for a job type, then reset stats. + // (note: we don't reset counter based stats) m.NumJobsWithPTS.Update(0) m.ProtectedAge.Update(0) } diff --git a/pkg/jobs/metricspoller/poller.go b/pkg/jobs/metricspoller/poller.go index 186687f58607..6e63ad26f9f2 100644 --- a/pkg/jobs/metricspoller/poller.go +++ b/pkg/jobs/metricspoller/poller.go @@ -71,13 +71,11 @@ func (mp *metricsPoller) Resume(ctx context.Context, execCtx interface{}) error return ctx.Err() case <-t.C: t.Read = true - if err := runTask("paused-jobs", updatePausedMetrics); err != nil { - log.Errorf(ctx, "Periodic stats collector task paused-jobs completed with error %s", err) - metrics.numErrors.Inc(1) - } - if err := runTask("pts-stats", updatePTSStats); err != nil { - log.Errorf(ctx, "Periodic stats collector task pts-stats completed with error %s", err) - metrics.numErrors.Inc(1) + for name, task := range metricPollerTasks { + if err := runTask(name, task); err != nil { + log.Errorf(ctx, "Periodic stats collector task %s completed with error %s", name, err) + metrics.numErrors.Inc(1) + } } } } @@ -87,6 +85,13 @@ type pollerMetrics struct { numErrors *metric.Counter } +// metricsPollerTasks lists the list of tasks performed on each iteration +// of metrics poller. +var metricPollerTasks = map[string]func(ctx context.Context, execCtx sql.JobExecContext) error{ + "paused-jobs": updatePausedMetrics, + "manage-pts": manageJobsProtectedTimestamps, +} + func (m pollerMetrics) MetricStruct() {} func newPollerMetrics() metric.Struct { diff --git a/pkg/jobs/registry.go b/pkg/jobs/registry.go index 3062077873ac..a211692a5984 100644 --- a/pkg/jobs/registry.go +++ b/pkg/jobs/registry.go @@ -343,6 +343,7 @@ func (r *Registry) makePayload(ctx context.Context, record *Record) (jobspb.Payl Noncancelable: record.NonCancelable, CreationClusterVersion: r.settings.Version.ActiveVersion(ctx).Version, CreationClusterID: r.clusterID.Get(), + MaximumPTSAge: record.MaximumPTSAge, }, nil } diff --git a/pkg/ts/catalog/BUILD.bazel b/pkg/ts/catalog/BUILD.bazel index f11f5787f140..4020c5dd8168 100644 --- a/pkg/ts/catalog/BUILD.bazel +++ b/pkg/ts/catalog/BUILD.bazel @@ -14,6 +14,7 @@ go_library( importpath = "github.com/cockroachdb/cockroach/pkg/ts/catalog", visibility = ["//visibility:public"], deps = [ + "//pkg/jobs/jobspb", "//pkg/ts/tspb", "//pkg/util/metric", "@com_github_cockroachdb_errors//:errors",