Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

sink(cdc): close table sinks when sink factory fails (#9449) #9464

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
61 changes: 29 additions & 32 deletions cdc/processor/sinkmanager/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -273,6 +273,20 @@ func (m *SinkManager) run(ctx context.Context, warnings ...chan<- error) (err er
zap.Error(err))
m.clearSinkFactory()
sinkFactoryErrors = make(chan error, 16)

start := time.Now()
log.Info("Sink manager is closing all table sinks",
zap.String("namespace", m.changefeedID.Namespace),
zap.String("changefeed", m.changefeedID.ID))
m.tableSinks.Range(func(key, value interface{}) bool {
value.(*tableSinkWrapper).closeTableSink()
m.sinkMemQuota.ClearTable(key.(model.TableID))
return true
})
log.Info("Sink manager has closed all table sinks",
zap.String("namespace", m.changefeedID.Namespace),
zap.String("changefeed", m.changefeedID.ID),
zap.Duration("cost", time.Since(start)))
}

if !cerror.IsChangefeedUnRetryableError(err) && errors.Cause(err) != context.Canceled {
Expand Down Expand Up @@ -413,22 +427,17 @@ func (m *SinkManager) backgroundGC(errors chan<- error) {
}()
}

// generateSinkTasks generates tasks to fetch data from the source manager.
func (m *SinkManager) generateSinkTasks(ctx context.Context) error {
// Task upperbound is limited by barrierTs and schemaResolvedTs.
// But receivedSorterResolvedTs can be less than barrierTs, in which case
// the table is just scheduled to this node.
getUpperBound := func(
tableSinkUpperBoundTs model.Ts,
) engine.Position {
schemaTs := m.schemaStorage.ResolvedTs()
if schemaTs != math.MaxUint64 && tableSinkUpperBoundTs > schemaTs+1 {
// schemaTs == math.MaxUint64 means it's in tests.
tableSinkUpperBoundTs = schemaTs + 1
}
return engine.Position{StartTs: tableSinkUpperBoundTs - 1, CommitTs: tableSinkUpperBoundTs}
func (m *SinkManager) getUpperBound(tableSinkUpperBoundTs model.Ts) engine.Position {
schemaTs := m.schemaStorage.ResolvedTs()
if schemaTs != math.MaxUint64 && tableSinkUpperBoundTs > schemaTs+1 {
// schemaTs == math.MaxUint64 means it's in tests.
tableSinkUpperBoundTs = schemaTs + 1
}
return engine.Position{StartTs: tableSinkUpperBoundTs - 1, CommitTs: tableSinkUpperBoundTs}
}

// generateSinkTasks generates tasks to fetch data from the source manager.
func (m *SinkManager) generateSinkTasks(ctx context.Context) error {
dispatchTasks := func() error {
tables := make([]*tableSinkWrapper, 0, sinkWorkerNum)
progs := make([]*progress, 0, sinkWorkerNum)
Expand Down Expand Up @@ -476,7 +485,7 @@ func (m *SinkManager) generateSinkTasks(ctx context.Context) error {
tableSink := tables[i]
slowestTableProgress := progs[i]
lowerBound := slowestTableProgress.nextLowerBoundPos
upperBound := getUpperBound(tableSink.getUpperBoundTs())
upperBound := m.getUpperBound(tableSink.getUpperBoundTs())
// The table has no available progress.
if lowerBound.Compare(upperBound) >= 0 {
m.sinkProgressHeap.push(slowestTableProgress)
Expand All @@ -502,7 +511,7 @@ func (m *SinkManager) generateSinkTasks(ctx context.Context) error {
t := &sinkTask{
tableID: tableSink.tableID,
lowerBound: lowerBound,
getUpperBound: getUpperBound,
getUpperBound: m.getUpperBound,
tableSink: tableSink,
callback: func(lastWrittenPos engine.Position) {
p := &progress{
Expand Down Expand Up @@ -566,18 +575,6 @@ func (m *SinkManager) generateSinkTasks(ctx context.Context) error {
}

func (m *SinkManager) generateRedoTasks(ctx context.Context) error {
// We use the table's resolved ts as the upper bound to fetch events.
getUpperBound := func(tableSinkUpperBoundTs model.Ts) engine.Position {
// If a task carries events after schemaResolvedTs, mounter group threads
// can be blocked on waiting schemaResolvedTs get advanced.
schemaTs := m.schemaStorage.ResolvedTs()
if tableSinkUpperBoundTs > schemaTs+1 {
tableSinkUpperBoundTs = schemaTs + 1
}

return engine.Position{StartTs: tableSinkUpperBoundTs - 1, CommitTs: tableSinkUpperBoundTs}
}

dispatchTasks := func() error {
tables := make([]*tableSinkWrapper, 0, redoWorkerNum)
progs := make([]*progress, 0, redoWorkerNum)
Expand Down Expand Up @@ -624,7 +621,7 @@ func (m *SinkManager) generateRedoTasks(ctx context.Context) error {
tableSink := tables[i]
slowestTableProgress := progs[i]
lowerBound := slowestTableProgress.nextLowerBoundPos
upperBound := getUpperBound(tableSink.getReceivedSorterResolvedTs())
upperBound := m.getUpperBound(tableSink.getReceivedSorterResolvedTs())

// The table has no available progress.
if lowerBound.Compare(upperBound) >= 0 {
Expand All @@ -646,7 +643,7 @@ func (m *SinkManager) generateRedoTasks(ctx context.Context) error {
t := &redoTask{
tableID: tableSink.tableID,
lowerBound: lowerBound,
getUpperBound: getUpperBound,
getUpperBound: m.getUpperBound,
tableSink: tableSink,
callback: func(lastWrittenPos engine.Position) {
p := &progress{
Expand Down Expand Up @@ -840,7 +837,7 @@ func (m *SinkManager) AsyncStopTable(tableID model.TableID) bool {
zap.String("changefeed", m.changefeedID.ID),
zap.Int64("tableID", tableID))
}
if tableSink.(*tableSinkWrapper).asyncClose() {
if tableSink.(*tableSinkWrapper).asyncStop() {
cleanedBytes := m.sinkMemQuota.RemoveTable(tableID)
cleanedBytes += m.redoMemQuota.RemoveTable(tableID)
log.Debug("MemoryQuotaTracing: Clean up memory quota for table sink task when removing table",
Expand Down Expand Up @@ -910,7 +907,7 @@ func (m *SinkManager) GetTableState(tableID model.TableID) (tablepb.TableState,
// again or not if it returns false. So we must retry `tableSink.asyncClose` here
// if necessary. It's better to remove the dirty logic in the future.
tableSink := wrapper.(*tableSinkWrapper)
if tableSink.getState() == tablepb.TableStateStopping && tableSink.asyncClose() {
if tableSink.getState() == tablepb.TableStateStopping && tableSink.asyncStop() {
cleanedBytes := m.sinkMemQuota.RemoveTable(tableID)
cleanedBytes += m.redoMemQuota.RemoveTable(tableID)
log.Debug("MemoryQuotaTracing: Clean up memory quota for table sink task when removing table",
Expand Down
31 changes: 17 additions & 14 deletions cdc/processor/sinkmanager/table_sink_wrapper.go
Original file line number Diff line number Diff line change
Expand Up @@ -276,7 +276,7 @@ func (t *tableSinkWrapper) markAsClosed() {
}
}

func (t *tableSinkWrapper) asyncClose() bool {
func (t *tableSinkWrapper) asyncStop() bool {
t.markAsClosing()
if t.asyncCloseAndClearTableSink() {
t.markAsClosed()
Expand All @@ -285,7 +285,7 @@ func (t *tableSinkWrapper) asyncClose() bool {
return false
}

func (t *tableSinkWrapper) close() {
func (t *tableSinkWrapper) stop() {
t.markAsClosing()
t.closeAndClearTableSink()
t.markAsClosed()
Expand All @@ -302,29 +302,32 @@ func (t *tableSinkWrapper) initTableSink() bool {
return true
}

func (t *tableSinkWrapper) asyncCloseAndClearTableSink() bool {
func (t *tableSinkWrapper) asyncCloseTableSink() bool {
t.tableSinkMu.RLock()
defer t.tableSinkMu.RUnlock()
if t.tableSink == nil {
t.tableSinkMu.RUnlock()
return true
}
if !t.tableSink.AsyncClose() {
t.tableSinkMu.RUnlock()
return false
}
t.tableSinkMu.RUnlock()
t.doTableSinkClear()
return true
return t.tableSink.AsyncClose()
}

func (t *tableSinkWrapper) closeAndClearTableSink() {
func (t *tableSinkWrapper) closeTableSink() {
t.tableSinkMu.RLock()
defer t.tableSinkMu.RUnlock()
if t.tableSink == nil {
t.tableSinkMu.RUnlock()
return
}
t.tableSink.Close()
t.tableSinkMu.RUnlock()
}

func (t *tableSinkWrapper) asyncCloseAndClearTableSink() bool {
t.asyncCloseTableSink()
t.doTableSinkClear()
return true
}

func (t *tableSinkWrapper) closeAndClearTableSink() {
t.closeTableSink()
t.doTableSinkClear()
}

Expand Down
2 changes: 1 addition & 1 deletion cdc/processor/sinkmanager/table_sink_wrapper_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ func TestTableSinkWrapperClose(t *testing.T) {

wrapper, _ := createTableSinkWrapper(model.DefaultChangeFeedID("1"), 1)
require.Equal(t, tablepb.TableStatePreparing, wrapper.getState())
wrapper.close()
wrapper.stop()
require.Equal(t, tablepb.TableStateStopped, wrapper.getState(), "table sink state should be stopped")
}

Expand Down
4 changes: 3 additions & 1 deletion cdc/sinkv2/tablesink/table_sink_impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,9 @@ func (e *EventTableSink[E]) UpdateResolvedTs(resolvedTs model.ResolvedTs) error
// GetCheckpointTs returns the checkpoint ts of the table sink.
func (e *EventTableSink[E]) GetCheckpointTs() model.ResolvedTs {
if e.state.Load() == state.TableSinkStopping {
e.progressTracker.checkClosed(e.backendSink.Dead())
if e.progressTracker.checkClosed(e.backendSink.Dead()) {
e.markAsClosed()
}
}
return e.progressTracker.advance()
}
Expand Down
30 changes: 9 additions & 21 deletions cdc/sinkv2/tablesink/table_sink_impl_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -381,26 +381,14 @@ func TestCheckpointTsFrozenWhenStopping(t *testing.T) {
require.Nil(t, err)
require.Len(t, sink.events, 7, "all events should be flushed")

go func() {
time.Sleep(time.Millisecond * 10)
sink.Close()
}()
// Table sink close should return even if callbacks are not called,
// because the backend sink is closed.
sink.Close()
tb.Close()

var wg sync.WaitGroup
wg.Add(1)
go func() {
defer wg.Done()
tb.Close()
}()
require.Eventually(t, func() bool {
return state.TableSinkStopping == tb.state.Load()
}, time.Second, time.Microsecond, "table should be stopping")
wg.Add(1)
go func() {
defer wg.Done()
currentTs := tb.GetCheckpointTs()
sink.acknowledge(105)
require.Equal(t, currentTs, tb.GetCheckpointTs(), "checkpointTs should not be updated")
}()
wg.Wait()
require.Equal(t, state.TableSinkStopped, tb.state.Load())

currentTs := tb.GetCheckpointTs()
sink.acknowledge(105)
require.Equal(t, currentTs, tb.GetCheckpointTs(), "checkpointTs should not be updated")
}