Skip to content

Commit

Permalink
cdc, pkg: replace table ID with table name in metrics (#695)
Browse files Browse the repository at this point in the history
* cdc, pkg: replace table ID with table name in metrics
* cherry-pick #692 bugfix

Signed-off-by: Neil Shen <overvenus@gmail.com>
  • Loading branch information
overvenus committed Jun 29, 2020
1 parent 5cd12d3 commit 8276511
Show file tree
Hide file tree
Showing 10 changed files with 78 additions and 50 deletions.
22 changes: 15 additions & 7 deletions cdc/changefeed.go
Expand Up @@ -323,26 +323,34 @@ func (c *changeFeed) balanceOrphanTables(ctx context.Context, captures map[model
}
newTaskStatus[captureID] = status
}
schemaSnapshot := c.schema
for tableID, op := range operation {
var orphanMarkTableID model.TableID
tableName, found := schemaSnapshot.GetTableNameByID(tableID)
if !found {
log.Warn("balance orphan tables delay, table not found",
zap.String("changefeed", c.id),
zap.Int64("tableID", tableID))
continue
}
if c.cyclicEnabled {
schemaSnapshot := c.schema
tableName, found := schemaSnapshot.GetTableNameByID(tableID)
if !found {
continue
}
markTableSchameName, markTableTableName := mark.GetMarkTableName(tableName.Schema, tableName.Table)
orphanMarkTableID, found = schemaSnapshot.GetTableIDByName(markTableSchameName, markTableTableName)
if !found {
// Mark table is not created yet, skip and wait.
log.Info("balance table info delay, wait mark table",
log.Warn("balance table info delay, wait mark table",
zap.String("changefeed", c.id),
zap.Int64("tableID", tableID),
zap.String("markTableName", markTableTableName))
continue
}
}
status.AddTable(tableID, &model.TableReplicaInfo{StartTs: op.BoundaryTs, MarkTableID: orphanMarkTableID}, op.BoundaryTs)
info := &model.TableReplicaInfo{
StartTs: op.BoundaryTs,
MarkTableID: orphanMarkTableID,
Name: tableName.String(),
}
status.AddTable(tableID, info, op.BoundaryTs)
addedTables[tableID] = struct{}{}
}
}
Expand Down
19 changes: 17 additions & 2 deletions cdc/entry/schema_storage.go
Expand Up @@ -99,9 +99,18 @@ func newSchemaSnapshotFromMeta(meta *timeta.Meta, currentTs uint64) (*schemaSnap
tableInfo := WrapTableInfo(dbinfo.ID, dbinfo.Name.O, tableInfo)
snap.tables[tableInfo.ID] = tableInfo
snap.tableNameToID[model.TableName{Schema: dbinfo.Name.O, Table: tableInfo.Name.O}] = tableInfo.ID
if !tableInfo.IsEligible() {
isEligible := tableInfo.IsEligible()
if !isEligible {
snap.ineligibleTableID[tableInfo.ID] = struct{}{}
}
if pi := tableInfo.GetPartitionInfo(); pi != nil {
for _, partition := range pi.Definitions {
snap.partitionTable[partition.ID] = tableInfo
if !isEligible {
snap.ineligibleTableID[partition.ID] = struct{}{}
}
}
}
}
}
snap.currentTs = currentTs
Expand Down Expand Up @@ -364,7 +373,13 @@ func (ti *TableInfo) Clone() *TableInfo {
func (s *schemaSnapshot) GetTableNameByID(id int64) (model.TableName, bool) {
tableInfo, ok := s.tables[id]
if !ok {
return model.TableName{}, false
// Try partition, it could be a partition table.
partInfo, ok := s.partitionTable[id]
if !ok {
return model.TableName{}, false
}
// Must exists an table that contains the partition.
tableInfo = s.tables[partInfo.ID]
}
return tableInfo.TableName, true
}
Expand Down
8 changes: 0 additions & 8 deletions cdc/metrics_processor.go
Expand Up @@ -69,13 +69,6 @@ var (
Help: "Bucketed histogram of processing time (s) of waiting event prepare in processor.",
Buckets: prometheus.ExponentialBuckets(0.000001, 10, 10),
}, []string{"changefeed", "capture"})
tableInputChanSizeGauge = prometheus.NewGaugeVec(
prometheus.GaugeOpts{
Namespace: "ticdc",
Subsystem: "processor",
Name: "table_input_chan_size",
Help: "txn input channel size for a table",
}, []string{"changefeed", "capture", "table"})
tableOutputChanSizeGauge = prometheus.NewGaugeVec(
prometheus.GaugeOpts{
Namespace: "ticdc",
Expand All @@ -100,7 +93,6 @@ func initProcessorMetrics(registry *prometheus.Registry) {
registry.MustRegister(syncTableNumGauge)
registry.MustRegister(txnCounter)
registry.MustRegister(updateInfoDuration)
registry.MustRegister(tableInputChanSizeGauge)
registry.MustRegister(tableOutputChanSizeGauge)
registry.MustRegister(waitEventPrepareDuration)
registry.MustRegister(processorErrorCounter)
Expand Down
7 changes: 6 additions & 1 deletion cdc/model/owner.go
Expand Up @@ -146,6 +146,7 @@ func (w *TaskWorkload) Marshal() (string, error) {
type TableReplicaInfo struct {
StartTs Ts `json:"start-ts"`
MarkTableID TableID `json:"mark-table-id"`
Name string `json:"-"`
}

// TaskStatus records the task information of a capture
Expand Down Expand Up @@ -237,7 +238,11 @@ func (ts *TaskStatus) Snapshot(cfID ChangeFeedID, captureID CaptureID, checkpoin
if ts < table.StartTs {
ts = table.StartTs
}
snap.Tables[tableID] = &TableReplicaInfo{StartTs: ts, MarkTableID: table.MarkTableID}
snap.Tables[tableID] = &TableReplicaInfo{
StartTs: ts,
MarkTableID: table.MarkTableID,
Name: table.Name,
}
}
return snap
}
Expand Down
2 changes: 1 addition & 1 deletion cdc/owner_operator.go
Expand Up @@ -52,7 +52,7 @@ func newDDLHandler(pdCli pd.Client, kvStorage tidbkv.Storage, checkpointTS uint6
}
// Set it up so that one failed goroutine cancels all others sharing the same ctx
errg, ctx := errgroup.WithContext(ctx)
ctx = util.PutTableIDInCtx(ctx, -1)
ctx = util.PutTableInfoInCtx(ctx, -1, "")

// FIXME: user of ddlHandler can't know error happen.
errg.Go(func() error {
Expand Down
15 changes: 7 additions & 8 deletions cdc/processor.go
Expand Up @@ -17,7 +17,6 @@ import (
"context"
"fmt"
"io"
"strconv"
"strings"
"sync"
"sync/atomic"
Expand Down Expand Up @@ -111,6 +110,7 @@ type processor struct {

type tableInfo struct {
id int64
name string
resolvedTs uint64
markTableID int64
mResolvedTs uint64
Expand Down Expand Up @@ -163,7 +163,6 @@ func newProcessor(
// so we set `needEncode` to false.
log.Info("start processor with startts", zap.Uint64("startts", checkpointTs))
ddlPuller := puller.NewPuller(pdCli, kvStorage, checkpointTs, []regionspan.Span{regionspan.GetDDLSpan(), regionspan.GetAddIndexDDLSpan()}, false, limitter)
ctx = util.PutTableIDInCtx(ctx, 0)
filter, err := filter.NewFilter(changefeed.Config)
if err != nil {
return nil, errors.Trace(err)
Expand Down Expand Up @@ -214,7 +213,8 @@ func newProcessor(
func (p *processor) Run(ctx context.Context) {
wg, cctx := errgroup.WithContext(ctx)
p.wg = wg
ddlPullerCtx, ddlPullerCancel := context.WithCancel(util.PutTableIDInCtx(cctx, 0))
ddlPullerCtx, ddlPullerCancel :=
context.WithCancel(util.PutTableInfoInCtx(cctx, 0, "ticdc-processor-ddl"))
p.ddlPullerCancel = ddlPullerCancel

wg.Go(func() error {
Expand Down Expand Up @@ -495,9 +495,7 @@ func (p *processor) removeTable(tableID int64) {

table.cancel()
delete(p.tables, tableID)
tableIDStr := strconv.FormatInt(tableID, 10)
tableInputChanSizeGauge.DeleteLabelValues(p.changefeedID, p.captureInfo.AdvertiseAddr, tableIDStr)
tableResolvedTsGauge.DeleteLabelValues(p.changefeedID, p.captureInfo.AdvertiseAddr, tableIDStr)
tableResolvedTsGauge.DeleteLabelValues(p.changefeedID, p.captureInfo.AdvertiseAddr, table.name)
syncTableNumGauge.WithLabelValues(p.changefeedID, p.captureInfo.AdvertiseAddr).Dec()
}

Expand Down Expand Up @@ -743,7 +741,7 @@ func createTsRWriter(cli kv.CDCEtcdClient, changefeedID, captureID string) (stor
func (p *processor) addTable(ctx context.Context, tableID int64, replicaInfo *model.TableReplicaInfo) {
p.stateMu.Lock()
defer p.stateMu.Unlock()
ctx = util.PutTableIDInCtx(ctx, tableID)
ctx = util.PutTableInfoInCtx(ctx, tableID, replicaInfo.Name)

log.Debug("Add table", zap.Int64("tableID", tableID), zap.Any("replicaInfo", replicaInfo))
if _, ok := p.tables[tableID]; ok {
Expand All @@ -754,6 +752,7 @@ func (p *processor) addTable(ctx context.Context, tableID int64, replicaInfo *mo
ctx, cancel := context.WithCancel(ctx)
table := &tableInfo{
id: tableID,
name: replicaInfo.Name,
resolvedTs: replicaInfo.StartTs,
cancel: cancel,
}
Expand Down Expand Up @@ -792,7 +791,7 @@ func (p *processor) addTable(ctx context.Context, tableID int64, replicaInfo *mo
}
}()
go func() {
resolvedTsGauge := tableResolvedTsGauge.WithLabelValues(p.changefeedID, p.captureInfo.AdvertiseAddr, strconv.FormatInt(table.id, 10))
resolvedTsGauge := tableResolvedTsGauge.WithLabelValues(p.changefeedID, p.captureInfo.AdvertiseAddr, table.name)
for {
select {
case <-ctx.Done():
Expand Down
13 changes: 6 additions & 7 deletions cdc/puller/entry_sorter.go
Expand Up @@ -16,7 +16,6 @@ package puller
import (
"context"
"sort"
"strconv"
"sync"
"sync/atomic"
"time"
Expand Down Expand Up @@ -54,12 +53,12 @@ func NewEntrySorter() *EntrySorter {
func (es *EntrySorter) Run(ctx context.Context) error {
captureAddr := util.CaptureAddrFromCtx(ctx)
changefeedID := util.ChangefeedIDFromCtx(ctx)
tableIDStr := strconv.FormatInt(util.TableIDFromCtx(ctx), 10)
metricEntrySorterResolvedChanSizeGuage := entrySorterResolvedChanSizeGauge.WithLabelValues(captureAddr, changefeedID, tableIDStr)
metricEntrySorterOutputChanSizeGauge := entrySorterOutputChanSizeGauge.WithLabelValues(captureAddr, changefeedID, tableIDStr)
metricEntryUnsortedSizeGauge := entrySorterUnsortedSizeGauge.WithLabelValues(captureAddr, changefeedID, tableIDStr)
metricEntrySorterSortDuration := entrySorterSortDuration.WithLabelValues(captureAddr, changefeedID, tableIDStr)
metricEntrySorterMergeDuration := entrySorterMergeDuration.WithLabelValues(captureAddr, changefeedID, tableIDStr)
_, tableName := util.TableIDFromCtx(ctx)
metricEntrySorterResolvedChanSizeGuage := entrySorterResolvedChanSizeGauge.WithLabelValues(captureAddr, changefeedID, tableName)
metricEntrySorterOutputChanSizeGauge := entrySorterOutputChanSizeGauge.WithLabelValues(captureAddr, changefeedID, tableName)
metricEntryUnsortedSizeGauge := entrySorterUnsortedSizeGauge.WithLabelValues(captureAddr, changefeedID, tableName)
metricEntrySorterSortDuration := entrySorterSortDuration.WithLabelValues(captureAddr, changefeedID, tableName)
metricEntrySorterMergeDuration := entrySorterMergeDuration.WithLabelValues(captureAddr, changefeedID, tableName)

lessFunc := func(i *model.PolymorphicEvent, j *model.PolymorphicEvent) bool {
if i.CRTs == j.CRTs {
Expand Down
16 changes: 7 additions & 9 deletions cdc/puller/puller.go
Expand Up @@ -15,7 +15,6 @@ package puller

import (
"context"
"strconv"
"sync/atomic"
"time"

Expand Down Expand Up @@ -116,16 +115,15 @@ func (p *pullerImpl) Run(ctx context.Context) error {

captureAddr := util.CaptureAddrFromCtx(ctx)
changefeedID := util.ChangefeedIDFromCtx(ctx)
tableID := util.TableIDFromCtx(ctx)
tableIDStr := strconv.FormatInt(tableID, 10)
metricOutputChanSize := outputChanSizeGauge.WithLabelValues(captureAddr, changefeedID, tableIDStr)
metricEventChanSize := eventChanSizeGauge.WithLabelValues(captureAddr, changefeedID, tableIDStr)
metricMemBufferSize := memBufferSizeGauge.WithLabelValues(captureAddr, changefeedID, tableIDStr)
metricPullerResolvedTs := pullerResolvedTsGauge.WithLabelValues(captureAddr, changefeedID, tableIDStr)
tableID, tableName := util.TableIDFromCtx(ctx)
metricOutputChanSize := outputChanSizeGauge.WithLabelValues(captureAddr, changefeedID, tableName)
metricEventChanSize := eventChanSizeGauge.WithLabelValues(captureAddr, changefeedID, tableName)
metricMemBufferSize := memBufferSizeGauge.WithLabelValues(captureAddr, changefeedID, tableName)
metricPullerResolvedTs := pullerResolvedTsGauge.WithLabelValues(captureAddr, changefeedID, tableName)
metricEventCounterKv := kvEventCounter.WithLabelValues(captureAddr, changefeedID, "kv")
metricEventCounterResolved := kvEventCounter.WithLabelValues(captureAddr, changefeedID, "resolved")
metricTxnCollectCounterKv := txnCollectCounter.WithLabelValues(captureAddr, changefeedID, tableIDStr, "kv")
metricTxnCollectCounterResolved := txnCollectCounter.WithLabelValues(captureAddr, changefeedID, tableIDStr, "kv")
metricTxnCollectCounterKv := txnCollectCounter.WithLabelValues(captureAddr, changefeedID, tableName, "kv")
metricTxnCollectCounterResolved := txnCollectCounter.WithLabelValues(captureAddr, changefeedID, tableName, "kv")

g.Go(func() error {
for {
Expand Down
7 changes: 7 additions & 0 deletions cmd/changefeed.toml
Expand Up @@ -12,7 +12,9 @@ case-sensitive = true
ignore-txn-start-ts = [1, 2]

# 过滤器规则
# 过滤规则语法: https://github.com/pingcap/tidb-tools/tree/master/pkg/table-filter#syntax
# The rules of the filter
# Filter rule syntax: https://github.com/pingcap/tidb-tools/tree/master/pkg/table-filter#syntax
rules = ['*.*', '!test.*']

[mounter]
Expand All @@ -29,6 +31,11 @@ dispatchers = [
{matcher = ['test1.*', 'test2.*'], dispatcher = "ts"},
{matcher = ['test3.*', 'test4.*'], dispatcher = "rowid"},
]
# 对于 MQ 类的 Sink,可以指定消息的协议格式
# 协议目前支持 default, canal 两种,default 为 ticdc-open-protocol
# For MQ Sinks, you can configure the protocol of the messages sending to MQ
# Currently the protocol support default and canal
protocol = "default"

[cyclic-replication]
# 是否开启环形复制
Expand Down
19 changes: 12 additions & 7 deletions pkg/util/ctx.go
Expand Up @@ -48,18 +48,23 @@ func PutTimezoneInCtx(ctx context.Context, timezone *time.Location) context.Cont
return context.WithValue(ctx, ctxKeyTimezone, timezone)
}

// PutTableIDInCtx returns a new child context with the specified table ID stored.
func PutTableIDInCtx(ctx context.Context, tableID int64) context.Context {
return context.WithValue(ctx, ctxKeyTableID, tableID)
type tableinfo struct {
id int64
name string
}

// PutTableInfoInCtx returns a new child context with the specified table ID and name stored.
func PutTableInfoInCtx(ctx context.Context, tableID int64, tableName string) context.Context {
return context.WithValue(ctx, ctxKeyTableID, tableinfo{id: tableID, name: tableName})
}

// TableIDFromCtx returns a table ID
func TableIDFromCtx(ctx context.Context) int64 {
tableID, ok := ctx.Value(ctxKeyTableID).(int64)
func TableIDFromCtx(ctx context.Context) (int64, string) {
info, ok := ctx.Value(ctxKeyTableID).(tableinfo)
if !ok {
return 0
return 0, ""
}
return tableID
return info.id, info.name
}

// TimezoneFromCtx returns a timezone
Expand Down

0 comments on commit 8276511

Please sign in to comment.