From 8ab9c91300663dd0db4ca6ebf2165a92d611f645 Mon Sep 17 00:00:00 2001 From: wlwilliamx <53336371+wlwilliamx@users.noreply.github.com> Date: Tue, 19 May 2026 16:54:52 +0800 Subject: [PATCH] This is an automated cherry-pick of #5095 Signed-off-by: ti-chi-bot --- api/middleware/changefeed_operation.go | 212 ++++++++++++ api/middleware/changefeed_operation_test.go | 139 ++++++++ api/v1/api.go | 10 +- api/v2/api.go | 10 +- api/v2/changefeed.go | 25 +- metrics/grafana/ticdc_new_arch.json | 314 ++++++++++++++++++ .../ticdc_new_arch_next_gen.json | 314 ++++++++++++++++++ .../ticdc_new_arch_with_keyspace_name.json | 314 ++++++++++++++++++ pkg/metrics/changefeed.go | 28 ++ 9 files changed, 1354 insertions(+), 12 deletions(-) create mode 100644 api/middleware/changefeed_operation.go create mode 100644 api/middleware/changefeed_operation_test.go diff --git a/api/middleware/changefeed_operation.go b/api/middleware/changefeed_operation.go new file mode 100644 index 0000000000..e9524e7dbc --- /dev/null +++ b/api/middleware/changefeed_operation.go @@ -0,0 +1,212 @@ +// Copyright 2026 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package middleware + +import ( + "fmt" + "net/http" + "strings" + "sync" + "sync/atomic" + "time" + "unicode/utf8" + + "github.com/gin-gonic/gin" + "github.com/pingcap/log" + "github.com/pingcap/ticdc/pkg/api" + "github.com/pingcap/ticdc/pkg/metrics" + "go.uber.org/zap" +) + +const ( + changefeedOperationContextKey = "changefeed-operation-info" + changefeedOperationHistoryLimit = 100 + changefeedOperationMetricTextLimit = 256 +) + +var ( + recentChangefeedOperations = newRecentChangefeedOperationStore(changefeedOperationHistoryLimit) + changefeedOperationEventID uint64 +) + +type changefeedOperationInfo struct { + keyspace string + changefeed string + details string +} + +type changefeedOperationMetricLabels struct { + keyspace string + changefeed string + operation string + result string + username string + details string + err string + eventID string +} + +func (l changefeedOperationMetricLabels) labelValues() []string { + return []string{ + l.keyspace, + l.changefeed, + l.operation, + l.result, + l.username, + l.details, + l.err, + l.eventID, + } +} + +type recentChangefeedOperationStore struct { + mu sync.Mutex + limit int + events []changefeedOperationMetricLabels +} + +func newRecentChangefeedOperationStore(limit int) *recentChangefeedOperationStore { + return &recentChangefeedOperationStore{limit: limit} +} + +func (s *recentChangefeedOperationStore) record( + labels changefeedOperationMetricLabels, + operationTime time.Time, +) { + s.mu.Lock() + defer s.mu.Unlock() + + // The dashboard only needs a recent investigation window. Keep this cache + // bounded so user names and detail strings do not become unbounded metric + // cardinality over long-running clusters. + metrics.ChangefeedOperationTimeGauge.WithLabelValues(labels.labelValues()...).Set(float64(operationTime.UnixMilli())) + s.events = append(s.events, labels) + if len(s.events) <= s.limit { + return + } + + oldest := s.events[0] + s.events = s.events[1:] + metrics.ChangefeedOperationTimeGauge.DeleteLabelValues(oldest.labelValues()...) +} + +// ChangefeedOperationMiddleware records user initiated changefeed mutations for +// logs and the bounded recent-operation Grafana panel. +func ChangefeedOperationMiddleware(operation string) gin.HandlerFunc { + return func(c *gin.Context) { + start := time.Now() + c.Next() + + statusCode := c.Writer.Status() + lastError := c.Errors.Last() + result := "success" + if statusCode >= http.StatusBadRequest || lastError != nil { + result = "failed" + } + + info := getChangefeedOperationInfo(c) + if info.keyspace == "" { + info.keyspace = c.Query(api.APIOpVarKeyspace) + } + if info.changefeed == "" { + info.changefeed = c.Param(api.APIOpVarChangefeedID) + } + + username, _, _ := c.Request.BasicAuth() + if username == "" { + username = "anonymous" + } + + var operationErr error + if lastError != nil { + operationErr = lastError.Err + } + + operationTime := time.Now() + eventID := atomic.AddUint64(&changefeedOperationEventID, 1) + labels := changefeedOperationMetricLabels{ + keyspace: normalizeChangefeedOperationMetricText(info.keyspace), + changefeed: normalizeChangefeedOperationMetricText(info.changefeed), + operation: operation, + result: result, + username: normalizeChangefeedOperationMetricText(username), + details: normalizeChangefeedOperationMetricText(info.details), + err: normalizeChangefeedOperationMetricError(operationErr), + eventID: fmt.Sprintf("%d", eventID), + } + recentChangefeedOperations.record(labels, operationTime) + + log.Info("changefeed operation", + zap.String("operation", operation), + zap.String("result", result), + zap.String("keyspace", info.keyspace), + zap.String("changefeedID", info.changefeed), + zap.String("details", info.details), + zap.Int("status", statusCode), + zap.String("username", username), + zap.String("ip", c.ClientIP()), + zap.String("userAgent", c.Request.UserAgent()), + zap.String("clientVersion", c.Request.Header.Get(ClientVersionHeader)), + zap.Duration("duration", time.Since(start)), + zap.Error(operationErr), + ) + } +} + +// SetChangefeedOperationTarget attaches the logical changefeed identity that the +// current mutation request targets. +func SetChangefeedOperationTarget(c *gin.Context, keyspace, changefeed string) { + info := getChangefeedOperationInfo(c) + info.keyspace = keyspace + info.changefeed = changefeed + c.Set(changefeedOperationContextKey, info) +} + +// SetChangefeedOperationDetails attaches a concise, non-sensitive summary for +// the current mutation request. +func SetChangefeedOperationDetails(c *gin.Context, details string) { + info := getChangefeedOperationInfo(c) + info.details = details + c.Set(changefeedOperationContextKey, info) +} + +func getChangefeedOperationInfo(c *gin.Context) changefeedOperationInfo { + raw, ok := c.Get(changefeedOperationContextKey) + if !ok { + return changefeedOperationInfo{} + } + info, ok := raw.(changefeedOperationInfo) + if !ok { + return changefeedOperationInfo{} + } + return info +} + +func normalizeChangefeedOperationMetricError(err error) string { + if err == nil { + return "" + } + return normalizeChangefeedOperationMetricText(err.Error()) +} + +func normalizeChangefeedOperationMetricText(value string) string { + value = strings.Join(strings.Fields(value), " ") + if utf8.RuneCountInString(value) <= changefeedOperationMetricTextLimit { + return value + } + + runes := []rune(value) + return string(runes[:changefeedOperationMetricTextLimit-3]) + "..." +} diff --git a/api/middleware/changefeed_operation_test.go b/api/middleware/changefeed_operation_test.go new file mode 100644 index 0000000000..e95785bee0 --- /dev/null +++ b/api/middleware/changefeed_operation_test.go @@ -0,0 +1,139 @@ +// Copyright 2026 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package middleware + +import ( + "errors" + "net/http" + "net/http/httptest" + "strings" + "sync/atomic" + "testing" + "time" + + "github.com/gin-gonic/gin" + "github.com/pingcap/ticdc/pkg/metrics" + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/testutil" + "github.com/stretchr/testify/require" +) + +// TestChangefeedOperationMiddlewareRecordsSuccessfulMutation verifies that a +// successful request keeps the logical target, normalized detail summary, and +// authenticated user for the dashboard history row. +func TestChangefeedOperationMiddlewareRecordsSuccessfulMutation(t *testing.T) { + resetRecentChangefeedOperationStateForTest(t) + + router := gin.New() + router.POST("/api/v2/changefeeds/:changefeed_id/resume", + ChangefeedOperationMiddleware("resume"), + func(c *gin.Context) { + SetChangefeedOperationTarget(c, "ks1", "test") + SetChangefeedOperationDetails(c, "overwrite_checkpoint_ts=true\nnew_checkpoint_ts=123") + c.Status(http.StatusOK) + }, + ) + + recorder := httptest.NewRecorder() + req := httptest.NewRequest(http.MethodPost, "/api/v2/changefeeds/test/resume?keyspace=ks1", nil) + req.SetBasicAuth("alice", "secret") + router.ServeHTTP(recorder, req) + + require.Len(t, recentChangefeedOperations.events, 1) + require.Equal(t, changefeedOperationMetricLabels{ + keyspace: "ks1", + changefeed: "test", + operation: "resume", + result: "success", + username: "alice", + details: "overwrite_checkpoint_ts=true new_checkpoint_ts=123", + err: "", + eventID: "1", + }, recentChangefeedOperations.events[0]) +} + +// TestChangefeedOperationMiddlewareRecordsFailedMutation verifies that a failed +// request is still retained with its normalized error summary so oncall can see +// rejected user attempts as well as successful changes. +func TestChangefeedOperationMiddlewareRecordsFailedMutation(t *testing.T) { + resetRecentChangefeedOperationStateForTest(t) + + router := gin.New() + router.PUT("/api/v2/changefeeds/:changefeed_id", + ChangefeedOperationMiddleware("update"), + func(c *gin.Context) { + _ = c.Error(errors.New("update rejected\nbecause state is normal")) + c.Status(http.StatusBadRequest) + }, + ) + + recorder := httptest.NewRecorder() + req := httptest.NewRequest(http.MethodPut, "/api/v2/changefeeds/test?keyspace=ks1", nil) + router.ServeHTTP(recorder, req) + + require.Len(t, recentChangefeedOperations.events, 1) + require.Equal(t, "failed", recentChangefeedOperations.events[0].result) + require.Equal(t, "anonymous", recentChangefeedOperations.events[0].username) + require.Equal(t, "update rejected because state is normal", recentChangefeedOperations.events[0].err) +} + +// TestRecentChangefeedOperationStoreEvictsOldestMetric verifies that the recent +// operation cache preserves only the newest bounded rows and removes the evicted +// metric series to avoid unbounded label cardinality. +func TestRecentChangefeedOperationStoreEvictsOldestMetric(t *testing.T) { + resetRecentChangefeedOperationStateForTest(t) + store := newRecentChangefeedOperationStore(2) + now := time.UnixMilli(1_700_000_000_000) + + first := changefeedOperationMetricLabels{eventID: "1"} + second := changefeedOperationMetricLabels{eventID: "2"} + third := changefeedOperationMetricLabels{eventID: "3"} + t.Cleanup(func() { + for _, labels := range []changefeedOperationMetricLabels{first, second, third} { + metrics.ChangefeedOperationTimeGauge.DeleteLabelValues(labels.labelValues()...) + } + }) + store.record(first, now) + store.record(second, now.Add(time.Millisecond)) + store.record(third, now.Add(2*time.Millisecond)) + + require.Equal(t, []changefeedOperationMetricLabels{second, third}, store.events) + require.Equal(t, float64(0), testutil.ToFloat64(metricsGaugeForTest(first))) + require.Equal(t, float64(now.Add(time.Millisecond).UnixMilli()), testutil.ToFloat64(metricsGaugeForTest(second))) + require.Equal(t, float64(now.Add(2*time.Millisecond).UnixMilli()), testutil.ToFloat64(metricsGaugeForTest(third))) +} + +// TestNormalizeChangefeedOperationMetricText verifies that dashboard labels are +// kept single-line and bounded so a malformed request cannot create oversized +// operation-history fields. +func TestNormalizeChangefeedOperationMetricText(t *testing.T) { + longText := strings.Repeat("a", changefeedOperationMetricTextLimit+10) + require.Equal(t, "a b", normalizeChangefeedOperationMetricText("a\n b")) + require.Equal(t, changefeedOperationMetricTextLimit, len(normalizeChangefeedOperationMetricText(longText))) + require.True(t, strings.HasSuffix(normalizeChangefeedOperationMetricText(longText), "...")) +} + +func resetRecentChangefeedOperationStateForTest(t *testing.T) { + t.Helper() + for _, labels := range recentChangefeedOperations.events { + metrics.ChangefeedOperationTimeGauge.DeleteLabelValues(labels.labelValues()...) + } + recentChangefeedOperations = newRecentChangefeedOperationStore(changefeedOperationHistoryLimit) + atomic.StoreUint64(&changefeedOperationEventID, 0) +} + +func metricsGaugeForTest(labels changefeedOperationMetricLabels) prometheus.Gauge { + return metrics.ChangefeedOperationTimeGauge.WithLabelValues(labels.labelValues()...) +} diff --git a/api/v1/api.go b/api/v1/api.go index 7b2981d4b0..677517cba3 100644 --- a/api/v1/api.go +++ b/api/v1/api.go @@ -68,12 +68,12 @@ func RegisterOpenAPIV1Routes(router *gin.Engine, api OpenAPIV1) { changefeedGroup.GET("/:changefeed_id", coordinatorMiddleware, setV1Header, api.v2.GetChangeFeed) // These two APIs need to be adjusted to be compatible with the API v1. - changefeedGroup.POST("", coordinatorMiddleware, authenticateMiddleware, setV1Header, api.createChangefeed) - changefeedGroup.PUT("/:changefeed_id", coordinatorMiddleware, authenticateMiddleware, setV1Header, api.updateChangefeed) + changefeedGroup.POST("", coordinatorMiddleware, middleware.ChangefeedOperationMiddleware("create"), authenticateMiddleware, setV1Header, api.createChangefeed) + changefeedGroup.PUT("/:changefeed_id", coordinatorMiddleware, middleware.ChangefeedOperationMiddleware("update"), authenticateMiddleware, setV1Header, api.updateChangefeed) - changefeedGroup.POST("/:changefeed_id/pause", coordinatorMiddleware, authenticateMiddleware, setV1Header, api.v2.PauseChangefeed) - changefeedGroup.POST("/:changefeed_id/resume", coordinatorMiddleware, authenticateMiddleware, setV1Header, api.v2.ResumeChangefeed) - changefeedGroup.DELETE("/:changefeed_id", coordinatorMiddleware, authenticateMiddleware, setV1Header, api.v2.DeleteChangefeed) + changefeedGroup.POST("/:changefeed_id/pause", coordinatorMiddleware, middleware.ChangefeedOperationMiddleware("pause"), authenticateMiddleware, setV1Header, api.v2.PauseChangefeed) + changefeedGroup.POST("/:changefeed_id/resume", coordinatorMiddleware, middleware.ChangefeedOperationMiddleware("resume"), authenticateMiddleware, setV1Header, api.v2.ResumeChangefeed) + changefeedGroup.DELETE("/:changefeed_id", coordinatorMiddleware, middleware.ChangefeedOperationMiddleware("delete"), authenticateMiddleware, setV1Header, api.v2.DeleteChangefeed) // These two APIs are not useful in new arch cdc, we implement them for compatibility with old arch cdc only. changefeedGroup.POST("/:changefeed_id/tables/rebalance_table", coordinatorMiddleware, authenticateMiddleware, setV1Header, api.rebalanceTables) diff --git a/api/v2/api.go b/api/v2/api.go index 8a2a6bd1c2..0a897faea9 100644 --- a/api/v2/api.go +++ b/api/v2/api.go @@ -63,12 +63,12 @@ func RegisterOpenAPIV2Routes(router *gin.Engine, api OpenAPIV2) { // The authenticateMiddleware will retire the KeyspaceMeta from the context, // which is set by the keyspaceCheckerMiddleware. // Therefore, the The authenticateMiddleware must be called after the keyspaceCheckerMiddleware. - changefeedGroup.POST("", coordinatorMiddleware, keyspaceCheckerMiddleware, authenticateMiddleware, api.CreateChangefeed) + changefeedGroup.POST("", coordinatorMiddleware, middleware.ChangefeedOperationMiddleware("create"), keyspaceCheckerMiddleware, authenticateMiddleware, api.CreateChangefeed) changefeedGroup.GET("", coordinatorMiddleware, keyspaceCheckerMiddleware, api.ListChangeFeeds) - changefeedGroup.PUT("/:changefeed_id", coordinatorMiddleware, keyspaceCheckerMiddleware, authenticateMiddleware, api.UpdateChangefeed) - changefeedGroup.POST("/:changefeed_id/resume", coordinatorMiddleware, keyspaceCheckerMiddleware, authenticateMiddleware, api.ResumeChangefeed) - changefeedGroup.POST("/:changefeed_id/pause", coordinatorMiddleware, keyspaceCheckerMiddleware, authenticateMiddleware, api.PauseChangefeed) - changefeedGroup.DELETE("/:changefeed_id", coordinatorMiddleware, keyspaceCheckerMiddleware, authenticateMiddleware, api.DeleteChangefeed) + changefeedGroup.PUT("/:changefeed_id", coordinatorMiddleware, middleware.ChangefeedOperationMiddleware("update"), keyspaceCheckerMiddleware, authenticateMiddleware, api.UpdateChangefeed) + changefeedGroup.POST("/:changefeed_id/resume", coordinatorMiddleware, middleware.ChangefeedOperationMiddleware("resume"), keyspaceCheckerMiddleware, authenticateMiddleware, api.ResumeChangefeed) + changefeedGroup.POST("/:changefeed_id/pause", coordinatorMiddleware, middleware.ChangefeedOperationMiddleware("pause"), keyspaceCheckerMiddleware, authenticateMiddleware, api.PauseChangefeed) + changefeedGroup.DELETE("/:changefeed_id", coordinatorMiddleware, middleware.ChangefeedOperationMiddleware("delete"), keyspaceCheckerMiddleware, authenticateMiddleware, api.DeleteChangefeed) changefeedGroup.GET("/:changefeed_id/status", coordinatorMiddleware, keyspaceCheckerMiddleware, authenticateMiddleware, api.status) changefeedGroup.GET("/:changefeed_id/synced", coordinatorMiddleware, keyspaceCheckerMiddleware, authenticateMiddleware, api.synced) diff --git a/api/v2/changefeed.go b/api/v2/changefeed.go index 7de7fa52fc..b6098f8d14 100644 --- a/api/v2/changefeed.go +++ b/api/v2/changefeed.go @@ -95,6 +95,7 @@ func (h *OpenAPIV2) CreateChangefeed(c *gin.Context) { _ = c.Error(errors.ErrAPIInvalidParam.GenWithStack("invalid keyspace: %s", cfg.ID)) return } + middleware.SetChangefeedOperationTarget(c, changefeedID.Keyspace(), changefeedID.Name()) keyspaceMeta := middleware.GetKeyspaceFromContext(c) if keyspaceMeta.State != keyspacepb.KeyspaceState_ENABLED { @@ -167,6 +168,8 @@ func (h *OpenAPIV2) CreateChangefeed(c *gin.Context) { _ = c.Error(errors.ErrTargetTsBeforeStartTs.GenWithStackByArgs(cfg.TargetTs, cfg.StartTs)) return } + middleware.SetChangefeedOperationDetails(c, fmt.Sprintf( + "start_ts=%d target_ts=%d sink_scheme=%s", cfg.StartTs, cfg.TargetTs, scheme)) // fill replicaConfig replicaCfg := cfg.ReplicaConfig.ToInternalReplicaConfig() @@ -606,7 +609,7 @@ func (h *OpenAPIV2) DeleteChangefeed(c *gin.Context) { return } - cfInfo, _, err := co.GetChangefeed(c, changefeedDisplayName) + cfInfo, status, err := co.GetChangefeed(c, changefeedDisplayName) if err != nil { if errors.ErrChangeFeedNotExists.Equal(err) { c.JSON(getStatus(c), nil) @@ -615,6 +618,13 @@ func (h *OpenAPIV2) DeleteChangefeed(c *gin.Context) { _ = c.Error(err) return } + middleware.SetChangefeedOperationTarget(c, cfInfo.ChangefeedID.Keyspace(), cfInfo.ChangefeedID.Name()) + var previousCheckpointTs uint64 + if status != nil { + previousCheckpointTs = status.CheckpointTs + } + middleware.SetChangefeedOperationDetails(c, fmt.Sprintf( + "previous_state=%s previous_checkpoint_ts=%d", cfInfo.State, previousCheckpointTs)) _, err = co.RemoveChangefeed(ctx, cfInfo.ChangefeedID) if err != nil { _ = c.Error(err) @@ -661,6 +671,9 @@ func (h *OpenAPIV2) PauseChangefeed(c *gin.Context) { _ = c.Error(err) return } + middleware.SetChangefeedOperationTarget(c, cfInfo.ChangefeedID.Keyspace(), cfInfo.ChangefeedID.Name()) + middleware.SetChangefeedOperationDetails(c, fmt.Sprintf( + "previous_state=%s", cfInfo.State)) err = co.PauseChangefeed(ctx, cfInfo.ChangefeedID) if err != nil { _ = c.Error(err) @@ -729,6 +742,7 @@ func (h *OpenAPIV2) ResumeChangefeed(c *gin.Context) { _ = c.Error(err) return } + middleware.SetChangefeedOperationTarget(c, cfInfo.ChangefeedID.Keyspace(), cfInfo.ChangefeedID.Name()) // If there is no overrideCheckpointTs, then check whether the currentCheckpointTs is smaller than gc safepoint or not. newCheckpointTs := status.CheckpointTs @@ -737,6 +751,8 @@ func (h *OpenAPIV2) ResumeChangefeed(c *gin.Context) { newCheckpointTs = cfg.OverwriteCheckpointTs overwriteCheckpointTs = true } + middleware.SetChangefeedOperationDetails(c, fmt.Sprintf( + "overwrite_checkpoint_ts=%t new_checkpoint_ts=%d", overwriteCheckpointTs, newCheckpointTs)) keyspaceMeta := middleware.GetKeyspaceFromContext(c) @@ -885,6 +901,7 @@ func (h *OpenAPIV2) UpdateChangefeed(c *gin.Context) { changefeedDisplayName.Name)) return } + middleware.SetChangefeedOperationTarget(c, changefeedDisplayName.Keyspace, changefeedDisplayName.Name) co, err := h.server.GetCoordinator() if err != nil { _ = c.Error(err) @@ -920,7 +937,7 @@ func (h *OpenAPIV2) UpdateChangefeed(c *gin.Context) { return } - var configUpdated, sinkURIUpdated bool + var configUpdated, sinkURIUpdated, targetTsUpdated bool if updateCfConfig.TargetTs != 0 { if updateCfConfig.TargetTs <= oldCfInfo.StartTs { _ = c.Error(errors.ErrChangefeedUpdateRefused.GenWithStack( @@ -929,6 +946,7 @@ func (h *OpenAPIV2) UpdateChangefeed(c *gin.Context) { return } oldCfInfo.TargetTs = updateCfConfig.TargetTs + targetTsUpdated = true } if updateCfConfig.ReplicaConfig != nil { configUpdated = true @@ -942,6 +960,9 @@ func (h *OpenAPIV2) UpdateChangefeed(c *gin.Context) { _ = c.Error(errors.ErrAPIInvalidParam.GenWithStack("start_ts can not be updated")) return } + middleware.SetChangefeedOperationDetails(c, fmt.Sprintf( + "target_ts_changed=%t sink_uri_changed=%t replica_config_changed=%t", + targetTsUpdated, sinkURIUpdated, configUpdated)) var ( ineligibleTables []common.TableName diff --git a/metrics/grafana/ticdc_new_arch.json b/metrics/grafana/ticdc_new_arch.json index c73ee064f6..2251f9c575 100644 --- a/metrics/grafana/ticdc_new_arch.json +++ b/metrics/grafana/ticdc_new_arch.json @@ -4591,6 +4591,320 @@ "align": false, "alignLevel": null } +<<<<<<< HEAD +======= + }, + { + "datasource": "${DS_TEST-CLUSTER}", + "description": "Current warning or failed reason of each changefeed. The metric message is normalized to a single line and truncated to 256 characters.", + "fieldConfig": { + "defaults": { + "custom": { + "align": null, + "filterable": false + }, + "links": [], + "mappings": [], + "thresholds": { + "mode": "absolute", + "steps": [ + { + "color": "green", + "value": null + }, + { + "color": "red", + "value": 80 + } + ] + } + }, + "overrides": [ + { + "matcher": { + "id": "byName", + "options": "namespace" + }, + "properties": [ + { + "id": "custom.width", + "value": 120 + } + ] + }, + { + "matcher": { + "id": "byName", + "options": "changefeed" + }, + "properties": [ + { + "id": "custom.width", + "value": 180 + } + ] + }, + { + "matcher": { + "id": "byName", + "options": "state" + }, + "properties": [ + { + "id": "custom.width", + "value": 100 + } + ] + }, + { + "matcher": { + "id": "byName", + "options": "code" + }, + "properties": [ + { + "id": "custom.width", + "value": 180 + } + ] + }, + { + "matcher": { + "id": "byName", + "options": "error_time" + }, + "properties": [ + { + "id": "custom.width", + "value": 180 + } + ] + } + ] + }, + "gridPos": { + "h": 8, + "w": 24, + "x": 0, + "y": 26 + }, + "id": 62010, + "options": { + "showHeader": true, + "sortBy": [] + }, + "pluginVersion": "7.5.17", + "targets": [ + { + "expr": "max by (namespace, changefeed, state, code, error_time, message) (ticdc_owner_changefeed_error_info{k8s_cluster=\"$k8s_cluster\", tidb_cluster=\"$tidb_cluster\", namespace=~\"$namespace\", changefeed=~\"$changefeed\"})", + "format": "time_series", + "instant": true, + "refId": "A" + } + ], + "title": "Changefeed Error Details", + "transformations": [ + { + "id": "labelsToFields", + "options": {} + }, + { + "id": "organize", + "options": { + "excludeByName": { + "Metric": true, + "Time": true, + "Value": true, + "__name__": true + }, + "indexByName": { + "namespace": 0, + "changefeed": 1, + "state": 2, + "error_time": 3, + "code": 4, + "message": 5 + }, + "renameByName": {} + } + } + ], + "type": "table" + }, + { + "datasource": "${DS_TEST-CLUSTER}", + "description": "Recent user initiated changefeed mutations retained in memory on the coordinator for oncall investigation. Use TiCDC logs for durable history beyond the latest 100 operations.", + "fieldConfig": { + "defaults": { + "custom": { + "align": null, + "filterable": false + }, + "links": [], + "mappings": [], + "thresholds": { + "mode": "absolute", + "steps": [ + { + "color": "green", + "value": null + }, + { + "color": "red", + "value": 80 + } + ] + } + }, + "overrides": [ + { + "matcher": { + "id": "byName", + "options": "namespace" + }, + "properties": [ + { + "id": "custom.width", + "value": 120 + } + ] + }, + { + "matcher": { + "id": "byName", + "options": "changefeed" + }, + "properties": [ + { + "id": "custom.width", + "value": 180 + } + ] + }, + { + "matcher": { + "id": "byName", + "options": "operation_time" + }, + "properties": [ + { + "id": "custom.width", + "value": 180 + }, + { + "id": "unit", + "value": "dateTimeAsIso" + } + ] + }, + { + "matcher": { + "id": "byName", + "options": "operation" + }, + "properties": [ + { + "id": "custom.width", + "value": 100 + } + ] + }, + { + "matcher": { + "id": "byName", + "options": "result" + }, + "properties": [ + { + "id": "custom.width", + "value": 90 + } + ] + }, + { + "matcher": { + "id": "byName", + "options": "username" + }, + "properties": [ + { + "id": "custom.width", + "value": 120 + } + ] + }, + { + "matcher": { + "id": "byName", + "options": "details" + }, + "properties": [ + { + "id": "custom.width", + "value": 320 + } + ] + } + ] + }, + "gridPos": { + "h": 9, + "w": 24, + "x": 0, + "y": 34 + }, + "id": 62042, + "options": { + "showHeader": true, + "sortBy": [ + { + "displayName": "operation_time", + "desc": true + } + ] + }, + "pluginVersion": "7.5.17", + "targets": [ + { + "expr": "max by (namespace, changefeed, operation, result, username, details, error, event_id) (ticdc_owner_changefeed_operation_time{k8s_cluster=\"$k8s_cluster\", tidb_cluster=\"$tidb_cluster\", namespace=~\"$namespace\", changefeed=~\"$changefeed\"})", + "format": "time_series", + "instant": true, + "refId": "A" + } + ], + "title": "Changefeed Operation History", + "transformations": [ + { + "id": "labelsToFields", + "options": {} + }, + { + "id": "organize", + "options": { + "excludeByName": { + "Metric": true, + "Time": true, + "__name__": true, + "event_id": true + }, + "indexByName": { + "namespace": 0, + "changefeed": 1, + "Value": 2, + "operation": 3, + "result": 4, + "username": 5, + "details": 6, + "error": 7 + }, + "renameByName": { + "Value": "operation_time" + } + } + } + ], + "type": "table" +>>>>>>> 3a652c164 (api,metrics: add changefeed operation history (#5095)) } ], "title": "Changefeed", diff --git a/metrics/nextgengrafana/ticdc_new_arch_next_gen.json b/metrics/nextgengrafana/ticdc_new_arch_next_gen.json index 17755d4fc9..d5c875cb61 100644 --- a/metrics/nextgengrafana/ticdc_new_arch_next_gen.json +++ b/metrics/nextgengrafana/ticdc_new_arch_next_gen.json @@ -4591,6 +4591,320 @@ "align": false, "alignLevel": null } +<<<<<<< HEAD +======= + }, + { + "datasource": "${DS_TEST-CLUSTER}", + "description": "Current warning or failed reason of each changefeed. The metric message is normalized to a single line and truncated to 256 characters.", + "fieldConfig": { + "defaults": { + "custom": { + "align": null, + "filterable": false + }, + "links": [], + "mappings": [], + "thresholds": { + "mode": "absolute", + "steps": [ + { + "color": "green", + "value": null + }, + { + "color": "red", + "value": 80 + } + ] + } + }, + "overrides": [ + { + "matcher": { + "id": "byName", + "options": "keyspace_name" + }, + "properties": [ + { + "id": "custom.width", + "value": 120 + } + ] + }, + { + "matcher": { + "id": "byName", + "options": "changefeed" + }, + "properties": [ + { + "id": "custom.width", + "value": 180 + } + ] + }, + { + "matcher": { + "id": "byName", + "options": "state" + }, + "properties": [ + { + "id": "custom.width", + "value": 100 + } + ] + }, + { + "matcher": { + "id": "byName", + "options": "code" + }, + "properties": [ + { + "id": "custom.width", + "value": 180 + } + ] + }, + { + "matcher": { + "id": "byName", + "options": "error_time" + }, + "properties": [ + { + "id": "custom.width", + "value": 180 + } + ] + } + ] + }, + "gridPos": { + "h": 8, + "w": 24, + "x": 0, + "y": 26 + }, + "id": 62010, + "options": { + "showHeader": true, + "sortBy": [] + }, + "pluginVersion": "7.5.17", + "targets": [ + { + "expr": "max by (keyspace_name, changefeed, state, code, error_time, message) (ticdc_owner_changefeed_error_info{k8s_cluster=\"$k8s_cluster\", sharedpool_id=\"$tidb_cluster\", keyspace_name=~\"$keyspace_name\", changefeed=~\"$changefeed\"})", + "format": "time_series", + "instant": true, + "refId": "A" + } + ], + "title": "Changefeed Error Details", + "transformations": [ + { + "id": "labelsToFields", + "options": {} + }, + { + "id": "organize", + "options": { + "excludeByName": { + "Metric": true, + "Time": true, + "Value": true, + "__name__": true + }, + "indexByName": { + "keyspace_name": 0, + "changefeed": 1, + "state": 2, + "error_time": 3, + "code": 4, + "message": 5 + }, + "renameByName": {} + } + } + ], + "type": "table" + }, + { + "datasource": "${DS_TEST-CLUSTER}", + "description": "Recent user initiated changefeed mutations retained in memory on the coordinator for oncall investigation. Use TiCDC logs for durable history beyond the latest 100 operations.", + "fieldConfig": { + "defaults": { + "custom": { + "align": null, + "filterable": false + }, + "links": [], + "mappings": [], + "thresholds": { + "mode": "absolute", + "steps": [ + { + "color": "green", + "value": null + }, + { + "color": "red", + "value": 80 + } + ] + } + }, + "overrides": [ + { + "matcher": { + "id": "byName", + "options": "keyspace_name" + }, + "properties": [ + { + "id": "custom.width", + "value": 120 + } + ] + }, + { + "matcher": { + "id": "byName", + "options": "changefeed" + }, + "properties": [ + { + "id": "custom.width", + "value": 180 + } + ] + }, + { + "matcher": { + "id": "byName", + "options": "operation_time" + }, + "properties": [ + { + "id": "custom.width", + "value": 180 + }, + { + "id": "unit", + "value": "dateTimeAsIso" + } + ] + }, + { + "matcher": { + "id": "byName", + "options": "operation" + }, + "properties": [ + { + "id": "custom.width", + "value": 100 + } + ] + }, + { + "matcher": { + "id": "byName", + "options": "result" + }, + "properties": [ + { + "id": "custom.width", + "value": 90 + } + ] + }, + { + "matcher": { + "id": "byName", + "options": "username" + }, + "properties": [ + { + "id": "custom.width", + "value": 120 + } + ] + }, + { + "matcher": { + "id": "byName", + "options": "details" + }, + "properties": [ + { + "id": "custom.width", + "value": 320 + } + ] + } + ] + }, + "gridPos": { + "h": 9, + "w": 24, + "x": 0, + "y": 34 + }, + "id": 62042, + "options": { + "showHeader": true, + "sortBy": [ + { + "displayName": "operation_time", + "desc": true + } + ] + }, + "pluginVersion": "7.5.17", + "targets": [ + { + "expr": "max by (keyspace_name, changefeed, operation, result, username, details, error, event_id) (ticdc_owner_changefeed_operation_time{k8s_cluster=\"$k8s_cluster\", sharedpool_id=\"$tidb_cluster\", keyspace_name=~\"$keyspace_name\", changefeed=~\"$changefeed\"})", + "format": "time_series", + "instant": true, + "refId": "A" + } + ], + "title": "Changefeed Operation History", + "transformations": [ + { + "id": "labelsToFields", + "options": {} + }, + { + "id": "organize", + "options": { + "excludeByName": { + "Metric": true, + "Time": true, + "__name__": true, + "event_id": true + }, + "indexByName": { + "keyspace_name": 0, + "changefeed": 1, + "Value": 2, + "operation": 3, + "result": 4, + "username": 5, + "details": 6, + "error": 7 + }, + "renameByName": { + "Value": "operation_time" + } + } + } + ], + "type": "table" +>>>>>>> 3a652c164 (api,metrics: add changefeed operation history (#5095)) } ], "title": "Changefeed", diff --git a/metrics/nextgengrafana/ticdc_new_arch_with_keyspace_name.json b/metrics/nextgengrafana/ticdc_new_arch_with_keyspace_name.json index ba2f1b9c7b..08c2ed0463 100644 --- a/metrics/nextgengrafana/ticdc_new_arch_with_keyspace_name.json +++ b/metrics/nextgengrafana/ticdc_new_arch_with_keyspace_name.json @@ -2334,6 +2334,320 @@ "align": false, "alignLevel": null } +<<<<<<< HEAD +======= + }, + { + "datasource": "${DS_TEST-CLUSTER}", + "description": "Current warning or failed reason of each changefeed. The metric message is normalized to a single line and truncated to 256 characters.", + "fieldConfig": { + "defaults": { + "custom": { + "align": null, + "filterable": false + }, + "links": [], + "mappings": [], + "thresholds": { + "mode": "absolute", + "steps": [ + { + "color": "green", + "value": null + }, + { + "color": "red", + "value": 80 + } + ] + } + }, + "overrides": [ + { + "matcher": { + "id": "byName", + "options": "keyspace_name" + }, + "properties": [ + { + "id": "custom.width", + "value": 120 + } + ] + }, + { + "matcher": { + "id": "byName", + "options": "changefeed" + }, + "properties": [ + { + "id": "custom.width", + "value": 180 + } + ] + }, + { + "matcher": { + "id": "byName", + "options": "state" + }, + "properties": [ + { + "id": "custom.width", + "value": 100 + } + ] + }, + { + "matcher": { + "id": "byName", + "options": "code" + }, + "properties": [ + { + "id": "custom.width", + "value": 180 + } + ] + }, + { + "matcher": { + "id": "byName", + "options": "error_time" + }, + "properties": [ + { + "id": "custom.width", + "value": 180 + } + ] + } + ] + }, + "gridPos": { + "h": 8, + "w": 24, + "x": 0, + "y": 26 + }, + "id": 62010, + "options": { + "showHeader": true, + "sortBy": [] + }, + "pluginVersion": "7.5.17", + "targets": [ + { + "expr": "max by (keyspace_name, changefeed, state, code, error_time, message) (ticdc_owner_changefeed_error_info{k8s_cluster=\"$k8s_cluster\", tidb_cluster=\"$tidb_cluster\", keyspace_name=~\"$keyspace_name\", changefeed=~\"$changefeed\"})", + "format": "time_series", + "instant": true, + "refId": "A" + } + ], + "title": "Changefeed Error Details", + "transformations": [ + { + "id": "labelsToFields", + "options": {} + }, + { + "id": "organize", + "options": { + "excludeByName": { + "Metric": true, + "Time": true, + "Value": true, + "__name__": true + }, + "indexByName": { + "keyspace_name": 0, + "changefeed": 1, + "state": 2, + "error_time": 3, + "code": 4, + "message": 5 + }, + "renameByName": {} + } + } + ], + "type": "table" + }, + { + "datasource": "${DS_TEST-CLUSTER}", + "description": "Recent user initiated changefeed mutations retained in memory on the coordinator for oncall investigation. Use TiCDC logs for durable history beyond the latest 100 operations.", + "fieldConfig": { + "defaults": { + "custom": { + "align": null, + "filterable": false + }, + "links": [], + "mappings": [], + "thresholds": { + "mode": "absolute", + "steps": [ + { + "color": "green", + "value": null + }, + { + "color": "red", + "value": 80 + } + ] + } + }, + "overrides": [ + { + "matcher": { + "id": "byName", + "options": "keyspace_name" + }, + "properties": [ + { + "id": "custom.width", + "value": 120 + } + ] + }, + { + "matcher": { + "id": "byName", + "options": "changefeed" + }, + "properties": [ + { + "id": "custom.width", + "value": 180 + } + ] + }, + { + "matcher": { + "id": "byName", + "options": "operation_time" + }, + "properties": [ + { + "id": "custom.width", + "value": 180 + }, + { + "id": "unit", + "value": "dateTimeAsIso" + } + ] + }, + { + "matcher": { + "id": "byName", + "options": "operation" + }, + "properties": [ + { + "id": "custom.width", + "value": 100 + } + ] + }, + { + "matcher": { + "id": "byName", + "options": "result" + }, + "properties": [ + { + "id": "custom.width", + "value": 90 + } + ] + }, + { + "matcher": { + "id": "byName", + "options": "username" + }, + "properties": [ + { + "id": "custom.width", + "value": 120 + } + ] + }, + { + "matcher": { + "id": "byName", + "options": "details" + }, + "properties": [ + { + "id": "custom.width", + "value": 320 + } + ] + } + ] + }, + "gridPos": { + "h": 9, + "w": 24, + "x": 0, + "y": 34 + }, + "id": 62042, + "options": { + "showHeader": true, + "sortBy": [ + { + "displayName": "operation_time", + "desc": true + } + ] + }, + "pluginVersion": "7.5.17", + "targets": [ + { + "expr": "max by (keyspace_name, changefeed, operation, result, username, details, error, event_id) (ticdc_owner_changefeed_operation_time{k8s_cluster=\"$k8s_cluster\", tidb_cluster=\"$tidb_cluster\", keyspace_name=~\"$keyspace_name\", changefeed=~\"$changefeed\"})", + "format": "time_series", + "instant": true, + "refId": "A" + } + ], + "title": "Changefeed Operation History", + "transformations": [ + { + "id": "labelsToFields", + "options": {} + }, + { + "id": "organize", + "options": { + "excludeByName": { + "Metric": true, + "Time": true, + "__name__": true, + "event_id": true + }, + "indexByName": { + "keyspace_name": 0, + "changefeed": 1, + "Value": 2, + "operation": 3, + "result": 4, + "username": 5, + "details": 6, + "error": 7 + }, + "renameByName": { + "Value": "operation_time" + } + } + } + ], + "type": "table" +>>>>>>> 3a652c164 (api,metrics: add changefeed operation history (#5095)) } ], "title": "Changefeed", diff --git a/pkg/metrics/changefeed.go b/pkg/metrics/changefeed.go index 03e9514a9c..06fde2bf22 100644 --- a/pkg/metrics/changefeed.go +++ b/pkg/metrics/changefeed.go @@ -73,6 +73,29 @@ var ( Help: "The status of changefeeds", }, []string{getKeyspaceLabel(), "changefeed"}) +<<<<<<< HEAD +======= + // ChangefeedErrorInfoGauge records the current warning or failed reason and its occurrence time + // for each changefeed. + ChangefeedErrorInfoGauge = prometheus.NewGaugeVec( + prometheus.GaugeOpts{ + Namespace: "ticdc", + Subsystem: "owner", + Name: "changefeed_error_info", + Help: "The current warning or failed reason and occurrence time of changefeeds", + }, []string{getKeyspaceLabel(), "changefeed", "state", "error_time", "code", "message"}) + + // ChangefeedOperationTimeGauge records a bounded set of recent user initiated + // changefeed operation timestamps for the Grafana investigation panel. + ChangefeedOperationTimeGauge = prometheus.NewGaugeVec( + prometheus.GaugeOpts{ + Namespace: "ticdc", + Subsystem: "owner", + Name: "changefeed_operation_time", + Help: "Recent user initiated changefeed operation timestamps in Unix milliseconds", + }, []string{getKeyspaceLabel(), "changefeed", "operation", "result", "username", "details", "error", "event_id"}) + +>>>>>>> 3a652c164 (api,metrics: add changefeed operation history (#5095)) ChangefeedCheckpointTsLagGauge = prometheus.NewGaugeVec( prometheus.GaugeOpts{ Namespace: "ticdc", @@ -99,6 +122,11 @@ func initChangefeedMetrics(registry *prometheus.Registry) { registry.MustRegister(CoordinatorCounter) registry.MustRegister(MaintainerGauge) registry.MustRegister(ChangefeedStatusGauge) +<<<<<<< HEAD +======= + registry.MustRegister(ChangefeedErrorInfoGauge) + registry.MustRegister(ChangefeedOperationTimeGauge) +>>>>>>> 3a652c164 (api,metrics: add changefeed operation history (#5095)) registry.MustRegister(ChangefeedCheckpointTsLagGauge) registry.MustRegister(ChangefeedCheckpointTsGauge) }