Skip to content

Commit

Permalink
Merge branch 'master' into kv-reduce-retry-log
Browse files Browse the repository at this point in the history
  • Loading branch information
ti-chi-bot committed Dec 28, 2021
2 parents a90e28c + 20e3f13 commit 8ccd0ab
Show file tree
Hide file tree
Showing 3 changed files with 3,070 additions and 2,635 deletions.
20 changes: 17 additions & 3 deletions cdc/owner/changefeed.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,8 @@ type changefeed struct {

metricsChangefeedCheckpointTsGauge prometheus.Gauge
metricsChangefeedCheckpointTsLagGauge prometheus.Gauge
metricsChangefeedResolvedTsGauge prometheus.Gauge
metricsChangefeedResolvedTsLagGauge prometheus.Gauge

newDDLPuller func(ctx cdcContext.Context, startTs uint64) (DDLPuller, error)
newSink func() DDLSink
Expand Down Expand Up @@ -292,6 +294,8 @@ LOOP:
// init metrics
c.metricsChangefeedCheckpointTsGauge = changefeedCheckpointTsGauge.WithLabelValues(c.id)
c.metricsChangefeedCheckpointTsLagGauge = changefeedCheckpointTsLagGauge.WithLabelValues(c.id)
c.metricsChangefeedResolvedTsGauge = changefeedResolvedTsGauge.WithLabelValues(c.id)
c.metricsChangefeedResolvedTsLagGauge = changefeedResolvedTsLagGauge.WithLabelValues(c.id)

// create scheduler
c.scheduler, err = c.newScheduler(ctx, checkpointTs)
Expand Down Expand Up @@ -323,10 +327,17 @@ func (c *changefeed) releaseResources(ctx cdcContext.Context) {
}
c.wg.Wait()
c.scheduler.Close(ctx)

changefeedCheckpointTsGauge.DeleteLabelValues(c.id)
changefeedCheckpointTsLagGauge.DeleteLabelValues(c.id)
c.metricsChangefeedCheckpointTsGauge = nil
c.metricsChangefeedCheckpointTsLagGauge = nil

changefeedResolvedTsGauge.DeleteLabelValues(c.id)
changefeedResolvedTsLagGauge.DeleteLabelValues(c.id)
c.metricsChangefeedResolvedTsGauge = nil
c.metricsChangefeedResolvedTsLagGauge = nil

c.initialized = false
}

Expand Down Expand Up @@ -524,10 +535,13 @@ func (c *changefeed) updateStatus(currentTs int64, checkpointTs, resolvedTs mode
}
return status, changed, nil
})
phyTs := oracle.ExtractPhysical(checkpointTs)
phyCkpTs := oracle.ExtractPhysical(checkpointTs)
c.metricsChangefeedCheckpointTsGauge.Set(float64(phyCkpTs))
c.metricsChangefeedCheckpointTsLagGauge.Set(float64(currentTs-phyCkpTs) / 1e3)

c.metricsChangefeedCheckpointTsGauge.Set(float64(phyTs))
c.metricsChangefeedCheckpointTsLagGauge.Set(float64(currentTs-phyTs) / 1e3)
phyRTs := oracle.ExtractPhysical(resolvedTs)
c.metricsChangefeedResolvedTsGauge.Set(float64(phyRTs))
c.metricsChangefeedResolvedTsLagGauge.Set(float64(currentTs-phyRTs) / 1e3)
}

func (c *changefeed) Close(ctx cdcContext.Context) {
Expand Down
18 changes: 17 additions & 1 deletion cdc/owner/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,21 @@ var (
Namespace: "ticdc",
Subsystem: "owner",
Name: "checkpoint_ts_lag",
Help: "checkpoint ts lag of changefeeds",
Help: "checkpoint ts lag of changefeeds in seconds",
}, []string{"changefeed"})
changefeedResolvedTsGauge = prometheus.NewGaugeVec(
prometheus.GaugeOpts{
Namespace: "ticdc",
Subsystem: "owner",
Name: "resolved_ts",
Help: "resolved ts of changefeeds",
}, []string{"changefeed"})
changefeedResolvedTsLagGauge = prometheus.NewGaugeVec(
prometheus.GaugeOpts{
Namespace: "ticdc",
Subsystem: "owner",
Name: "resolved_ts_lag",
Help: "resolved ts lag of changefeeds in seconds",
}, []string{"changefeed"})
ownershipCounter = prometheus.NewCounter(
prometheus.CounterOpts{
Expand Down Expand Up @@ -63,7 +77,9 @@ const (
// InitMetrics registers all metrics used in owner
func InitMetrics(registry *prometheus.Registry) {
registry.MustRegister(changefeedCheckpointTsGauge)
registry.MustRegister(changefeedResolvedTsGauge)
registry.MustRegister(changefeedCheckpointTsLagGauge)
registry.MustRegister(changefeedResolvedTsLagGauge)
registry.MustRegister(ownershipCounter)
registry.MustRegister(ownerMaintainTableNumGauge)
registry.MustRegister(changefeedStatusGauge)
Expand Down

0 comments on commit 8ccd0ab

Please sign in to comment.