From 8276511b28a28e1d15746fa9559c3550003f7896 Mon Sep 17 00:00:00 2001 From: Neil Shen Date: Mon, 29 Jun 2020 17:47:01 +0800 Subject: [PATCH] cdc, pkg: replace table ID with table name in metrics (#695) * cdc, pkg: replace table ID with table name in metrics * cherry-pick #692 bugfix Signed-off-by: Neil Shen --- cdc/changefeed.go | 22 +++++++++++++++------- cdc/entry/schema_storage.go | 19 +++++++++++++++++-- cdc/metrics_processor.go | 8 -------- cdc/model/owner.go | 7 ++++++- cdc/owner_operator.go | 2 +- cdc/processor.go | 15 +++++++-------- cdc/puller/entry_sorter.go | 13 ++++++------- cdc/puller/puller.go | 16 +++++++--------- cmd/changefeed.toml | 7 +++++++ pkg/util/ctx.go | 19 ++++++++++++------- 10 files changed, 78 insertions(+), 50 deletions(-) diff --git a/cdc/changefeed.go b/cdc/changefeed.go index 8fcb065863e..bed8097ae1b 100644 --- a/cdc/changefeed.go +++ b/cdc/changefeed.go @@ -323,26 +323,34 @@ func (c *changeFeed) balanceOrphanTables(ctx context.Context, captures map[model } newTaskStatus[captureID] = status } + schemaSnapshot := c.schema for tableID, op := range operation { var orphanMarkTableID model.TableID + tableName, found := schemaSnapshot.GetTableNameByID(tableID) + if !found { + log.Warn("balance orphan tables delay, table not found", + zap.String("changefeed", c.id), + zap.Int64("tableID", tableID)) + continue + } if c.cyclicEnabled { - schemaSnapshot := c.schema - tableName, found := schemaSnapshot.GetTableNameByID(tableID) - if !found { - continue - } markTableSchameName, markTableTableName := mark.GetMarkTableName(tableName.Schema, tableName.Table) orphanMarkTableID, found = schemaSnapshot.GetTableIDByName(markTableSchameName, markTableTableName) if !found { // Mark table is not created yet, skip and wait. - log.Info("balance table info delay, wait mark table", + log.Warn("balance table info delay, wait mark table", zap.String("changefeed", c.id), zap.Int64("tableID", tableID), zap.String("markTableName", markTableTableName)) continue } } - status.AddTable(tableID, &model.TableReplicaInfo{StartTs: op.BoundaryTs, MarkTableID: orphanMarkTableID}, op.BoundaryTs) + info := &model.TableReplicaInfo{ + StartTs: op.BoundaryTs, + MarkTableID: orphanMarkTableID, + Name: tableName.String(), + } + status.AddTable(tableID, info, op.BoundaryTs) addedTables[tableID] = struct{}{} } } diff --git a/cdc/entry/schema_storage.go b/cdc/entry/schema_storage.go index c1ffb4907b0..a5533cac59a 100644 --- a/cdc/entry/schema_storage.go +++ b/cdc/entry/schema_storage.go @@ -99,9 +99,18 @@ func newSchemaSnapshotFromMeta(meta *timeta.Meta, currentTs uint64) (*schemaSnap tableInfo := WrapTableInfo(dbinfo.ID, dbinfo.Name.O, tableInfo) snap.tables[tableInfo.ID] = tableInfo snap.tableNameToID[model.TableName{Schema: dbinfo.Name.O, Table: tableInfo.Name.O}] = tableInfo.ID - if !tableInfo.IsEligible() { + isEligible := tableInfo.IsEligible() + if !isEligible { snap.ineligibleTableID[tableInfo.ID] = struct{}{} } + if pi := tableInfo.GetPartitionInfo(); pi != nil { + for _, partition := range pi.Definitions { + snap.partitionTable[partition.ID] = tableInfo + if !isEligible { + snap.ineligibleTableID[partition.ID] = struct{}{} + } + } + } } } snap.currentTs = currentTs @@ -364,7 +373,13 @@ func (ti *TableInfo) Clone() *TableInfo { func (s *schemaSnapshot) GetTableNameByID(id int64) (model.TableName, bool) { tableInfo, ok := s.tables[id] if !ok { - return model.TableName{}, false + // Try partition, it could be a partition table. + partInfo, ok := s.partitionTable[id] + if !ok { + return model.TableName{}, false + } + // Must exists an table that contains the partition. + tableInfo = s.tables[partInfo.ID] } return tableInfo.TableName, true } diff --git a/cdc/metrics_processor.go b/cdc/metrics_processor.go index cf149de3f61..96afc11ffc0 100644 --- a/cdc/metrics_processor.go +++ b/cdc/metrics_processor.go @@ -69,13 +69,6 @@ var ( Help: "Bucketed histogram of processing time (s) of waiting event prepare in processor.", Buckets: prometheus.ExponentialBuckets(0.000001, 10, 10), }, []string{"changefeed", "capture"}) - tableInputChanSizeGauge = prometheus.NewGaugeVec( - prometheus.GaugeOpts{ - Namespace: "ticdc", - Subsystem: "processor", - Name: "table_input_chan_size", - Help: "txn input channel size for a table", - }, []string{"changefeed", "capture", "table"}) tableOutputChanSizeGauge = prometheus.NewGaugeVec( prometheus.GaugeOpts{ Namespace: "ticdc", @@ -100,7 +93,6 @@ func initProcessorMetrics(registry *prometheus.Registry) { registry.MustRegister(syncTableNumGauge) registry.MustRegister(txnCounter) registry.MustRegister(updateInfoDuration) - registry.MustRegister(tableInputChanSizeGauge) registry.MustRegister(tableOutputChanSizeGauge) registry.MustRegister(waitEventPrepareDuration) registry.MustRegister(processorErrorCounter) diff --git a/cdc/model/owner.go b/cdc/model/owner.go index a9650f3cbe8..879f2e2fc7f 100644 --- a/cdc/model/owner.go +++ b/cdc/model/owner.go @@ -146,6 +146,7 @@ func (w *TaskWorkload) Marshal() (string, error) { type TableReplicaInfo struct { StartTs Ts `json:"start-ts"` MarkTableID TableID `json:"mark-table-id"` + Name string `json:"-"` } // TaskStatus records the task information of a capture @@ -237,7 +238,11 @@ func (ts *TaskStatus) Snapshot(cfID ChangeFeedID, captureID CaptureID, checkpoin if ts < table.StartTs { ts = table.StartTs } - snap.Tables[tableID] = &TableReplicaInfo{StartTs: ts, MarkTableID: table.MarkTableID} + snap.Tables[tableID] = &TableReplicaInfo{ + StartTs: ts, + MarkTableID: table.MarkTableID, + Name: table.Name, + } } return snap } diff --git a/cdc/owner_operator.go b/cdc/owner_operator.go index a9af00e3130..b82a451112a 100644 --- a/cdc/owner_operator.go +++ b/cdc/owner_operator.go @@ -52,7 +52,7 @@ func newDDLHandler(pdCli pd.Client, kvStorage tidbkv.Storage, checkpointTS uint6 } // Set it up so that one failed goroutine cancels all others sharing the same ctx errg, ctx := errgroup.WithContext(ctx) - ctx = util.PutTableIDInCtx(ctx, -1) + ctx = util.PutTableInfoInCtx(ctx, -1, "") // FIXME: user of ddlHandler can't know error happen. errg.Go(func() error { diff --git a/cdc/processor.go b/cdc/processor.go index 9acbd3e2573..016cc6e1ea1 100644 --- a/cdc/processor.go +++ b/cdc/processor.go @@ -17,7 +17,6 @@ import ( "context" "fmt" "io" - "strconv" "strings" "sync" "sync/atomic" @@ -111,6 +110,7 @@ type processor struct { type tableInfo struct { id int64 + name string resolvedTs uint64 markTableID int64 mResolvedTs uint64 @@ -163,7 +163,6 @@ func newProcessor( // so we set `needEncode` to false. log.Info("start processor with startts", zap.Uint64("startts", checkpointTs)) ddlPuller := puller.NewPuller(pdCli, kvStorage, checkpointTs, []regionspan.Span{regionspan.GetDDLSpan(), regionspan.GetAddIndexDDLSpan()}, false, limitter) - ctx = util.PutTableIDInCtx(ctx, 0) filter, err := filter.NewFilter(changefeed.Config) if err != nil { return nil, errors.Trace(err) @@ -214,7 +213,8 @@ func newProcessor( func (p *processor) Run(ctx context.Context) { wg, cctx := errgroup.WithContext(ctx) p.wg = wg - ddlPullerCtx, ddlPullerCancel := context.WithCancel(util.PutTableIDInCtx(cctx, 0)) + ddlPullerCtx, ddlPullerCancel := + context.WithCancel(util.PutTableInfoInCtx(cctx, 0, "ticdc-processor-ddl")) p.ddlPullerCancel = ddlPullerCancel wg.Go(func() error { @@ -495,9 +495,7 @@ func (p *processor) removeTable(tableID int64) { table.cancel() delete(p.tables, tableID) - tableIDStr := strconv.FormatInt(tableID, 10) - tableInputChanSizeGauge.DeleteLabelValues(p.changefeedID, p.captureInfo.AdvertiseAddr, tableIDStr) - tableResolvedTsGauge.DeleteLabelValues(p.changefeedID, p.captureInfo.AdvertiseAddr, tableIDStr) + tableResolvedTsGauge.DeleteLabelValues(p.changefeedID, p.captureInfo.AdvertiseAddr, table.name) syncTableNumGauge.WithLabelValues(p.changefeedID, p.captureInfo.AdvertiseAddr).Dec() } @@ -743,7 +741,7 @@ func createTsRWriter(cli kv.CDCEtcdClient, changefeedID, captureID string) (stor func (p *processor) addTable(ctx context.Context, tableID int64, replicaInfo *model.TableReplicaInfo) { p.stateMu.Lock() defer p.stateMu.Unlock() - ctx = util.PutTableIDInCtx(ctx, tableID) + ctx = util.PutTableInfoInCtx(ctx, tableID, replicaInfo.Name) log.Debug("Add table", zap.Int64("tableID", tableID), zap.Any("replicaInfo", replicaInfo)) if _, ok := p.tables[tableID]; ok { @@ -754,6 +752,7 @@ func (p *processor) addTable(ctx context.Context, tableID int64, replicaInfo *mo ctx, cancel := context.WithCancel(ctx) table := &tableInfo{ id: tableID, + name: replicaInfo.Name, resolvedTs: replicaInfo.StartTs, cancel: cancel, } @@ -792,7 +791,7 @@ func (p *processor) addTable(ctx context.Context, tableID int64, replicaInfo *mo } }() go func() { - resolvedTsGauge := tableResolvedTsGauge.WithLabelValues(p.changefeedID, p.captureInfo.AdvertiseAddr, strconv.FormatInt(table.id, 10)) + resolvedTsGauge := tableResolvedTsGauge.WithLabelValues(p.changefeedID, p.captureInfo.AdvertiseAddr, table.name) for { select { case <-ctx.Done(): diff --git a/cdc/puller/entry_sorter.go b/cdc/puller/entry_sorter.go index 6d43d644f9a..4a9252a8337 100644 --- a/cdc/puller/entry_sorter.go +++ b/cdc/puller/entry_sorter.go @@ -16,7 +16,6 @@ package puller import ( "context" "sort" - "strconv" "sync" "sync/atomic" "time" @@ -54,12 +53,12 @@ func NewEntrySorter() *EntrySorter { func (es *EntrySorter) Run(ctx context.Context) error { captureAddr := util.CaptureAddrFromCtx(ctx) changefeedID := util.ChangefeedIDFromCtx(ctx) - tableIDStr := strconv.FormatInt(util.TableIDFromCtx(ctx), 10) - metricEntrySorterResolvedChanSizeGuage := entrySorterResolvedChanSizeGauge.WithLabelValues(captureAddr, changefeedID, tableIDStr) - metricEntrySorterOutputChanSizeGauge := entrySorterOutputChanSizeGauge.WithLabelValues(captureAddr, changefeedID, tableIDStr) - metricEntryUnsortedSizeGauge := entrySorterUnsortedSizeGauge.WithLabelValues(captureAddr, changefeedID, tableIDStr) - metricEntrySorterSortDuration := entrySorterSortDuration.WithLabelValues(captureAddr, changefeedID, tableIDStr) - metricEntrySorterMergeDuration := entrySorterMergeDuration.WithLabelValues(captureAddr, changefeedID, tableIDStr) + _, tableName := util.TableIDFromCtx(ctx) + metricEntrySorterResolvedChanSizeGuage := entrySorterResolvedChanSizeGauge.WithLabelValues(captureAddr, changefeedID, tableName) + metricEntrySorterOutputChanSizeGauge := entrySorterOutputChanSizeGauge.WithLabelValues(captureAddr, changefeedID, tableName) + metricEntryUnsortedSizeGauge := entrySorterUnsortedSizeGauge.WithLabelValues(captureAddr, changefeedID, tableName) + metricEntrySorterSortDuration := entrySorterSortDuration.WithLabelValues(captureAddr, changefeedID, tableName) + metricEntrySorterMergeDuration := entrySorterMergeDuration.WithLabelValues(captureAddr, changefeedID, tableName) lessFunc := func(i *model.PolymorphicEvent, j *model.PolymorphicEvent) bool { if i.CRTs == j.CRTs { diff --git a/cdc/puller/puller.go b/cdc/puller/puller.go index c1a0478b1b5..5e9b2909c32 100644 --- a/cdc/puller/puller.go +++ b/cdc/puller/puller.go @@ -15,7 +15,6 @@ package puller import ( "context" - "strconv" "sync/atomic" "time" @@ -116,16 +115,15 @@ func (p *pullerImpl) Run(ctx context.Context) error { captureAddr := util.CaptureAddrFromCtx(ctx) changefeedID := util.ChangefeedIDFromCtx(ctx) - tableID := util.TableIDFromCtx(ctx) - tableIDStr := strconv.FormatInt(tableID, 10) - metricOutputChanSize := outputChanSizeGauge.WithLabelValues(captureAddr, changefeedID, tableIDStr) - metricEventChanSize := eventChanSizeGauge.WithLabelValues(captureAddr, changefeedID, tableIDStr) - metricMemBufferSize := memBufferSizeGauge.WithLabelValues(captureAddr, changefeedID, tableIDStr) - metricPullerResolvedTs := pullerResolvedTsGauge.WithLabelValues(captureAddr, changefeedID, tableIDStr) + tableID, tableName := util.TableIDFromCtx(ctx) + metricOutputChanSize := outputChanSizeGauge.WithLabelValues(captureAddr, changefeedID, tableName) + metricEventChanSize := eventChanSizeGauge.WithLabelValues(captureAddr, changefeedID, tableName) + metricMemBufferSize := memBufferSizeGauge.WithLabelValues(captureAddr, changefeedID, tableName) + metricPullerResolvedTs := pullerResolvedTsGauge.WithLabelValues(captureAddr, changefeedID, tableName) metricEventCounterKv := kvEventCounter.WithLabelValues(captureAddr, changefeedID, "kv") metricEventCounterResolved := kvEventCounter.WithLabelValues(captureAddr, changefeedID, "resolved") - metricTxnCollectCounterKv := txnCollectCounter.WithLabelValues(captureAddr, changefeedID, tableIDStr, "kv") - metricTxnCollectCounterResolved := txnCollectCounter.WithLabelValues(captureAddr, changefeedID, tableIDStr, "kv") + metricTxnCollectCounterKv := txnCollectCounter.WithLabelValues(captureAddr, changefeedID, tableName, "kv") + metricTxnCollectCounterResolved := txnCollectCounter.WithLabelValues(captureAddr, changefeedID, tableName, "kv") g.Go(func() error { for { diff --git a/cmd/changefeed.toml b/cmd/changefeed.toml index dd655960582..617e8a6a160 100644 --- a/cmd/changefeed.toml +++ b/cmd/changefeed.toml @@ -12,7 +12,9 @@ case-sensitive = true ignore-txn-start-ts = [1, 2] # 过滤器规则 +# 过滤规则语法: https://github.com/pingcap/tidb-tools/tree/master/pkg/table-filter#syntax # The rules of the filter +# Filter rule syntax: https://github.com/pingcap/tidb-tools/tree/master/pkg/table-filter#syntax rules = ['*.*', '!test.*'] [mounter] @@ -29,6 +31,11 @@ dispatchers = [ {matcher = ['test1.*', 'test2.*'], dispatcher = "ts"}, {matcher = ['test3.*', 'test4.*'], dispatcher = "rowid"}, ] +# 对于 MQ 类的 Sink,可以指定消息的协议格式 +# 协议目前支持 default, canal 两种,default 为 ticdc-open-protocol +# For MQ Sinks, you can configure the protocol of the messages sending to MQ +# Currently the protocol support default and canal +protocol = "default" [cyclic-replication] # 是否开启环形复制 diff --git a/pkg/util/ctx.go b/pkg/util/ctx.go index 8bc5904ba8f..9b4f2c9ac1c 100644 --- a/pkg/util/ctx.go +++ b/pkg/util/ctx.go @@ -48,18 +48,23 @@ func PutTimezoneInCtx(ctx context.Context, timezone *time.Location) context.Cont return context.WithValue(ctx, ctxKeyTimezone, timezone) } -// PutTableIDInCtx returns a new child context with the specified table ID stored. -func PutTableIDInCtx(ctx context.Context, tableID int64) context.Context { - return context.WithValue(ctx, ctxKeyTableID, tableID) +type tableinfo struct { + id int64 + name string +} + +// PutTableInfoInCtx returns a new child context with the specified table ID and name stored. +func PutTableInfoInCtx(ctx context.Context, tableID int64, tableName string) context.Context { + return context.WithValue(ctx, ctxKeyTableID, tableinfo{id: tableID, name: tableName}) } // TableIDFromCtx returns a table ID -func TableIDFromCtx(ctx context.Context) int64 { - tableID, ok := ctx.Value(ctxKeyTableID).(int64) +func TableIDFromCtx(ctx context.Context) (int64, string) { + info, ok := ctx.Value(ctxKeyTableID).(tableinfo) if !ok { - return 0 + return 0, "" } - return tableID + return info.id, info.name } // TimezoneFromCtx returns a timezone