Skip to content

Commit

Permalink
Fix learner nodes got ignored in auto detect mode error
Browse files Browse the repository at this point in the history
Signed-off-by: Yang Zhang <yang.zhang@pingcap.com>
  • Loading branch information
v01dstar committed Jun 28, 2023
1 parent cd5b1ce commit 1414baf
Show file tree
Hide file tree
Showing 29 changed files with 402 additions and 185 deletions.
1 change: 1 addition & 0 deletions client/grpcutil/grpcutil.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@ func BuildForwardContext(ctx context.Context, addr string) context.Context {
func GetOrCreateGRPCConn(ctx context.Context, clientConns *sync.Map, addr string, tlsCfg *tlsutil.TLSConfig, opt ...grpc.DialOption) (*grpc.ClientConn, error) {
conn, ok := clientConns.Load(addr)
if ok {
// TODO: check the connection state.
return conn.(*grpc.ClientConn), nil
}
tlsConfig, err := tlsCfg.ToTLSConfig()
Expand Down
4 changes: 2 additions & 2 deletions client/tso_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,8 +83,8 @@ type tsoClient struct {
tsoDispatcher sync.Map // Same as map[string]chan *tsoRequest
// dc-location -> deadline
tsDeadline sync.Map // Same as map[string]chan deadline
// dc-location -> *lastTSO
lastTSMap sync.Map // Same as map[string]*lastTSO
// dc-location -> *tsoInfo while the tsoInfo is the last TSO info
lastTSOInfoMap sync.Map // Same as map[string]*tsoInfo

checkTSDeadlineCh chan struct{}
checkTSODispatcherCh chan struct{}
Expand Down
85 changes: 49 additions & 36 deletions client/tso_dispatcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,10 +40,13 @@ type tsoDispatcher struct {
tsoBatchController *tsoBatchController
}

type lastTSO struct {
keyspaceGroupID uint32
physical int64
logical int64
type tsoInfo struct {
tsoServer string
reqKeyspaceGroupID uint32
respKeyspaceGroupID uint32
respReceivedAt time.Time
physical int64
logical int64
}

const (
Expand Down Expand Up @@ -709,62 +712,72 @@ func (c *tsoClient) processRequests(

requests := tbc.getCollectedRequests()
count := int64(len(requests))
reqKeyspaceGroupID := c.svcDiscovery.GetKeyspaceGroupID()
respKeyspaceGroupID, physical, logical, suffixBits, err := stream.processRequests(
c.svcDiscovery.GetClusterID(), c.svcDiscovery.GetKeyspaceID(), c.svcDiscovery.GetKeyspaceGroupID(),
c.svcDiscovery.GetClusterID(), c.svcDiscovery.GetKeyspaceID(), reqKeyspaceGroupID,
dcLocation, requests, tbc.batchStartTime)
if err != nil {
c.finishRequest(requests, 0, 0, 0, err)
return err
}
// `logical` is the largest ts's logical part here, we need to do the subtracting before we finish each TSO request.
firstLogical := tsoutil.AddLogical(logical, -count+1, suffixBits)
c.compareAndSwapTS(dcLocation, respKeyspaceGroupID, physical, firstLogical, suffixBits, count)
curTSOInfo := &tsoInfo{
tsoServer: stream.getServerAddr(),
reqKeyspaceGroupID: reqKeyspaceGroupID,
respKeyspaceGroupID: respKeyspaceGroupID,
respReceivedAt: time.Now(),
physical: physical,
logical: tsoutil.AddLogical(firstLogical, count-1, suffixBits),
}
c.compareAndSwapTS(dcLocation, curTSOInfo, physical, firstLogical)
c.finishRequest(requests, physical, firstLogical, suffixBits, nil)
return nil
}

func (c *tsoClient) compareAndSwapTS(
dcLocation string, respKeyspaceGroupID uint32,
physical, firstLogical int64, suffixBits uint32, count int64,
dcLocation string,
curTSOInfo *tsoInfo,
physical, firstLogical int64,
) {
largestLogical := tsoutil.AddLogical(firstLogical, count-1, suffixBits)
lastTSOInterface, loaded := c.lastTSMap.LoadOrStore(dcLocation, &lastTSO{
keyspaceGroupID: respKeyspaceGroupID,
physical: physical,
// Save the largest logical part here
logical: largestLogical,
})
val, loaded := c.lastTSOInfoMap.LoadOrStore(dcLocation, curTSOInfo)
if !loaded {
return
}
lastTSOPointer := lastTSOInterface.(*lastTSO)
lastKeyspaceGroupID := lastTSOPointer.keyspaceGroupID
lastPhysical := lastTSOPointer.physical
lastLogical := lastTSOPointer.logical

if lastKeyspaceGroupID != respKeyspaceGroupID {
lastTSOInfo := val.(*tsoInfo)
if lastTSOInfo.respKeyspaceGroupID != curTSOInfo.respKeyspaceGroupID {
log.Info("[tso] keyspace group changed",
zap.String("dc-location", dcLocation),
zap.Uint32("old-group-id", lastKeyspaceGroupID),
zap.Uint32("new-group-id", respKeyspaceGroupID))
zap.Uint32("old-group-id", lastTSOInfo.respKeyspaceGroupID),
zap.Uint32("new-group-id", curTSOInfo.respKeyspaceGroupID))
}

// The TSO we get is a range like [largestLogical-count+1, largestLogical], so we save the last TSO's largest logical
// to compare with the new TSO's first logical. For example, if we have a TSO resp with logical 10, count 5, then
// all TSOs we get will be [6, 7, 8, 9, 10].
if tsoutil.TSLessEqual(physical, firstLogical, lastPhysical, lastLogical) {
panic(errors.Errorf(
"%s timestamp fallback, new ts (%d, %d) <= the last one (%d, %d). "+
"last keyspace group: %d, keyspace in request: %d, "+
"keyspace group in request: %d, keyspace group in response: %d",
dcLocation, physical, firstLogical, lastPhysical, lastLogical,
lastKeyspaceGroupID, c.svcDiscovery.GetKeyspaceID(),
c.svcDiscovery.GetKeyspaceGroupID(), respKeyspaceGroupID))
// all TSOs we get will be [6, 7, 8, 9, 10]. lastTSOInfo.logical stores the logical part of the largest ts returned
// last time.
if tsoutil.TSLessEqual(physical, firstLogical, lastTSOInfo.physical, lastTSOInfo.logical) {
log.Panic("[tso] timestamp fallback",
zap.String("dc-location", dcLocation),
zap.Uint32("keyspace", c.svcDiscovery.GetKeyspaceID()),
zap.String("last-ts", fmt.Sprintf("(%d, %d)", lastTSOInfo.physical, lastTSOInfo.logical)),
zap.String("cur-ts", fmt.Sprintf("(%d, %d)", physical, firstLogical)),
zap.String("last-tso-server", lastTSOInfo.tsoServer),
zap.String("cur-tso-server", curTSOInfo.tsoServer),
zap.Uint32("last-keyspace-group-in-request", lastTSOInfo.reqKeyspaceGroupID),
zap.Uint32("cur-keyspace-group-in-request", curTSOInfo.reqKeyspaceGroupID),
zap.Uint32("last-keyspace-group-in-response", lastTSOInfo.respKeyspaceGroupID),
zap.Uint32("cur-keyspace-group-in-response", curTSOInfo.respKeyspaceGroupID),
zap.Time("last-response-received-at", lastTSOInfo.respReceivedAt),
zap.Time("cur-response-received-at", curTSOInfo.respReceivedAt),
)

Check warning on line 773 in client/tso_dispatcher.go

View check run for this annotation

Codecov / codecov/patch

client/tso_dispatcher.go#L760-L773

Added lines #L760 - L773 were not covered by tests
}
lastTSOPointer.keyspaceGroupID = respKeyspaceGroupID
lastTSOPointer.physical = physical
// Same as above, we save the largest logical part here.
lastTSOPointer.logical = largestLogical
lastTSOInfo.tsoServer = curTSOInfo.tsoServer
lastTSOInfo.reqKeyspaceGroupID = curTSOInfo.reqKeyspaceGroupID
lastTSOInfo.respKeyspaceGroupID = curTSOInfo.respKeyspaceGroupID
lastTSOInfo.respReceivedAt = curTSOInfo.respReceivedAt
lastTSOInfo.physical = curTSOInfo.physical
lastTSOInfo.logical = curTSOInfo.logical
}

func (c *tsoClient) finishRequest(requests []*tsoRequest, physical, firstLogical int64, suffixBits uint32, err error) {
Expand Down
29 changes: 21 additions & 8 deletions client/tso_stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,13 +35,13 @@ type tsoStreamBuilderFactory interface {
type pdTSOStreamBuilderFactory struct{}

func (f *pdTSOStreamBuilderFactory) makeBuilder(cc *grpc.ClientConn) tsoStreamBuilder {
return &pdTSOStreamBuilder{client: pdpb.NewPDClient(cc)}
return &pdTSOStreamBuilder{client: pdpb.NewPDClient(cc), serverAddr: cc.Target()}
}

type tsoTSOStreamBuilderFactory struct{}

func (f *tsoTSOStreamBuilderFactory) makeBuilder(cc *grpc.ClientConn) tsoStreamBuilder {
return &tsoTSOStreamBuilder{client: tsopb.NewTSOClient(cc)}
return &tsoTSOStreamBuilder{client: tsopb.NewTSOClient(cc), serverAddr: cc.Target()}
}

// TSO Stream Builder
Expand All @@ -51,7 +51,8 @@ type tsoStreamBuilder interface {
}

type pdTSOStreamBuilder struct {
client pdpb.PDClient
serverAddr string
client pdpb.PDClient
}

func (b *pdTSOStreamBuilder) build(ctx context.Context, cancel context.CancelFunc, timeout time.Duration) (tsoStream, error) {
Expand All @@ -61,13 +62,14 @@ func (b *pdTSOStreamBuilder) build(ctx context.Context, cancel context.CancelFun
stream, err := b.client.Tso(ctx)
done <- struct{}{}
if err == nil {
return &pdTSOStream{stream: stream}, nil
return &pdTSOStream{stream: stream, serverAddr: b.serverAddr}, nil
}
return nil, err
}

type tsoTSOStreamBuilder struct {
client tsopb.TSOClient
serverAddr string
client tsopb.TSOClient
}

func (b *tsoTSOStreamBuilder) build(
Expand All @@ -79,7 +81,7 @@ func (b *tsoTSOStreamBuilder) build(
stream, err := b.client.Tso(ctx)
done <- struct{}{}
if err == nil {
return &tsoTSOStream{stream: stream}, nil
return &tsoTSOStream{stream: stream, serverAddr: b.serverAddr}, nil
}
return nil, err
}
Expand All @@ -98,6 +100,7 @@ func checkStreamTimeout(ctx context.Context, cancel context.CancelFunc, done cha
// TSO Stream

type tsoStream interface {
getServerAddr() string
// processRequests processes TSO requests in streaming mode to get timestamps
processRequests(
clusterID uint64, keyspaceID, keyspaceGroupID uint32, dcLocation string,
Expand All @@ -106,7 +109,12 @@ type tsoStream interface {
}

type pdTSOStream struct {
stream pdpb.PD_TsoClient
serverAddr string
stream pdpb.PD_TsoClient
}

func (s *pdTSOStream) getServerAddr() string {
return s.serverAddr
}

func (s *pdTSOStream) processRequests(
Expand Down Expand Up @@ -155,7 +163,12 @@ func (s *pdTSOStream) processRequests(
}

type tsoTSOStream struct {
stream tsopb.TSO_TsoClient
serverAddr string
stream tsopb.TSO_TsoClient
}

func (s *tsoTSOStream) getServerAddr() string {
return s.serverAddr
}

func (s *tsoTSOStream) processRequests(
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ require (
github.com/pingcap/kvproto v0.0.0-20230530111525-e4919c190b46
github.com/pingcap/log v1.1.1-0.20221110025148-ca232912c9f3
github.com/pingcap/sysutil v1.0.1-0.20230407040306-fb007c5aff21
github.com/pingcap/tidb-dashboard v0.0.0-20230508075335-d6e0218addd5
github.com/pingcap/tidb-dashboard v0.0.0-20230626093106-fcc40851da45
github.com/prometheus/client_golang v1.11.1
github.com/prometheus/common v0.26.0
github.com/sasha-s/go-deadlock v0.2.0
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -431,8 +431,8 @@ github.com/pingcap/log v1.1.1-0.20221110025148-ca232912c9f3 h1:HR/ylkkLmGdSSDaD8
github.com/pingcap/log v1.1.1-0.20221110025148-ca232912c9f3/go.mod h1:DWQW5jICDR7UJh4HtxXSM20Churx4CQL0fwL/SoOSA4=
github.com/pingcap/sysutil v1.0.1-0.20230407040306-fb007c5aff21 h1:QV6jqlfOkh8hqvEAgwBZa+4bSgO0EeKC7s5c6Luam2I=
github.com/pingcap/sysutil v1.0.1-0.20230407040306-fb007c5aff21/go.mod h1:QYnjfA95ZaMefyl1NO8oPtKeb8pYUdnDVhQgf+qdpjM=
github.com/pingcap/tidb-dashboard v0.0.0-20230508075335-d6e0218addd5 h1:adV4kUWI7v+v/6joR7lfjFngHhS4eiqwr4g3dHCjHtA=
github.com/pingcap/tidb-dashboard v0.0.0-20230508075335-d6e0218addd5/go.mod h1:OUzFMMVjR1GKlf4LWLqza9QNKjCrYJ7stVn/3PN0djM=
github.com/pingcap/tidb-dashboard v0.0.0-20230626093106-fcc40851da45 h1:/jqjj+ydlqjp144LAlHDfHtr7eWJyaNIIXX5viv0RZo=
github.com/pingcap/tidb-dashboard v0.0.0-20230626093106-fcc40851da45/go.mod h1:OUzFMMVjR1GKlf4LWLqza9QNKjCrYJ7stVn/3PN0djM=
github.com/pingcap/tipb v0.0.0-20220718022156-3e2483c20a9e h1:FBaTXU8C3xgt/drM58VHxojHo/QoG1oPsgWTGvaSpO4=
github.com/pingcap/tipb v0.0.0-20220718022156-3e2483c20a9e/go.mod h1:A7mrd7WHBl1o63LE2bIBGEJMTNWXqhgmYiOvMLxozfs=
github.com/pkg/diff v0.0.0-20210226163009-20ebb0f2a09e/go.mod h1:pJLUxLENpZxwdsKMEsNbx1VGcRFpLqf3715MtcvvzbA=
Expand Down
12 changes: 6 additions & 6 deletions pkg/keyspace/keyspace.go
Original file line number Diff line number Diff line change
Expand Up @@ -515,9 +515,9 @@ func (manager *Manager) UpdateKeyspaceState(name string, newState keyspacepb.Key
// Changing the state of default keyspace is not allowed.
if name == utils.DefaultKeyspaceName {
log.Warn("[keyspace] failed to update keyspace config",
zap.Error(errModifyDefault),
zap.Error(ErrModifyDefaultKeyspace),
)
return nil, errModifyDefault
return nil, ErrModifyDefaultKeyspace
}
var meta *keyspacepb.KeyspaceMeta
err := manager.store.RunInTxn(manager.ctx, func(txn kv.Txn) error {
Expand Down Expand Up @@ -567,9 +567,9 @@ func (manager *Manager) UpdateKeyspaceStateByID(id uint32, newState keyspacepb.K
// Changing the state of default keyspace is not allowed.
if id == utils.DefaultKeyspaceID {
log.Warn("[keyspace] failed to update keyspace config",
zap.Error(errModifyDefault),
zap.Error(ErrModifyDefaultKeyspace),

Check warning on line 570 in pkg/keyspace/keyspace.go

View check run for this annotation

Codecov / codecov/patch

pkg/keyspace/keyspace.go#L570

Added line #L570 was not covered by tests
)
return nil, errModifyDefault
return nil, ErrModifyDefaultKeyspace

Check warning on line 572 in pkg/keyspace/keyspace.go

View check run for this annotation

Codecov / codecov/patch

pkg/keyspace/keyspace.go#L572

Added line #L572 was not covered by tests
}
var meta *keyspacepb.KeyspaceMeta
var err error
Expand Down Expand Up @@ -702,10 +702,10 @@ func (manager *Manager) PatrolKeyspaceAssignment(startKeyspaceID, endKeyspaceID
return errors.Errorf("default keyspace group %d not found", utils.DefaultKeyspaceGroupID)
}
if defaultKeyspaceGroup.IsSplitting() {
return ErrKeyspaceGroupInSplit
return ErrKeyspaceGroupInSplit(utils.DefaultKeyspaceGroupID)

Check warning on line 705 in pkg/keyspace/keyspace.go

View check run for this annotation

Codecov / codecov/patch

pkg/keyspace/keyspace.go#L705

Added line #L705 was not covered by tests
}
if defaultKeyspaceGroup.IsMerging() {
return ErrKeyspaceGroupInMerging
return ErrKeyspaceGroupInMerging(utils.DefaultKeyspaceGroupID)

Check warning on line 708 in pkg/keyspace/keyspace.go

View check run for this annotation

Codecov / codecov/patch

pkg/keyspace/keyspace.go#L708

Added line #L708 was not covered by tests
}
keyspaces, err := manager.store.LoadRangeKeyspace(txn, manager.nextPatrolStartID, maxEtcdTxnOps)
if err != nil {
Expand Down
Loading

0 comments on commit 1414baf

Please sign in to comment.