Skip to content

Commit

Permalink
This is an automated cherry-pick of tikv#5551
Browse files Browse the repository at this point in the history
close tikv#5207

Signed-off-by: ti-chi-bot <ti-community-prow-bot@tidb.io>
  • Loading branch information
CabinfeverB authored and ti-chi-bot committed Oct 10, 2022
1 parent d6d91a6 commit 5eef731
Show file tree
Hide file tree
Showing 3 changed files with 321 additions and 0 deletions.
151 changes: 151 additions & 0 deletions client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -167,6 +167,7 @@ type lastTSO struct {
}

const (
<<<<<<< HEAD
defaultPDTimeout = 3 * time.Second
dialTimeout = 3 * time.Second
updateMemberTimeout = time.Second // Use a shorter timeout to recover faster from network isolation.
Expand All @@ -175,6 +176,14 @@ const (
maxInitClusterRetries = 100
retryInterval = 1 * time.Second
maxRetryTimes = 5
=======
dialTimeout = 3 * time.Second
updateMemberTimeout = time.Second // Use a shorter timeout to recover faster from network isolation.
tsLoopDCCheckInterval = time.Minute
defaultMaxTSOBatchSize = 10000 // should be higher if client is sending requests in burst
retryInterval = 500 * time.Millisecond
maxRetryTimes = 6
>>>>>>> d50e5fe43 (client: fix Stream timeout logic (#5551))
)

// LeaderHealthCheckInterval might be chagned in the unit to shorten the testing time.
Expand Down Expand Up @@ -496,6 +505,7 @@ type streamCh chan struct {

func (c *client) handleDispatcher(dispatcherCtx context.Context, dc string, tsoDispatcher chan *tsoRequest) {
var (
<<<<<<< HEAD
err error
cancel context.CancelFunc
stream pdpb.PD_TsoClient
Expand All @@ -505,6 +515,16 @@ func (c *client) handleDispatcher(dispatcherCtx context.Context, dc string, tsoD
streamCh streamCh
changedCh chan bool
connectionCtx connectionContext
=======
err error
streamAddr string
stream pdpb.PD_TsoClient
streamCtx context.Context
cancel context.CancelFunc
// addr -> connectionContext
connectionCtxs sync.Map
opts []opentracing.StartSpanOption
>>>>>>> d50e5fe43 (client: fix Stream timeout logic (#5551))
)
defer func() {
log.Info("[pd] exit tso dispatcher", zap.String("dc-location", dc))
Expand Down Expand Up @@ -536,7 +556,69 @@ func (c *client) handleDispatcher(dispatcherCtx context.Context, dc string, tsoD
select {
case <-dispatcherCtx.Done():
return
<<<<<<< HEAD
default:
=======
case <-c.option.enableTSOFollowerProxyCh:
enableTSOFollowerProxy := c.option.getEnableTSOFollowerProxy()
if enableTSOFollowerProxy && updateTicker.C == nil {
// Because the TSO Follower Proxy is enabled,
// the periodic check needs to be performed.
setNewUpdateTicker(time.NewTicker(memberUpdateInterval))
} else if !enableTSOFollowerProxy && updateTicker.C != nil {
// Because the TSO Follower Proxy is disabled,
// the periodic check needs to be turned off.
setNewUpdateTicker(&time.Ticker{})
} else {
// The status of TSO Follower Proxy does not change, and updateConnectionCtxs is not triggered
continue
}
case <-updateTicker.C:
case <-c.updateConnectionCtxsCh:
}
c.updateConnectionCtxs(dispatcherCtx, dc, &connectionCtxs)
}
}()
}

// Loop through each batch of TSO requests and send them for processing.
streamLoopTimer := time.NewTimer(c.option.timeout)
tsoBatchLoop:
for {
select {
case <-dispatcherCtx.Done():
return
default:
}
// Start to collect the TSO requests.
maxBatchWaitInterval := c.option.getMaxTSOBatchWaitInterval()
if err = tbc.fetchPendingRequests(dispatcherCtx, maxBatchWaitInterval); err != nil {
if err == context.Canceled {
log.Info("[pd] stop fetching the pending tso requests due to context canceled",
zap.String("dc-location", dc))
} else {
log.Error("[pd] fetch pending tso requests error",
zap.String("dc-location", dc), errs.ZapError(errs.ErrClientGetTSO, err))
}
return
}
if maxBatchWaitInterval >= 0 {
tbc.adjustBestBatchSize()
}
streamLoopTimer.Reset(c.option.timeout)
// Choose a stream to send the TSO gRPC request.
streamChoosingLoop:
for {
connectionCtx := c.chooseStream(&connectionCtxs)
if connectionCtx != nil {
streamAddr, stream, streamCtx, cancel = connectionCtx.streamAddr, connectionCtx.stream, connectionCtx.ctx, connectionCtx.cancel
}
// Check stream and retry if necessary.
if stream == nil {
log.Info("[pd] tso stream is not ready", zap.String("dc", dc))
if c.updateConnectionCtxs(dispatcherCtx, dc, &connectionCtxs) {
continue streamChoosingLoop
>>>>>>> d50e5fe43 (client: fix Stream timeout logic (#5551))
}
log.Error("[pd] create tso stream error", zap.String("dc-location", dc), errs.ZapError(errs.ErrClientCreateTSOStream, err))
c.ScheduleCheckLeader()
Expand All @@ -545,7 +627,27 @@ func (c *client) handleDispatcher(dispatcherCtx context.Context, dc string, tsoD
case <-time.After(time.Second):
case <-dispatcherCtx.Done():
return
<<<<<<< HEAD
}
=======
case <-streamLoopTimer.C:
err = errs.ErrClientCreateTSOStream.FastGenByArgs(errs.RetryTimeoutErr)
log.Error("[pd] create tso stream error", zap.String("dc-location", dc), errs.ZapError(err))
c.ScheduleCheckLeader()
c.finishTSORequest(tbc.getCollectedRequests(), 0, 0, 0, errors.WithStack(err))
continue tsoBatchLoop
case <-time.After(retryInterval):
continue streamChoosingLoop
}
}
select {
case <-streamCtx.Done():
log.Info("[pd] tso stream is canceled", zap.String("dc", dc), zap.String("stream-addr", streamAddr))
// Set `stream` to nil and remove this stream from the `connectionCtxs` due to being canceled.
connectionCtxs.Delete(streamAddr)
cancel()
stream = nil
>>>>>>> d50e5fe43 (client: fix Stream timeout logic (#5551))
continue
}
}
Expand Down Expand Up @@ -614,16 +716,65 @@ type connectionContext struct {
changeCh chan bool
}

<<<<<<< HEAD
func (c *client) tryConnect(dispatcherCtx context.Context, dc string) (connectionContext, error) {
=======
func (c *client) updateConnectionCtxs(updaterCtx context.Context, dc string, connectionCtxs *sync.Map) bool {
// Normal connection creating, it will be affected by the `enableForwarding`.
createTSOConnection := c.tryConnect
if c.allowTSOFollowerProxy(dc) {
createTSOConnection = c.tryConnectWithProxy
}
if err := createTSOConnection(updaterCtx, dc, connectionCtxs); err != nil {
log.Error("[pd] update connection contexts failed", zap.String("dc", dc), errs.ZapError(err))
return false
}
return true
}

// tryConnect will try to connect to the TSO allocator leader. If the connection becomes unreachable
// and enableForwarding is true, it will create a new connection to a follower to do the forwarding,
// while a new daemon will be created also to switch back to a normal leader connection ASAP the
// connection comes back to normal.
func (c *client) tryConnect(
dispatcherCtx context.Context,
dc string,
connectionCtxs *sync.Map,
) error {
>>>>>>> d50e5fe43 (client: fix Stream timeout logic (#5551))
var (
url string
networkErrNum uint64
err error
cc *grpc.ClientConn
stream pdpb.PD_TsoClient
url string
cc *grpc.ClientConn
)
<<<<<<< HEAD
=======
updateAndClear := func(newAddr string, connectionCtx *connectionContext) {
if cc, loaded := connectionCtxs.LoadOrStore(newAddr, connectionCtx); loaded {
// If the previous connection still exists, we should close it first.
cc.(*connectionContext).cancel()
connectionCtxs.Store(newAddr, connectionCtx)
}
connectionCtxs.Range(func(addr, cc interface{}) bool {
if addr.(string) != newAddr {
cc.(*connectionContext).cancel()
connectionCtxs.Delete(addr)
}
return true
})
}
>>>>>>> d50e5fe43 (client: fix Stream timeout logic (#5551))
// retry several times before falling back to the follower when the network problem happens

for i := 0; i < maxRetryTimes; i++ {
<<<<<<< HEAD
=======
c.ScheduleCheckLeader()
>>>>>>> d50e5fe43 (client: fix Stream timeout logic (#5551))
cc, url = c.getAllocatorClientConnByDCLocation(dc)
cctx, cancel := context.WithCancel(dispatcherCtx)
stream, err = c.createTsoStream(cctx, cancel, pdpb.NewPDClient(cc))
Expand Down
63 changes: 63 additions & 0 deletions client/errs/errno.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
// Copyright 2022 TiKV Project Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package errs

import "github.com/pingcap/errors"

const (
// NotLeaderErr indicates the the non-leader member received the requests which should be received by leader.
NotLeaderErr = "is not leader"
// MismatchLeaderErr indicates the the non-leader member received the requests which should be received by leader.
MismatchLeaderErr = "mismatch leader id"
RetryTimeoutErr = "retry timeout"
)

// client errors
var (
ErrClientCreateTSOStream = errors.Normalize("create TSO stream failed, %s", errors.RFCCodeText("PD:client:ErrClientCreateTSOStream"))
ErrClientGetTSOTimeout = errors.Normalize("get TSO timeout", errors.RFCCodeText("PD:client:ErrClientGetTSOTimeout"))
ErrClientGetTSO = errors.Normalize("get TSO failed, %v", errors.RFCCodeText("PD:client:ErrClientGetTSO"))
ErrClientGetLeader = errors.Normalize("get leader from %v error", errors.RFCCodeText("PD:client:ErrClientGetLeader"))
ErrClientGetMember = errors.Normalize("get member failed", errors.RFCCodeText("PD:client:ErrClientGetMember"))
ErrClientUpdateMember = errors.Normalize("update member failed, %v", errors.RFCCodeText("PD:client:ErrUpdateMember"))
)

// grpcutil errors
var (
ErrSecurityConfig = errors.Normalize("security config error: %s", errors.RFCCodeText("PD:grpcutil:ErrSecurityConfig"))
)

// The third-party project error.
// url errors
var (
ErrURLParse = errors.Normalize("parse url error", errors.RFCCodeText("PD:url:ErrURLParse"))
)

// grpc errors
var (
ErrGRPCDial = errors.Normalize("dial error", errors.RFCCodeText("PD:grpc:ErrGRPCDial"))
ErrCloseGRPCConn = errors.Normalize("close gRPC connection failed", errors.RFCCodeText("PD:grpc:ErrCloseGRPCConn"))
)

// etcd errors
var (
ErrEtcdTLSConfig = errors.Normalize("etcd TLS config error", errors.RFCCodeText("PD:etcd:ErrEtcdTLSConfig"))
)

// crypto
var (
ErrCryptoX509KeyPair = errors.Normalize("x509 keypair error", errors.RFCCodeText("PD:crypto:ErrCryptoX509KeyPair"))
ErrCryptoAppendCertsFromPEM = errors.Normalize("cert pool append certs error", errors.RFCCodeText("PD:crypto:ErrCryptoAppendCertsFromPEM"))
)
107 changes: 107 additions & 0 deletions tests/client/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -282,7 +282,114 @@ func (s *clientTestSuite) TestTSOAllocatorLeader(c *C) {
}
}

<<<<<<< HEAD
func (s *clientTestSuite) TestGlobalAndLocalTSO(c *C) {
=======
func TestTSOFollowerProxy(t *testing.T) {
re := require.New(t)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
cluster, err := tests.NewTestCluster(ctx, 3)
re.NoError(err)
defer cluster.Destroy()

endpoints := runServer(re, cluster)
cli1 := setupCli(re, ctx, endpoints)
cli2 := setupCli(re, ctx, endpoints)
cli2.UpdateOption(pd.EnableTSOFollowerProxy, true)

var wg sync.WaitGroup
wg.Add(tsoRequestConcurrencyNumber)
for i := 0; i < tsoRequestConcurrencyNumber; i++ {
go func() {
defer wg.Done()
var lastTS uint64
for i := 0; i < tsoRequestRound; i++ {
physical, logical, err := cli2.GetTS(context.Background())
re.NoError(err)
ts := tsoutil.ComposeTS(physical, logical)
re.Less(lastTS, ts)
lastTS = ts
// After requesting with the follower proxy, request with the leader directly.
physical, logical, err = cli1.GetTS(context.Background())
re.NoError(err)
ts = tsoutil.ComposeTS(physical, logical)
re.Less(lastTS, ts)
lastTS = ts
}
}()
}
wg.Wait()
}

// TestUnavailableTimeAfterLeaderIsReady is used to test https://github.com/tikv/pd/issues/5207
func TestUnavailableTimeAfterLeaderIsReady(t *testing.T) {
re := require.New(t)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
cluster, err := tests.NewTestCluster(ctx, 3)
re.NoError(err)
defer cluster.Destroy()

endpoints := runServer(re, cluster)
cli := setupCli(re, ctx, endpoints)

var wg sync.WaitGroup
var maxUnavailableTime, leaderReadyTime time.Time
getTsoFunc := func() {
defer wg.Done()
var lastTS uint64
for i := 0; i < tsoRequestRound; i++ {
var physical, logical int64
var ts uint64
physical, logical, err = cli.GetTS(context.Background())
ts = tsoutil.ComposeTS(physical, logical)
if err != nil {
maxUnavailableTime = time.Now()
continue
}
re.NoError(err)
re.Less(lastTS, ts)
lastTS = ts
}
}

// test resign pd leader or stop pd leader
wg.Add(1 + 1)
go getTsoFunc()
go func() {
defer wg.Done()
leader := cluster.GetServer(cluster.GetLeader())
leader.Stop()
cluster.WaitLeader()
leaderReadyTime = time.Now()
cluster.RunServers([]*tests.TestServer{leader})
}()
wg.Wait()
re.Less(maxUnavailableTime.UnixMilli(), leaderReadyTime.Add(1*time.Second).UnixMilli())

// test kill pd leader pod or network of leader is unreachable
wg.Add(1 + 1)
maxUnavailableTime, leaderReadyTime = time.Time{}, time.Time{}
go getTsoFunc()
go func() {
defer wg.Done()
leader := cluster.GetServer(cluster.GetLeader())
re.NoError(failpoint.Enable("github.com/tikv/pd/client/unreachableNetwork", "return(true)"))
leader.Stop()
cluster.WaitLeader()
re.NoError(failpoint.Disable("github.com/tikv/pd/client/unreachableNetwork"))
leaderReadyTime = time.Now()
}()
wg.Wait()
re.Less(maxUnavailableTime.UnixMilli(), leaderReadyTime.Add(1*time.Second).UnixMilli())
}

func TestGlobalAndLocalTSO(t *testing.T) {
re := require.New(t)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
>>>>>>> d50e5fe43 (client: fix Stream timeout logic (#5551))
dcLocationConfig := map[string]string{
"pd1": "dc-1",
"pd2": "dc-2",
Expand Down

0 comments on commit 5eef731

Please sign in to comment.