Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

cdc: retry internal context deadline exceeded (#8602) #8643

Merged
40 changes: 25 additions & 15 deletions cdc/capture/capture.go
Original file line number Diff line number Diff line change
Expand Up @@ -195,13 +195,12 @@ func (c *captureImpl) GetEtcdClient() etcd.CDCEtcdClient {
func (c *captureImpl) reset(ctx context.Context) error {
lease, err := c.EtcdClient.GetEtcdClient().Grant(ctx, int64(c.config.CaptureSessionTTL))
if err != nil {
return cerror.WrapError(cerror.ErrNewCaptureFailed, err)
return errors.Trace(err)
}
sess, err := concurrency.NewSession(
c.EtcdClient.GetEtcdClient().Unwrap(),
concurrency.WithLease(lease.ID))
c.EtcdClient.GetEtcdClient().Unwrap(), concurrency.WithLease(lease.ID))
if err != nil {
return cerror.WrapError(cerror.ErrNewCaptureFailed, err)
return errors.Trace(err)
}

c.captureMu.Lock()
Expand All @@ -218,7 +217,7 @@ func (c *captureImpl) reset(ctx context.Context) error {
c.upstreamManager = upstream.NewManager(ctx, c.EtcdClient.GetGCServiceID())
_, err = c.upstreamManager.AddDefaultUpstream(c.pdEndpoints, c.config.Security)
if err != nil {
return cerror.WrapError(cerror.ErrNewCaptureFailed, err)
return errors.Trace(err)
}

c.processorManager = c.newProcessorManager(c.info, c.upstreamManager, &c.liveness)
Expand Down Expand Up @@ -274,18 +273,23 @@ func (c *captureImpl) Run(ctx context.Context) error {
}
return errors.Trace(err)
}
err = c.reset(ctx)
if err != nil {
log.Error("reset capture failed", zap.Error(err))
return errors.Trace(err)
}
err = c.run(ctx)
// if capture suicided, reset the capture and run again.
// if the canceled error throw, there are two possible scenarios:
// 1. the internal context canceled, it means some error happened in the internal, and the routine is exited, we should restart the capture
// 2. the parent context canceled, it means that the caller of the capture hope the capture to exit, and this loop will return in the above `select` block
// TODO: make sure the internal cancel should return the real error instead of context.Canceled
if cerror.ErrCaptureSuicide.Equal(err) || context.Canceled == errors.Cause(err) {
// 1. the internal context canceled, it means some error happened in
// the internal, and the routine is exited, we should restart
// the capture.
// 2. the parent context canceled, it means that the caller of
// the capture hope the capture to exit, and this loop will return
// in the above `select` block.
// if there are some **internal** context deadline exceeded (IO/network
// timeout), reset the capture and run again.
//
// TODO: make sure the internal cancel should return the real error
// instead of context.Canceled.
if cerror.ErrCaptureSuicide.Equal(err) ||
context.Canceled == errors.Cause(err) ||
context.DeadlineExceeded == errors.Cause(err) {
log.Info("capture recovered", zap.String("captureID", c.info.ID))
continue
}
Expand All @@ -294,7 +298,13 @@ func (c *captureImpl) Run(ctx context.Context) error {
}

func (c *captureImpl) run(stdCtx context.Context) error {
err := c.register(stdCtx)
err := c.reset(stdCtx)
if err != nil {
log.Error("reset capture failed", zap.Error(err))
return errors.Trace(err)
}

err = c.register(stdCtx)
if err != nil {
return errors.Trace(err)
}
Expand Down
50 changes: 50 additions & 0 deletions cdc/capture/capture_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
cdcContext "github.com/pingcap/tiflow/pkg/context"
"github.com/pingcap/tiflow/pkg/etcd"
mock_etcd "github.com/pingcap/tiflow/pkg/etcd/mock"
"github.com/prometheus/client_golang/prometheus"
"github.com/stretchr/testify/require"
"go.etcd.io/etcd/client/pkg/v3/logutil"
clientv3 "go.etcd.io/etcd/client/v3"
Expand Down Expand Up @@ -77,6 +78,55 @@ func TestReset(t *testing.T) {
wg.Wait()
}

type mockEtcdClient struct {
etcd.CDCEtcdClient
clientv3.Lease
called chan struct{}
}

func (m *mockEtcdClient) GetEtcdClient() *etcd.Client {
cli := &clientv3.Client{Lease: m}
return etcd.Wrap(cli, map[string]prometheus.Counter{})
}

func (m *mockEtcdClient) Grant(_ context.Context, _ int64) (*clientv3.LeaseGrantResponse, error) {
select {
case m.called <- struct{}{}:
default:
}
return nil, context.DeadlineExceeded
}

func TestRetryInternalContextDeadlineExceeded(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())

called := make(chan struct{}, 2)
cp := NewCapture4Test(nil)
// In the current implementation, the first RPC is grant.
// the mock client always retry DeadlineExceeded for the RPC.
cp.EtcdClient = &mockEtcdClient{called: called}

errCh := make(chan error, 1)
go func() {
errCh <- cp.Run(ctx)
}()
time.Sleep(100 * time.Millisecond)
// Waiting for Grant to be called.
<-called
time.Sleep(100 * time.Millisecond)
// Make sure it retrys
<-called

// Do not retry context canceled.
cancel()
select {
case err := <-errCh:
require.NoError(t, err)
case <-time.After(5 * time.Second):
require.Fail(t, "timeout")
}
}

func TestInfo(t *testing.T) {
cp := NewCapture4Test(nil)
cp.info = nil
Expand Down
4 changes: 2 additions & 2 deletions cdc/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -179,12 +179,12 @@ func (s *server) prepare(ctx context.Context) error {
},
})
if err != nil {
return cerror.WrapError(cerror.ErrNewCaptureFailed, err)
return errors.Trace(err)
}

cdcEtcdClient, err := etcd.NewCDCEtcdClient(ctx, etcdCli, conf.ClusterID)
if err != nil {
return cerror.WrapError(cerror.ErrNewCaptureFailed, err)
return errors.Trace(err)
}
s.etcdClient = cdcEtcdClient

Expand Down
5 changes: 0 additions & 5 deletions errors.toml
Original file line number Diff line number Diff line change
Expand Up @@ -701,11 +701,6 @@ error = '''
MySQL worker panic
'''

["CDC:ErrNewCaptureFailed"]
error = '''
new capture failed
'''

["CDC:ErrNewProcessorFailed"]
error = '''
new processor failed
Expand Down
4 changes: 0 additions & 4 deletions pkg/errors/cdc_errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -605,10 +605,6 @@ var (
"capture suicide",
errors.RFCCodeText("CDC:ErrCaptureSuicide"),
)
ErrNewCaptureFailed = errors.Normalize(
"new capture failed",
errors.RFCCodeText("CDC:ErrNewCaptureFailed"),
)
ErrCaptureRegister = errors.Normalize(
"capture register to etcd failed",
errors.RFCCodeText("CDC:ErrCaptureRegister"),
Expand Down