Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
27 commits
Select commit Hold shift + click to select a range
dbea2c0
add logs to debug panic
3AceShowHand Sep 9, 2025
f185c70
skip the empty database info
3AceShowHand Sep 9, 2025
ae4f5fe
set default keyspace to system
3AceShowHand Sep 9, 2025
ee2fc73
add log to track send request region key
3AceShowHand Sep 10, 2025
574ad6e
hack the range
3AceShowHand Sep 10, 2025
22d0d17
hack the range
3AceShowHand Sep 10, 2025
882a0d3
set the keyspace prefix when build key range from table id
3AceShowHand Sep 10, 2025
597fe71
disable make it codec client
3AceShowHand Sep 10, 2025
e5118b2
add more debug log
3AceShowHand Sep 10, 2025
05e0135
remove prefix manually
3AceShowHand Sep 10, 2025
b663cfe
remove prefix manually
3AceShowHand Sep 10, 2025
4c52967
remove prefix manually
3AceShowHand Sep 10, 2025
60fe1f1
test classic tikv
3AceShowHand Sep 11, 2025
8c93e66
adjust logs
3AceShowHand Sep 11, 2025
4b74e7f
encode key when subscribe dml tables
3AceShowHand Sep 11, 2025
d6dcbe1
Merge branch 'master' into keyspace-pdclient
3AceShowHand Sep 11, 2025
33d73e9
add log to debug table trigger span
3AceShowHand Sep 11, 2025
62e7707
add more logs to debug commit ts fallback
3AceShowHand Sep 11, 2025
5e59b81
add more logs to debug commit ts fallback
3AceShowHand Sep 11, 2025
ce33177
fix data
3AceShowHand Sep 11, 2025
589b9fb
fix data
3AceShowHand Sep 11, 2025
b02e5e4
build key range with prefix
3AceShowHand Sep 11, 2025
f92d8d1
remove the key prefix
3AceShowHand Sep 12, 2025
2b4ba12
add committed entry logs
3AceShowHand Sep 15, 2025
5411c52
Merge branch 'master' into keyspace-pdclient
3AceShowHand Sep 15, 2025
dbf2617
set keyspace to kvstorage
3AceShowHand Sep 15, 2025
c5b9a10
Merge branch 'master' into keyspace-pdclient
3AceShowHand Sep 17, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 12 additions & 2 deletions logservice/logpuller/region_event_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"github.com/pingcap/log"
"github.com/pingcap/ticdc/pkg/common"
"github.com/pingcap/ticdc/pkg/metrics"
"github.com/pingcap/ticdc/pkg/spanz"
"github.com/pingcap/ticdc/utils/dynstream"
"github.com/tikv/client-go/v2/oracle"
"go.uber.org/zap"
Expand Down Expand Up @@ -235,10 +236,19 @@ func handleEventEntries(span *subscribedSpan, state *regionFeedState, entries *c
log.Fatal("The CommitTs must be greater than the resolvedTs",
zap.String("EventType", "COMMITTED"),
zap.Uint64("CommitTs", entry.CommitTs),
zap.Uint64("resolvedTs", resolvedTs),
zap.Uint64("regionID", regionID))
zap.Uint64("resolvedTs", resolvedTs))
}
span.kvEventsCache = append(span.kvEventsCache, assembleRowEvent(regionID, entry))
key := newMatchKey(entry)
if state.matcher.unmatchedValue != nil {
if _, exists := state.matcher.unmatchedValue[key]; exists {
log.Warn("receive a COMMITTED entry while there is still unmatched prewrite on the same key",
zap.Uint64("regionID", regionID),
zap.String("key", spanz.HexKey(entry.GetKey())),
zap.Uint64("startTs", entry.GetStartTs()),
zap.Uint64("commitTs", entry.GetCommitTs()))
}
}
case cdcpb.Event_PREWRITE:
state.matcher.putPrewriteRow(entry)
case cdcpb.Event_COMMIT:
Expand Down
10 changes: 9 additions & 1 deletion logservice/logpuller/region_request_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
"github.com/pingcap/ticdc/pkg/config"
cerror "github.com/pingcap/ticdc/pkg/errors"
"github.com/pingcap/ticdc/pkg/security"
"github.com/pingcap/ticdc/pkg/spanz"
"github.com/pingcap/ticdc/pkg/util"
"github.com/pingcap/ticdc/pkg/version"
"go.uber.org/zap"
Expand Down Expand Up @@ -406,12 +407,19 @@ func (s *regionRequestWorker) processRegionSendTask(
}

func (s *regionRequestWorker) createRegionRequest(region regionInfo) *cdcpb.ChangeDataRequest {
resolvedTs := region.resolvedTs()
log.Info("region worker start to send request to store",
zap.Uint64("regionID", region.verID.GetID()),
zap.Uint64("startTs", resolvedTs),
zap.Uint64("subscriptionID", uint64(region.subscribedSpan.subID)),
zap.String("startKey", spanz.HexKey(region.span.StartKey)),
zap.String("endKey", spanz.HexKey(region.span.EndKey)))
return &cdcpb.ChangeDataRequest{
Header: &cdcpb.Header{ClusterId: s.client.clusterID, TicdcVersion: version.ReleaseSemver()},
RegionId: region.verID.GetID(),
RequestId: uint64(region.subscribedSpan.subID),
RegionEpoch: region.rpcCtx.Meta.RegionEpoch,
CheckpointTs: region.resolvedTs(),
CheckpointTs: resolvedTs,
StartKey: region.span.StartKey,
EndKey: region.span.EndKey,
ExtraOp: kvrpcpb.ExtraOp_ReadOldValue,
Expand Down
8 changes: 6 additions & 2 deletions logservice/logpuller/txn_matcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"github.com/pingcap/kvproto/pkg/cdcpb"
"github.com/pingcap/log"
"github.com/pingcap/ticdc/pkg/metrics"
"github.com/pingcap/ticdc/pkg/spanz"
"go.uber.org/zap"
)

Expand Down Expand Up @@ -127,8 +128,11 @@ func (m *matcher) matchCachedRow(initialized bool) []*cdcpb.Event_Row {
// prewrite log before initialized, a committed log with
// the same key and start-ts must have been received.
log.Info("ignore commit event without prewrite",
zap.Binary("key", cacheEntry.GetKey()),
zap.Uint64("startTs", cacheEntry.GetStartTs()))
zap.String("key", spanz.HexKey(cacheEntry.GetKey())),
zap.Uint64("startTs", cacheEntry.GetStartTs()),
zap.Uint64("commitTs", cacheEntry.GetCommitTs()),
zap.Any("opType", cacheEntry.GetOpType()),
zap.Any("logType", cacheEntry.GetType()))
continue
}
cachedCommit[top] = cacheEntry
Expand Down
24 changes: 21 additions & 3 deletions logservice/schemastore/ddl_job_fetcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,11 @@ import (
"github.com/pingcap/ticdc/heartbeatpb"
"github.com/pingcap/ticdc/logservice/logpuller"
"github.com/pingcap/ticdc/pkg/common"
appctx "github.com/pingcap/ticdc/pkg/common/context"
"github.com/pingcap/ticdc/pkg/common/event"
cerror "github.com/pingcap/ticdc/pkg/errors"
"github.com/pingcap/ticdc/pkg/pdutil"
"github.com/pingcap/ticdc/pkg/spanz"
"github.com/pingcap/ticdc/utils/heap"
"github.com/pingcap/tidb/pkg/ddl"
"github.com/pingcap/tidb/pkg/kv"
Expand Down Expand Up @@ -124,7 +127,7 @@ func (p *ddlJobFetcher) input(kvs []common.RawKVEntry, _ func()) bool {
for _, kv := range kvs {
job, err := p.unmarshalDDL(&kv)
if err != nil {
log.Fatal("unmarshal ddl failed", zap.Any("kv", kv), zap.Error(err))
log.Fatal("unmarshal ddl failed", zap.String("key", spanz.HexKey(kv.Key)), zap.Any("kv", kv), zap.Error(err))
}

if job == nil {
Expand Down Expand Up @@ -258,14 +261,29 @@ const (
)

func getAllDDLSpan() []heartbeatpb.TableSpan {
pdClient := appctx.GetService[pdutil.PDAPIClient](appctx.PDAPIClient)
keyspaceID, err := pdClient.GetKeyspaceID(context.Background(), "SYSTEM")
if err != nil {
log.Panic("get keyspace id from pd client failed", zap.Error(err))
}

spans := make([]heartbeatpb.TableSpan, 0, 2)
start, end := common.GetTableRange(JobTableID)
start, end, err := common.GetKeyspaceTableRange(keyspaceID, JobTableID)
if err != nil {
log.Panic("get keyspace table range failed",
zap.Uint32("keyspaceID", keyspaceID), zap.Error(err))
}
spans = append(spans, heartbeatpb.TableSpan{
TableID: JobTableID,
StartKey: common.ToComparableKey(start),
EndKey: common.ToComparableKey(end),
})
start, end = common.GetTableRange(JobHistoryID)

start, end, err = common.GetKeyspaceTableRange(keyspaceID, JobHistoryID)
if err != nil {
log.Panic("get keyspace table range failed",
zap.Uint32("keyspaceID", keyspaceID), zap.Error(err))
}
spans = append(spans, heartbeatpb.TableSpan{
TableID: JobHistoryID,
StartKey: common.ToComparableKey(start),
Expand Down
12 changes: 11 additions & 1 deletion maintainer/maintainer_controller_bootstrap.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import (
"github.com/pingcap/ticdc/pkg/errors"
"github.com/pingcap/ticdc/pkg/filter"
"github.com/pingcap/ticdc/pkg/node"
"github.com/pingcap/ticdc/pkg/pdutil"
"github.com/pingcap/ticdc/utils"
"go.uber.org/zap"
)
Expand Down Expand Up @@ -209,7 +210,16 @@ func (c *Controller) processTableSpans(
// Add new table if not working
if isTableWorking {
// Handle existing table spans
span := common.TableIDToComparableSpan(table.TableID)
pdClient := appcontext.GetService[pdutil.PDAPIClient](appcontext.PDAPIClient)
keyspaceID, err := pdClient.GetKeyspaceID(context.Background(), "SYSTEM")
if err != nil {
log.Panic("get codec from pd client failed", zap.Error(err))
}
span, err := common.TableIDToComparableSpanWithKeyspace(keyspaceID, table.TableID)
if err != nil {
log.Panic("tableIDToComparableSpanWithKeyspace failed",
zap.Uint32("keyspaceID", keyspaceID), zap.Int64("tableID", table.TableID), zap.Error(err))
}
tableSpan := &heartbeatpb.TableSpan{
TableID: table.TableID,
StartKey: span.StartKey,
Expand Down
13 changes: 12 additions & 1 deletion maintainer/maintainer_controller_helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,10 @@ import (
"github.com/pingcap/ticdc/maintainer/span"
"github.com/pingcap/ticdc/maintainer/split"
"github.com/pingcap/ticdc/pkg/common"
appcontext "github.com/pingcap/ticdc/pkg/common/context"
"github.com/pingcap/ticdc/pkg/errors"
"github.com/pingcap/ticdc/pkg/node"
"github.com/pingcap/ticdc/pkg/pdutil"
pkgoperator "github.com/pingcap/ticdc/pkg/scheduler/operator"
"go.uber.org/zap"
)
Expand Down Expand Up @@ -165,7 +167,16 @@ func (c *Controller) splitTableByRegionCount(tableID int64, mode int64) error {
return nil
}

span := common.TableIDToComparableSpan(tableID)
pdClient := appcontext.GetService[pdutil.PDAPIClient](appcontext.PDAPIClient)
keyspaceID, err := pdClient.GetKeyspaceID(context.Background(), "SYSTEM")
if err != nil {
log.Panic("get codec from pd client failed", zap.Error(err))
}
span, err := common.TableIDToComparableSpanWithKeyspace(keyspaceID, tableID)
if err != nil {
log.Panic("tableIDToComparableSpanWithKeyspace failed",
zap.Uint32("keyspaceID", keyspaceID), zap.Int64("tableID", tableID), zap.Error(err))
}
wholeSpan := &heartbeatpb.TableSpan{
TableID: span.TableID,
StartKey: span.StartKey,
Expand Down
16 changes: 15 additions & 1 deletion maintainer/range_checker/table_span_range_checker.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,17 @@ package range_checker

import (
"bytes"
"context"
"fmt"
"strings"
"sync/atomic"

"github.com/google/btree"
"github.com/pingcap/log"
"github.com/pingcap/ticdc/pkg/common"
appcontext "github.com/pingcap/ticdc/pkg/common/context"
"github.com/pingcap/ticdc/pkg/pdutil"
"go.uber.org/zap"
)

// TableSpanRangeChecker is used to check if all ranges cover the start and end byte slices.
Expand All @@ -35,8 +40,17 @@ func NewTableSpanRangeChecker(tables []int64) *TableSpanRangeChecker {
tableSpans: make(map[int64]*SpanCoverageChecker),
covered: atomic.Bool{},
}
pdClient := appcontext.GetService[pdutil.PDAPIClient](appcontext.PDAPIClient)
keyspaceID, err := pdClient.GetKeyspaceID(context.Background(), "SYSTEM")
if err != nil {
log.Panic("get codec from pd client failed", zap.Error(err))
}
for _, table := range tables {
span := common.TableIDToComparableSpan(table)
span, err := common.TableIDToComparableSpanWithKeyspace(keyspaceID, table)
if err != nil {
log.Panic("tableIDToComparableSpanWithKeyspace failed",
zap.Uint32("keyspaceID", keyspaceID), zap.Error(err))
}
sc.tableSpans[table] = NewTableSpanCoverageChecker(span.StartKey, span.EndKey)
}
return sc
Expand Down
15 changes: 14 additions & 1 deletion maintainer/replica/replication_span.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,14 +15,17 @@ package replica

import (
"bytes"
"context"
"encoding/hex"
"sync"

"github.com/pingcap/log"
"github.com/pingcap/ticdc/heartbeatpb"
"github.com/pingcap/ticdc/pkg/common"
appcontext "github.com/pingcap/ticdc/pkg/common/context"
"github.com/pingcap/ticdc/pkg/messaging"
"github.com/pingcap/ticdc/pkg/node"
"github.com/pingcap/ticdc/pkg/pdutil"
"github.com/pingcap/ticdc/pkg/scheduler/replica"
"go.uber.org/atomic"
"go.uber.org/zap"
Expand Down Expand Up @@ -127,8 +130,18 @@ func (r *SpanReplication) initStatus(status *heartbeatpb.TableSpanStatus) {
func (r *SpanReplication) initGroupID() {
r.groupID = replica.DefaultGroupID
span := heartbeatpb.TableSpan{TableID: r.Span.TableID, StartKey: r.Span.StartKey, EndKey: r.Span.EndKey}
pdClient := appcontext.GetService[pdutil.PDAPIClient](appcontext.PDAPIClient)
keyspaceID, err := pdClient.GetKeyspaceID(context.Background(), "SYSTEM")
if err != nil {
log.Panic("get codec from pd client failed", zap.Error(err))
}
// check if the table is split
totalSpan := common.TableIDToComparableSpan(span.TableID)
totalSpan, err := common.TableIDToComparableSpanWithKeyspace(keyspaceID, span.TableID)
if err != nil {
log.Panic("tableIDToComparableSpanWithKeyspace failed",
zap.Uint32("keyspaceID", keyspaceID), zap.Int64("tableID", span.TableID), zap.Error(err))
}

if !common.IsSubSpan(span, totalSpan) {
log.Warn("invalid span range", zap.String("changefeedID", r.ChangefeedID.Name()),
zap.String("id", r.ID.String()), zap.Int64("tableID", span.TableID),
Expand Down
12 changes: 11 additions & 1 deletion maintainer/span/span_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
commonEvent "github.com/pingcap/ticdc/pkg/common/event"
"github.com/pingcap/ticdc/pkg/config"
"github.com/pingcap/ticdc/pkg/node"
"github.com/pingcap/ticdc/pkg/pdutil"
pkgreplica "github.com/pingcap/ticdc/pkg/scheduler/replica"
"github.com/pingcap/ticdc/server/watcher"
"github.com/pingcap/ticdc/utils"
Expand Down Expand Up @@ -142,7 +143,16 @@ func (c *Controller) AddNewTable(table commonEvent.Table, startTs uint64) {
zap.Int64("table", table.TableID))
return
}
span := common.TableIDToComparableSpan(table.TableID)
pdClient := appcontext.GetService[pdutil.PDAPIClient](appcontext.PDAPIClient)
keyspaceID, err := pdClient.GetKeyspaceID(context.Background(), "SYSTEM")
if err != nil {
log.Panic("get keyspaceID from pd client failed", zap.Error(err))
}
span, err := common.TableIDToComparableSpanWithKeyspace(keyspaceID, table.TableID)
if err != nil {
log.Panic("tableIDToComparableSpanWithKeyspace failed",
zap.Uint32("keyspaceID", keyspaceID), zap.Int64("tableID", table.TableID), zap.Error(err))
}
tableSpan := &heartbeatpb.TableSpan{
TableID: table.TableID,
StartKey: span.StartKey,
Expand Down
4 changes: 4 additions & 0 deletions maintainer/testutil/test_util.go
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,10 @@ func (m *MockPDAPIClient) ScanRegions(ctx context.Context, span heartbeatpb.Tabl
return m.scnaRegionsResult[fmt.Sprintf("%s-%s", span.StartKey, span.EndKey)], nil
}

func (m *MockPDAPIClient) GetKeyspaceID(ctx context.Context, keyspace string) (uint32, error) {
return 0, nil
}

func (m *MockPDAPIClient) Close() {
// Mock implementation - do nothing
}
Expand Down
19 changes: 19 additions & 0 deletions pkg/common/event/mounter.go
Original file line number Diff line number Diff line change
Expand Up @@ -157,6 +157,24 @@ func IsLegacyFormatJob(rawKV *common.RawKVEntry) bool {
return bytes.HasPrefix(rawKV.Key, metaPrefix)
}

const (
keyspacePrefixLen = 4
apiV2TxnModePrefix byte = 'x'
)

// RemoveKeyspacePrefix is used to remove keyspace prefix from the key.
func RemoveKeyspacePrefix(key []byte) []byte {
if len(key) <= keyspacePrefixLen {
return key
}

if key[0] != apiV2TxnModePrefix {
log.Warn("the first byte of key is not 'x', it may not be in api v2 txn mode", zap.Any("byte", key[0]))
return key
}
return key[keyspacePrefixLen:]
}

// ParseDDLJob parses the job from the raw KV entry.
func ParseDDLJob(rawKV *common.RawKVEntry, ddlTableInfo *DDLTableInfo) (*model.Job, error) {
var v []byte
Expand All @@ -172,6 +190,7 @@ func ParseDDLJob(rawKV *common.RawKVEntry, ddlTableInfo *DDLTableInfo) (*model.J
return job, err
}

rawKV.Key = RemoveKeyspacePrefix(rawKV.Key)
recordID, err := tablecodec.DecodeRowKey(rawKV.Key)
if err != nil {
return nil, errors.Trace(err)
Expand Down
2 changes: 2 additions & 0 deletions pkg/common/event/mounter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,13 @@
package event

import (
"fmt"
"testing"
"time"

"github.com/pingcap/ticdc/pkg/common"
"github.com/pingcap/ticdc/pkg/integrity"
"github.com/pingcap/ticdc/pkg/spanz"
timodel "github.com/pingcap/tidb/pkg/meta/model"
"github.com/pingcap/tidb/pkg/parser/mysql"
"github.com/pingcap/tidb/pkg/types"
Expand Down
Loading