From 50862a702109c1d8e6deddae5c89ed775be5afbd Mon Sep 17 00:00:00 2001 From: qupeng Date: Tue, 1 Aug 2023 16:50:38 +0800 Subject: [PATCH] sink(cdc): close table sinks when sink factory fails (#9449) close pingcap/tiflow#9450 Signed-off-by: qupeng --- cdc/processor/sinkmanager/manager.go | 61 +++++++++---------- .../sinkmanager/table_sink_wrapper.go | 31 +++++----- .../sinkmanager/table_sink_wrapper_test.go | 2 +- cdc/sinkv2/tablesink/table_sink_impl.go | 4 +- cdc/sinkv2/tablesink/table_sink_impl_test.go | 30 +++------ 5 files changed, 59 insertions(+), 69 deletions(-) diff --git a/cdc/processor/sinkmanager/manager.go b/cdc/processor/sinkmanager/manager.go index 99466f59796..0350953be5e 100644 --- a/cdc/processor/sinkmanager/manager.go +++ b/cdc/processor/sinkmanager/manager.go @@ -273,6 +273,20 @@ func (m *SinkManager) run(ctx context.Context, warnings ...chan<- error) (err er zap.Error(err)) m.clearSinkFactory() sinkFactoryErrors = make(chan error, 16) + + start := time.Now() + log.Info("Sink manager is closing all table sinks", + zap.String("namespace", m.changefeedID.Namespace), + zap.String("changefeed", m.changefeedID.ID)) + m.tableSinks.Range(func(span tablepb.Span, value interface{}) bool { + value.(*tableSinkWrapper).closeTableSink() + m.sinkMemQuota.ClearTable(span) + return true + }) + log.Info("Sink manager has closed all table sinks", + zap.String("namespace", m.changefeedID.Namespace), + zap.String("changefeed", m.changefeedID.ID), + zap.Duration("cost", time.Since(start))) } if !cerror.IsChangefeedUnRetryableError(err) && errors.Cause(err) != context.Canceled { @@ -413,22 +427,17 @@ func (m *SinkManager) backgroundGC(errors chan<- error) { }() } -// generateSinkTasks generates tasks to fetch data from the source manager. -func (m *SinkManager) generateSinkTasks(ctx context.Context) error { - // Task upperbound is limited by barrierTs and schemaResolvedTs. - // But receivedSorterResolvedTs can be less than barrierTs, in which case - // the table is just scheduled to this node. - getUpperBound := func( - tableSinkUpperBoundTs model.Ts, - ) engine.Position { - schemaTs := m.schemaStorage.ResolvedTs() - if schemaTs != math.MaxUint64 && tableSinkUpperBoundTs > schemaTs+1 { - // schemaTs == math.MaxUint64 means it's in tests. - tableSinkUpperBoundTs = schemaTs + 1 - } - return engine.Position{StartTs: tableSinkUpperBoundTs - 1, CommitTs: tableSinkUpperBoundTs} +func (m *SinkManager) getUpperBound(tableSinkUpperBoundTs model.Ts) engine.Position { + schemaTs := m.schemaStorage.ResolvedTs() + if schemaTs != math.MaxUint64 && tableSinkUpperBoundTs > schemaTs+1 { + // schemaTs == math.MaxUint64 means it's in tests. + tableSinkUpperBoundTs = schemaTs + 1 } + return engine.Position{StartTs: tableSinkUpperBoundTs - 1, CommitTs: tableSinkUpperBoundTs} +} +// generateSinkTasks generates tasks to fetch data from the source manager. +func (m *SinkManager) generateSinkTasks(ctx context.Context) error { dispatchTasks := func() error { tables := make([]*tableSinkWrapper, 0, sinkWorkerNum) progs := make([]*progress, 0, sinkWorkerNum) @@ -476,7 +485,7 @@ func (m *SinkManager) generateSinkTasks(ctx context.Context) error { tableSink := tables[i] slowestTableProgress := progs[i] lowerBound := slowestTableProgress.nextLowerBoundPos - upperBound := getUpperBound(tableSink.getUpperBoundTs()) + upperBound := m.getUpperBound(tableSink.getUpperBoundTs()) // The table has no available progress. if lowerBound.Compare(upperBound) >= 0 { m.sinkProgressHeap.push(slowestTableProgress) @@ -502,7 +511,7 @@ func (m *SinkManager) generateSinkTasks(ctx context.Context) error { t := &sinkTask{ tableID: tableSink.tableID, lowerBound: lowerBound, - getUpperBound: getUpperBound, + getUpperBound: m.getUpperBound, tableSink: tableSink, callback: func(lastWrittenPos engine.Position) { p := &progress{ @@ -566,18 +575,6 @@ func (m *SinkManager) generateSinkTasks(ctx context.Context) error { } func (m *SinkManager) generateRedoTasks(ctx context.Context) error { - // We use the table's resolved ts as the upper bound to fetch events. - getUpperBound := func(tableSinkUpperBoundTs model.Ts) engine.Position { - // If a task carries events after schemaResolvedTs, mounter group threads - // can be blocked on waiting schemaResolvedTs get advanced. - schemaTs := m.schemaStorage.ResolvedTs() - if tableSinkUpperBoundTs > schemaTs+1 { - tableSinkUpperBoundTs = schemaTs + 1 - } - - return engine.Position{StartTs: tableSinkUpperBoundTs - 1, CommitTs: tableSinkUpperBoundTs} - } - dispatchTasks := func() error { tables := make([]*tableSinkWrapper, 0, redoWorkerNum) progs := make([]*progress, 0, redoWorkerNum) @@ -624,7 +621,7 @@ func (m *SinkManager) generateRedoTasks(ctx context.Context) error { tableSink := tables[i] slowestTableProgress := progs[i] lowerBound := slowestTableProgress.nextLowerBoundPos - upperBound := getUpperBound(tableSink.getReceivedSorterResolvedTs()) + upperBound := m.getUpperBound(tableSink.getReceivedSorterResolvedTs()) // The table has no available progress. if lowerBound.Compare(upperBound) >= 0 { @@ -646,7 +643,7 @@ func (m *SinkManager) generateRedoTasks(ctx context.Context) error { t := &redoTask{ tableID: tableSink.tableID, lowerBound: lowerBound, - getUpperBound: getUpperBound, + getUpperBound: m.getUpperBound, tableSink: tableSink, callback: func(lastWrittenPos engine.Position) { p := &progress{ @@ -840,7 +837,7 @@ func (m *SinkManager) AsyncStopTable(tableID model.TableID) bool { zap.String("changefeed", m.changefeedID.ID), zap.Int64("tableID", tableID)) } - if tableSink.(*tableSinkWrapper).asyncClose() { + if tableSink.(*tableSinkWrapper).asyncStop() { cleanedBytes := m.sinkMemQuota.RemoveTable(tableID) cleanedBytes += m.redoMemQuota.RemoveTable(tableID) log.Debug("MemoryQuotaTracing: Clean up memory quota for table sink task when removing table", @@ -910,7 +907,7 @@ func (m *SinkManager) GetTableState(tableID model.TableID) (tablepb.TableState, // again or not if it returns false. So we must retry `tableSink.asyncClose` here // if necessary. It's better to remove the dirty logic in the future. tableSink := wrapper.(*tableSinkWrapper) - if tableSink.getState() == tablepb.TableStateStopping && tableSink.asyncClose() { + if tableSink.getState() == tablepb.TableStateStopping && tableSink.asyncStop() { cleanedBytes := m.sinkMemQuota.RemoveTable(tableID) cleanedBytes += m.redoMemQuota.RemoveTable(tableID) log.Debug("MemoryQuotaTracing: Clean up memory quota for table sink task when removing table", diff --git a/cdc/processor/sinkmanager/table_sink_wrapper.go b/cdc/processor/sinkmanager/table_sink_wrapper.go index 2d46c3bf679..18fc71ed0cf 100644 --- a/cdc/processor/sinkmanager/table_sink_wrapper.go +++ b/cdc/processor/sinkmanager/table_sink_wrapper.go @@ -276,7 +276,7 @@ func (t *tableSinkWrapper) markAsClosed() { } } -func (t *tableSinkWrapper) asyncClose() bool { +func (t *tableSinkWrapper) asyncStop() bool { t.markAsClosing() if t.asyncCloseAndClearTableSink() { t.markAsClosed() @@ -285,7 +285,7 @@ func (t *tableSinkWrapper) asyncClose() bool { return false } -func (t *tableSinkWrapper) close() { +func (t *tableSinkWrapper) stop() { t.markAsClosing() t.closeAndClearTableSink() t.markAsClosed() @@ -302,29 +302,32 @@ func (t *tableSinkWrapper) initTableSink() bool { return true } -func (t *tableSinkWrapper) asyncCloseAndClearTableSink() bool { +func (t *tableSinkWrapper) asyncCloseTableSink() bool { t.tableSinkMu.RLock() + defer t.tableSinkMu.RUnlock() if t.tableSink == nil { - t.tableSinkMu.RUnlock() return true } - if !t.tableSink.AsyncClose() { - t.tableSinkMu.RUnlock() - return false - } - t.tableSinkMu.RUnlock() - t.doTableSinkClear() - return true + return t.tableSink.AsyncClose() } -func (t *tableSinkWrapper) closeAndClearTableSink() { +func (t *tableSinkWrapper) closeTableSink() { t.tableSinkMu.RLock() + defer t.tableSinkMu.RUnlock() if t.tableSink == nil { - t.tableSinkMu.RUnlock() return } t.tableSink.Close() - t.tableSinkMu.RUnlock() +} + +func (t *tableSinkWrapper) asyncCloseAndClearTableSink() bool { + t.asyncCloseTableSink() + t.doTableSinkClear() + return true +} + +func (t *tableSinkWrapper) closeAndClearTableSink() { + t.closeTableSink() t.doTableSinkClear() } diff --git a/cdc/processor/sinkmanager/table_sink_wrapper_test.go b/cdc/processor/sinkmanager/table_sink_wrapper_test.go index 209801244b5..29d8cafbf7f 100644 --- a/cdc/processor/sinkmanager/table_sink_wrapper_test.go +++ b/cdc/processor/sinkmanager/table_sink_wrapper_test.go @@ -89,7 +89,7 @@ func TestTableSinkWrapperClose(t *testing.T) { wrapper, _ := createTableSinkWrapper(model.DefaultChangeFeedID("1"), 1) require.Equal(t, tablepb.TableStatePreparing, wrapper.getState()) - wrapper.close() + wrapper.stop() require.Equal(t, tablepb.TableStateStopped, wrapper.getState(), "table sink state should be stopped") } diff --git a/cdc/sinkv2/tablesink/table_sink_impl.go b/cdc/sinkv2/tablesink/table_sink_impl.go index affe7af76a4..a096725659e 100644 --- a/cdc/sinkv2/tablesink/table_sink_impl.go +++ b/cdc/sinkv2/tablesink/table_sink_impl.go @@ -130,7 +130,9 @@ func (e *EventTableSink[E]) UpdateResolvedTs(resolvedTs model.ResolvedTs) error // GetCheckpointTs returns the checkpoint ts of the table sink. func (e *EventTableSink[E]) GetCheckpointTs() model.ResolvedTs { if e.state.Load() == state.TableSinkStopping { - e.progressTracker.checkClosed(e.backendSink.Dead()) + if e.progressTracker.checkClosed(e.backendSink.Dead()) { + e.markAsClosed() + } } return e.progressTracker.advance() } diff --git a/cdc/sinkv2/tablesink/table_sink_impl_test.go b/cdc/sinkv2/tablesink/table_sink_impl_test.go index ae925dcbb40..358198eee13 100644 --- a/cdc/sinkv2/tablesink/table_sink_impl_test.go +++ b/cdc/sinkv2/tablesink/table_sink_impl_test.go @@ -381,26 +381,14 @@ func TestCheckpointTsFrozenWhenStopping(t *testing.T) { require.Nil(t, err) require.Len(t, sink.events, 7, "all events should be flushed") - go func() { - time.Sleep(time.Millisecond * 10) - sink.Close() - }() + // Table sink close should return even if callbacks are not called, + // because the backend sink is closed. + sink.Close() + tb.Close() - var wg sync.WaitGroup - wg.Add(1) - go func() { - defer wg.Done() - tb.Close() - }() - require.Eventually(t, func() bool { - return state.TableSinkStopping == tb.state.Load() - }, time.Second, time.Microsecond, "table should be stopping") - wg.Add(1) - go func() { - defer wg.Done() - currentTs := tb.GetCheckpointTs() - sink.acknowledge(105) - require.Equal(t, currentTs, tb.GetCheckpointTs(), "checkpointTs should not be updated") - }() - wg.Wait() + require.Equal(t, state.TableSinkStopped, tb.state.Load()) + + currentTs := tb.GetCheckpointTs() + sink.acknowledge(105) + require.Equal(t, currentTs, tb.GetCheckpointTs(), "checkpointTs should not be updated") }