Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

tso: add more debugging info to time fallback log. #6700

Merged
merged 3 commits into from
Jun 28, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions client/tso_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,8 +83,8 @@ type tsoClient struct {
tsoDispatcher sync.Map // Same as map[string]chan *tsoRequest
// dc-location -> deadline
tsDeadline sync.Map // Same as map[string]chan deadline
// dc-location -> *lastTSO
lastTSMap sync.Map // Same as map[string]*lastTSO
// dc-location -> *tsoInfo while the tsoInfo is the last TSO info
lastTSOInfoMap sync.Map // Same as map[string]*tsoInfo

checkTSDeadlineCh chan struct{}
checkTSODispatcherCh chan struct{}
Expand Down
85 changes: 49 additions & 36 deletions client/tso_dispatcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,10 +40,13 @@
tsoBatchController *tsoBatchController
}

type lastTSO struct {
keyspaceGroupID uint32
physical int64
logical int64
type tsoInfo struct {
tsoServer string
reqKeyspaceGroupID uint32
respKeyspaceGroupID uint32
respReceivedAt time.Time
physical int64
logical int64
}

const (
Expand Down Expand Up @@ -709,62 +712,72 @@

requests := tbc.getCollectedRequests()
count := int64(len(requests))
reqKeyspaceGroupID := c.svcDiscovery.GetKeyspaceGroupID()
respKeyspaceGroupID, physical, logical, suffixBits, err := stream.processRequests(
c.svcDiscovery.GetClusterID(), c.svcDiscovery.GetKeyspaceID(), c.svcDiscovery.GetKeyspaceGroupID(),
c.svcDiscovery.GetClusterID(), c.svcDiscovery.GetKeyspaceID(), reqKeyspaceGroupID,
dcLocation, requests, tbc.batchStartTime)
if err != nil {
c.finishRequest(requests, 0, 0, 0, err)
return err
}
// `logical` is the largest ts's logical part here, we need to do the subtracting before we finish each TSO request.
firstLogical := tsoutil.AddLogical(logical, -count+1, suffixBits)
c.compareAndSwapTS(dcLocation, respKeyspaceGroupID, physical, firstLogical, suffixBits, count)
curTSOInfo := &tsoInfo{
tsoServer: stream.getServerAddr(),
reqKeyspaceGroupID: reqKeyspaceGroupID,
respKeyspaceGroupID: respKeyspaceGroupID,
respReceivedAt: time.Now(),
physical: physical,
logical: tsoutil.AddLogical(firstLogical, count-1, suffixBits),
}
c.compareAndSwapTS(dcLocation, curTSOInfo, physical, firstLogical)
c.finishRequest(requests, physical, firstLogical, suffixBits, nil)
return nil
}

func (c *tsoClient) compareAndSwapTS(
dcLocation string, respKeyspaceGroupID uint32,
physical, firstLogical int64, suffixBits uint32, count int64,
dcLocation string,
curTSOInfo *tsoInfo,
physical, firstLogical int64,
) {
largestLogical := tsoutil.AddLogical(firstLogical, count-1, suffixBits)
lastTSOInterface, loaded := c.lastTSMap.LoadOrStore(dcLocation, &lastTSO{
keyspaceGroupID: respKeyspaceGroupID,
physical: physical,
// Save the largest logical part here
logical: largestLogical,
})
val, loaded := c.lastTSOInfoMap.LoadOrStore(dcLocation, curTSOInfo)
if !loaded {
return
}
lastTSOPointer := lastTSOInterface.(*lastTSO)
lastKeyspaceGroupID := lastTSOPointer.keyspaceGroupID
lastPhysical := lastTSOPointer.physical
lastLogical := lastTSOPointer.logical

if lastKeyspaceGroupID != respKeyspaceGroupID {
lastTSOInfo := val.(*tsoInfo)
if lastTSOInfo.respKeyspaceGroupID != curTSOInfo.respKeyspaceGroupID {
log.Info("[tso] keyspace group changed",
zap.String("dc-location", dcLocation),
zap.Uint32("old-group-id", lastKeyspaceGroupID),
zap.Uint32("new-group-id", respKeyspaceGroupID))
zap.Uint32("old-group-id", lastTSOInfo.respKeyspaceGroupID),
zap.Uint32("new-group-id", curTSOInfo.respKeyspaceGroupID))
}

// The TSO we get is a range like [largestLogical-count+1, largestLogical], so we save the last TSO's largest logical
// to compare with the new TSO's first logical. For example, if we have a TSO resp with logical 10, count 5, then
// all TSOs we get will be [6, 7, 8, 9, 10].
if tsoutil.TSLessEqual(physical, firstLogical, lastPhysical, lastLogical) {
panic(errors.Errorf(
"%s timestamp fallback, new ts (%d, %d) <= the last one (%d, %d). "+
"last keyspace group: %d, keyspace in request: %d, "+
"keyspace group in request: %d, keyspace group in response: %d",
dcLocation, physical, firstLogical, lastPhysical, lastLogical,
lastKeyspaceGroupID, c.svcDiscovery.GetKeyspaceID(),
c.svcDiscovery.GetKeyspaceGroupID(), respKeyspaceGroupID))
// all TSOs we get will be [6, 7, 8, 9, 10]. lastTSOInfo.logical stores the logical part of the largest ts returned
// last time.
if tsoutil.TSLessEqual(physical, firstLogical, lastTSOInfo.physical, lastTSOInfo.logical) {
log.Panic("[tso] timestamp fallback",
zap.String("dc-location", dcLocation),
zap.Uint32("keyspace", c.svcDiscovery.GetKeyspaceID()),
zap.String("last-ts", fmt.Sprintf("(%d, %d)", lastTSOInfo.physical, lastTSOInfo.logical)),
zap.String("cur-ts", fmt.Sprintf("(%d, %d)", physical, firstLogical)),
zap.String("last-tso-server", lastTSOInfo.tsoServer),
zap.String("cur-tso-server", curTSOInfo.tsoServer),
zap.Uint32("last-keyspace-group-in-request", lastTSOInfo.reqKeyspaceGroupID),
zap.Uint32("cur-keyspace-group-in-request", curTSOInfo.reqKeyspaceGroupID),
zap.Uint32("last-keyspace-group-in-response", lastTSOInfo.respKeyspaceGroupID),
zap.Uint32("cur-keyspace-group-in-response", curTSOInfo.respKeyspaceGroupID),
zap.Time("last-response-received-at", lastTSOInfo.respReceivedAt),
zap.Time("cur-response-received-at", curTSOInfo.respReceivedAt),
)

Check warning on line 773 in client/tso_dispatcher.go

View check run for this annotation

Codecov / codecov/patch

client/tso_dispatcher.go#L760-L773

Added lines #L760 - L773 were not covered by tests
}
lastTSOPointer.keyspaceGroupID = respKeyspaceGroupID
lastTSOPointer.physical = physical
// Same as above, we save the largest logical part here.
lastTSOPointer.logical = largestLogical
lastTSOInfo.tsoServer = curTSOInfo.tsoServer
lastTSOInfo.reqKeyspaceGroupID = curTSOInfo.reqKeyspaceGroupID
lastTSOInfo.respKeyspaceGroupID = curTSOInfo.respKeyspaceGroupID
lastTSOInfo.respReceivedAt = curTSOInfo.respReceivedAt
lastTSOInfo.physical = curTSOInfo.physical
lastTSOInfo.logical = curTSOInfo.logical
}

func (c *tsoClient) finishRequest(requests []*tsoRequest, physical, firstLogical int64, suffixBits uint32, err error) {
Expand Down
29 changes: 21 additions & 8 deletions client/tso_stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,13 +35,13 @@ type tsoStreamBuilderFactory interface {
type pdTSOStreamBuilderFactory struct{}

func (f *pdTSOStreamBuilderFactory) makeBuilder(cc *grpc.ClientConn) tsoStreamBuilder {
return &pdTSOStreamBuilder{client: pdpb.NewPDClient(cc)}
return &pdTSOStreamBuilder{client: pdpb.NewPDClient(cc), serverAddr: cc.Target()}
}

type tsoTSOStreamBuilderFactory struct{}

func (f *tsoTSOStreamBuilderFactory) makeBuilder(cc *grpc.ClientConn) tsoStreamBuilder {
return &tsoTSOStreamBuilder{client: tsopb.NewTSOClient(cc)}
return &tsoTSOStreamBuilder{client: tsopb.NewTSOClient(cc), serverAddr: cc.Target()}
}

// TSO Stream Builder
Expand All @@ -51,7 +51,8 @@ type tsoStreamBuilder interface {
}

type pdTSOStreamBuilder struct {
client pdpb.PDClient
serverAddr string
client pdpb.PDClient
}

func (b *pdTSOStreamBuilder) build(ctx context.Context, cancel context.CancelFunc, timeout time.Duration) (tsoStream, error) {
Expand All @@ -61,13 +62,14 @@ func (b *pdTSOStreamBuilder) build(ctx context.Context, cancel context.CancelFun
stream, err := b.client.Tso(ctx)
done <- struct{}{}
if err == nil {
return &pdTSOStream{stream: stream}, nil
return &pdTSOStream{stream: stream, serverAddr: b.serverAddr}, nil
}
return nil, err
}

type tsoTSOStreamBuilder struct {
client tsopb.TSOClient
serverAddr string
client tsopb.TSOClient
}

func (b *tsoTSOStreamBuilder) build(
Expand All @@ -79,7 +81,7 @@ func (b *tsoTSOStreamBuilder) build(
stream, err := b.client.Tso(ctx)
done <- struct{}{}
if err == nil {
return &tsoTSOStream{stream: stream}, nil
return &tsoTSOStream{stream: stream, serverAddr: b.serverAddr}, nil
}
return nil, err
}
Expand All @@ -98,6 +100,7 @@ func checkStreamTimeout(ctx context.Context, cancel context.CancelFunc, done cha
// TSO Stream

type tsoStream interface {
getServerAddr() string
// processRequests processes TSO requests in streaming mode to get timestamps
processRequests(
clusterID uint64, keyspaceID, keyspaceGroupID uint32, dcLocation string,
Expand All @@ -106,7 +109,12 @@ type tsoStream interface {
}

type pdTSOStream struct {
stream pdpb.PD_TsoClient
serverAddr string
stream pdpb.PD_TsoClient
}

func (s *pdTSOStream) getServerAddr() string {
return s.serverAddr
}

func (s *pdTSOStream) processRequests(
Expand Down Expand Up @@ -155,7 +163,12 @@ func (s *pdTSOStream) processRequests(
}

type tsoTSOStream struct {
stream tsopb.TSO_TsoClient
serverAddr string
stream tsopb.TSO_TsoClient
}

func (s *tsoTSOStream) getServerAddr() string {
return s.serverAddr
}

func (s *tsoTSOStream) processRequests(
Expand Down