diff --git a/cdc/redo/writer/memory/encoding_worker.go b/cdc/redo/writer/memory/encoding_worker.go index 8d8e37916fb..9b0cc6a8652 100644 --- a/cdc/redo/writer/memory/encoding_worker.go +++ b/cdc/redo/writer/memory/encoding_worker.go @@ -104,7 +104,7 @@ type encodingWorkerGroup struct { workerNum int nextWorker atomic.Uint64 - closed chan struct{} + closed chan error } func newEncodingWorkerGroup(cfg *writer.LogWriterConfig) *encodingWorkerGroup { @@ -121,19 +121,20 @@ func newEncodingWorkerGroup(cfg *writer.LogWriterConfig) *encodingWorkerGroup { inputChs: inputChs, outputCh: make(chan *polymorphicRedoEvent, redo.DefaultEncodingOutputChanSize), workerNum: workerNum, - closed: make(chan struct{}), + closed: make(chan error, 1), } } func (e *encodingWorkerGroup) Run(ctx context.Context) (err error) { defer func() { - close(e.closed) + log.Warn("redo encoding workers closed", + zap.String("namespace", e.changefeed.Namespace), + zap.String("changefeed", e.changefeed.ID), + zap.Error(err)) if err != nil && errors.Cause(err) != context.Canceled { - log.Warn("redo fileWorkerGroup closed with error", - zap.String("namespace", e.changefeed.Namespace), - zap.String("changefeed", e.changefeed.ID), - zap.Error(err)) + e.closed <- err } + close(e.closed) }() eg, egCtx := errgroup.WithContext(ctx) for i := 0; i < e.workerNum; i++ { @@ -183,8 +184,8 @@ func (e *encodingWorkerGroup) input( select { case <-ctx.Done(): return ctx.Err() - case <-e.closed: - return errors.ErrRedoWriterStopped.GenWithStack("encoding worker is closed") + case err := <-e.closed: + return errors.WrapError(errors.ErrRedoWriterStopped, err, "encoding worker is closed") case e.inputChs[idx] <- event: return nil } @@ -196,8 +197,8 @@ func (e *encodingWorkerGroup) output( select { case <-ctx.Done(): return ctx.Err() - case <-e.closed: - return errors.ErrRedoWriterStopped.GenWithStack("encoding worker is closed") + case err := <-e.closed: + return errors.WrapError(errors.ErrRedoWriterStopped, err, "encoding worker is closed") case e.outputCh <- event: return nil } @@ -221,8 +222,8 @@ func (e *encodingWorkerGroup) FlushAll(ctx context.Context) error { select { case <-ctx.Done(): return ctx.Err() - case <-e.closed: - return errors.ErrRedoWriterStopped.GenWithStack("encoding worker is closed") + case err := <-e.closed: + return errors.WrapError(errors.ErrRedoWriterStopped, err, "encoding worker is closed") case <-flushCh: } return nil @@ -245,8 +246,8 @@ func (e *encodingWorkerGroup) broadcastAndWaitEncoding(ctx context.Context) erro select { case <-ctx.Done(): return ctx.Err() - case <-e.closed: - return errors.ErrRedoWriterStopped.GenWithStack("encoding worker is closed") + case err := <-e.closed: + return errors.WrapError(errors.ErrRedoWriterStopped, err, "encoding worker is closed") case <-ch: } } diff --git a/cdc/redo/writer/memory/file_worker.go b/cdc/redo/writer/memory/file_worker.go index 572a03bcc20..b92e37e59ee 100644 --- a/cdc/redo/writer/memory/file_worker.go +++ b/cdc/redo/writer/memory/file_worker.go @@ -155,12 +155,10 @@ func (f *fileWorkerGroup) Run( ) (err error) { defer func() { f.close() - if err != nil && errors.Cause(err) != context.Canceled { - log.Warn("redo file workers closed with error", - zap.String("namespace", f.cfg.ChangeFeedID.Namespace), - zap.String("changefeed", f.cfg.ChangeFeedID.ID), - zap.Error(err)) - } + log.Warn("redo file workers closed", + zap.String("namespace", f.cfg.ChangeFeedID.Namespace), + zap.String("changefeed", f.cfg.ChangeFeedID.ID), + zap.Error(err)) }() eg, egCtx := errgroup.WithContext(ctx) diff --git a/cdc/redo/writer/memory/mem_log_writer_test.go b/cdc/redo/writer/memory/mem_log_writer_test.go index b20b2eb4131..33e1f426489 100644 --- a/cdc/redo/writer/memory/mem_log_writer_test.go +++ b/cdc/redo/writer/memory/mem_log_writer_test.go @@ -22,7 +22,6 @@ import ( "github.com/pingcap/log" "github.com/pingcap/tiflow/cdc/model" "github.com/pingcap/tiflow/cdc/redo/writer" - cerror "github.com/pingcap/tiflow/pkg/errors" "github.com/pingcap/tiflow/pkg/redo" "github.com/pingcap/tiflow/pkg/util" "github.com/stretchr/testify/require" @@ -101,11 +100,9 @@ func testWriteEvents(t *testing.T, events []writer.RedoEvent) { require.NoError(t, err) require.ErrorIs(t, lw.Close(), context.Canceled) - require.Eventually(t, func() bool { - err = lw.WriteEvents(ctx, events...) - return err != nil - }, 2*time.Second, 10*time.Millisecond) - require.ErrorIs(t, err, cerror.ErrRedoWriterStopped) + + err = lw.WriteEvents(ctx, events...) + require.NoError(t, err) err = lw.FlushLog(ctx) - require.ErrorIs(t, err, cerror.ErrRedoWriterStopped) + require.NoError(t, err) }