Skip to content

Commit

Permalink
timer
Browse files Browse the repository at this point in the history
Signed-off-by: lhy1024 <admin@liudos.us>
  • Loading branch information
lhy1024 committed Jun 29, 2023
1 parent 3b59a78 commit 1e2828f
Show file tree
Hide file tree
Showing 16 changed files with 129 additions and 41 deletions.
4 changes: 3 additions & 1 deletion client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -526,14 +526,16 @@ func newClientWithKeyspaceName(

func (c *client) initRetry(f func(s string) error, str string) error {
var err error
ticker := time.NewTicker(time.Second)
defer ticker.Stop()
for i := 0; i < c.option.maxRetryTimes; i++ {
if err = f(str); err == nil {
return nil
}
select {
case <-c.ctx.Done():
return err
case <-time.After(time.Second):
case <-ticker.C:

Check warning on line 538 in client/client.go

View check run for this annotation

Codecov / codecov/patch

client/client.go#L538

Added line #L538 was not covered by tests
}
}
return errors.WithStack(err)
Expand Down
4 changes: 3 additions & 1 deletion client/pd_service_discovery.go
Original file line number Diff line number Diff line change
Expand Up @@ -206,14 +206,16 @@ func (c *pdServiceDiscovery) Init() error {

func (c *pdServiceDiscovery) initRetry(f func() error) error {
var err error
ticker := time.NewTicker(time.Second)
defer ticker.Stop()
for i := 0; i < c.option.maxRetryTimes; i++ {
if err = f(); err == nil {
return nil
}
select {
case <-c.ctx.Done():
return err
case <-time.After(time.Second):
case <-ticker.C:
}
}
return errors.WithStack(err)
Expand Down
4 changes: 3 additions & 1 deletion client/resource_manager_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -389,6 +389,8 @@ func (c *client) tryResourceManagerConnect(ctx context.Context, connection *reso
err error
stream rmpb.ResourceManager_AcquireTokenBucketsClient
)
ticker := time.NewTicker(retryInterval)
defer ticker.Stop()
for i := 0; i < maxRetryTimes; i++ {
cc, err := c.resourceManagerClient()
if err != nil {
Expand All @@ -406,7 +408,7 @@ func (c *client) tryResourceManagerConnect(ctx context.Context, connection *reso
select {
case <-ctx.Done():
return err
case <-time.After(retryInterval):
case <-ticker.C:

Check warning on line 411 in client/resource_manager_client.go

View check run for this annotation

Codecov / codecov/patch

client/resource_manager_client.go#L411

Added line #L411 was not covered by tests
}
}
return err
Expand Down
58 changes: 44 additions & 14 deletions client/tso_dispatcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -138,12 +138,34 @@ func (c *tsoClient) updateTSODispatcher() {
})
}

type deadline struct {
timer <-chan time.Time
var timerPool = sync.Pool{
New: func() interface{} {
return time.NewTimer(defaultPDTimeout)
},
}

// TSDeadline is used to watch the deadline of each tso request.
type TSDeadline struct {
timer *time.Timer
done chan struct{}
cancel context.CancelFunc
}

// NewTSDeadline creates a new TSDeadline.
func NewTSDeadline(
timeout time.Duration,
done chan struct{},
cancel context.CancelFunc,
) *TSDeadline {
timer := timerPool.Get().(*time.Timer)
timer.Reset(timeout)
return &TSDeadline{
timer: timer,
done: done,
cancel: cancel,
}
}

func (c *tsoClient) tsCancelLoop() {
defer c.wg.Done()

Expand Down Expand Up @@ -172,17 +194,21 @@ func (c *tsoClient) tsCancelLoop() {

func (c *tsoClient) watchTSDeadline(ctx context.Context, dcLocation string) {
if _, exist := c.tsDeadline.Load(dcLocation); !exist {
tsDeadlineCh := make(chan deadline, 1)
tsDeadlineCh := make(chan *TSDeadline, 1)
c.tsDeadline.Store(dcLocation, tsDeadlineCh)
go func(dc string, tsDeadlineCh <-chan deadline) {
go func(dc string, tsDeadlineCh <-chan *TSDeadline) {
for {
select {
case d := <-tsDeadlineCh:
select {
case <-d.timer:
case <-d.timer.C:
log.Error("[tso] tso request is canceled due to timeout", zap.String("dc-location", dc), errs.ZapError(errs.ErrClientGetTSOTimeout))
d.cancel()
timerPool.Put(d.timer) // it's safe to put the timer back to the pool
continue
case <-d.done:
d.timer.Stop() // not recieved from timer.C, so we need to stop the timer
timerPool.Put(d.timer)
continue
case <-ctx.Done():
return
Expand Down Expand Up @@ -234,6 +260,8 @@ func (c *tsoClient) checkAllocator(
}()
cc, u := c.GetTSOAllocatorClientConnByDCLocation(dc)
healthCli := healthpb.NewHealthClient(cc)
ticker := time.NewTicker(time.Second)
defer ticker.Stop()
for {
// the pd/allocator leader change, we need to re-establish the stream
if u != url {
Expand All @@ -259,7 +287,7 @@ func (c *tsoClient) checkAllocator(
select {
case <-dispatcherCtx.Done():
return
case <-time.After(time.Second):
case <-ticker.C:
// To ensure we can get the latest allocator leader
// and once the leader is changed, we can exit this function.
_, u = c.GetTSOAllocatorClientConnByDCLocation(dc)
Expand Down Expand Up @@ -403,16 +431,20 @@ tsoBatchLoop:
if c.updateTSOConnectionCtxs(dispatcherCtx, dc, &connectionCtxs) {
continue streamChoosingLoop
}
timer := time.NewTimer(retryInterval)

Check warning on line 434 in client/tso_dispatcher.go

View check run for this annotation

Codecov / codecov/patch

client/tso_dispatcher.go#L434

Added line #L434 was not covered by tests
select {
case <-dispatcherCtx.Done():
timer.Stop()

Check warning on line 437 in client/tso_dispatcher.go

View check run for this annotation

Codecov / codecov/patch

client/tso_dispatcher.go#L437

Added line #L437 was not covered by tests
return
case <-streamLoopTimer.C:
err = errs.ErrClientCreateTSOStream.FastGenByArgs(errs.RetryTimeoutErr)
log.Error("[tso] create tso stream error", zap.String("dc-location", dc), errs.ZapError(err))
c.svcDiscovery.ScheduleCheckMemberChanged()
c.finishRequest(tbc.getCollectedRequests(), 0, 0, 0, errors.WithStack(err))
timer.Stop()

Check warning on line 444 in client/tso_dispatcher.go

View check run for this annotation

Codecov / codecov/patch

client/tso_dispatcher.go#L444

Added line #L444 was not covered by tests
continue tsoBatchLoop
case <-time.After(retryInterval):
case <-timer.C:
timer.Stop()

Check warning on line 447 in client/tso_dispatcher.go

View check run for this annotation

Codecov / codecov/patch

client/tso_dispatcher.go#L446-L447

Added lines #L446 - L447 were not covered by tests
continue streamChoosingLoop
}
}
Expand All @@ -429,11 +461,7 @@ tsoBatchLoop:
}
}
done := make(chan struct{})
dl := deadline{
timer: time.After(c.option.timeout),
done: done,
cancel: cancel,
}
dl := NewTSDeadline(c.option.timeout, done, cancel)
tsDeadlineCh, ok := c.tsDeadline.Load(dc)
for !ok || tsDeadlineCh == nil {
c.scheduleCheckTSDeadline()
Expand All @@ -443,7 +471,7 @@ tsoBatchLoop:
select {
case <-dispatcherCtx.Done():
return
case tsDeadlineCh.(chan deadline) <- dl:
case tsDeadlineCh.(chan *TSDeadline) <- dl:
}
opts = extractSpanReference(tbc, opts[:0])
err = c.processRequests(stream, dc, tbc, opts)
Expand Down Expand Up @@ -558,6 +586,8 @@ func (c *tsoClient) tryConnectToTSO(
}
// retry several times before falling back to the follower when the network problem happens

ticker := time.NewTicker(retryInterval)
defer ticker.Stop()
for i := 0; i < maxRetryTimes; i++ {
c.svcDiscovery.ScheduleCheckMemberChanged()
cc, url = c.GetTSOAllocatorClientConnByDCLocation(dc)
Expand Down Expand Up @@ -587,7 +617,7 @@ func (c *tsoClient) tryConnectToTSO(
select {
case <-dispatcherCtx.Done():
return err
case <-time.After(retryInterval):
case <-ticker.C:
}
}

Expand Down
8 changes: 6 additions & 2 deletions client/tso_service_discovery.go
Original file line number Diff line number Diff line change
Expand Up @@ -209,14 +209,16 @@ func (c *tsoServiceDiscovery) retry(
maxRetryTimes int, retryInterval time.Duration, f func() error,
) error {
var err error
ticker := time.NewTicker(retryInterval)
defer ticker.Stop()
for i := 0; i < maxRetryTimes; i++ {
if err = f(); err == nil {
return nil
}
select {
case <-c.ctx.Done():
return err
case <-time.After(retryInterval):
case <-ticker.C:
}
}
return errors.WithStack(err)
Expand Down Expand Up @@ -245,11 +247,13 @@ func (c *tsoServiceDiscovery) startCheckMemberLoop() {

ctx, cancel := context.WithCancel(c.ctx)
defer cancel()
ticker := time.NewTicker(memberUpdateInterval)
defer ticker.Stop()

for {
select {
case <-c.checkMembershipCh:
case <-time.After(memberUpdateInterval):
case <-ticker.C:
case <-ctx.Done():
log.Info("[tso] exit check member loop")
return
Expand Down
4 changes: 3 additions & 1 deletion client/tso_stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,10 +87,12 @@ func (b *tsoTSOStreamBuilder) build(
}

func checkStreamTimeout(ctx context.Context, cancel context.CancelFunc, done chan struct{}, timeout time.Duration) {
timer := time.NewTimer(timeout)
defer timer.Stop()
select {
case <-done:
return
case <-time.After(timeout):
case <-timer.C:

Check warning on line 95 in client/tso_stream.go

View check run for this annotation

Codecov / codecov/patch

client/tso_stream.go#L95

Added line #L95 was not covered by tests
cancel()
case <-ctx.Done():
}
Expand Down
5 changes: 4 additions & 1 deletion pkg/election/lease.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,8 @@ func (l *lease) KeepAlive(ctx context.Context) {
timeCh := l.keepAliveWorker(ctx, l.leaseTimeout/3)

var maxExpire time.Time
timer := time.NewTimer(l.leaseTimeout)
defer timer.Stop()
for {
select {
case t := <-timeCh:
Expand All @@ -122,7 +124,8 @@ func (l *lease) KeepAlive(ctx context.Context) {
l.expireTime.Store(t)
}
}
case <-time.After(l.leaseTimeout):
timer.Reset(l.leaseTimeout)
case <-timer.C:
log.Info("lease timeout", zap.Time("expire", l.expireTime.Load().(time.Time)), zap.String("purpose", l.Purpose))
return
case <-ctx.Done():
Expand Down
4 changes: 3 additions & 1 deletion pkg/mcs/tso/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -595,6 +595,8 @@ func (s *Server) waitAPIServiceReady() error {
ready bool
err error
)
ticker := time.NewTicker(retryIntervalWaitAPIService)
defer ticker.Stop()
for i := 0; i < maxRetryTimesWaitAPIService; i++ {
ready, err = s.isAPIServiceReady()
if err == nil && ready {
Expand All @@ -604,7 +606,7 @@ func (s *Server) waitAPIServiceReady() error {
select {
case <-s.ctx.Done():
return errors.New("context canceled while waiting api server ready")
case <-time.After(retryIntervalWaitAPIService):
case <-ticker.C:
}
}
if err != nil {
Expand Down
4 changes: 3 additions & 1 deletion pkg/mcs/utils/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,14 +37,16 @@ const (

// InitClusterID initializes the cluster ID.
func InitClusterID(ctx context.Context, client *clientv3.Client) (id uint64, err error) {
ticker := time.NewTicker(retryInterval)
defer ticker.Stop()
for i := 0; i < maxRetryTimes; i++ {
if clusterID, err := etcdutil.GetClusterID(client, clusterIDPath); err == nil && clusterID != 0 {
return clusterID, nil
}
select {
case <-ctx.Done():
return 0, err
case <-time.After(retryInterval):
case <-ticker.C:

Check warning on line 49 in pkg/mcs/utils/util.go

View check run for this annotation

Codecov / codecov/patch

pkg/mcs/utils/util.go#L49

Added line #L49 was not covered by tests
}
}
return 0, errors.Errorf("failed to init cluster ID after retrying %d times", maxRetryTimes)
Expand Down
8 changes: 6 additions & 2 deletions pkg/replication/replication_mode.go
Original file line number Diff line number Diff line change
Expand Up @@ -375,14 +375,18 @@ const (
// Run starts the background job.
func (m *ModeManager) Run(ctx context.Context) {
// Wait for a while when just start, in case tikv do not connect in time.
timer := time.NewTimer(idleTimeout)
defer timer.Stop()
select {
case <-time.After(idleTimeout):
case <-timer.C:
case <-ctx.Done():
return
}
ticker := time.NewTicker(tickInterval)
defer ticker.Stop()
for {
select {
case <-time.After(tickInterval):
case <-ticker.C:
case <-ctx.Done():
return
}
Expand Down
4 changes: 3 additions & 1 deletion pkg/tso/keyspace_group_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -535,6 +535,8 @@ func (kgm *KeyspaceGroupManager) primaryPriorityCheckLoop() {
kgm.primaryPriorityCheckInterval = 200 * time.Millisecond
})

ticker := time.NewTicker(kgm.primaryPriorityCheckInterval)
defer ticker.Stop()
ctx, cancel := context.WithCancel(kgm.ctx)
defer cancel()
groupID := 0
Expand All @@ -543,7 +545,7 @@ func (kgm *KeyspaceGroupManager) primaryPriorityCheckLoop() {
case <-ctx.Done():
log.Info("exit primary priority check loop")
return
case <-time.After(kgm.primaryPriorityCheckInterval):
case <-ticker.C:
// Every primaryPriorityCheckInterval, we only reset the primary of one keyspace group
member, kg, localPriority, nextGroupID := kgm.getNextPrimaryToReset(groupID, kgm.tsoServiceID.ServiceAddr)
if member != nil {
Expand Down
23 changes: 18 additions & 5 deletions pkg/utils/tsoutil/tso_dispatcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -195,9 +195,15 @@ func (s *TSODispatcher) finishRequest(requests []Request, physical, firstLogical
return nil
}

var timerPool = sync.Pool{
New: func() interface{} {
return time.NewTimer(DefaultTSOProxyTimeout)
},
}

// TSDeadline is used to watch the deadline of each tso request.
type TSDeadline struct {
timer <-chan time.Time
timer *time.Timer
done chan struct{}
cancel context.CancelFunc
}
Expand All @@ -208,8 +214,10 @@ func NewTSDeadline(
done chan struct{},
cancel context.CancelFunc,
) *TSDeadline {
timer := timerPool.Get().(*time.Timer)
timer.Reset(timeout)
return &TSDeadline{
timer: time.After(timeout),
timer: timer,
done: done,
cancel: cancel,
}
Expand All @@ -224,11 +232,15 @@ func WatchTSDeadline(ctx context.Context, tsDeadlineCh <-chan *TSDeadline) {
select {
case d := <-tsDeadlineCh:
select {
case <-d.timer:
case <-d.timer.C:
log.Error("tso proxy request processing is canceled due to timeout",
errs.ZapError(errs.ErrProxyTSOTimeout))
d.cancel()
timerPool.Put(d.timer) // it's safe to put the timer back to the pool
continue
case <-d.done:
d.timer.Stop() // not recieved from timer.C, so we need to stop the timer

Check failure on line 242 in pkg/utils/tsoutil/tso_dispatcher.go

View workflow job for this annotation

GitHub Actions / statics

`recieved` is a misspelling of `received` (misspell)
timerPool.Put(d.timer)
continue
case <-ctx.Done():
return
Expand All @@ -241,11 +253,12 @@ func WatchTSDeadline(ctx context.Context, tsDeadlineCh <-chan *TSDeadline) {

func checkStream(streamCtx context.Context, cancel context.CancelFunc, done chan struct{}) {
defer logutil.LogPanic()

timer := time.NewTimer(3 * time.Second)
defer timer.Stop()
select {
case <-done:
return
case <-time.After(3 * time.Second):
case <-timer.C:

Check warning on line 261 in pkg/utils/tsoutil/tso_dispatcher.go

View check run for this annotation

Codecov / codecov/patch

pkg/utils/tsoutil/tso_dispatcher.go#L261

Added line #L261 was not covered by tests
cancel()
case <-streamCtx.Done():
}
Expand Down

0 comments on commit 1e2828f

Please sign in to comment.